Compare commits

..

282 Commits

Author SHA1 Message Date
twwu
f815310fda Merge branch 'main' into feat/hitl-frontend
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-02-05 10:08:24 +08:00
JzoNg
2ca03d80f9 Merge branch 'main' into jzh
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-02-04 12:53:07 +08:00
JzoNg
52b9299e43 Merge branch 'main' into jzh
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-02-03 16:17:41 +08:00
JzoNg
757317f965 chore: add feat/hitl-frontend to build-push workflow triggers 2026-02-03 15:11:38 +08:00
JzoNg
e119d1a16c fix: do not stop when workflow paused event recieved 2026-02-03 15:08:59 +08:00
JzoNg
0187838a54 Merge branch 'main' into jzh 2026-02-03 14:42:30 +08:00
twwu
68c36655a7 chore: remove feat/hitl-frontend from build-push workflow triggers 2026-01-30 09:46:24 +08:00
twwu
0357530468 Merge branch 'main' into feat/hitl-frontend
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-29 18:55:13 +08:00
twwu
966e308e8b test(billing, rag-pipeline): enhance tests by adding act wrapper and mocking download utility 2026-01-29 18:42:33 +08:00
twwu
e405e801ce test(billing): add human_input_email_delivery_enabled to billing utils tests; remove obsolete text generation result tests 2026-01-29 18:20:26 +08:00
twwu
3170e12b0a Merge branch 'main' into feat/hitl-frontend 2026-01-29 17:27:25 +08:00
twwu
8bbf3ef261 Merge branch 'main' into feat/hitl-frontend
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-29 10:58:26 +08:00
twwu
e09bd3e977 refactor(Result): update setWorkflowProcessData to use useCallback and reset workflow process data on completion 2026-01-29 10:39:40 +08:00
twwu
b1f7602dc8 feat(humanInput): update email configuration defaults and remove validation tooltip
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-28 22:43:58 +08:00
twwu
2ac20ced26 feat(humanInput): add email configuration validation and tooltip for delivery method 2026-01-28 22:22:01 +08:00
twwu
4e34fa3d70 feat(humanInput): enhance web app delivery method handling in trigger mode 2026-01-28 22:03:47 +08:00
twwu
bf161d01a8 Merge branch 'main' into feat/hitl-frontend
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-28 18:29:13 +08:00
twwu
fe5a65b21c test(shortcuts-popup-plugin): add JSDOM mocks for Range methods to improve test reliability 2026-01-28 13:24:55 +08:00
twwu
b3be035c64 fix(userActions): update error messages to include dynamic max length and improve validation logic 2026-01-28 11:51:08 +08:00
twwu
91f118c82c fix(varReferencePicker): update trigger condition to use double negation for clarity 2026-01-28 11:38:52 +08:00
twwu
3eaafccff2 chore(mock): remove unused mock data file for human input form 2026-01-28 11:31:14 +08:00
twwu
9d3b4749ae fix(humanInput): wrap rate limit exceeded error message in return statement for proper rendering 2026-01-28 11:15:10 +08:00
twwu
67bbfd972b feat(humanInput): add rate limit exceeded error handling and localization support
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-28 10:43:13 +08:00
twwu
1968589fdf Merge branch 'main' into feat/hitl-frontend 2026-01-28 10:19:52 +08:00
twwu
fc2acc2c53 refactor(form): streamline error handling by removing redundant expired state management
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-27 21:58:15 +08:00
twwu
e945f3d439 feat(log): add paused status count to log rendering and simplify status rendering logic 2026-01-27 21:20:47 +08:00
twwu
1b0abdf642 feat(humanInput): improve form handling with site info integration and error management
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-27 18:14:24 +08:00
JzoNg
441104ac3c fix: lnit error 2026-01-27 17:55:43 +08:00
twwu
8897633e42 feat(workflow): add human input node formatting to retain latest status 2026-01-27 17:09:00 +08:00
twwu
8a20a59d72 Merge branch 'feat/hitl-frontend' of https://github.com/langgenius/dify into feat/hitl-frontend 2026-01-27 15:57:42 +08:00
twwu
7cd4a7c1de feat: implement human input form timeout handling and enhance expiration time display 2026-01-27 15:57:13 +08:00
JzoNg
b503da0942 fix: validation of inputs in email sender 2026-01-27 15:35:53 +08:00
twwu
94671d42c1 refactor(workflow): optimize input variable generation in useSingleRunFormParams hook 2026-01-27 15:19:45 +08:00
JzoNg
8cf817a5fd fix: empty result judgement 2026-01-27 15:02:45 +08:00
JzoNg
3e0b3fae37 fix: test running tab switch 2026-01-27 14:45:46 +08:00
twwu
19664daeef feat(chat): enhance file handling in chat by converting file types to MIME format 2026-01-27 11:19:07 +08:00
twwu
fe5b5bc4c3 Merge branch 'main' into feat/hitl-frontend 2026-01-27 11:06:50 +08:00
twwu
07d8eaae82 refactor: simplify filteredHumanInputFormDataList computation by removing useMemo
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-26 15:16:55 +08:00
JzoNg
e8130fc509 enhancement: add error style for wrong member in email configure of delivery methods 2026-01-26 14:38:17 +08:00
twwu
4ea3cf46fa fix: update key references in HumanInputFormList and adjust resume URL in useChat hook 2026-01-26 14:23:45 +08:00
JzoNg
824f139b46 fix: variable display in form content preview 2026-01-26 13:52:21 +08:00
twwu
f2d98e832f Merge branch 'feat/hitl-frontend' of https://github.com/langgenius/dify into feat/hitl-frontend 2026-01-26 12:00:37 +08:00
twwu
5f73fed1e7 fix(input-field): update regex for name validation to ensure correct format 2026-01-26 12:00:18 +08:00
JzoNg
2ccef3ac84 fix: human input field dragging 2026-01-26 11:47:51 +08:00
twwu
9341227cf1 fix(i18n): update Chinese translations for human input terminology 2026-01-26 11:11:39 +08:00
twwu
d9982b8dc4 Merge branch 'main' into feat/hitl-frontend 2026-01-26 10:50:25 +08:00
Wu Tianwei
d3d42e3a8e refactor: rename placeholder to default in form input (#31452)
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-23 18:01:44 +08:00
twwu
f73db70cec Merge branch 'main' into feat/hitl-frontend 2026-01-23 15:51:58 +08:00
twwu
bcd6e22735 fix: update key prop in UserActionItem to use index for consistent rendering
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-22 17:15:58 +08:00
twwu
9aa674a04f feat: add jumpToEmailConfigModal function to enhance email configuration navigation in delivery method component 2026-01-22 16:48:29 +08:00
twwu
a4b87be5f4 Merge branch 'feat/hitl-frontend' of https://github.com/langgenius/dify into feat/hitl-frontend 2026-01-22 16:16:52 +08:00
twwu
c8c98519ad refactor: update expiration time handling in human input component with new utility functions and localization support 2026-01-22 16:16:34 +08:00
JzoNg
6aa452d4e3 fix: add default value for form in HITL 2026-01-22 15:41:42 +08:00
JzoNg
45c2167e0f fix: default value of input field in form content editor 2026-01-22 15:35:52 +08:00
JzoNg
b242578b86 enhancement: add email address when input blur 2026-01-22 15:35:52 +08:00
JzoNg
d65523aa0b save draft when delivery method changed 2026-01-22 15:35:52 +08:00
twwu
ac46cf499f refactor: update human input form handling with new hooks and improve error management
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-22 14:52:06 +08:00
twwu
8f780cad4c Merge branch 'feat/hitl-frontend' of https://github.com/langgenius/dify into feat/hitl-frontend
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-21 17:33:15 +08:00
twwu
0389dd459d style: update class names for consistent width handling in chat answer component 2026-01-21 17:31:46 +08:00
JzoNg
a19ae24adb fix: fix typo 2026-01-21 17:27:26 +08:00
JzoNg
42e80659b6 fix: name validation for output variable 2026-01-21 17:02:17 +08:00
JzoNg
59cb447e05 enhancement: add keyboard events handle 2026-01-21 16:52:49 +08:00
twwu
e63f7b2249 fix: include state snapshot in workflow event URLs for chat hooks 2026-01-21 16:45:38 +08:00
twwu
307f0d5827 fix: initialize requiredInputs state to an empty object and improve requestParamsObj handling in useSingleRunFormParams hook 2026-01-21 16:23:42 +08:00
JzoNg
757b1c7190 fix: display node name in form content preview 2026-01-21 16:05:03 +08:00
JzoNg
c42b78f2d2 fix: lint err 2026-01-21 16:05:03 +08:00
twwu
645793c48c refactor: streamline input variable handling and enhance type definitions in human input components 2026-01-21 15:44:14 +08:00
twwu
528e3400da fix: add HumanInput block type to available blocks filter logic
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-21 13:54:19 +08:00
twwu
c69a26b1ca fix: correct spacing in expiration time message in Chinese localization 2026-01-21 12:51:21 +08:00
twwu
663f320d86 style: adjust layout and styling in HITL input component for improved responsiveness 2026-01-21 12:19:12 +08:00
twwu
6e9facd9b5 Merge branch 'feat/hitl-frontend' of https://github.com/langgenius/dify into feat/hitl-frontend 2026-01-21 11:32:20 +08:00
twwu
52e9342af5 feat: refactor user action handling in human input panel and update error messaging 2026-01-21 11:31:09 +08:00
JzoNg
0138dc45b6 fix linit error 2026-01-21 11:03:53 +08:00
JzoNg
aaf6e8f978 use action is for executed action text 2026-01-21 11:03:53 +08:00
JzoNg
55be933342 fix email input handle & email test sender UI 2026-01-21 11:03:51 +08:00
twwu
711cca01b8 Merge branch 'main' into feat/hitl-frontend 2026-01-21 10:44:23 +08:00
twwu
2b28074f4c feat: enhance email configuration and body input components with support for dynamic node variables 2026-01-21 10:30:48 +08:00
twwu
c5721184e9 refactor: implement workflow resumption logic in embedded chat and update chat wrapper functionality
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-20 17:03:06 +08:00
Wu Tianwei
f3ec6ad53c feat: enhance chat functionality with workflow resumption and support regeneration (#31281) 2026-01-20 16:52:04 +08:00
twwu
1014852ebd Merge branch 'main' into feat/hitl-frontend
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-20 10:45:27 +08:00
JzoNg
5e644315e4 add tip modal for email type
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-19 16:40:54 +08:00
JzoNg
e3a22e5027 hide workflow as tool button when DSL contains human input nodes 2026-01-19 14:16:05 +08:00
twwu
977cef5a22 Merge branch 'main' into feat/hitl-frontend 2026-01-19 13:46:07 +08:00
twwu
1353eec9ca Merge branch 'main' into feat/hitl-frontend
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-19 12:40:11 +08:00
twwu
73343f03c1 Merge branch 'main' into feat/hitl-frontend 2026-01-19 10:16:27 +08:00
twwu
d401a29bd9 feat: update test email sender mutation to include inputs and adjust API endpoint for better data handling
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-16 17:52:33 +08:00
twwu
3bf8f19874 fix: reduce BUTTON_TEXT_MAX_LENGTH to 20 in user action component for consistency 2026-01-16 17:12:58 +08:00
twwu
51a7ddba81 refactor: rename placeholder_values to resolved_placeholder_values and adjust expiration_time calculation in human input form 2026-01-16 16:46:09 +08:00
twwu
bd634b165d feat: enhance user action validation in human input form by adding checks for duplicate IDs, empty IDs, and empty titles; update translations accordingly 2026-01-16 15:31:13 +08:00
twwu
a298140d8f Merge branch 'main' into feat/hitl-frontend 2026-01-16 13:49:15 +08:00
twwu
3db3a18eff refactor: move getButtonStyle function to utils for better code organization in human input form 2026-01-16 13:47:34 +08:00
twwu
91c35c2f0a feat: enhance human input form handling by adding placeholder values and new workflow event handlers
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-15 18:22:11 +08:00
twwu
61c7fdc614 feat: add humanInputEmailDeliveryEnabled to provider context and update related components for email delivery handling 2026-01-15 16:13:01 +08:00
twwu
88c2483192 refactor: simplify dependencies in DetailPanel and enhance workflow run components with new props for better handling of paused states 2026-01-15 11:52:33 +08:00
twwu
ca58055a39 Merge branch 'main' into feat/hitl-frontend
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-15 09:58:09 +08:00
twwu
368e38d593 fix(chat): add tracing for workflow process when node status is running
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-14 17:24:16 +08:00
twwu
7463bc9199 feat: integrate workflow hook to handle output variable renaming in useFormContent 2026-01-14 16:36:37 +08:00
twwu
35a707199f refactor: update email configuration modal to use selector for user email and enhance tooltip functionality in delivery method item 2026-01-14 16:28:22 +08:00
twwu
dfb25df5ec Merge branch 'main' into feat/hitl-frontend 2026-01-14 13:24:56 +08:00
JzoNg
cdc24696e4 add validation for output name & add correct placeholder
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-13 14:41:46 +08:00
JzoNg
8f6a0e2d8e step run of human input params 2026-01-13 10:39:35 +08:00
twwu
20ba0de47d Merge branch 'main' into feat/hitl-frontend 2026-01-13 09:43:47 +08:00
twwu
f13ace9dd9 fix(use-available-nodes-meta-data): temporarily exclude human-input nodes from available nodes
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-12 16:10:04 +08:00
twwu
23f5427349 feat(elk-layout): add support for Human Input nodes with ELK ports and enhance edge sorting 2026-01-12 16:00:48 +08:00
JzoNg
7deaab116a fix display of preview 2026-01-12 15:56:09 +08:00
JzoNg
b6db6d9305 revert regex for output variable name 2026-01-12 15:56:09 +08:00
twwu
2fa688cefd refactor(use-checklist, variable-utils): update dependencies in hooks and enhance human-input node support 2026-01-12 15:41:25 +08:00
JzoNg
b24fafe901 fix variable name of input field in chinese 2026-01-12 15:33:37 +08:00
JzoNg
c837def205 Revert "add check valid for human input"
This reverts commit d21cf87bb6.
2026-01-12 15:17:29 +08:00
QuantumGhost
1a7eac192c ci: remove unused GitHub Action file
The `workflow_run` event requires the executed workflow files exist in
the default branch.
2026-01-12 15:14:49 +08:00
JzoNg
d21cf87bb6 add check valid for human input 2026-01-12 15:12:28 +08:00
twwu
ef41325ad1 Merge branch 'feat/hitl-frontend' of https://github.com/langgenius/dify into feat/hitl-frontend 2026-01-12 14:52:09 +08:00
twwu
57c33d5869 refactor(draggable-plugin, hitl-input, human-input): update styles and improve layout for better UI consistency 2026-01-12 14:51:48 +08:00
JzoNg
4484261023 fix judgement of generate button display 2026-01-12 14:21:32 +08:00
JzoNg
72bc396646 step run of human input 2026-01-12 14:16:53 +08:00
twwu
68885afac6 Merge branch 'main' into feat/hitl-frontend 2026-01-12 13:47:36 +08:00
twwu
18e57096d2 Merge branch 'feat/hitl-frontend' of https://github.com/langgenius/dify into feat/hitl-frontend 2026-01-12 13:46:48 +08:00
twwu
b6c6d52725 feat(hitl-input): add readonly prop to HITL input components for enhanced user interaction control 2026-01-12 13:46:04 +08:00
QuantumGhost
bd104557ef chore(web): fix incorrect update to deploy actions 2026-01-12 11:40:38 +08:00
QuantumGhost
55b18bb79e chore(web): update deployment action 2026-01-12 11:38:23 +08:00
QuantumGhost
3f8f158b04 ci(web): add automatically deployment for HITL frontend 2026-01-12 09:43:06 +08:00
JzoNg
471d14f882 step run of human input
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-09 18:31:56 +08:00
twwu
a280df2c07 feat(chat): implement workflow event handling and audio player management in chat hooks 2026-01-09 17:05:38 +08:00
twwu
b479a36273 feat(human-input): add formContent prop to delivery method components for enhanced email configuration
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-09 15:20:07 +08:00
twwu
9136bf48f5 Merge branch 'main' into feat/hitl-frontend 2026-01-09 10:49:24 +08:00
twwu
ac70069847 Merge branch 'feat/hitl-frontend' of https://github.com/langgenius/dify into feat/hitl-frontend 2026-01-09 10:47:52 +08:00
twwu
27da1e72eb refactor: remove unused webAppLogout dependency from effect hooks and enhance chat item structure with extra content handling 2026-01-08 16:18:44 +08:00
JzoNg
467119d186 fix loading state in batch run
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-08 15:24:02 +08:00
JzoNg
1cdbfa2539 human input in workflow apps
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-08 14:45:05 +08:00
twwu
d3299db915 feat: add human input output structure and enhance filtering in human input form components
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-07 18:07:30 +08:00
twwu
ebb816b90b fix: update condition for rendering human input form in Answer component 2026-01-07 15:46:47 +08:00
twwu
0b794ad9cc feat: extend log status to include 'paused' and update related components for improved status rendering
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-07 11:48:42 +08:00
twwu
a19c0023f9 Merge branch 'main' into feat/hitl-frontend 2026-01-07 10:24:51 +08:00
twwu
c7fa7009b1 refactor: update form submission parameter from formID to formToken across chat components
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-06 18:13:42 +08:00
JzoNg
3e1a96ad64 expand filled form content as default 2026-01-06 17:57:35 +08:00
JzoNg
d500a631f0 add node title in content wrapper 2026-01-06 17:57:34 +08:00
twwu
607e77e0a7 feat: add human input form divider background color and enhance chat answer component layout 2026-01-06 15:18:15 +08:00
twwu
46ec24cf8a Merge branch 'main' into feat/hitl-frontend
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-06 10:41:22 +08:00
twwu
f654a7f704 feat: implement human input form handling and display components
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-05 17:05:10 +08:00
twwu
cf9b72d574 Merge branch 'main' into feat/hitl-frontend 2026-01-05 15:31:08 +08:00
twwu
5a92bbdab5 feat: enhance chat component to manage multiple human input forms and their submissions 2026-01-05 15:30:17 +08:00
twwu
d0a713e117 feat: integrate human input form submission handling in chat components
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-04 15:27:40 +08:00
twwu
9ec127ea8f feat: enhance human input handling by adding filled data support
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-01-04 11:31:09 +08:00
twwu
d478822fe1 refactor: update locale hook in ExpirationTime component 2026-01-04 09:50:08 +08:00
twwu
4eac271adc Merge branch 'main' into feat/hitl-frontend 2026-01-04 09:38:08 +08:00
twwu
0f13b6b26d fix: include workflowStore in dependency array for useChat hook to ensure proper state updates
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2025-12-31 16:52:22 +08:00
twwu
52ce46c364 refactor: optimize workflow variable handling and enhance node state management for resumption scenarios 2025-12-31 16:44:43 +08:00
twwu
f0f1ae0b49 fix: enhance workflow node handling by including paused state and improving human input management 2025-12-31 13:45:16 +08:00
twwu
0c69466b0f feat: add workflow resume functionality and enhance handling of HumanInput node in workflow events
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2025-12-30 18:30:32 +08:00
twwu
44a688cb81 feat: implement edge source handle change functionality and enhance node interactions for HumanInput node 2025-12-30 16:05:33 +08:00
twwu
0e2b59d661 fix: update placeholder handling in ContentItem and remove unused error messages from HumanInputNode
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2025-12-30 10:45:59 +08:00
twwu
501c3bcc94 chore: add i18n namespace to various components in the workflow for consistency 2025-12-30 10:18:31 +08:00
twwu
bf6a2c22eb Merge branch 'main' into feat/hitl-frontend 2025-12-30 09:34:30 +08:00
twwu
8c0365c71e Merge branch 'main' into feat/hitl-frontend
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2025-12-29 18:36:35 +08:00
twwu
c0a4f3b715 refactor(human-input): reorganize hooks and improve structure for better maintainability
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2025-12-29 13:57:34 +08:00
twwu
d308c34842 refactor(use-common): simplify current workspace query function by removing unnecessary body parameter 2025-12-29 13:37:33 +08:00
twwu
68615eec04 Merge branch 'main' into feat/hitl-frontend 2025-12-29 13:36:39 +08:00
WTW0313
eca3e23af0 Merge branch 'main' into feat/hitl-frontend 2025-12-29 12:54:17 +08:00
twwu
c716c4ccbe Merge branch 'main' into feat/hitl-frontend 2025-12-29 10:35:51 +08:00
QuantumGhost
32366774a1 chore(web): remove temporary workaround for CurrentWorkspace query
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2025-12-29 09:55:29 +08:00
twwu
640661983a refactor: update feature branch naming in build-push workflow
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2025-12-26 16:40:50 +08:00
twwu
f528f2eafc refactor: streamline chat operation handling and enhance workflow event management 2025-12-26 16:39:17 +08:00
twwu
0994953728 refactor: enhance email delivery method handling and improve debug mode display in HumanInputContent 2025-12-26 15:43:20 +08:00
twwu
d80167d9ec refactor: update UserActionItem component props and improve dependency management in useFormContent 2025-12-26 15:09:04 +08:00
twwu
2969a77b15 refactor: rename 'suspended' status to 'paused' across workflow components and update related styles 2025-12-26 14:50:47 +08:00
twwu
062896cb9c refactor: enhance Markdown component and update FormContentPreview props 2025-12-26 12:55:46 +08:00
twwu
5a1e6269d5 Merge branch 'main' into feat/hitl-frontend 2025-12-26 12:32:29 +08:00
twwu
e744d4de80 feat: replace timeout handling with expiration time in HumanInput form and add ExpirationTime component 2025-12-25 18:16:32 +08:00
twwu
be0f493e61 feat: enhance HumanInput node by adding a Timeout action and restructuring user action rendering 2025-12-25 16:16:24 +08:00
twwu
b8d4f60782 feat: add HumanInput block support with output variable handling and integration 2025-12-25 15:51:19 +08:00
twwu
8b65b689f7 feat: enhance human input panel with structured output and variable handling improvements 2025-12-25 14:53:19 +08:00
twwu
8b9846f52b Merge branch 'main' into feat/hitl-frontend 2025-12-25 13:43:27 +08:00
twwu
1c4c1b5cb1 refactor: streamline HITL input components by consolidating props and enhancing variable handling 2025-12-25 10:50:20 +08:00
twwu
ec0c144eb2 refactor: rename isHideNodeLabel to isShowNodeLabel for clarity in variable label component 2025-12-24 16:21:07 +08:00
twwu
afddc56bb4 refactor: replace GeneratedFormInputItem with FormInputItem across components for consistency 2025-12-24 16:01:05 +08:00
twwu
57dde4a4d8 refactor: update dependency arrays in HITL input components for improved stability 2025-12-24 13:06:52 +08:00
twwu
3c0fd213bf refactor: update human input form handling to support async submission and improve placeholder resolution 2025-12-24 12:13:32 +08:00
twwu
ddfd1cb1f5 refactor: update classnames import style across multiple components 2025-12-22 18:09:11 +08:00
twwu
138922f3f4 Merge branch 'main' into feat/hitl-frontend 2025-12-22 18:07:16 +08:00
twwu
aa9d0bd655 Merge branch 'main' into feat/hitl-frontend 2025-12-15 10:56:43 +08:00
twwu
ccef15aafa Merge branch 'main' into feat/hitl-frontend 2025-12-12 17:59:08 +08:00
JzoNg
202f924d99 fix type error of placeholder 2025-12-03 19:48:24 +08:00
JzoNg
882d2f5b4c Merge branch 'main' into tp 2025-12-02 15:33:41 +08:00
JzoNg
93ec42344c add info panel 2025-12-02 15:33:32 +08:00
JzoNg
330585ec9b merge main 2025-11-24 10:46:20 +08:00
JzoNg
8f2660cea8 fix markdown props 2025-11-17 14:56:45 +08:00
JzoNg
87f6ac78c5 Merge branch 'main' into tp 2025-11-17 10:55:31 +08:00
JzoNg
6c7d58aa11 merge main 2025-11-14 10:51:26 +08:00
JzoNg
8bca22a94b merge main 2025-10-28 19:41:33 +08:00
JzoNg
17aa11da79 fix workflow preview crash 2025-10-28 19:40:10 +08:00
JzoNg
63bbcff496 test email sender 2025-10-27 14:07:23 +08:00
JzoNg
f71a632cdc delivery methods config update 2025-10-27 11:55:57 +08:00
JzoNg
4e3bded902 Merge branch 'main' into tp 2025-10-27 10:54:00 +08:00
JzoNg
e75bbdbc0d add other delivery methods 2025-10-26 18:01:05 +08:00
JzoNg
331a5edbf9 fix: drag icon position 2025-10-20 15:57:39 +08:00
JzoNg
6afc99a5ad fix immer import 2025-10-20 14:25:14 +08:00
JzoNg
a4e2ef6b0c merge main 2025-10-20 14:21:09 +08:00
JzoNg
3632b473df fix: merge error 2025-10-20 11:29:52 +08:00
JzoNg
e35dd14c59 merge main 2025-10-13 14:41:31 +08:00
JzoNg
495f901798 add validation for form content 2025-09-08 11:43:51 +08:00
JzoNg
8703515153 human input step run when no variables 2025-09-06 12:34:25 +08:00
JzoNg
be3c6da654 human input step run 2025-09-05 19:59:33 +08:00
JzoNg
d51db3afb3 single run form data 2025-09-05 18:37:28 +08:00
JzoNg
22683fba3f single run form of human input 2025-09-05 16:14:48 +08:00
JzoNg
ed16265eee form inputs in email sender 2025-09-05 15:13:44 +08:00
JzoNg
463ea14d44 email sender modal 2025-09-05 15:13:44 +08:00
JzoNg
527736b8e4 email debug switch 2025-09-05 15:13:44 +08:00
Joel
f22dcee6d9 fix: preview i18n and ui promblem 2025-09-05 14:45:46 +08:00
Joel
9bdd7e5465 fix: note match and render problem 2025-09-05 14:25:52 +08:00
Joel
ad0e79372f feat: can show notes 2025-09-05 14:19:13 +08:00
Joel
a362114486 chore: can render note 2025-09-05 11:26:13 +08:00
Joel
79ab253c26 fix: match variable error 2025-09-05 11:06:35 +08:00
Joel
783b78cc0a chore: add custom variable 2025-09-04 18:41:48 +08:00
Joel
b1e123c3aa fix: can not choose vars 2025-09-04 16:46:43 +08:00
Joel
84709a7941 temp 2025-09-02 10:06:57 +08:00
Joel
ec07636ce9 feat: preveiw wrap 2025-08-29 16:21:39 +08:00
Joel
4d4c8b21ac chore: some tiny ui fix 2025-08-29 14:35:37 +08:00
Joel
ec8754173f feat: copy and expand 2025-08-29 14:30:57 +08:00
Joel
7920b89714 chore: hotkey tip i18n 2025-08-29 11:07:34 +08:00
Joel
19e152fd0c chore: prompt editor ui 2025-08-28 16:57:12 +08:00
yessenia
076a8ecff4 feat: portal position 2025-08-27 18:46:09 +08:00
Joel
40591b2196 fix: not choose vars 2025-08-26 17:35:25 +08:00
Joel
5156b8f9c9 fix: not auto focus and popup var hide 2025-08-26 16:54:00 +08:00
Joel
286ab0d468 feat: insert the hitl config 2025-08-26 16:23:16 +08:00
Joel
71a511a470 feat: can insert hitl node by / 2025-08-26 15:51:03 +08:00
JzoNg
6b11973151 add workflow run events 2025-08-26 14:27:27 +08:00
JzoNg
949a894f03 preview tip of human input delivery methods 2025-08-26 14:27:27 +08:00
Joel
305b5da764 chore: pre popular placeholder ui 2025-08-25 17:47:45 +08:00
Joel
fda19d3f0e feat: in placeholder choose var 2025-08-25 16:29:58 +08:00
JzoNg
e5a2172a85 human input form display & submit in preview 2025-08-25 16:17:40 +08:00
Joel
2d89d59d74 main 2025-08-25 16:08:24 +08:00
Joel
4f56acd432 chore: fix variable change ux error 2025-08-25 15:36:40 +08:00
Joel
c1b7412465 chore: handle textare to ui 2025-08-25 15:31:21 +08:00
Joel
62b9a20115 chore: fix placeholder ux 2025-08-25 14:40:51 +08:00
Joel
5392401e60 chore: btn text 2025-08-22 18:10:53 +08:00
Joel
baa77d3cda chore: can show popup 2025-08-22 16:59:04 +08:00
Joel
a41176b66d chore: pre poplulte field 2025-08-22 16:24:45 +08:00
yessenia
465e978209 feat: modify shortcutsplugin 2025-08-22 16:13:54 +08:00
yessenia
a9ea8cfd1c feat: shortcut popup unit test 2025-08-21 16:30:19 +08:00
yessenia
c771f4dbc7 feat: shortcut popup 2025-08-20 18:18:42 +08:00
JzoNg
ebbed8f863 form field 2025-08-12 14:17:08 +08:00
JzoNg
bdf1e9ed3b form content 2025-08-12 13:59:28 +08:00
JzoNg
36acd0b9dd form submit 2025-08-12 10:58:23 +08:00
JzoNg
a4049e1ea7 api of human input form 2025-08-12 10:21:02 +08:00
JzoNg
114dfe038c human input form 2025-08-11 17:58:03 +08:00
Joel
81f6344aaa feat: can update var 2025-08-08 16:16:52 +08:00
Joel
89963ecf59 feat: can remove 2025-08-08 15:08:02 +08:00
Joel
a18bcf3957 feat: pass current value to form input 2025-08-08 14:22:30 +08:00
JzoNg
c28720529e add suspended status 2025-08-08 11:27:11 +08:00
JzoNg
242826013e update validation for human input node 2025-08-08 10:38:32 +08:00
JzoNg
05453cb22f user action validation 2025-08-08 10:14:29 +08:00
JzoNg
da211d3009 timeout value validation 2025-08-08 09:42:21 +08:00
JzoNg
f8a249de03 email config validation 2025-08-07 20:03:53 +08:00
Joel
e2e5dedceb feat: add form content editor 2025-08-07 18:28:17 +08:00
JzoNg
bd7ba85471 fix input focus 2025-08-07 18:21:43 +08:00
JzoNg
7d5f6bc255 email input 2025-08-07 18:13:21 +08:00
JzoNg
fcc8789cc3 member selector 2025-08-07 18:13:21 +08:00
Joel
6da1a48cad chore: notes add form content holder 2025-08-07 17:56:37 +08:00
Joel
465ff7838a fix: move style 2025-08-07 17:25:15 +08:00
Joel
a4bf493343 chore: hanele drag ui 2025-08-07 16:56:56 +08:00
Joel
c27f20b4b7 chore: hide tree view 2025-08-07 15:06:13 +08:00
Joel
a9e6140dc6 feat: can drag and drop hitl block 2025-08-07 15:05:44 +08:00
JzoNg
792f28451c timeout new data structure 2025-08-07 10:44:25 +08:00
JzoNg
82530df38f mail body input 2025-08-06 17:04:46 +08:00
JzoNg
ce8325c83c update data structure 2025-08-06 17:02:40 +08:00
JzoNg
3ed561d943 delivery method item 2025-08-06 17:02:40 +08:00
Joel
bb8d54c48b chore: fix hitl not full 2025-08-06 16:59:46 +08:00
Joel
922aeb7c21 feat: node hitl render 2025-08-06 16:56:54 +08:00
Joel
177be06d09 files 2025-08-05 18:31:32 +08:00
Joel
736ec55f86 feat: hitl block 2025-08-05 18:31:18 +08:00
Joel
ab373197f9 feat: i18n 2025-08-05 15:14:17 +08:00
JzoNg
a6d2392c6c method selector 2025-08-01 15:36:02 +08:00
JzoNg
3371989572 method selector 2025-08-01 15:14:27 +08:00
JzoNg
f04daf056d delivery methods 2025-08-01 14:37:48 +08:00
JzoNg
fb6c8fa01f user action add 2025-08-01 14:26:21 +08:00
JzoNg
b02199145e user actions 2025-07-27 16:46:40 +08:00
Joel
e257455c9c feat: input filed form 2025-07-25 16:59:09 +08:00
Joel
63af5305e6 chore: template 2025-07-25 15:47:35 +08:00
JzoNg
95b88a0621 add timeout of human input node 2025-07-25 11:42:50 +08:00
JzoNg
1ab02c6e9a human input node style complete 2025-07-25 10:45:56 +08:00
JzoNg
1099ab5d91 update human input node 2025-07-25 08:54:43 +08:00
JzoNg
6485adae35 add human input node 2025-07-25 08:54:43 +08:00
409 changed files with 2574 additions and 29533 deletions

View File

@@ -0,0 +1 @@
../../.agents/skills/component-refactoring

View File

@@ -0,0 +1 @@
../../.agents/skills/frontend-code-review

View File

@@ -0,0 +1 @@
../../.agents/skills/frontend-testing

View File

@@ -0,0 +1 @@
../../.agents/skills/orpc-contract-first

View File

@@ -79,6 +79,29 @@ jobs:
find . -name "*.py" -type f -exec sed -i.bak -E 's/"([^"]+)" \| None/Optional["\1"]/g; s/'"'"'([^'"'"']+)'"'"' \| None/Optional['"'"'\1'"'"']/g' {} \;
find . -name "*.py.bak" -type f -delete
- name: Install pnpm
uses: pnpm/action-setup@v4
with:
package_json_file: web/package.json
run_install: false
- name: Setup Node.js
uses: actions/setup-node@v6
with:
node-version: 24
cache: pnpm
cache-dependency-path: ./web/pnpm-lock.yaml
- name: Install web dependencies
run: |
cd web
pnpm install --frozen-lockfile
- name: ESLint autofix
run: |
cd web
pnpm lint:fix || true
# mdformat breaks YAML front matter in markdown files. Add --exclude for directories containing YAML front matter.
- name: mdformat
run: |

View File

@@ -8,7 +8,7 @@ on:
- "build/**"
- "release/e-*"
- "hotfix/**"
- "feat/hitl-backend"
- "feat/hitl-frontend"
tags:
- "*"

View File

@@ -4,7 +4,8 @@ on:
workflow_run:
workflows: ["Build and Push API & Web"]
branches:
- "build/feat/hitl"
- "feat/hitl-frontend"
- "feat/hitl-backend"
types:
- completed
@@ -13,7 +14,10 @@ jobs:
runs-on: ubuntu-latest
if: |
github.event.workflow_run.conclusion == 'success' &&
github.event.workflow_run.head_branch == 'build/feat/hitl'
(
github.event.workflow_run.head_branch == 'feat/hitl-frontend' ||
github.event.workflow_run.head_branch == 'feat/hitl-backend'
)
steps:
- name: Deploy to server
uses: appleboy/ssh-action@v1

View File

@@ -39,7 +39,7 @@ jobs:
run: pnpm install --frozen-lockfile
- name: Run tests
run: pnpm test:ci
run: pnpm test:coverage
- name: Coverage Summary
if: always()

View File

@@ -37,7 +37,7 @@
"-c",
"1",
"-Q",
"dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution",
"dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention",
"--loglevel",
"INFO"
],

View File

@@ -717,28 +717,3 @@ SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30
SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL=90000
# Redis URL used for PubSub between API and
# celery worker
# defaults to url constructed from `REDIS_*`
# configurations
PUBSUB_REDIS_URL=
# Pub/sub channel type for streaming events.
# valid options are:
#
# - pubsub: for normal Pub/Sub
# - sharded: for sharded Pub/Sub
#
# It's highly recommended to use sharded Pub/Sub AND redis cluster
# for large deployments.
PUBSUB_REDIS_CHANNEL_TYPE=pubsub
# Whether to use Redis cluster mode while running
# PubSub.
# It's highly recommended to enable this for large deployments.
PUBSUB_REDIS_USE_CLUSTERS=false
# Whether to Enable human input timeout check task
ENABLE_HUMAN_INPUT_TIMEOUT_TASK=true
# Human input timeout check interval in minutes
HUMAN_INPUT_TIMEOUT_TASK_INTERVAL=1

View File

@@ -36,8 +36,6 @@ ignore_imports =
core.workflow.nodes.loop.loop_node -> core.workflow.graph_engine
core.workflow.nodes.loop.loop_node -> core.workflow.graph
core.workflow.nodes.loop.loop_node -> core.workflow.graph_engine.command_channels
# TODO(QuantumGhost): fix the import violation later
core.workflow.entities.pause_reason -> core.workflow.nodes.human_input.entities
[importlinter:contract:workflow-infrastructure-dependencies]
name = Workflow Infrastructure Dependencies
@@ -52,14 +50,14 @@ ignore_imports =
core.workflow.nodes.agent.agent_node -> extensions.ext_database
core.workflow.nodes.datasource.datasource_node -> extensions.ext_database
core.workflow.nodes.knowledge_index.knowledge_index_node -> extensions.ext_database
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> extensions.ext_database
core.workflow.nodes.llm.file_saver -> extensions.ext_database
core.workflow.nodes.llm.llm_utils -> extensions.ext_database
core.workflow.nodes.llm.node -> extensions.ext_database
core.workflow.nodes.tool.tool_node -> extensions.ext_database
core.workflow.graph_engine.command_channels.redis_channel -> extensions.ext_redis
core.workflow.graph_engine.manager -> extensions.ext_redis
# TODO(QuantumGhost): use DI to avoid depending on global DB.
core.workflow.nodes.human_input.human_input_node -> extensions.ext_database
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> extensions.ext_redis
[importlinter:contract:workflow-external-imports]
name = Workflow External Imports
@@ -124,6 +122,11 @@ ignore_imports =
core.workflow.nodes.http_request.node -> core.tools.tool_file_manager
core.workflow.nodes.iteration.iteration_node -> core.app.workflow.node_factory
core.workflow.nodes.knowledge_index.knowledge_index_node -> core.rag.index_processor.index_processor_factory
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.rag.datasource.retrieval_service
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.rag.retrieval.dataset_retrieval
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> models.dataset
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> services.feature_service
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.model_runtime.model_providers.__base.large_language_model
core.workflow.nodes.llm.llm_utils -> configs
core.workflow.nodes.llm.llm_utils -> core.app.entities.app_invoke_entities
core.workflow.nodes.llm.llm_utils -> core.file.models
@@ -133,6 +136,7 @@ ignore_imports =
core.workflow.nodes.llm.llm_utils -> models.provider
core.workflow.nodes.llm.llm_utils -> services.credit_pool_service
core.workflow.nodes.llm.node -> core.tools.signature
core.workflow.nodes.template_transform.template_transform_node -> configs
core.workflow.nodes.tool.tool_node -> core.callback_handler.workflow_tool_callback_handler
core.workflow.nodes.tool.tool_node -> core.tools.tool_engine
core.workflow.nodes.tool.tool_node -> core.tools.tool_manager
@@ -141,9 +145,9 @@ ignore_imports =
core.workflow.nodes.agent.agent_node -> core.agent.entities
core.workflow.nodes.agent.agent_node -> core.agent.plugin_entities
core.workflow.nodes.base.node -> core.app.entities.app_invoke_entities
core.workflow.nodes.human_input.human_input_node -> core.app.entities.app_invoke_entities
core.workflow.nodes.knowledge_index.knowledge_index_node -> core.app.entities.app_invoke_entities
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.app.app_config.entities
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.app.entities.app_invoke_entities
core.workflow.nodes.llm.node -> core.app.entities.app_invoke_entities
core.workflow.nodes.parameter_extractor.parameter_extractor_node -> core.app.entities.app_invoke_entities
core.workflow.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.advanced_prompt_transform
@@ -159,6 +163,9 @@ ignore_imports =
core.workflow.workflow_entry -> core.app.workflow.node_factory
core.workflow.nodes.datasource.datasource_node -> core.datasource.datasource_manager
core.workflow.nodes.datasource.datasource_node -> core.datasource.utils.message_transformer
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.entities.agent_entities
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.entities.model_entities
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.model_manager
core.workflow.nodes.llm.llm_utils -> core.entities.provider_entities
core.workflow.nodes.parameter_extractor.parameter_extractor_node -> core.model_manager
core.workflow.nodes.question_classifier.question_classifier_node -> core.model_manager
@@ -207,6 +214,7 @@ ignore_imports =
core.workflow.nodes.llm.node -> core.llm_generator.output_parser.structured_output
core.workflow.nodes.llm.node -> core.model_manager
core.workflow.nodes.agent.entities -> core.prompt.entities.advanced_prompt_entities
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.prompt.simple_prompt_transform
core.workflow.nodes.llm.entities -> core.prompt.entities.advanced_prompt_entities
core.workflow.nodes.llm.llm_utils -> core.prompt.entities.advanced_prompt_entities
core.workflow.nodes.llm.node -> core.prompt.entities.advanced_prompt_entities
@@ -222,6 +230,7 @@ ignore_imports =
core.workflow.nodes.knowledge_index.knowledge_index_node -> services.summary_index_service
core.workflow.nodes.knowledge_index.knowledge_index_node -> tasks.generate_summary_index_task
core.workflow.nodes.knowledge_index.knowledge_index_node -> core.rag.index_processor.processor.paragraph_index_processor
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.rag.retrieval.retrieval_methods
core.workflow.nodes.llm.node -> models.dataset
core.workflow.nodes.agent.agent_node -> core.tools.utils.message_transformer
core.workflow.nodes.llm.file_saver -> core.tools.signature
@@ -239,7 +248,6 @@ ignore_imports =
core.workflow.nodes.document_extractor.node -> core.variables.segments
core.workflow.nodes.http_request.executor -> core.variables.segments
core.workflow.nodes.http_request.node -> core.variables.segments
core.workflow.nodes.human_input.entities -> core.variables.consts
core.workflow.nodes.iteration.iteration_node -> core.variables
core.workflow.nodes.iteration.iteration_node -> core.variables.segments
core.workflow.nodes.iteration.iteration_node -> core.variables.variables
@@ -280,12 +288,12 @@ ignore_imports =
core.workflow.nodes.agent.agent_node -> extensions.ext_database
core.workflow.nodes.datasource.datasource_node -> extensions.ext_database
core.workflow.nodes.knowledge_index.knowledge_index_node -> extensions.ext_database
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> extensions.ext_database
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> extensions.ext_redis
core.workflow.nodes.llm.file_saver -> extensions.ext_database
core.workflow.nodes.llm.llm_utils -> extensions.ext_database
core.workflow.nodes.llm.node -> extensions.ext_database
core.workflow.nodes.tool.tool_node -> extensions.ext_database
core.workflow.nodes.human_input.human_input_node -> extensions.ext_database
core.workflow.nodes.human_input.human_input_node -> core.repositories.human_input_repository
core.workflow.workflow_entry -> extensions.otel.runtime
core.workflow.nodes.agent.agent_node -> models
core.workflow.nodes.base.node -> models.enums

View File

@@ -122,7 +122,7 @@ These commands assume you start from the repository root.
```bash
cd api
uv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q api_token,dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention
uv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention
```
1. Optional: start Celery Beat (scheduled tasks, in a new terminal).

View File

@@ -739,10 +739,8 @@ def upgrade_db():
click.echo(click.style("Database migration successful!", fg="green"))
except Exception as e:
except Exception:
logger.exception("Failed to execute database migration")
click.echo(click.style(f"Database migration failed: {e}", fg="red"))
raise SystemExit(1)
finally:
lock.release()
else:

View File

@@ -1,4 +1,3 @@
from datetime import timedelta
from enum import StrEnum
from typing import Literal
@@ -49,16 +48,6 @@ class SecurityConfig(BaseSettings):
default=5,
)
WEB_FORM_SUBMIT_RATE_LIMIT_MAX_ATTEMPTS: PositiveInt = Field(
description="Maximum number of web form submissions allowed per IP within the rate limit window",
default=30,
)
WEB_FORM_SUBMIT_RATE_LIMIT_WINDOW_SECONDS: PositiveInt = Field(
description="Time window in seconds for web form submission rate limiting",
default=60,
)
LOGIN_DISABLED: bool = Field(
description="Whether to disable login checks",
default=False,
@@ -93,12 +82,6 @@ class AppExecutionConfig(BaseSettings):
default=0,
)
HUMAN_INPUT_GLOBAL_TIMEOUT_SECONDS: PositiveInt = Field(
description="Maximum seconds a workflow run can stay paused waiting for human input before global timeout.",
default=int(timedelta(days=7).total_seconds()),
ge=1,
)
class CodeExecutionSandboxConfig(BaseSettings):
"""
@@ -1151,14 +1134,6 @@ class CeleryScheduleTasksConfig(BaseSettings):
description="Enable queue monitor task",
default=False,
)
ENABLE_HUMAN_INPUT_TIMEOUT_TASK: bool = Field(
description="Enable human input timeout check task",
default=True,
)
HUMAN_INPUT_TIMEOUT_TASK_INTERVAL: PositiveInt = Field(
description="Human input timeout check interval in minutes",
default=1,
)
ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK: bool = Field(
description="Enable check upgradable plugin task",
default=True,
@@ -1180,16 +1155,6 @@ class CeleryScheduleTasksConfig(BaseSettings):
default=0,
)
# API token last_used_at batch update
ENABLE_API_TOKEN_LAST_USED_UPDATE_TASK: bool = Field(
description="Enable periodic batch update of API token last_used_at timestamps",
default=True,
)
API_TOKEN_LAST_USED_UPDATE_INTERVAL: int = Field(
description="Interval in minutes for batch updating API token last_used_at (default 30)",
default=30,
)
# Trigger provider refresh (simple version)
ENABLE_TRIGGER_PROVIDER_REFRESH_TASK: bool = Field(
description="Enable trigger provider refresh poller",

View File

@@ -6,7 +6,6 @@ from pydantic import Field, NonNegativeFloat, NonNegativeInt, PositiveFloat, Pos
from pydantic_settings import BaseSettings
from .cache.redis_config import RedisConfig
from .cache.redis_pubsub_config import RedisPubSubConfig
from .storage.aliyun_oss_storage_config import AliyunOSSStorageConfig
from .storage.amazon_s3_storage_config import S3StorageConfig
from .storage.azure_blob_storage_config import AzureBlobStorageConfig
@@ -318,7 +317,6 @@ class MiddlewareConfig(
CeleryConfig, # Note: CeleryConfig already inherits from DatabaseConfig
KeywordStoreConfig,
RedisConfig,
RedisPubSubConfig,
# configs of storage and storage providers
StorageConfig,
AliyunOSSStorageConfig,

View File

@@ -1,96 +0,0 @@
from typing import Literal, Protocol
from urllib.parse import quote_plus, urlunparse
from pydantic import Field
from pydantic_settings import BaseSettings
class RedisConfigDefaults(Protocol):
REDIS_HOST: str
REDIS_PORT: int
REDIS_USERNAME: str | None
REDIS_PASSWORD: str | None
REDIS_DB: int
REDIS_USE_SSL: bool
REDIS_USE_SENTINEL: bool | None
REDIS_USE_CLUSTERS: bool
class RedisConfigDefaultsMixin:
def _redis_defaults(self: RedisConfigDefaults) -> RedisConfigDefaults:
return self
class RedisPubSubConfig(BaseSettings, RedisConfigDefaultsMixin):
"""
Configuration settings for Redis pub/sub streaming.
"""
PUBSUB_REDIS_URL: str | None = Field(
alias="PUBSUB_REDIS_URL",
description=(
"Redis connection URL for pub/sub streaming events between API "
"and celery worker, defaults to url constructed from "
"`REDIS_*` configurations"
),
default=None,
)
PUBSUB_REDIS_USE_CLUSTERS: bool = Field(
description=(
"Enable Redis Cluster mode for pub/sub streaming. It's highly "
"recommended to enable this for large deployments."
),
default=False,
)
PUBSUB_REDIS_CHANNEL_TYPE: Literal["pubsub", "sharded"] = Field(
description=(
"Pub/sub channel type for streaming events. "
"Valid options are:\n"
"\n"
" - pubsub: for normal Pub/Sub\n"
" - sharded: for sharded Pub/Sub\n"
"\n"
"It's highly recommended to use sharded Pub/Sub AND redis cluster "
"for large deployments."
),
default="pubsub",
)
def _build_default_pubsub_url(self) -> str:
defaults = self._redis_defaults()
if not defaults.REDIS_HOST or not defaults.REDIS_PORT:
raise ValueError("PUBSUB_REDIS_URL must be set when default Redis URL cannot be constructed")
scheme = "rediss" if defaults.REDIS_USE_SSL else "redis"
username = defaults.REDIS_USERNAME or None
password = defaults.REDIS_PASSWORD or None
userinfo = ""
if username:
userinfo = quote_plus(username)
if password:
password_part = quote_plus(password)
userinfo = f"{userinfo}:{password_part}" if userinfo else f":{password_part}"
if userinfo:
userinfo = f"{userinfo}@"
host = defaults.REDIS_HOST
port = defaults.REDIS_PORT
db = defaults.REDIS_DB
netloc = f"{userinfo}{host}:{port}"
return urlunparse((scheme, netloc, f"/{db}", "", "", ""))
@property
def normalized_pubsub_redis_url(self) -> str:
pubsub_redis_url = self.PUBSUB_REDIS_URL
if pubsub_redis_url:
cleaned = pubsub_redis_url.strip()
pubsub_redis_url = cleaned or None
if pubsub_redis_url:
return pubsub_redis_url
return self._build_default_pubsub_url()

View File

@@ -5,6 +5,8 @@ from enum import StrEnum
from flask_restx import Namespace
from pydantic import BaseModel, TypeAdapter
from controllers.console import console_ns
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
@@ -22,9 +24,6 @@ def register_schema_models(namespace: Namespace, *models: type[BaseModel]) -> No
def get_or_create_model(model_name: str, field_def):
# Import lazily to avoid circular imports between console controllers and schema helpers.
from controllers.console import console_ns
existing = console_ns.models.get(model_name)
if existing is None:
existing = console_ns.model(model_name, field_def)

View File

@@ -37,7 +37,6 @@ from . import (
apikey,
extension,
feature,
human_input_form,
init_validate,
ping,
setup,
@@ -172,7 +171,6 @@ __all__ = [
"forgot_password",
"generator",
"hit_testing",
"human_input_form",
"init_validate",
"installed_app",
"load_balancing_config",

View File

@@ -10,7 +10,6 @@ from libs.helper import TimestampField
from libs.login import current_account_with_tenant, login_required
from models.dataset import Dataset
from models.model import ApiToken, App
from services.api_token_service import ApiTokenCache
from . import console_ns
from .wraps import account_initialization_required, edit_permission_required, setup_required
@@ -132,11 +131,6 @@ class BaseApiKeyResource(Resource):
if key is None:
flask_restx.abort(HTTPStatus.NOT_FOUND, message="API key not found")
# Invalidate cache before deleting from database
# Type assertion: key is guaranteed to be non-None here because abort() raises
assert key is not None # nosec - for type checker only
ApiTokenCache.delete(key.token, key.type)
db.session.query(ApiToken).where(ApiToken.id == api_key_id).delete()
db.session.commit()

View File

@@ -1,4 +1,3 @@
import logging
import uuid
from datetime import datetime
from typing import Any, Literal, TypeAlias
@@ -55,8 +54,6 @@ ALLOW_CREATE_APP_MODES = ["chat", "agent-chat", "advanced-chat", "workflow", "co
register_enum_models(console_ns, IconType)
_logger = logging.getLogger(__name__)
class AppListQuery(BaseModel):
page: int = Field(default=1, ge=1, le=99999, description="Page number (1-99999)")
@@ -502,7 +499,6 @@ class AppListApi(Resource):
select(Workflow).where(
Workflow.version == Workflow.VERSION_DRAFT,
Workflow.app_id.in_(workflow_capable_app_ids),
Workflow.tenant_id == current_tenant_id,
)
)
.scalars()
@@ -514,14 +510,12 @@ class AppListApi(Resource):
NodeType.TRIGGER_PLUGIN,
}
for workflow in draft_workflows:
node_id = None
try:
for node_id, node_data in workflow.walk_nodes():
for _, node_data in workflow.walk_nodes():
if node_data.get("type") in trigger_node_types:
draft_trigger_app_ids.add(str(workflow.app_id))
break
except Exception:
_logger.exception("error while walking nodes, workflow_id=%s, node_id=%s", workflow.id, node_id)
continue
for app in app_pagination.items:

View File

@@ -89,7 +89,6 @@ status_count_model = console_ns.model(
"success": fields.Integer,
"failed": fields.Integer,
"partial_success": fields.Integer,
"paused": fields.Integer,
},
)

View File

@@ -33,7 +33,7 @@ from libs.login import current_account_with_tenant, login_required
from models.model import AppMode, Conversation, Message, MessageAnnotation, MessageFeedback
from services.errors.conversation import ConversationNotExistsError
from services.errors.message import MessageNotExistsError, SuggestedQuestionsAfterAnswerDisabledError
from services.message_service import MessageService, attach_message_extra_contents
from services.message_service import MessageService
logger = logging.getLogger(__name__)
@@ -207,7 +207,6 @@ message_detail_model = console_ns.model(
"created_at": TimestampField,
"agent_thoughts": fields.List(fields.Nested(agent_thought_model)),
"message_files": fields.List(fields.Nested(message_file_model)),
"extra_contents": fields.List(fields.Raw),
"metadata": fields.Raw(attribute="message_metadata_dict"),
"status": fields.String,
"error": fields.String,
@@ -300,7 +299,6 @@ class ChatMessageListApi(Resource):
has_more = False
history_messages = list(reversed(history_messages))
attach_message_extra_contents(history_messages)
return InfiniteScrollPagination(data=history_messages, limit=args.limit, has_more=has_more)
@@ -483,5 +481,4 @@ class MessageApi(Resource):
if not message:
raise NotFound("Message Not Exists.")
attach_message_extra_contents([message])
return message

View File

@@ -507,179 +507,6 @@ class WorkflowDraftRunLoopNodeApi(Resource):
raise InternalServerError()
class HumanInputFormPreviewPayload(BaseModel):
inputs: dict[str, Any] = Field(
default_factory=dict,
description="Values used to fill missing upstream variables referenced in form_content",
)
class HumanInputFormSubmitPayload(BaseModel):
form_inputs: dict[str, Any] = Field(..., description="Values the user provides for the form's own fields")
inputs: dict[str, Any] = Field(
...,
description="Values used to fill missing upstream variables referenced in form_content",
)
action: str = Field(..., description="Selected action ID")
class HumanInputDeliveryTestPayload(BaseModel):
delivery_method_id: str = Field(..., description="Delivery method ID")
inputs: dict[str, Any] = Field(
default_factory=dict,
description="Values used to fill missing upstream variables referenced in form_content",
)
reg(HumanInputFormPreviewPayload)
reg(HumanInputFormSubmitPayload)
reg(HumanInputDeliveryTestPayload)
@console_ns.route("/apps/<uuid:app_id>/advanced-chat/workflows/draft/human-input/nodes/<string:node_id>/form/preview")
class AdvancedChatDraftHumanInputFormPreviewApi(Resource):
@console_ns.doc("get_advanced_chat_draft_human_input_form")
@console_ns.doc(description="Get human input form preview for advanced chat workflow")
@console_ns.doc(params={"app_id": "Application ID", "node_id": "Node ID"})
@console_ns.expect(console_ns.models[HumanInputFormPreviewPayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
@edit_permission_required
def post(self, app_model: App, node_id: str):
"""
Preview human input form content and placeholders
"""
current_user, _ = current_account_with_tenant()
args = HumanInputFormPreviewPayload.model_validate(console_ns.payload or {})
inputs = args.inputs
workflow_service = WorkflowService()
preview = workflow_service.get_human_input_form_preview(
app_model=app_model,
account=current_user,
node_id=node_id,
inputs=inputs,
)
return jsonable_encoder(preview)
@console_ns.route("/apps/<uuid:app_id>/advanced-chat/workflows/draft/human-input/nodes/<string:node_id>/form/run")
class AdvancedChatDraftHumanInputFormRunApi(Resource):
@console_ns.doc("submit_advanced_chat_draft_human_input_form")
@console_ns.doc(description="Submit human input form preview for advanced chat workflow")
@console_ns.doc(params={"app_id": "Application ID", "node_id": "Node ID"})
@console_ns.expect(console_ns.models[HumanInputFormSubmitPayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
@edit_permission_required
def post(self, app_model: App, node_id: str):
"""
Submit human input form preview
"""
current_user, _ = current_account_with_tenant()
args = HumanInputFormSubmitPayload.model_validate(console_ns.payload or {})
workflow_service = WorkflowService()
result = workflow_service.submit_human_input_form_preview(
app_model=app_model,
account=current_user,
node_id=node_id,
form_inputs=args.form_inputs,
inputs=args.inputs,
action=args.action,
)
return jsonable_encoder(result)
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/human-input/nodes/<string:node_id>/form/preview")
class WorkflowDraftHumanInputFormPreviewApi(Resource):
@console_ns.doc("get_workflow_draft_human_input_form")
@console_ns.doc(description="Get human input form preview for workflow")
@console_ns.doc(params={"app_id": "Application ID", "node_id": "Node ID"})
@console_ns.expect(console_ns.models[HumanInputFormPreviewPayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.WORKFLOW])
@edit_permission_required
def post(self, app_model: App, node_id: str):
"""
Preview human input form content and placeholders
"""
current_user, _ = current_account_with_tenant()
args = HumanInputFormPreviewPayload.model_validate(console_ns.payload or {})
inputs = args.inputs
workflow_service = WorkflowService()
preview = workflow_service.get_human_input_form_preview(
app_model=app_model,
account=current_user,
node_id=node_id,
inputs=inputs,
)
return jsonable_encoder(preview)
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/human-input/nodes/<string:node_id>/form/run")
class WorkflowDraftHumanInputFormRunApi(Resource):
@console_ns.doc("submit_workflow_draft_human_input_form")
@console_ns.doc(description="Submit human input form preview for workflow")
@console_ns.doc(params={"app_id": "Application ID", "node_id": "Node ID"})
@console_ns.expect(console_ns.models[HumanInputFormSubmitPayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.WORKFLOW])
@edit_permission_required
def post(self, app_model: App, node_id: str):
"""
Submit human input form preview
"""
current_user, _ = current_account_with_tenant()
workflow_service = WorkflowService()
args = HumanInputFormSubmitPayload.model_validate(console_ns.payload or {})
result = workflow_service.submit_human_input_form_preview(
app_model=app_model,
account=current_user,
node_id=node_id,
form_inputs=args.form_inputs,
inputs=args.inputs,
action=args.action,
)
return jsonable_encoder(result)
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/human-input/nodes/<string:node_id>/delivery-test")
class WorkflowDraftHumanInputDeliveryTestApi(Resource):
@console_ns.doc("test_workflow_draft_human_input_delivery")
@console_ns.doc(description="Test human input delivery for workflow")
@console_ns.doc(params={"app_id": "Application ID", "node_id": "Node ID"})
@console_ns.expect(console_ns.models[HumanInputDeliveryTestPayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.WORKFLOW, AppMode.ADVANCED_CHAT])
@edit_permission_required
def post(self, app_model: App, node_id: str):
"""
Test human input delivery
"""
current_user, _ = current_account_with_tenant()
workflow_service = WorkflowService()
args = HumanInputDeliveryTestPayload.model_validate(console_ns.payload or {})
workflow_service.test_human_input_delivery(
app_model=app_model,
account=current_user,
node_id=node_id,
delivery_method_id=args.delivery_method_id,
inputs=args.inputs,
)
return jsonable_encoder({})
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/run")
class DraftWorkflowRunApi(Resource):
@console_ns.doc("run_draft_workflow")

View File

@@ -5,15 +5,10 @@ from flask import request
from flask_restx import Resource, fields, marshal_with
from pydantic import BaseModel, Field, field_validator
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from configs import dify_config
from controllers.console import console_ns
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from controllers.web.error import NotFoundError
from core.workflow.entities.pause_reason import HumanInputRequired
from core.workflow.enums import WorkflowExecutionStatus
from extensions.ext_database import db
from fields.end_user_fields import simple_end_user_fields
from fields.member_fields import simple_account_fields
@@ -32,21 +27,9 @@ from libs.custom_inputs import time_duration
from libs.helper import uuid_value
from libs.login import current_user, login_required
from models import Account, App, AppMode, EndUser, WorkflowArchiveLog, WorkflowRunTriggeredFrom
from models.workflow import WorkflowRun
from repositories.factory import DifyAPIRepositoryFactory
from services.retention.workflow_run.constants import ARCHIVE_BUNDLE_NAME
from services.workflow_run_service import WorkflowRunService
def _build_backstage_input_url(form_token: str | None) -> str | None:
if not form_token:
return None
base_url = dify_config.APP_WEB_URL
if not base_url:
return None
return f"{base_url.rstrip('/')}/form/{form_token}"
# Workflow run status choices for filtering
WORKFLOW_RUN_STATUS_CHOICES = ["running", "succeeded", "failed", "stopped", "partial-succeeded"]
EXPORT_SIGNED_URL_EXPIRE_SECONDS = 3600
@@ -457,68 +440,3 @@ class WorkflowRunNodeExecutionListApi(Resource):
)
return {"data": node_executions}
@console_ns.route("/workflow/<string:workflow_run_id>/pause-details")
class ConsoleWorkflowPauseDetailsApi(Resource):
"""Console API for getting workflow pause details."""
@setup_required
@login_required
@account_initialization_required
def get(self, workflow_run_id: str):
"""
Get workflow pause details.
GET /console/api/workflow/<workflow_run_id>/pause-details
Returns information about why and where the workflow is paused.
"""
# Query WorkflowRun to determine if workflow is suspended
session_maker = sessionmaker(bind=db.engine)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker=session_maker)
workflow_run = db.session.get(WorkflowRun, workflow_run_id)
if not workflow_run:
raise NotFoundError("Workflow run not found")
if workflow_run.tenant_id != current_user.current_tenant_id:
raise NotFoundError("Workflow run not found")
# Check if workflow is suspended
is_paused = workflow_run.status == WorkflowExecutionStatus.PAUSED
if not is_paused:
return {
"paused_at": None,
"paused_nodes": [],
}, 200
pause_entity = workflow_run_repo.get_workflow_pause(workflow_run_id)
pause_reasons = pause_entity.get_pause_reasons() if pause_entity else []
# Build response
paused_at = pause_entity.paused_at if pause_entity else None
paused_nodes = []
response = {
"paused_at": paused_at.isoformat() + "Z" if paused_at else None,
"paused_nodes": paused_nodes,
}
for reason in pause_reasons:
if isinstance(reason, HumanInputRequired):
paused_nodes.append(
{
"node_id": reason.node_id,
"node_title": reason.node_title,
"pause_type": {
"type": "human_input",
"form_id": reason.form_id,
"backstage_input_url": _build_backstage_input_url(reason.form_token),
},
}
)
else:
raise AssertionError("unimplemented.")
return response, 200

View File

@@ -55,7 +55,6 @@ from libs.login import current_account_with_tenant, login_required
from models import ApiToken, Dataset, Document, DocumentSegment, UploadFile
from models.dataset import DatasetPermissionEnum
from models.provider_ids import ModelProviderID
from services.api_token_service import ApiTokenCache
from services.dataset_service import DatasetPermissionService, DatasetService, DocumentService
# Register models for flask_restx to avoid dict type issues in Swagger
@@ -821,11 +820,6 @@ class DatasetApiDeleteApi(Resource):
if key is None:
console_ns.abort(404, message="API key not found")
# Invalidate cache before deleting from database
# Type assertion: key is guaranteed to be non-None here because abort() raises
assert key is not None # nosec - for type checker only
ApiTokenCache.delete(key.token, key.type)
db.session.query(ApiToken).where(ApiToken.id == api_key_id).delete()
db.session.commit()

View File

@@ -1,217 +0,0 @@
"""
Console/Studio Human Input Form APIs.
"""
import json
import logging
from collections.abc import Generator
from flask import Response, jsonify, request
from flask_restx import Resource, reqparse
from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, setup_required
from controllers.web.error import InvalidArgumentError, NotFoundError
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.apps.message_generator import MessageGenerator
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from extensions.ext_database import db
from libs.login import current_account_with_tenant, login_required
from models import App
from models.enums import CreatorUserRole
from models.human_input import RecipientType
from models.model import AppMode
from models.workflow import WorkflowRun
from repositories.factory import DifyAPIRepositoryFactory
from services.human_input_service import Form, HumanInputService
from services.workflow_event_snapshot_service import build_workflow_event_stream
logger = logging.getLogger(__name__)
def _jsonify_form_definition(form: Form) -> Response:
payload = form.get_definition().model_dump()
payload["expiration_time"] = int(form.expiration_time.timestamp())
return Response(json.dumps(payload, ensure_ascii=False), mimetype="application/json")
@console_ns.route("/form/human_input/<string:form_token>")
class ConsoleHumanInputFormApi(Resource):
"""Console API for getting human input form definition."""
@staticmethod
def _ensure_console_access(form: Form):
_, current_tenant_id = current_account_with_tenant()
if form.tenant_id != current_tenant_id:
raise NotFoundError("App not found")
@setup_required
@login_required
@account_initialization_required
def get(self, form_token: str):
"""
Get human input form definition by form token.
GET /console/api/form/human_input/<form_token>
"""
service = HumanInputService(db.engine)
form = service.get_form_definition_by_token_for_console(form_token)
if form is None:
raise NotFoundError(f"form not found, token={form_token}")
self._ensure_console_access(form)
return _jsonify_form_definition(form)
@account_initialization_required
@login_required
def post(self, form_token: str):
"""
Submit human input form by form token.
POST /console/api/form/human_input/<form_token>
Request body:
{
"inputs": {
"content": "User input content"
},
"action": "Approve"
}
"""
parser = reqparse.RequestParser()
parser.add_argument("inputs", type=dict, required=True, location="json")
parser.add_argument("action", type=str, required=True, location="json")
args = parser.parse_args()
current_user, _ = current_account_with_tenant()
service = HumanInputService(db.engine)
form = service.get_form_by_token(form_token)
if form is None:
raise NotFoundError(f"form not found, token={form_token}")
self._ensure_console_access(form)
recipient_type = form.recipient_type
if recipient_type not in {RecipientType.CONSOLE, RecipientType.BACKSTAGE}:
raise NotFoundError(f"form not found, token={form_token}")
# The type checker is not smart enought to validate the following invariant.
# So we need to assert it manually.
assert recipient_type is not None, "recipient_type cannot be None here."
service.submit_form_by_token(
recipient_type=recipient_type,
form_token=form_token,
selected_action_id=args["action"],
form_data=args["inputs"],
submission_user_id=current_user.id,
)
return jsonify({})
@console_ns.route("/workflow/<string:workflow_run_id>/events")
class ConsoleWorkflowEventsApi(Resource):
"""Console API for getting workflow execution events after resume."""
@account_initialization_required
@login_required
def get(self, workflow_run_id: str):
"""
Get workflow execution events stream after resume.
GET /console/api/workflow/<workflow_run_id>/events
Returns Server-Sent Events stream.
"""
user, tenant_id = current_account_with_tenant()
session_maker = sessionmaker(db.engine)
repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
workflow_run = repo.get_workflow_run_by_id_and_tenant_id(
tenant_id=tenant_id,
run_id=workflow_run_id,
)
if workflow_run is None:
raise NotFoundError(f"WorkflowRun not found, id={workflow_run_id}")
if workflow_run.created_by_role != CreatorUserRole.ACCOUNT:
raise NotFoundError(f"WorkflowRun not created by account, id={workflow_run_id}")
if workflow_run.created_by != user.id:
raise NotFoundError(f"WorkflowRun not created by the current account, id={workflow_run_id}")
with Session(expire_on_commit=False, bind=db.engine) as session:
app = _retrieve_app_for_workflow_run(session, workflow_run)
if workflow_run.finished_at is not None:
# TODO(QuantumGhost): should we modify the handling for finished workflow run here?
response = WorkflowResponseConverter.workflow_run_result_to_finish_response(
task_id=workflow_run.id,
workflow_run=workflow_run,
creator_user=user,
)
payload = response.model_dump(mode="json")
payload["event"] = response.event.value
def _generate_finished_events() -> Generator[str, None, None]:
yield f"data: {json.dumps(payload)}\n\n"
event_generator = _generate_finished_events
else:
msg_generator = MessageGenerator()
if app.mode == AppMode.ADVANCED_CHAT:
generator = AdvancedChatAppGenerator()
elif app.mode == AppMode.WORKFLOW:
generator = WorkflowAppGenerator()
else:
raise InvalidArgumentError(f"cannot subscribe to workflow run, workflow_run_id={workflow_run.id}")
include_state_snapshot = request.args.get("include_state_snapshot", "false").lower() == "true"
def _generate_stream_events():
if include_state_snapshot:
return generator.convert_to_event_stream(
build_workflow_event_stream(
app_mode=AppMode(app.mode),
workflow_run=workflow_run,
tenant_id=workflow_run.tenant_id,
app_id=workflow_run.app_id,
session_maker=session_maker,
)
)
return generator.convert_to_event_stream(
msg_generator.retrieve_events(AppMode(app.mode), workflow_run.id),
)
event_generator = _generate_stream_events
return Response(
event_generator(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
def _retrieve_app_for_workflow_run(session: Session, workflow_run: WorkflowRun):
query = select(App).where(
App.id == workflow_run.app_id,
App.tenant_id == workflow_run.tenant_id,
)
app = session.scalars(query).first()
if app is None:
raise AssertionError(
f"App not found for WorkflowRun, workflow_run_id={workflow_run.id}, "
f"app_id={workflow_run.app_id}, tenant_id={workflow_run.tenant_id}"
)
return app

View File

@@ -120,7 +120,7 @@ class TagUpdateDeleteApi(Resource):
TagService.delete_tag(tag_id)
return "", 204
return 204
@console_ns.route("/tag-bindings/create")

View File

@@ -34,8 +34,6 @@ from .dataset import (
metadata,
segment,
)
from .dataset.rag_pipeline import rag_pipeline_workflow
from .end_user import end_user
from .workspace import models
__all__ = [
@@ -46,7 +44,6 @@ __all__ = [
"conversation",
"dataset",
"document",
"end_user",
"file",
"file_preview",
"hit_testing",
@@ -54,7 +51,6 @@ __all__ = [
"message",
"metadata",
"models",
"rag_pipeline_workflow",
"segment",
"site",
"workflow",

View File

@@ -33,9 +33,8 @@ from core.workflow.graph_engine.manager import GraphEngineManager
from extensions.ext_database import db
from fields.workflow_app_log_fields import build_workflow_app_log_pagination_model
from libs import helper
from libs.helper import OptionalTimestampField, TimestampField
from libs.helper import TimestampField
from models.model import App, AppMode, EndUser
from models.workflow import WorkflowRun
from repositories.factory import DifyAPIRepositoryFactory
from services.app_generate_service import AppGenerateService
from services.errors.app import IsDraftWorkflowError, WorkflowIdFormatError, WorkflowNotFoundError
@@ -64,32 +63,17 @@ class WorkflowLogQuery(BaseModel):
register_schema_models(service_api_ns, WorkflowRunPayload, WorkflowLogQuery)
class WorkflowRunStatusField(fields.Raw):
def output(self, key, obj: WorkflowRun, **kwargs):
return obj.status.value
class WorkflowRunOutputsField(fields.Raw):
def output(self, key, obj: WorkflowRun, **kwargs):
if obj.status == WorkflowExecutionStatus.PAUSED:
return {}
outputs = obj.outputs_dict
return outputs or {}
workflow_run_fields = {
"id": fields.String,
"workflow_id": fields.String,
"status": WorkflowRunStatusField,
"status": fields.String,
"inputs": fields.Raw,
"outputs": WorkflowRunOutputsField,
"outputs": fields.Raw,
"error": fields.String,
"total_steps": fields.Integer,
"total_tokens": fields.Integer,
"created_at": TimestampField,
"finished_at": OptionalTimestampField,
"finished_at": TimestampField,
"elapsed_time": fields.Float,
}

View File

@@ -396,7 +396,7 @@ class DatasetApi(DatasetApiResource):
try:
if DatasetService.delete_dataset(dataset_id_str, current_user):
DatasetPermissionService.clear_partial_member_list(dataset_id_str)
return "", 204
return 204
else:
raise NotFound("Dataset not found.")
except services.errors.dataset.DatasetInUseError:
@@ -557,7 +557,7 @@ class DatasetTagsApi(DatasetApiResource):
payload = TagDeletePayload.model_validate(service_api_ns.payload or {})
TagService.delete_tag(payload.tag_id)
return "", 204
return 204
@service_api_ns.route("/datasets/tags/binding")
@@ -581,7 +581,7 @@ class DatasetTagBindingApi(DatasetApiResource):
payload = TagBindingPayload.model_validate(service_api_ns.payload or {})
TagService.save_tag_binding({"tag_ids": payload.tag_ids, "target_id": payload.target_id, "type": "knowledge"})
return "", 204
return 204
@service_api_ns.route("/datasets/tags/unbinding")
@@ -605,7 +605,7 @@ class DatasetTagUnbindingApi(DatasetApiResource):
payload = TagUnbindingPayload.model_validate(service_api_ns.payload or {})
TagService.delete_tag_binding({"tag_id": payload.tag_id, "target_id": payload.target_id, "type": "knowledge"})
return "", 204
return 204
@service_api_ns.route("/datasets/<uuid:dataset_id>/tags")

View File

@@ -746,4 +746,4 @@ class DocumentApi(DatasetApiResource):
except services.errors.document.DocumentIndexingError:
raise DocumentIndexingError("Cannot delete document during indexing.")
return "", 204
return 204

View File

@@ -128,7 +128,7 @@ class DatasetMetadataServiceApi(DatasetApiResource):
DatasetService.check_dataset_permission(dataset, current_user)
MetadataService.delete_metadata(dataset_id_str, metadata_id_str)
return "", 204
return 204
@service_api_ns.route("/datasets/<uuid:dataset_id>/metadata/built-in")

View File

@@ -1,3 +1,5 @@
import string
import uuid
from collections.abc import Generator
from typing import Any
@@ -10,7 +12,6 @@ from controllers.common.errors import FilenameNotExistsError, NoFileUploadedErro
from controllers.common.schema import register_schema_model
from controllers.service_api import service_api_ns
from controllers.service_api.dataset.error import PipelineRunError
from controllers.service_api.dataset.rag_pipeline.serializers import serialize_upload_file
from controllers.service_api.wraps import DatasetApiResource
from core.app.apps.pipeline.pipeline_generator import PipelineGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
@@ -40,7 +41,7 @@ register_schema_model(service_api_ns, DatasourceNodeRunPayload)
register_schema_model(service_api_ns, PipelineRunApiEntity)
@service_api_ns.route("/datasets/<uuid:dataset_id>/pipeline/datasource-plugins")
@service_api_ns.route(f"/datasets/{uuid:dataset_id}/pipeline/datasource-plugins")
class DatasourcePluginsApi(DatasetApiResource):
"""Resource for datasource plugins."""
@@ -75,7 +76,7 @@ class DatasourcePluginsApi(DatasetApiResource):
return datasource_plugins, 200
@service_api_ns.route("/datasets/<uuid:dataset_id>/pipeline/datasource/nodes/<string:node_id>/run")
@service_api_ns.route(f"/datasets/{uuid:dataset_id}/pipeline/datasource/nodes/{string:node_id}/run")
class DatasourceNodeRunApi(DatasetApiResource):
"""Resource for datasource node run."""
@@ -130,7 +131,7 @@ class DatasourceNodeRunApi(DatasetApiResource):
)
@service_api_ns.route("/datasets/<uuid:dataset_id>/pipeline/run")
@service_api_ns.route(f"/datasets/{uuid:dataset_id}/pipeline/run")
class PipelineRunApi(DatasetApiResource):
"""Resource for datasource node run."""
@@ -231,4 +232,12 @@ class KnowledgebasePipelineFileUploadApi(DatasetApiResource):
except services.errors.file.UnsupportedFileTypeError:
raise UnsupportedFileTypeError()
return serialize_upload_file(upload_file), 201
return {
"id": upload_file.id,
"name": upload_file.name,
"size": upload_file.size,
"extension": upload_file.extension,
"mime_type": upload_file.mime_type,
"created_by": upload_file.created_by,
"created_at": upload_file.created_at,
}, 201

View File

@@ -1,22 +0,0 @@
"""
Serialization helpers for Service API knowledge pipeline endpoints.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from models.model import UploadFile
def serialize_upload_file(upload_file: UploadFile) -> dict[str, Any]:
return {
"id": upload_file.id,
"name": upload_file.name,
"size": upload_file.size,
"extension": upload_file.extension,
"mime_type": upload_file.mime_type,
"created_by": upload_file.created_by,
"created_at": upload_file.created_at.isoformat() if upload_file.created_at else None,
}

View File

@@ -233,7 +233,7 @@ class DatasetSegmentApi(DatasetApiResource):
if not segment:
raise NotFound("Segment not found.")
SegmentService.delete_segment(segment, document, dataset)
return "", 204
return 204
@service_api_ns.expect(service_api_ns.models[SegmentUpdatePayload.__name__])
@service_api_ns.doc("update_segment")
@@ -499,7 +499,7 @@ class DatasetChildChunkApi(DatasetApiResource):
except ChildChunkDeleteIndexServiceError as e:
raise ChildChunkDeleteIndexError(str(e))
return "", 204
return 204
@service_api_ns.expect(service_api_ns.models[ChildChunkUpdatePayload.__name__])
@service_api_ns.doc("update_child_chunk")

View File

@@ -1,3 +0,0 @@
from . import end_user
__all__ = ["end_user"]

View File

@@ -1,41 +0,0 @@
from uuid import UUID
from flask_restx import Resource
from controllers.service_api import service_api_ns
from controllers.service_api.end_user.error import EndUserNotFoundError
from controllers.service_api.wraps import validate_app_token
from fields.end_user_fields import EndUserDetail
from models.model import App
from services.end_user_service import EndUserService
@service_api_ns.route("/end-users/<uuid:end_user_id>")
class EndUserApi(Resource):
"""Resource for retrieving end user details by ID."""
@service_api_ns.doc("get_end_user")
@service_api_ns.doc(description="Get an end user by ID")
@service_api_ns.doc(
params={"end_user_id": "End user ID"},
responses={
200: "End user retrieved successfully",
401: "Unauthorized - invalid API token",
404: "End user not found",
},
)
@validate_app_token
def get(self, app_model: App, end_user_id: UUID):
"""Get end user detail.
This endpoint is scoped to the current app token's tenant/app to prevent
cross-tenant/app access when an end-user ID is known.
"""
end_user = EndUserService.get_end_user_by_id(
tenant_id=app_model.tenant_id, app_id=app_model.id, end_user_id=str(end_user_id)
)
if end_user is None:
raise EndUserNotFoundError()
return EndUserDetail.model_validate(end_user).model_dump(mode="json")

View File

@@ -1,7 +0,0 @@
from libs.exception import BaseHTTPException
class EndUserNotFoundError(BaseHTTPException):
error_code = "end_user_not_found"
description = "End user not found."
code = 404

View File

@@ -1,24 +1,27 @@
import logging
import time
from collections.abc import Callable
from datetime import timedelta
from enum import StrEnum, auto
from functools import wraps
from typing import Concatenate, ParamSpec, TypeVar, cast
from typing import Concatenate, ParamSpec, TypeVar
from flask import current_app, request
from flask_login import user_logged_in
from flask_restx import Resource
from pydantic import BaseModel
from sqlalchemy import select, update
from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden, NotFound, Unauthorized
from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.datetime_utils import naive_utc_now
from libs.login import current_user
from models import Account, Tenant, TenantAccountJoin, TenantStatus
from models.dataset import Dataset, RateLimitLog
from models.model import ApiToken, App
from services.api_token_service import ApiTokenCache, fetch_token_with_single_flight, record_token_usage
from services.end_user_service import EndUserService
from services.feature_service import FeatureService
@@ -217,8 +220,6 @@ def validate_dataset_token(view: Callable[Concatenate[T, P], R] | None = None):
def decorator(view: Callable[Concatenate[T, P], R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
api_token = validate_and_get_api_token("dataset")
# get url path dataset_id from positional args or kwargs
# Flask passes URL path parameters as positional arguments
dataset_id = None
@@ -255,18 +256,12 @@ def validate_dataset_token(view: Callable[Concatenate[T, P], R] | None = None):
# Validate dataset if dataset_id is provided
if dataset_id:
dataset_id = str(dataset_id)
dataset = (
db.session.query(Dataset)
.where(
Dataset.id == dataset_id,
Dataset.tenant_id == api_token.tenant_id,
)
.first()
)
dataset = db.session.query(Dataset).where(Dataset.id == dataset_id).first()
if not dataset:
raise NotFound("Dataset not found.")
if not dataset.enable_api:
raise Forbidden("Dataset api access is not enabled.")
api_token = validate_and_get_api_token("dataset")
tenant_account_join = (
db.session.query(Tenant, TenantAccountJoin)
.where(Tenant.id == api_token.tenant_id)
@@ -301,14 +296,7 @@ def validate_dataset_token(view: Callable[Concatenate[T, P], R] | None = None):
def validate_and_get_api_token(scope: str | None = None):
"""
Validate and get API token with Redis caching.
This function uses a two-tier approach:
1. First checks Redis cache for the token
2. If not cached, queries database and caches the result
The last_used_at field is updated asynchronously via Celery task
to avoid blocking the request.
Validate and get API token.
"""
auth_header = request.headers.get("Authorization")
if auth_header is None or " " not in auth_header:
@@ -320,18 +308,29 @@ def validate_and_get_api_token(scope: str | None = None):
if auth_scheme != "bearer":
raise Unauthorized("Authorization scheme must be 'Bearer'")
# Try to get token from cache first
# Returns a CachedApiToken (plain Python object), not a SQLAlchemy model
cached_token = ApiTokenCache.get(auth_token, scope)
if cached_token is not None:
logger.debug("Token validation served from cache for scope: %s", scope)
# Record usage in Redis for later batch update (no Celery task per request)
record_token_usage(auth_token, scope)
return cast(ApiToken, cached_token)
current_time = naive_utc_now()
cutoff_time = current_time - timedelta(minutes=1)
with Session(db.engine, expire_on_commit=False) as session:
update_stmt = (
update(ApiToken)
.where(
ApiToken.token == auth_token,
(ApiToken.last_used_at.is_(None) | (ApiToken.last_used_at < cutoff_time)),
ApiToken.type == scope,
)
.values(last_used_at=current_time)
)
stmt = select(ApiToken).where(ApiToken.token == auth_token, ApiToken.type == scope)
result = session.execute(update_stmt)
api_token = session.scalar(stmt)
# Cache miss - use Redis lock for single-flight mode
# This ensures only one request queries DB for the same token concurrently
return fetch_token_with_single_flight(auth_token, scope)
if hasattr(result, "rowcount") and result.rowcount > 0:
session.commit()
if not api_token:
raise Unauthorized("Access token is invalid")
return api_token
class DatasetApiResource(Resource):

View File

@@ -23,7 +23,6 @@ from . import (
feature,
files,
forgot_password,
human_input_form,
login,
message,
passport,
@@ -31,7 +30,6 @@ from . import (
saved_message,
site,
workflow,
workflow_events,
)
api.add_namespace(web_ns)
@@ -46,7 +44,6 @@ __all__ = [
"feature",
"files",
"forgot_password",
"human_input_form",
"login",
"message",
"passport",
@@ -55,5 +52,4 @@ __all__ = [
"site",
"web_ns",
"workflow",
"workflow_events",
]

View File

@@ -117,12 +117,6 @@ class InvokeRateLimitError(BaseHTTPException):
code = 429
class WebFormRateLimitExceededError(BaseHTTPException):
error_code = "web_form_rate_limit_exceeded"
description = "Too many form requests. Please try again later."
code = 429
class NotFoundError(BaseHTTPException):
error_code = "not_found"
code = 404

View File

@@ -1,161 +0,0 @@
"""
Web App Human Input Form APIs.
"""
import json
import logging
from datetime import datetime
from flask import Response, request
from flask_restx import Resource, reqparse
from werkzeug.exceptions import Forbidden
from configs import dify_config
from controllers.web import web_ns
from controllers.web.error import NotFoundError, WebFormRateLimitExceededError
from controllers.web.site import serialize_app_site_payload
from extensions.ext_database import db
from libs.helper import RateLimiter, extract_remote_ip
from models.account import TenantStatus
from models.model import App, Site
from services.human_input_service import Form, FormNotFoundError, HumanInputService
logger = logging.getLogger(__name__)
_FORM_SUBMIT_RATE_LIMITER = RateLimiter(
prefix="web_form_submit_rate_limit",
max_attempts=dify_config.WEB_FORM_SUBMIT_RATE_LIMIT_MAX_ATTEMPTS,
time_window=dify_config.WEB_FORM_SUBMIT_RATE_LIMIT_WINDOW_SECONDS,
)
_FORM_ACCESS_RATE_LIMITER = RateLimiter(
prefix="web_form_access_rate_limit",
max_attempts=dify_config.WEB_FORM_SUBMIT_RATE_LIMIT_MAX_ATTEMPTS,
time_window=dify_config.WEB_FORM_SUBMIT_RATE_LIMIT_WINDOW_SECONDS,
)
def _stringify_default_values(values: dict[str, object]) -> dict[str, str]:
result: dict[str, str] = {}
for key, value in values.items():
if value is None:
result[key] = ""
elif isinstance(value, (dict, list)):
result[key] = json.dumps(value, ensure_ascii=False)
else:
result[key] = str(value)
return result
def _to_timestamp(value: datetime) -> int:
return int(value.timestamp())
def _jsonify_form_definition(form: Form, site_payload: dict | None = None) -> Response:
"""Return the form payload (optionally with site) as a JSON response."""
definition_payload = form.get_definition().model_dump()
payload = {
"form_content": definition_payload["rendered_content"],
"inputs": definition_payload["inputs"],
"resolved_default_values": _stringify_default_values(definition_payload["default_values"]),
"user_actions": definition_payload["user_actions"],
"expiration_time": _to_timestamp(form.expiration_time),
}
if site_payload is not None:
payload["site"] = site_payload
return Response(json.dumps(payload, ensure_ascii=False), mimetype="application/json")
@web_ns.route("/form/human_input/<string:form_token>")
class HumanInputFormApi(Resource):
"""API for getting and submitting human input forms via the web app."""
# NOTE(QuantumGhost): this endpoint is unauthenticated on purpose for now.
# def get(self, _app_model: App, _end_user: EndUser, form_token: str):
def get(self, form_token: str):
"""
Get human input form definition by token.
GET /api/form/human_input/<form_token>
"""
ip_address = extract_remote_ip(request)
if _FORM_ACCESS_RATE_LIMITER.is_rate_limited(ip_address):
raise WebFormRateLimitExceededError()
_FORM_ACCESS_RATE_LIMITER.increment_rate_limit(ip_address)
service = HumanInputService(db.engine)
# TODO(QuantumGhost): forbid submision for form tokens
# that are only for console.
form = service.get_form_by_token(form_token)
if form is None:
raise NotFoundError("Form not found")
service.ensure_form_active(form)
app_model, site = _get_app_site_from_form(form)
return _jsonify_form_definition(form, site_payload=serialize_app_site_payload(app_model, site, None))
# def post(self, _app_model: App, _end_user: EndUser, form_token: str):
def post(self, form_token: str):
"""
Submit human input form by token.
POST /api/form/human_input/<form_token>
Request body:
{
"inputs": {
"content": "User input content"
},
"action": "Approve"
}
"""
parser = reqparse.RequestParser()
parser.add_argument("inputs", type=dict, required=True, location="json")
parser.add_argument("action", type=str, required=True, location="json")
args = parser.parse_args()
ip_address = extract_remote_ip(request)
if _FORM_SUBMIT_RATE_LIMITER.is_rate_limited(ip_address):
raise WebFormRateLimitExceededError()
_FORM_SUBMIT_RATE_LIMITER.increment_rate_limit(ip_address)
service = HumanInputService(db.engine)
form = service.get_form_by_token(form_token)
if form is None:
raise NotFoundError("Form not found")
if (recipient_type := form.recipient_type) is None:
logger.warning("Recipient type is None for form, form_id=%", form.id)
raise AssertionError("Recipient type is None")
try:
service.submit_form_by_token(
recipient_type=recipient_type,
form_token=form_token,
selected_action_id=args["action"],
form_data=args["inputs"],
submission_end_user_id=None,
# submission_end_user_id=_end_user.id,
)
except FormNotFoundError:
raise NotFoundError("Form not found")
return {}, 200
def _get_app_site_from_form(form: Form) -> tuple[App, Site]:
"""Resolve App/Site for the form's app and validate tenant status."""
app_model = db.session.query(App).where(App.id == form.app_id).first()
if app_model is None or app_model.tenant_id != form.tenant_id:
raise NotFoundError("Form not found")
site = db.session.query(Site).where(Site.app_id == app_model.id).first()
if site is None:
raise Forbidden()
if app_model.tenant and app_model.tenant.status == TenantStatus.ARCHIVE:
raise Forbidden()
return app_model, site

View File

@@ -1,6 +1,4 @@
from typing import cast
from flask_restx import fields, marshal, marshal_with
from flask_restx import fields, marshal_with
from werkzeug.exceptions import Forbidden
from configs import dify_config
@@ -9,7 +7,7 @@ from controllers.web.wraps import WebApiResource
from extensions.ext_database import db
from libs.helper import AppIconUrlField
from models.account import TenantStatus
from models.model import App, Site
from models.model import Site
from services.feature_service import FeatureService
@@ -110,14 +108,3 @@ class AppSiteInfo:
"remove_webapp_brand": remove_webapp_brand,
"replace_webapp_logo": replace_webapp_logo,
}
def serialize_site(site: Site) -> dict:
"""Serialize Site model using the same schema as AppSiteApi."""
return cast(dict, marshal(site, AppSiteApi.site_fields))
def serialize_app_site_payload(app_model: App, site: Site, end_user_id: str | None) -> dict:
can_replace_logo = FeatureService.get_features(app_model.tenant_id).can_replace_logo
app_site_info = AppSiteInfo(app_model.tenant, app_model, site, end_user_id, can_replace_logo)
return cast(dict, marshal(app_site_info, AppSiteApi.app_fields))

View File

@@ -1,112 +0,0 @@
"""
Web App Workflow Resume APIs.
"""
import json
from collections.abc import Generator
from flask import Response, request
from sqlalchemy.orm import sessionmaker
from controllers.web import api
from controllers.web.error import InvalidArgumentError, NotFoundError
from controllers.web.wraps import WebApiResource
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
from core.app.apps.base_app_generator import BaseAppGenerator
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.apps.message_generator import MessageGenerator
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from extensions.ext_database import db
from models.enums import CreatorUserRole
from models.model import App, AppMode, EndUser
from repositories.factory import DifyAPIRepositoryFactory
from services.workflow_event_snapshot_service import build_workflow_event_stream
class WorkflowEventsApi(WebApiResource):
"""API for getting workflow execution events after resume."""
def get(self, app_model: App, end_user: EndUser, task_id: str):
"""
Get workflow execution events stream after resume.
GET /api/workflow/<task_id>/events
Returns Server-Sent Events stream.
"""
workflow_run_id = task_id
session_maker = sessionmaker(db.engine)
repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
workflow_run = repo.get_workflow_run_by_id_and_tenant_id(
tenant_id=app_model.tenant_id,
run_id=workflow_run_id,
)
if workflow_run is None:
raise NotFoundError(f"WorkflowRun not found, id={workflow_run_id}")
if workflow_run.app_id != app_model.id:
raise NotFoundError(f"WorkflowRun not found, id={workflow_run_id}")
if workflow_run.created_by_role != CreatorUserRole.END_USER:
raise NotFoundError(f"WorkflowRun not created by end user, id={workflow_run_id}")
if workflow_run.created_by != end_user.id:
raise NotFoundError(f"WorkflowRun not created by the current end user, id={workflow_run_id}")
if workflow_run.finished_at is not None:
response = WorkflowResponseConverter.workflow_run_result_to_finish_response(
task_id=workflow_run.id,
workflow_run=workflow_run,
creator_user=end_user,
)
payload = response.model_dump(mode="json")
payload["event"] = response.event.value
def _generate_finished_events() -> Generator[str, None, None]:
yield f"data: {json.dumps(payload)}\n\n"
event_generator = _generate_finished_events
else:
app_mode = AppMode.value_of(app_model.mode)
msg_generator = MessageGenerator()
generator: BaseAppGenerator
if app_mode == AppMode.ADVANCED_CHAT:
generator = AdvancedChatAppGenerator()
elif app_mode == AppMode.WORKFLOW:
generator = WorkflowAppGenerator()
else:
raise InvalidArgumentError(f"cannot subscribe to workflow run, workflow_run_id={workflow_run.id}")
include_state_snapshot = request.args.get("include_state_snapshot", "false").lower() == "true"
def _generate_stream_events():
if include_state_snapshot:
return generator.convert_to_event_stream(
build_workflow_event_stream(
app_mode=app_mode,
workflow_run=workflow_run,
tenant_id=app_model.tenant_id,
app_id=app_model.id,
session_maker=session_maker,
)
)
return generator.convert_to_event_stream(
msg_generator.retrieve_events(app_mode, workflow_run.id),
)
event_generator = _generate_stream_events
return Response(
event_generator(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
# Register the APIs
api.add_resource(WorkflowEventsApi, "/workflow/<string:task_id>/events")

View File

@@ -4,8 +4,8 @@ import contextvars
import logging
import threading
import uuid
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Literal, TypeVar, Union, overload
from collections.abc import Generator, Mapping
from typing import TYPE_CHECKING, Any, Literal, Union, overload
from flask import Flask, current_app
from pydantic import ValidationError
@@ -29,25 +29,21 @@ from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
from core.app.entities.task_entities import ChatbotAppBlockingResponse, ChatbotAppStreamResponse
from core.app.layers.pause_state_persist_layer import PauseStateLayerConfig, PauseStatePersistenceLayer
from core.helper.trace_id_helper import extract_external_trace_id_from_args
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
from core.repositories import DifyCoreRepositoryFactory
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.repositories.draft_variable_repository import (
DraftVariableSaverFactory,
)
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.runtime import GraphRuntimeState
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
from extensions.ext_database import db
from factories import file_factory
from libs.flask_utils import preserve_flask_contexts
from models import Account, App, Conversation, EndUser, Message, Workflow, WorkflowNodeExecutionTriggeredFrom
from models.base import Base
from models.enums import WorkflowRunTriggeredFrom
from services.conversation_service import ConversationService
from services.workflow_draft_variable_service import (
@@ -69,9 +65,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke_from: InvokeFrom,
workflow_run_id: str,
streaming: Literal[False],
pause_state_config: PauseStateLayerConfig | None = None,
) -> Mapping[str, Any]: ...
@overload
@@ -80,11 +74,9 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: Mapping[str, Any],
args: Mapping,
invoke_from: InvokeFrom,
workflow_run_id: str,
streaming: Literal[True],
pause_state_config: PauseStateLayerConfig | None = None,
) -> Generator[Mapping | str, None, None]: ...
@overload
@@ -93,11 +85,9 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: Mapping[str, Any],
args: Mapping,
invoke_from: InvokeFrom,
workflow_run_id: str,
streaming: bool,
pause_state_config: PauseStateLayerConfig | None = None,
) -> Mapping[str, Any] | Generator[str | Mapping, None, None]: ...
def generate(
@@ -105,11 +95,9 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: Mapping[str, Any],
args: Mapping,
invoke_from: InvokeFrom,
workflow_run_id: str,
streaming: bool = True,
pause_state_config: PauseStateLayerConfig | None = None,
) -> Mapping[str, Any] | Generator[str | Mapping, None, None]:
"""
Generate App response.
@@ -173,6 +161,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
# always enable retriever resource in debugger mode
app_config.additional_features.show_retrieve_source = True # type: ignore
workflow_run_id = str(uuid.uuid4())
# init application generate entity
application_generate_entity = AdvancedChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
@@ -190,7 +179,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
workflow_run_id=str(workflow_run_id),
workflow_run_id=workflow_run_id,
)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())
@@ -227,38 +216,6 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
workflow_node_execution_repository=workflow_node_execution_repository,
conversation=conversation,
stream=streaming,
pause_state_config=pause_state_config,
)
def resume(
self,
*,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
conversation: Conversation,
message: Message,
application_generate_entity: AdvancedChatAppGenerateEntity,
workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
graph_runtime_state: GraphRuntimeState,
pause_state_config: PauseStateLayerConfig | None = None,
):
"""
Resume a paused advanced chat execution.
"""
return self._generate(
workflow=workflow,
user=user,
invoke_from=application_generate_entity.invoke_from,
application_generate_entity=application_generate_entity,
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
conversation=conversation,
message=message,
stream=application_generate_entity.stream,
pause_state_config=pause_state_config,
graph_runtime_state=graph_runtime_state,
)
def single_iteration_generate(
@@ -439,12 +396,8 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
conversation: Conversation | None = None,
message: Message | None = None,
stream: bool = True,
variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER,
pause_state_config: PauseStateLayerConfig | None = None,
graph_runtime_state: GraphRuntimeState | None = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], Any, None]:
"""
Generate App response.
@@ -458,12 +411,12 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
:param conversation: conversation
:param stream: is stream
"""
is_first_conversation = conversation is None
is_first_conversation = False
if not conversation:
is_first_conversation = True
if conversation is not None and message is not None:
pass
else:
conversation, message = self._init_generate_records(application_generate_entity, conversation)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
if is_first_conversation:
# update conversation features
@@ -486,16 +439,6 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
message_id=message.id,
)
graph_layers: list[GraphEngineLayer] = list(graph_engine_layers)
if pause_state_config is not None:
graph_layers.append(
PauseStatePersistenceLayer(
session_factory=pause_state_config.session_factory,
generate_entity=application_generate_entity,
state_owner_user_id=pause_state_config.state_owner_user_id,
)
)
# new thread with request context and contextvars
context = contextvars.copy_context()
@@ -511,25 +454,14 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
"variable_loader": variable_loader,
"workflow_execution_repository": workflow_execution_repository,
"workflow_node_execution_repository": workflow_node_execution_repository,
"graph_engine_layers": tuple(graph_layers),
"graph_runtime_state": graph_runtime_state,
},
)
worker_thread.start()
# release database connection, because the following new thread operations may take a long time
with Session(bind=db.engine, expire_on_commit=False) as session:
workflow = _refresh_model(session, workflow)
message = _refresh_model(session, message)
# workflow_ = session.get(Workflow, workflow.id)
# assert workflow_ is not None
# workflow = workflow_
# message_ = session.get(Message, message.id)
# assert message_ is not None
# message = message_
# db.session.refresh(workflow)
# db.session.refresh(message)
db.session.refresh(workflow)
db.session.refresh(message)
# db.session.refresh(user)
db.session.close()
@@ -558,8 +490,6 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
variable_loader: VariableLoader,
workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
graph_runtime_state: GraphRuntimeState | None = None,
):
"""
Generate worker in a new thread.
@@ -617,8 +547,6 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
app=app,
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
graph_engine_layers=graph_engine_layers,
graph_runtime_state=graph_runtime_state,
)
try:
@@ -686,13 +614,3 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
else:
logger.exception("Failed to process generate task pipeline, conversation_id: %s", conversation.id)
raise e
_T = TypeVar("_T", bound=Base)
def _refresh_model(session, model: _T) -> _T:
with Session(bind=db.engine, expire_on_commit=False) as session:
detach_model = session.get(type(model), model.id)
assert detach_model is not None
return detach_model

View File

@@ -66,7 +66,6 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
graph_runtime_state: GraphRuntimeState | None = None,
):
super().__init__(
queue_manager=queue_manager,
@@ -83,7 +82,6 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
self._app = app
self._workflow_execution_repository = workflow_execution_repository
self._workflow_node_execution_repository = workflow_node_execution_repository
self._resume_graph_runtime_state = graph_runtime_state
@trace_span(WorkflowAppRunnerHandler)
def run(self):
@@ -112,21 +110,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
invoke_from = InvokeFrom.DEBUGGER
user_from = self._resolve_user_from(invoke_from)
resume_state = self._resume_graph_runtime_state
if resume_state is not None:
graph_runtime_state = resume_state
variable_pool = graph_runtime_state.variable_pool
graph = self._init_graph(
graph_config=self._workflow.graph_dict,
graph_runtime_state=graph_runtime_state,
workflow_id=self._workflow.id,
tenant_id=self._workflow.tenant_id,
user_id=self.application_generate_entity.user_id,
invoke_from=invoke_from,
user_from=user_from,
)
elif self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
# Handle single iteration or single loop run
graph, variable_pool, graph_runtime_state = self._prepare_single_node_execution(
workflow=self._workflow,

View File

@@ -24,8 +24,6 @@ from core.app.entities.queue_entities import (
QueueAgentLogEvent,
QueueAnnotationReplyEvent,
QueueErrorEvent,
QueueHumanInputFormFilledEvent,
QueueHumanInputFormTimeoutEvent,
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
@@ -44,7 +42,6 @@ from core.app.entities.queue_entities import (
QueueTextChunkEvent,
QueueWorkflowFailedEvent,
QueueWorkflowPartialSuccessEvent,
QueueWorkflowPausedEvent,
QueueWorkflowStartedEvent,
QueueWorkflowSucceededEvent,
WorkflowQueueMessage,
@@ -66,8 +63,6 @@ from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.ops_trace_manager import TraceQueueManager
from core.repositories.human_input_repository import HumanInputFormRepositoryImpl
from core.workflow.entities.pause_reason import HumanInputRequired
from core.workflow.enums import WorkflowExecutionStatus
from core.workflow.nodes import NodeType
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
@@ -76,8 +71,7 @@ from core.workflow.system_variable import SystemVariable
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models import Account, Conversation, EndUser, Message, MessageFile
from models.enums import CreatorUserRole, MessageStatus
from models.execution_extra_content import HumanInputContent
from models.enums import CreatorUserRole
from models.workflow import Workflow
logger = logging.getLogger(__name__)
@@ -134,7 +128,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
)
self._task_state = WorkflowTaskState()
self._seed_task_state_from_message(message)
self._message_cycle_manager = MessageCycleManager(
application_generate_entity=application_generate_entity, task_state=self._task_state
)
@@ -142,7 +135,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
self._application_generate_entity = application_generate_entity
self._workflow_id = workflow.id
self._workflow_features_dict = workflow.features_dict
self._workflow_tenant_id = workflow.tenant_id
self._conversation_id = conversation.id
self._conversation_mode = conversation.mode
self._message_id = message.id
@@ -152,13 +144,8 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
self._workflow_run_id: str = ""
self._draft_var_saver_factory = draft_var_saver_factory
self._graph_runtime_state: GraphRuntimeState | None = None
self._message_saved_on_pause = False
self._seed_graph_runtime_state_from_queue_manager()
def _seed_task_state_from_message(self, message: Message) -> None:
if message.status == MessageStatus.PAUSED and message.answer:
self._task_state.answer = message.answer
def process(self) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:
"""
Process generate task pipeline.
@@ -321,7 +308,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
task_id=self._application_generate_entity.task_id,
workflow_run_id=run_id,
workflow_id=self._workflow_id,
reason=event.reason,
)
yield workflow_start_resp
@@ -539,35 +525,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
)
yield workflow_finish_resp
def _handle_workflow_paused_event(
self,
event: QueueWorkflowPausedEvent,
**kwargs,
) -> Generator[StreamResponse, None, None]:
"""Handle workflow paused events."""
validated_state = self._ensure_graph_runtime_initialized()
responses = self._workflow_response_converter.workflow_pause_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
graph_runtime_state=validated_state,
)
for reason in event.reasons:
if isinstance(reason, HumanInputRequired):
self._persist_human_input_extra_content(form_id=reason.form_id, node_id=reason.node_id)
yield from responses
resolved_state: GraphRuntimeState | None = None
try:
resolved_state = self._ensure_graph_runtime_initialized()
except ValueError:
resolved_state = None
with self._database_session() as session:
self._save_message(session=session, graph_runtime_state=resolved_state)
message = self._get_message(session=session)
if message is not None:
message.status = MessageStatus.PAUSED
self._message_saved_on_pause = True
self._base_task_pipeline.queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
def _handle_workflow_failed_event(
@@ -657,10 +614,9 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
reason=QueueMessageReplaceEvent.MessageReplaceReason.OUTPUT_MODERATION,
)
# Save message unless it has already been persisted on pause.
if not self._message_saved_on_pause:
with self._database_session() as session:
self._save_message(session=session, graph_runtime_state=resolved_state)
# Save message
with self._database_session() as session:
self._save_message(session=session, graph_runtime_state=resolved_state)
yield self._message_end_to_stream_response()
@@ -686,65 +642,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
"""Handle message replace events."""
yield self._message_cycle_manager.message_replace_to_stream_response(answer=event.text, reason=event.reason)
def _handle_human_input_form_filled_event(
self, event: QueueHumanInputFormFilledEvent, **kwargs
) -> Generator[StreamResponse, None, None]:
"""Handle human input form filled events."""
self._persist_human_input_extra_content(node_id=event.node_id)
yield self._workflow_response_converter.human_input_form_filled_to_stream_response(
event=event, task_id=self._application_generate_entity.task_id
)
def _handle_human_input_form_timeout_event(
self, event: QueueHumanInputFormTimeoutEvent, **kwargs
) -> Generator[StreamResponse, None, None]:
"""Handle human input form timeout events."""
yield self._workflow_response_converter.human_input_form_timeout_to_stream_response(
event=event, task_id=self._application_generate_entity.task_id
)
def _persist_human_input_extra_content(self, *, node_id: str | None = None, form_id: str | None = None) -> None:
if not self._workflow_run_id or not self._message_id:
return
if form_id is None:
if node_id is None:
return
form_id = self._load_human_input_form_id(node_id=node_id)
if form_id is None:
logger.warning(
"HumanInput form not found for workflow run %s node %s",
self._workflow_run_id,
node_id,
)
return
with self._database_session() as session:
exists_stmt = select(HumanInputContent).where(
HumanInputContent.workflow_run_id == self._workflow_run_id,
HumanInputContent.message_id == self._message_id,
HumanInputContent.form_id == form_id,
)
if session.scalar(exists_stmt) is not None:
return
content = HumanInputContent(
workflow_run_id=self._workflow_run_id,
message_id=self._message_id,
form_id=form_id,
)
session.add(content)
def _load_human_input_form_id(self, *, node_id: str) -> str | None:
form_repository = HumanInputFormRepositoryImpl(
session_factory=db.engine,
tenant_id=self._workflow_tenant_id,
)
form = form_repository.get_form(self._workflow_run_id, node_id)
if form is None:
return None
return form.id
def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]:
"""Handle agent log events."""
yield self._workflow_response_converter.handle_agent_log(
@@ -762,7 +659,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
QueueWorkflowStartedEvent: self._handle_workflow_started_event,
QueueWorkflowSucceededEvent: self._handle_workflow_succeeded_event,
QueueWorkflowPartialSuccessEvent: self._handle_workflow_partial_success_event,
QueueWorkflowPausedEvent: self._handle_workflow_paused_event,
QueueWorkflowFailedEvent: self._handle_workflow_failed_event,
# Node events
QueueNodeRetryEvent: self._handle_node_retry_event,
@@ -784,8 +680,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
QueueMessageReplaceEvent: self._handle_message_replace_event,
QueueAdvancedChatMessageEndEvent: self._handle_advanced_chat_message_end_event,
QueueAgentLogEvent: self._handle_agent_log_event,
QueueHumanInputFormFilledEvent: self._handle_human_input_form_filled_event,
QueueHumanInputFormTimeoutEvent: self._handle_human_input_form_timeout_event,
}
def _dispatch_event(
@@ -853,9 +747,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
case QueueWorkflowFailedEvent():
yield from self._handle_workflow_failed_event(event, trace_manager=trace_manager)
break
case QueueWorkflowPausedEvent():
yield from self._handle_workflow_paused_event(event)
break
case QueueStopEvent():
yield from self._handle_stop_event(event, graph_runtime_state=None, trace_manager=trace_manager)
@@ -881,11 +772,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
def _save_message(self, *, session: Session, graph_runtime_state: GraphRuntimeState | None = None):
message = self._get_message(session=session)
if message is None:
return
if message.status == MessageStatus.PAUSED:
message.status = MessageStatus.NORMAL
# If there are assistant files, remove markdown image links from answer
answer_text = self._task_state.answer

View File

@@ -5,14 +5,9 @@ from dataclasses import dataclass
from datetime import datetime
from typing import Any, NewType, Union
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.queue_entities import (
QueueAgentLogEvent,
QueueHumanInputFormFilledEvent,
QueueHumanInputFormTimeoutEvent,
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
@@ -24,13 +19,9 @@ from core.app.entities.queue_entities import (
QueueNodeRetryEvent,
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueueWorkflowPausedEvent,
)
from core.app.entities.task_entities import (
AgentLogStreamResponse,
HumanInputFormFilledResponse,
HumanInputFormTimeoutResponse,
HumanInputRequiredResponse,
IterationNodeCompletedStreamResponse,
IterationNodeNextStreamResponse,
IterationNodeStartStreamResponse,
@@ -40,9 +31,7 @@ from core.app.entities.task_entities import (
NodeFinishStreamResponse,
NodeRetryStreamResponse,
NodeStartStreamResponse,
StreamResponse,
WorkflowFinishStreamResponse,
WorkflowPauseStreamResponse,
WorkflowStartStreamResponse,
)
from core.file import FILE_MODEL_IDENTITY, File
@@ -51,8 +40,6 @@ from core.tools.entities.tool_entities import ToolProviderType
from core.tools.tool_manager import ToolManager
from core.trigger.trigger_manager import TriggerManager
from core.variables.segments import ArrayFileSegment, FileSegment, Segment
from core.workflow.entities.pause_reason import HumanInputRequired
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.enums import (
NodeType,
SystemVariableKey,
@@ -64,11 +51,8 @@ from core.workflow.runtime import GraphRuntimeState
from core.workflow.system_variable import SystemVariable
from core.workflow.workflow_entry import WorkflowEntry
from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models import Account, EndUser
from models.human_input import HumanInputForm
from models.workflow import WorkflowRun
from services.variable_truncator import BaseTruncator, DummyVariableTruncator, VariableTruncator
NodeExecutionId = NewType("NodeExecutionId", str)
@@ -207,7 +191,6 @@ class WorkflowResponseConverter:
task_id: str,
workflow_run_id: str,
workflow_id: str,
reason: WorkflowStartReason,
) -> WorkflowStartStreamResponse:
run_id = self._ensure_workflow_run_id(workflow_run_id)
started_at = naive_utc_now()
@@ -221,7 +204,6 @@ class WorkflowResponseConverter:
workflow_id=workflow_id,
inputs=self._workflow_inputs,
created_at=int(started_at.timestamp()),
reason=reason,
),
)
@@ -282,160 +264,6 @@ class WorkflowResponseConverter:
),
)
def workflow_pause_to_stream_response(
self,
*,
event: QueueWorkflowPausedEvent,
task_id: str,
graph_runtime_state: GraphRuntimeState,
) -> list[StreamResponse]:
run_id = self._ensure_workflow_run_id()
started_at = self._workflow_started_at
if started_at is None:
raise ValueError(
"workflow_pause_to_stream_response called before workflow_start_to_stream_response",
)
paused_at = naive_utc_now()
elapsed_time = (paused_at - started_at).total_seconds()
encoded_outputs = self._encode_outputs(event.outputs) or {}
if self._application_generate_entity.invoke_from == InvokeFrom.SERVICE_API:
encoded_outputs = {}
pause_reasons = [reason.model_dump(mode="json") for reason in event.reasons]
human_input_form_ids = [reason.form_id for reason in event.reasons if isinstance(reason, HumanInputRequired)]
expiration_times_by_form_id: dict[str, datetime] = {}
if human_input_form_ids:
stmt = select(HumanInputForm.id, HumanInputForm.expiration_time).where(
HumanInputForm.id.in_(human_input_form_ids)
)
with Session(bind=db.engine) as session:
for form_id, expiration_time in session.execute(stmt):
expiration_times_by_form_id[str(form_id)] = expiration_time
responses: list[StreamResponse] = []
for reason in event.reasons:
if isinstance(reason, HumanInputRequired):
expiration_time = expiration_times_by_form_id.get(reason.form_id)
if expiration_time is None:
raise ValueError(f"HumanInputForm not found for pause reason, form_id={reason.form_id}")
responses.append(
HumanInputRequiredResponse(
task_id=task_id,
workflow_run_id=run_id,
data=HumanInputRequiredResponse.Data(
form_id=reason.form_id,
node_id=reason.node_id,
node_title=reason.node_title,
form_content=reason.form_content,
inputs=reason.inputs,
actions=reason.actions,
display_in_ui=reason.display_in_ui,
form_token=reason.form_token,
resolved_default_values=reason.resolved_default_values,
expiration_time=int(expiration_time.timestamp()),
),
)
)
responses.append(
WorkflowPauseStreamResponse(
task_id=task_id,
workflow_run_id=run_id,
data=WorkflowPauseStreamResponse.Data(
workflow_run_id=run_id,
paused_nodes=list(event.paused_nodes),
outputs=encoded_outputs,
reasons=pause_reasons,
status=WorkflowExecutionStatus.PAUSED,
created_at=int(started_at.timestamp()),
elapsed_time=elapsed_time,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
),
)
)
return responses
def human_input_form_filled_to_stream_response(
self, *, event: QueueHumanInputFormFilledEvent, task_id: str
) -> HumanInputFormFilledResponse:
run_id = self._ensure_workflow_run_id()
return HumanInputFormFilledResponse(
task_id=task_id,
workflow_run_id=run_id,
data=HumanInputFormFilledResponse.Data(
node_id=event.node_id,
node_title=event.node_title,
rendered_content=event.rendered_content,
action_id=event.action_id,
action_text=event.action_text,
),
)
def human_input_form_timeout_to_stream_response(
self, *, event: QueueHumanInputFormTimeoutEvent, task_id: str
) -> HumanInputFormTimeoutResponse:
run_id = self._ensure_workflow_run_id()
return HumanInputFormTimeoutResponse(
task_id=task_id,
workflow_run_id=run_id,
data=HumanInputFormTimeoutResponse.Data(
node_id=event.node_id,
node_title=event.node_title,
expiration_time=int(event.expiration_time.timestamp()),
),
)
@classmethod
def workflow_run_result_to_finish_response(
cls,
*,
task_id: str,
workflow_run: WorkflowRun,
creator_user: Account | EndUser,
) -> WorkflowFinishStreamResponse:
run_id = workflow_run.id
elapsed_time = workflow_run.elapsed_time
encoded_outputs = workflow_run.outputs_dict
finished_at = workflow_run.finished_at
assert finished_at is not None
created_by: Mapping[str, object]
user = creator_user
if isinstance(user, Account):
created_by = {
"id": user.id,
"name": user.name,
"email": user.email,
}
else:
created_by = {
"id": user.id,
"user": user.session_id,
}
return WorkflowFinishStreamResponse(
task_id=task_id,
workflow_run_id=run_id,
data=WorkflowFinishStreamResponse.Data(
id=run_id,
workflow_id=workflow_run.workflow_id,
status=workflow_run.status,
outputs=encoded_outputs,
error=workflow_run.error,
elapsed_time=elapsed_time,
total_tokens=workflow_run.total_tokens,
total_steps=workflow_run.total_steps,
created_by=created_by,
created_at=int(workflow_run.created_at.timestamp()),
finished_at=int(finished_at.timestamp()),
files=cls.fetch_files_from_node_outputs(encoded_outputs),
exceptions_count=workflow_run.exceptions_count,
),
)
def workflow_node_start_to_stream_response(
self,
*,
@@ -764,8 +592,7 @@ class WorkflowResponseConverter:
),
)
@classmethod
def fetch_files_from_node_outputs(cls, outputs_dict: Mapping[str, Any] | None) -> Sequence[Mapping[str, Any]]:
def fetch_files_from_node_outputs(self, outputs_dict: Mapping[str, Any] | None) -> Sequence[Mapping[str, Any]]:
"""
Fetch files from node outputs
:param outputs_dict: node outputs dict
@@ -774,7 +601,7 @@ class WorkflowResponseConverter:
if not outputs_dict:
return []
files = [cls._fetch_files_from_variable_value(output_value) for output_value in outputs_dict.values()]
files = [self._fetch_files_from_variable_value(output_value) for output_value in outputs_dict.values()]
# Remove None
files = [file for file in files if file]
# Flatten list

View File

@@ -1,6 +1,6 @@
import json
import logging
from collections.abc import Callable, Generator, Mapping
from collections.abc import Generator
from typing import Union, cast
from sqlalchemy import select
@@ -10,14 +10,12 @@ from core.app.app_config.entities import EasyUIBasedAppConfig, EasyUIBasedAppMod
from core.app.apps.base_app_generator import BaseAppGenerator
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.apps.streaming_utils import stream_topic_events
from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity,
AgentChatAppGenerateEntity,
AppGenerateEntity,
ChatAppGenerateEntity,
CompletionAppGenerateEntity,
ConversationAppGenerateEntity,
InvokeFrom,
)
from core.app.entities.task_entities import (
@@ -29,8 +27,6 @@ from core.app.entities.task_entities import (
from core.app.task_pipeline.easy_ui_based_generate_task_pipeline import EasyUIBasedGenerateTaskPipeline
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from extensions.ext_database import db
from extensions.ext_redis import get_pubsub_broadcast_channel
from libs.broadcast_channel.channel import Topic
from libs.datetime_utils import naive_utc_now
from models import Account
from models.enums import CreatorUserRole
@@ -160,7 +156,6 @@ class MessageBasedAppGenerator(BaseAppGenerator):
query = application_generate_entity.query or "New conversation"
conversation_name = (query[:20] + "") if len(query) > 20 else query
created_new_conversation = conversation is None
try:
if not conversation:
conversation = Conversation(
@@ -237,10 +232,6 @@ class MessageBasedAppGenerator(BaseAppGenerator):
db.session.add_all(message_files)
db.session.commit()
if isinstance(application_generate_entity, ConversationAppGenerateEntity):
application_generate_entity.conversation_id = conversation.id
application_generate_entity.is_new_conversation = created_new_conversation
return conversation, message
except Exception:
db.session.rollback()
@@ -293,29 +284,3 @@ class MessageBasedAppGenerator(BaseAppGenerator):
raise MessageNotExistsError("Message not exists")
return message
@staticmethod
def _make_channel_key(app_mode: AppMode, workflow_run_id: str):
return f"channel:{app_mode}:{workflow_run_id}"
@classmethod
def get_response_topic(cls, app_mode: AppMode, workflow_run_id: str) -> Topic:
key = cls._make_channel_key(app_mode, workflow_run_id)
channel = get_pubsub_broadcast_channel()
topic = channel.topic(key)
return topic
@classmethod
def retrieve_events(
cls,
app_mode: AppMode,
workflow_run_id: str,
idle_timeout=300,
on_subscribe: Callable[[], None] | None = None,
) -> Generator[Mapping | str, None, None]:
topic = cls.get_response_topic(app_mode, workflow_run_id)
return stream_topic_events(
topic=topic,
idle_timeout=idle_timeout,
on_subscribe=on_subscribe,
)

View File

@@ -1,36 +0,0 @@
from collections.abc import Callable, Generator, Mapping
from core.app.apps.streaming_utils import stream_topic_events
from extensions.ext_redis import get_pubsub_broadcast_channel
from libs.broadcast_channel.channel import Topic
from models.model import AppMode
class MessageGenerator:
@staticmethod
def _make_channel_key(app_mode: AppMode, workflow_run_id: str):
return f"channel:{app_mode}:{str(workflow_run_id)}"
@classmethod
def get_response_topic(cls, app_mode: AppMode, workflow_run_id: str) -> Topic:
key = cls._make_channel_key(app_mode, workflow_run_id)
channel = get_pubsub_broadcast_channel()
topic = channel.topic(key)
return topic
@classmethod
def retrieve_events(
cls,
app_mode: AppMode,
workflow_run_id: str,
idle_timeout=300,
ping_interval: float = 10.0,
on_subscribe: Callable[[], None] | None = None,
) -> Generator[Mapping | str, None, None]:
topic = cls.get_response_topic(app_mode, workflow_run_id)
return stream_topic_events(
topic=topic,
idle_timeout=idle_timeout,
ping_interval=ping_interval,
on_subscribe=on_subscribe,
)

View File

@@ -1,70 +0,0 @@
from __future__ import annotations
import json
import time
from collections.abc import Callable, Generator, Iterable, Mapping
from typing import Any
from core.app.entities.task_entities import StreamEvent
from libs.broadcast_channel.channel import Topic
from libs.broadcast_channel.exc import SubscriptionClosedError
def stream_topic_events(
*,
topic: Topic,
idle_timeout: float,
ping_interval: float | None = None,
on_subscribe: Callable[[], None] | None = None,
terminal_events: Iterable[str | StreamEvent] | None = None,
) -> Generator[Mapping[str, Any] | str, None, None]:
# send a PING event immediately to prevent the connection staying in pending state for a long time.
#
# This simplify the debugging process as the DevTools in Chrome does not
# provide complete curl command for pending connections.
yield StreamEvent.PING.value
terminal_values = _normalize_terminal_events(terminal_events)
last_msg_time = time.time()
last_ping_time = last_msg_time
with topic.subscribe() as sub:
# on_subscribe fires only after the Redis subscription is active.
# This is used to gate task start and reduce pub/sub race for the first event.
if on_subscribe is not None:
on_subscribe()
while True:
try:
msg = sub.receive(timeout=0.1)
except SubscriptionClosedError:
return
if msg is None:
current_time = time.time()
if current_time - last_msg_time > idle_timeout:
return
if ping_interval is not None and current_time - last_ping_time >= ping_interval:
yield StreamEvent.PING.value
last_ping_time = current_time
continue
last_msg_time = time.time()
last_ping_time = last_msg_time
event = json.loads(msg)
yield event
if not isinstance(event, dict):
continue
event_type = event.get("event")
if event_type in terminal_values:
return
def _normalize_terminal_events(terminal_events: Iterable[str | StreamEvent] | None) -> set[str]:
if not terminal_events:
return {StreamEvent.WORKFLOW_FINISHED.value, StreamEvent.WORKFLOW_PAUSED.value}
values: set[str] = set()
for item in terminal_events:
if isinstance(item, StreamEvent):
values.add(item.value)
else:
values.add(str(item))
return values

View File

@@ -25,7 +25,6 @@ from core.app.apps.workflow.generate_response_converter import WorkflowAppGenera
from core.app.apps.workflow.generate_task_pipeline import WorkflowAppGenerateTaskPipeline
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.task_entities import WorkflowAppBlockingResponse, WorkflowAppStreamResponse
from core.app.layers.pause_state_persist_layer import PauseStateLayerConfig, PauseStatePersistenceLayer
from core.db.session_factory import session_factory
from core.helper.trace_id_helper import extract_external_trace_id_from_args
from core.model_runtime.errors.invoke import InvokeAuthorizationError
@@ -35,15 +34,12 @@ from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.runtime import GraphRuntimeState
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
from extensions.ext_database import db
from factories import file_factory
from libs.flask_utils import preserve_flask_contexts
from models.account import Account
from models import Account, App, EndUser, Workflow, WorkflowNodeExecutionTriggeredFrom
from models.enums import WorkflowRunTriggeredFrom
from models.model import App, EndUser
from models.workflow import Workflow, WorkflowNodeExecutionTriggeredFrom
from services.workflow_draft_variable_service import DraftVarLoader, WorkflowDraftVariableService
if TYPE_CHECKING:
@@ -70,11 +66,9 @@ class WorkflowAppGenerator(BaseAppGenerator):
invoke_from: InvokeFrom,
streaming: Literal[True],
call_depth: int,
workflow_run_id: str | uuid.UUID | None = None,
triggered_from: WorkflowRunTriggeredFrom | None = None,
root_node_id: str | None = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
pause_state_config: PauseStateLayerConfig | None = None,
) -> Generator[Mapping[str, Any] | str, None, None]: ...
@overload
@@ -88,11 +82,9 @@ class WorkflowAppGenerator(BaseAppGenerator):
invoke_from: InvokeFrom,
streaming: Literal[False],
call_depth: int,
workflow_run_id: str | uuid.UUID | None = None,
triggered_from: WorkflowRunTriggeredFrom | None = None,
root_node_id: str | None = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
pause_state_config: PauseStateLayerConfig | None = None,
) -> Mapping[str, Any]: ...
@overload
@@ -106,11 +98,9 @@ class WorkflowAppGenerator(BaseAppGenerator):
invoke_from: InvokeFrom,
streaming: bool,
call_depth: int,
workflow_run_id: str | uuid.UUID | None = None,
triggered_from: WorkflowRunTriggeredFrom | None = None,
root_node_id: str | None = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
pause_state_config: PauseStateLayerConfig | None = None,
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]: ...
def generate(
@@ -123,11 +113,9 @@ class WorkflowAppGenerator(BaseAppGenerator):
invoke_from: InvokeFrom,
streaming: bool = True,
call_depth: int = 0,
workflow_run_id: str | uuid.UUID | None = None,
triggered_from: WorkflowRunTriggeredFrom | None = None,
root_node_id: str | None = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
pause_state_config: PauseStateLayerConfig | None = None,
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]:
files: Sequence[Mapping[str, Any]] = args.get("files") or []
@@ -162,7 +150,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
extras = {
**extract_external_trace_id_from_args(args),
}
workflow_run_id = str(workflow_run_id or uuid.uuid4())
workflow_run_id = str(uuid.uuid4())
# FIXME (Yeuoly): we need to remove the SKIP_PREPARE_USER_INPUTS_KEY from the args
# trigger shouldn't prepare user inputs
if self._should_prepare_user_inputs(args):
@@ -228,40 +216,13 @@ class WorkflowAppGenerator(BaseAppGenerator):
streaming=streaming,
root_node_id=root_node_id,
graph_engine_layers=graph_engine_layers,
pause_state_config=pause_state_config,
)
def resume(
self,
*,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
application_generate_entity: WorkflowAppGenerateEntity,
graph_runtime_state: GraphRuntimeState,
workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
pause_state_config: PauseStateLayerConfig | None = None,
variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER,
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
def resume(self, *, workflow_run_id: str) -> None:
"""
Resume a paused workflow execution using the persisted runtime state.
@TBD
"""
return self._generate(
app_model=app_model,
workflow=workflow,
user=user,
application_generate_entity=application_generate_entity,
invoke_from=application_generate_entity.invoke_from,
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
streaming=application_generate_entity.stream,
variable_loader=variable_loader,
graph_engine_layers=graph_engine_layers,
graph_runtime_state=graph_runtime_state,
pause_state_config=pause_state_config,
)
pass
def _generate(
self,
@@ -277,8 +238,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER,
root_node_id: str | None = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
graph_runtime_state: GraphRuntimeState | None = None,
pause_state_config: PauseStateLayerConfig | None = None,
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
"""
Generate App response.
@@ -292,8 +251,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
:param workflow_node_execution_repository: repository for workflow node execution
:param streaming: is stream
"""
graph_layers: list[GraphEngineLayer] = list(graph_engine_layers)
# init queue manager
queue_manager = WorkflowAppQueueManager(
task_id=application_generate_entity.task_id,
@@ -302,15 +259,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
app_mode=app_model.mode,
)
if pause_state_config is not None:
graph_layers.append(
PauseStatePersistenceLayer(
session_factory=pause_state_config.session_factory,
generate_entity=application_generate_entity,
state_owner_user_id=pause_state_config.state_owner_user_id,
)
)
# new thread with request context and contextvars
context = contextvars.copy_context()
@@ -328,8 +276,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
"root_node_id": root_node_id,
"workflow_execution_repository": workflow_execution_repository,
"workflow_node_execution_repository": workflow_node_execution_repository,
"graph_engine_layers": tuple(graph_layers),
"graph_runtime_state": graph_runtime_state,
"graph_engine_layers": graph_engine_layers,
},
)
@@ -431,7 +378,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow_node_execution_repository=workflow_node_execution_repository,
streaming=streaming,
variable_loader=var_loader,
pause_state_config=None,
)
def single_loop_generate(
@@ -513,7 +459,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow_node_execution_repository=workflow_node_execution_repository,
streaming=streaming,
variable_loader=var_loader,
pause_state_config=None,
)
def _generate_worker(
@@ -527,7 +472,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
root_node_id: str | None = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
graph_runtime_state: GraphRuntimeState | None = None,
) -> None:
"""
Generate worker in a new thread.
@@ -573,7 +517,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow_node_execution_repository=workflow_node_execution_repository,
root_node_id=root_node_id,
graph_engine_layers=graph_engine_layers,
graph_runtime_state=graph_runtime_state,
)
try:

View File

@@ -42,7 +42,6 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
graph_runtime_state: GraphRuntimeState | None = None,
):
super().__init__(
queue_manager=queue_manager,
@@ -56,7 +55,6 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
self._root_node_id = root_node_id
self._workflow_execution_repository = workflow_execution_repository
self._workflow_node_execution_repository = workflow_node_execution_repository
self._resume_graph_runtime_state = graph_runtime_state
@trace_span(WorkflowAppRunnerHandler)
def run(self):
@@ -65,28 +63,23 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
"""
app_config = self.application_generate_entity.app_config
app_config = cast(WorkflowAppConfig, app_config)
system_inputs = SystemVariable(
files=self.application_generate_entity.files,
user_id=self._sys_user_id,
app_id=app_config.app_id,
timestamp=int(naive_utc_now().timestamp()),
workflow_id=app_config.workflow_id,
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
)
invoke_from = self.application_generate_entity.invoke_from
# if only single iteration or single loop run is requested
if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
invoke_from = InvokeFrom.DEBUGGER
user_from = self._resolve_user_from(invoke_from)
resume_state = self._resume_graph_runtime_state
if resume_state is not None:
graph_runtime_state = resume_state
variable_pool = graph_runtime_state.variable_pool
graph = self._init_graph(
graph_config=self._workflow.graph_dict,
graph_runtime_state=graph_runtime_state,
workflow_id=self._workflow.id,
tenant_id=self._workflow.tenant_id,
user_id=self.application_generate_entity.user_id,
user_from=user_from,
invoke_from=invoke_from,
root_node_id=self._root_node_id,
)
elif self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
graph, variable_pool, graph_runtime_state = self._prepare_single_node_execution(
workflow=self._workflow,
single_iteration_run=self.application_generate_entity.single_iteration_run,
@@ -96,14 +89,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
inputs = self.application_generate_entity.inputs
# Create a variable pool.
system_inputs = SystemVariable(
files=self.application_generate_entity.files,
user_id=self._sys_user_id,
app_id=app_config.app_id,
timestamp=int(naive_utc_now().timestamp()),
workflow_id=app_config.workflow_id,
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
)
variable_pool = VariablePool(
system_variables=system_inputs,
user_inputs=inputs,
@@ -112,6 +98,8 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
# init graph
graph = self._init_graph(
graph_config=self._workflow.graph_dict,
graph_runtime_state=graph_runtime_state,

View File

@@ -1,7 +0,0 @@
from libs.exception import BaseHTTPException
class WorkflowPausedInBlockingModeError(BaseHTTPException):
error_code = "workflow_paused_in_blocking_mode"
description = "Workflow execution paused for human input; blocking response mode is not supported."
code = 400

View File

@@ -16,8 +16,6 @@ from core.app.entities.queue_entities import (
MessageQueueMessage,
QueueAgentLogEvent,
QueueErrorEvent,
QueueHumanInputFormFilledEvent,
QueueHumanInputFormTimeoutEvent,
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
@@ -34,7 +32,6 @@ from core.app.entities.queue_entities import (
QueueTextChunkEvent,
QueueWorkflowFailedEvent,
QueueWorkflowPartialSuccessEvent,
QueueWorkflowPausedEvent,
QueueWorkflowStartedEvent,
QueueWorkflowSucceededEvent,
WorkflowQueueMessage,
@@ -49,13 +46,11 @@ from core.app.entities.task_entities import (
WorkflowAppBlockingResponse,
WorkflowAppStreamResponse,
WorkflowFinishStreamResponse,
WorkflowPauseStreamResponse,
WorkflowStartStreamResponse,
)
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.ops.ops_trace_manager import TraceQueueManager
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.enums import WorkflowExecutionStatus
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
from core.workflow.runtime import GraphRuntimeState
@@ -137,25 +132,6 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
for stream_response in generator:
if isinstance(stream_response, ErrorStreamResponse):
raise stream_response.err
elif isinstance(stream_response, WorkflowPauseStreamResponse):
response = WorkflowAppBlockingResponse(
task_id=self._application_generate_entity.task_id,
workflow_run_id=stream_response.data.workflow_run_id,
data=WorkflowAppBlockingResponse.Data(
id=stream_response.data.workflow_run_id,
workflow_id=self._workflow.id,
status=stream_response.data.status,
outputs=stream_response.data.outputs or {},
error=None,
elapsed_time=stream_response.data.elapsed_time,
total_tokens=stream_response.data.total_tokens,
total_steps=stream_response.data.total_steps,
created_at=stream_response.data.created_at,
finished_at=None,
),
)
return response
elif isinstance(stream_response, WorkflowFinishStreamResponse):
response = WorkflowAppBlockingResponse(
task_id=self._application_generate_entity.task_id,
@@ -170,7 +146,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
total_tokens=stream_response.data.total_tokens,
total_steps=stream_response.data.total_steps,
created_at=int(stream_response.data.created_at),
finished_at=int(stream_response.data.finished_at) if stream_response.data.finished_at else None,
finished_at=int(stream_response.data.finished_at),
),
)
@@ -283,15 +259,13 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
run_id = self._extract_workflow_run_id(runtime_state)
self._workflow_execution_id = run_id
if event.reason == WorkflowStartReason.INITIAL:
with self._database_session() as session:
self._save_workflow_app_log(session=session, workflow_run_id=self._workflow_execution_id)
with self._database_session() as session:
self._save_workflow_app_log(session=session, workflow_run_id=self._workflow_execution_id)
start_resp = self._workflow_response_converter.workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id,
workflow_run_id=run_id,
workflow_id=self._workflow.id,
reason=event.reason,
)
yield start_resp
@@ -466,21 +440,6 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
)
yield workflow_finish_resp
def _handle_workflow_paused_event(
self,
event: QueueWorkflowPausedEvent,
**kwargs,
) -> Generator[StreamResponse, None, None]:
"""Handle workflow paused events."""
self._ensure_workflow_initialized()
validated_state = self._ensure_graph_runtime_initialized()
responses = self._workflow_response_converter.workflow_pause_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
graph_runtime_state=validated_state,
)
yield from responses
def _handle_workflow_failed_and_stop_events(
self,
event: Union[QueueWorkflowFailedEvent, QueueStopEvent],
@@ -536,22 +495,6 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
task_id=self._application_generate_entity.task_id, event=event
)
def _handle_human_input_form_filled_event(
self, event: QueueHumanInputFormFilledEvent, **kwargs
) -> Generator[StreamResponse, None, None]:
"""Handle human input form filled events."""
yield self._workflow_response_converter.human_input_form_filled_to_stream_response(
event=event, task_id=self._application_generate_entity.task_id
)
def _handle_human_input_form_timeout_event(
self, event: QueueHumanInputFormTimeoutEvent, **kwargs
) -> Generator[StreamResponse, None, None]:
"""Handle human input form timeout events."""
yield self._workflow_response_converter.human_input_form_timeout_to_stream_response(
event=event, task_id=self._application_generate_entity.task_id
)
def _get_event_handlers(self) -> dict[type, Callable]:
"""Get mapping of event types to their handlers using fluent pattern."""
return {
@@ -563,7 +506,6 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
QueueWorkflowStartedEvent: self._handle_workflow_started_event,
QueueWorkflowSucceededEvent: self._handle_workflow_succeeded_event,
QueueWorkflowPartialSuccessEvent: self._handle_workflow_partial_success_event,
QueueWorkflowPausedEvent: self._handle_workflow_paused_event,
# Node events
QueueNodeRetryEvent: self._handle_node_retry_event,
QueueNodeStartedEvent: self._handle_node_started_event,
@@ -578,8 +520,6 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
QueueLoopCompletedEvent: self._handle_loop_completed_event,
# Agent events
QueueAgentLogEvent: self._handle_agent_log_event,
QueueHumanInputFormFilledEvent: self._handle_human_input_form_filled_event,
QueueHumanInputFormTimeoutEvent: self._handle_human_input_form_timeout_event,
}
def _dispatch_event(
@@ -662,9 +602,6 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
case QueueWorkflowFailedEvent():
yield from self._handle_workflow_failed_and_stop_events(event)
break
case QueueWorkflowPausedEvent():
yield from self._handle_workflow_paused_event(event)
break
case QueueStopEvent():
yield from self._handle_workflow_failed_and_stop_events(event)

View File

@@ -1,4 +1,3 @@
import logging
import time
from collections.abc import Mapping, Sequence
from typing import Any, cast
@@ -8,8 +7,6 @@ from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import (
AppQueueEvent,
QueueAgentLogEvent,
QueueHumanInputFormFilledEvent,
QueueHumanInputFormTimeoutEvent,
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
@@ -25,27 +22,22 @@ from core.app.entities.queue_entities import (
QueueTextChunkEvent,
QueueWorkflowFailedEvent,
QueueWorkflowPartialSuccessEvent,
QueueWorkflowPausedEvent,
QueueWorkflowStartedEvent,
QueueWorkflowSucceededEvent,
)
from core.app.workflow.node_factory import DifyNodeFactory
from core.workflow.entities import GraphInitParams
from core.workflow.entities.pause_reason import HumanInputRequired
from core.workflow.graph import Graph
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_events import (
GraphEngineEvent,
GraphRunFailedEvent,
GraphRunPartialSucceededEvent,
GraphRunPausedEvent,
GraphRunStartedEvent,
GraphRunSucceededEvent,
NodeRunAgentLogEvent,
NodeRunExceptionEvent,
NodeRunFailedEvent,
NodeRunHumanInputFormFilledEvent,
NodeRunHumanInputFormTimeoutEvent,
NodeRunIterationFailedEvent,
NodeRunIterationNextEvent,
NodeRunIterationStartedEvent,
@@ -69,9 +61,6 @@ from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader,
from core.workflow.workflow_entry import WorkflowEntry
from models.enums import UserFrom
from models.workflow import Workflow
from tasks.mail_human_input_delivery_task import dispatch_human_input_email_task
logger = logging.getLogger(__name__)
class WorkflowBasedAppRunner:
@@ -338,7 +327,7 @@ class WorkflowBasedAppRunner:
:param event: event
"""
if isinstance(event, GraphRunStartedEvent):
self._publish_event(QueueWorkflowStartedEvent(reason=event.reason))
self._publish_event(QueueWorkflowStartedEvent())
elif isinstance(event, GraphRunSucceededEvent):
self._publish_event(QueueWorkflowSucceededEvent(outputs=event.outputs))
elif isinstance(event, GraphRunPartialSucceededEvent):
@@ -349,38 +338,6 @@ class WorkflowBasedAppRunner:
self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count))
elif isinstance(event, GraphRunAbortedEvent):
self._publish_event(QueueWorkflowFailedEvent(error=event.reason or "Unknown error", exceptions_count=0))
elif isinstance(event, GraphRunPausedEvent):
runtime_state = workflow_entry.graph_engine.graph_runtime_state
paused_nodes = runtime_state.get_paused_nodes()
self._enqueue_human_input_notifications(event.reasons)
self._publish_event(
QueueWorkflowPausedEvent(
reasons=event.reasons,
outputs=event.outputs,
paused_nodes=paused_nodes,
)
)
elif isinstance(event, NodeRunHumanInputFormFilledEvent):
self._publish_event(
QueueHumanInputFormFilledEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
rendered_content=event.rendered_content,
action_id=event.action_id,
action_text=event.action_text,
)
)
elif isinstance(event, NodeRunHumanInputFormTimeoutEvent):
self._publish_event(
QueueHumanInputFormTimeoutEvent(
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
expiration_time=event.expiration_time,
)
)
elif isinstance(event, NodeRunRetryEvent):
node_run_result = event.node_run_result
inputs = node_run_result.inputs
@@ -587,19 +544,5 @@ class WorkflowBasedAppRunner:
)
)
def _enqueue_human_input_notifications(self, reasons: Sequence[object]) -> None:
for reason in reasons:
if not isinstance(reason, HumanInputRequired):
continue
if not reason.form_id:
continue
try:
dispatch_human_input_email_task.apply_async(
kwargs={"form_id": reason.form_id, "node_title": reason.node_title},
queue="mail",
)
except Exception: # pragma: no cover - defensive logging
logger.exception("Failed to enqueue human input email task for form %s", reason.form_id)
def _publish_event(self, event: AppQueueEvent):
self._queue_manager.publish(event, PublishFrom.APPLICATION_MANAGER)

View File

@@ -132,7 +132,7 @@ class AppGenerateEntity(BaseModel):
extras: dict[str, Any] = Field(default_factory=dict)
# tracing instance
trace_manager: Optional["TraceQueueManager"] = Field(default=None, exclude=True, repr=False)
trace_manager: Optional["TraceQueueManager"] = None
class EasyUIBasedAppGenerateEntity(AppGenerateEntity):
@@ -156,7 +156,6 @@ class ConversationAppGenerateEntity(AppGenerateEntity):
"""
conversation_id: str | None = None
is_new_conversation: bool = False
parent_message_id: str | None = Field(
default=None,
description=(

View File

@@ -8,8 +8,6 @@ from pydantic import BaseModel, ConfigDict, Field
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.workflow.entities import AgentNodeStrategyInit
from core.workflow.entities.pause_reason import PauseReason
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.enums import WorkflowNodeExecutionMetadataKey
from core.workflow.nodes import NodeType
@@ -48,9 +46,6 @@ class QueueEvent(StrEnum):
PING = "ping"
STOP = "stop"
RETRY = "retry"
PAUSE = "pause"
HUMAN_INPUT_FORM_FILLED = "human_input_form_filled"
HUMAN_INPUT_FORM_TIMEOUT = "human_input_form_timeout"
class AppQueueEvent(BaseModel):
@@ -266,8 +261,6 @@ class QueueWorkflowStartedEvent(AppQueueEvent):
"""QueueWorkflowStartedEvent entity."""
event: QueueEvent = QueueEvent.WORKFLOW_STARTED
# Always present; mirrors GraphRunStartedEvent.reason for downstream consumers.
reason: WorkflowStartReason = WorkflowStartReason.INITIAL
class QueueWorkflowSucceededEvent(AppQueueEvent):
@@ -491,35 +484,6 @@ class QueueStopEvent(AppQueueEvent):
return reason_mapping.get(self.stopped_by, "Stopped by unknown reason.")
class QueueHumanInputFormFilledEvent(AppQueueEvent):
"""
QueueHumanInputFormFilledEvent entity
"""
event: QueueEvent = QueueEvent.HUMAN_INPUT_FORM_FILLED
node_execution_id: str
node_id: str
node_type: NodeType
node_title: str
rendered_content: str
action_id: str
action_text: str
class QueueHumanInputFormTimeoutEvent(AppQueueEvent):
"""
QueueHumanInputFormTimeoutEvent entity
"""
event: QueueEvent = QueueEvent.HUMAN_INPUT_FORM_TIMEOUT
node_id: str
node_type: NodeType
node_title: str
expiration_time: datetime
class QueueMessage(BaseModel):
"""
QueueMessage abstract entity
@@ -545,14 +509,3 @@ class WorkflowQueueMessage(QueueMessage):
"""
pass
class QueueWorkflowPausedEvent(AppQueueEvent):
"""
QueueWorkflowPausedEvent entity
"""
event: QueueEvent = QueueEvent.PAUSE
reasons: Sequence[PauseReason] = Field(default_factory=list)
outputs: Mapping[str, object] = Field(default_factory=dict)
paused_nodes: Sequence[str] = Field(default_factory=list)

View File

@@ -7,9 +7,7 @@ from pydantic import BaseModel, ConfigDict, Field
from core.model_runtime.entities.llm_entities import LLMResult, LLMUsage
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.workflow.entities import AgentNodeStrategyInit
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.enums import WorkflowExecutionStatus, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.nodes.human_input.entities import FormInput, UserAction
class AnnotationReplyAccount(BaseModel):
@@ -71,7 +69,6 @@ class StreamEvent(StrEnum):
AGENT_THOUGHT = "agent_thought"
AGENT_MESSAGE = "agent_message"
WORKFLOW_STARTED = "workflow_started"
WORKFLOW_PAUSED = "workflow_paused"
WORKFLOW_FINISHED = "workflow_finished"
NODE_STARTED = "node_started"
NODE_FINISHED = "node_finished"
@@ -85,9 +82,6 @@ class StreamEvent(StrEnum):
TEXT_CHUNK = "text_chunk"
TEXT_REPLACE = "text_replace"
AGENT_LOG = "agent_log"
HUMAN_INPUT_REQUIRED = "human_input_required"
HUMAN_INPUT_FORM_FILLED = "human_input_form_filled"
HUMAN_INPUT_FORM_TIMEOUT = "human_input_form_timeout"
class StreamResponse(BaseModel):
@@ -211,8 +205,6 @@ class WorkflowStartStreamResponse(StreamResponse):
workflow_id: str
inputs: Mapping[str, Any]
created_at: int
# Always present; mirrors QueueWorkflowStartedEvent.reason for SSE clients.
reason: WorkflowStartReason = WorkflowStartReason.INITIAL
event: StreamEvent = StreamEvent.WORKFLOW_STARTED
workflow_run_id: str
@@ -239,7 +231,7 @@ class WorkflowFinishStreamResponse(StreamResponse):
total_steps: int
created_by: Mapping[str, object] = Field(default_factory=dict)
created_at: int
finished_at: int | None
finished_at: int
exceptions_count: int | None = 0
files: Sequence[Mapping[str, Any]] | None = []
@@ -248,85 +240,6 @@ class WorkflowFinishStreamResponse(StreamResponse):
data: Data
class WorkflowPauseStreamResponse(StreamResponse):
"""
WorkflowPauseStreamResponse entity
"""
class Data(BaseModel):
"""
Data entity
"""
workflow_run_id: str
paused_nodes: Sequence[str] = Field(default_factory=list)
outputs: Mapping[str, Any] = Field(default_factory=dict)
reasons: Sequence[Mapping[str, Any]] = Field(default_factory=list)
status: WorkflowExecutionStatus
created_at: int
elapsed_time: float
total_tokens: int
total_steps: int
event: StreamEvent = StreamEvent.WORKFLOW_PAUSED
workflow_run_id: str
data: Data
class HumanInputRequiredResponse(StreamResponse):
class Data(BaseModel):
"""
Data entity
"""
form_id: str
node_id: str
node_title: str
form_content: str
inputs: Sequence[FormInput] = Field(default_factory=list)
actions: Sequence[UserAction] = Field(default_factory=list)
display_in_ui: bool = False
form_token: str | None = None
resolved_default_values: Mapping[str, Any] = Field(default_factory=dict)
expiration_time: int = Field(..., description="Unix timestamp in seconds")
event: StreamEvent = StreamEvent.HUMAN_INPUT_REQUIRED
workflow_run_id: str
data: Data
class HumanInputFormFilledResponse(StreamResponse):
class Data(BaseModel):
"""
Data entity
"""
node_id: str
node_title: str
rendered_content: str
action_id: str
action_text: str
event: StreamEvent = StreamEvent.HUMAN_INPUT_FORM_FILLED
workflow_run_id: str
data: Data
class HumanInputFormTimeoutResponse(StreamResponse):
class Data(BaseModel):
"""
Data entity
"""
node_id: str
node_title: str
expiration_time: int
event: StreamEvent = StreamEvent.HUMAN_INPUT_FORM_TIMEOUT
workflow_run_id: str
data: Data
class NodeStartStreamResponse(StreamResponse):
"""
NodeStartStreamResponse entity
@@ -813,7 +726,7 @@ class WorkflowAppBlockingResponse(AppBlockingResponse):
total_tokens: int
total_steps: int
created_at: int
finished_at: int | None
finished_at: int
workflow_run_id: str
data: Data

View File

@@ -1,4 +1,3 @@
import contextlib
import logging
import time
import uuid
@@ -104,14 +103,6 @@ class RateLimit:
)
@contextlib.contextmanager
def rate_limit_context(rate_limit: RateLimit, request_id: str | None):
request_id = rate_limit.enter(request_id)
yield
if request_id is not None:
rate_limit.exit(request_id)
class RateLimitGenerator:
def __init__(self, rate_limit: RateLimit, generator: Generator[str, None, None], request_id: str):
self.rate_limit = rate_limit

View File

@@ -1,4 +1,3 @@
from dataclasses import dataclass
from typing import Annotated, Literal, Self, TypeAlias
from pydantic import BaseModel, Field
@@ -53,14 +52,6 @@ class WorkflowResumptionContext(BaseModel):
return self.generate_entity.entity
@dataclass(frozen=True)
class PauseStateLayerConfig:
"""Configuration container for instantiating pause persistence layers."""
session_factory: Engine | sessionmaker[Session]
state_owner_user_id: str
class PauseStatePersistenceLayer(GraphEngineLayer):
def __init__(
self,

View File

@@ -82,11 +82,10 @@ class MessageCycleManager:
if isinstance(self._application_generate_entity, CompletionAppGenerateEntity):
return None
is_first_message = self._application_generate_entity.is_new_conversation
is_first_message = self._application_generate_entity.conversation_id is None
extras = self._application_generate_entity.extras
auto_generate_conversation_name = extras.get("auto_generate_conversation_name", True)
thread: Thread | None = None
if auto_generate_conversation_name and is_first_message:
# start generate thread
# time.sleep not block other logic
@@ -102,10 +101,9 @@ class MessageCycleManager:
thread.daemon = True
thread.start()
if is_first_message:
self._application_generate_entity.is_new_conversation = False
return thread
return thread
return None
def _generate_conversation_name_worker(self, flask_app: Flask, conversation_id: str, query: str):
with flask_app.app_context():

View File

@@ -8,7 +8,6 @@ from core.file.file_manager import file_manager
from core.helper.code_executor.code_executor import CodeExecutor
from core.helper.code_executor.code_node_provider import CodeNodeProvider
from core.helper.ssrf_proxy import ssrf_proxy
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
from core.tools.tool_file_manager import ToolFileManager
from core.workflow.entities.graph_config import NodeConfigDict
from core.workflow.enums import NodeType
@@ -17,7 +16,6 @@ from core.workflow.nodes.base.node import Node
from core.workflow.nodes.code.code_node import CodeNode
from core.workflow.nodes.code.limits import CodeNodeLimits
from core.workflow.nodes.http_request.node import HttpRequestNode
from core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node import KnowledgeRetrievalNode
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from core.workflow.nodes.protocols import FileManagerProtocol, HttpClientProtocol
from core.workflow.nodes.template_transform.template_renderer import (
@@ -49,7 +47,6 @@ class DifyNodeFactory(NodeFactory):
code_providers: Sequence[type[CodeNodeProvider]] | None = None,
code_limits: CodeNodeLimits | None = None,
template_renderer: Jinja2TemplateRenderer | None = None,
template_transform_max_output_length: int | None = None,
http_request_http_client: HttpClientProtocol | None = None,
http_request_tool_file_manager_factory: Callable[[], ToolFileManager] = ToolFileManager,
http_request_file_manager: FileManagerProtocol | None = None,
@@ -71,13 +68,9 @@ class DifyNodeFactory(NodeFactory):
max_object_array_length=dify_config.CODE_MAX_OBJECT_ARRAY_LENGTH,
)
self._template_renderer = template_renderer or CodeExecutorJinja2TemplateRenderer()
self._template_transform_max_output_length = (
template_transform_max_output_length or dify_config.TEMPLATE_TRANSFORM_MAX_LENGTH
)
self._http_request_http_client = http_request_http_client or ssrf_proxy
self._http_request_tool_file_manager_factory = http_request_tool_file_manager_factory
self._http_request_file_manager = http_request_file_manager or file_manager
self._rag_retrieval = DatasetRetrieval()
@override
def create_node(self, node_config: NodeConfigDict) -> Node:
@@ -129,7 +122,6 @@ class DifyNodeFactory(NodeFactory):
graph_init_params=self.graph_init_params,
graph_runtime_state=self.graph_runtime_state,
template_renderer=self._template_renderer,
max_output_length=self._template_transform_max_output_length,
)
if node_type == NodeType.HTTP_REQUEST:
@@ -143,15 +135,6 @@ class DifyNodeFactory(NodeFactory):
file_manager=self._http_request_file_manager,
)
if node_type == NodeType.KNOWLEDGE_RETRIEVAL:
return KnowledgeRetrievalNode(
id=node_id,
config=node_config,
graph_init_params=self.graph_init_params,
graph_runtime_state=self.graph_runtime_state,
rag_retrieval=self._rag_retrieval,
)
return node_class(
id=node_id,
config=node_config,

View File

@@ -1,54 +0,0 @@
from __future__ import annotations
from collections.abc import Mapping, Sequence
from typing import Any, TypeAlias
from pydantic import BaseModel, ConfigDict, Field
from core.workflow.nodes.human_input.entities import FormInput, UserAction
from models.execution_extra_content import ExecutionContentType
class HumanInputFormDefinition(BaseModel):
model_config = ConfigDict(frozen=True)
form_id: str
node_id: str
node_title: str
form_content: str
inputs: Sequence[FormInput] = Field(default_factory=list)
actions: Sequence[UserAction] = Field(default_factory=list)
display_in_ui: bool = False
form_token: str | None = None
resolved_default_values: Mapping[str, Any] = Field(default_factory=dict)
expiration_time: int
class HumanInputFormSubmissionData(BaseModel):
model_config = ConfigDict(frozen=True)
node_id: str
node_title: str
rendered_content: str
action_id: str
action_text: str
class HumanInputContent(BaseModel):
model_config = ConfigDict(frozen=True)
workflow_run_id: str
submitted: bool
form_definition: HumanInputFormDefinition | None = None
form_submission_data: HumanInputFormSubmissionData | None = None
type: ExecutionContentType = Field(default=ExecutionContentType.HUMAN_INPUT)
ExecutionExtraContentDomainModel: TypeAlias = HumanInputContent
__all__ = [
"ExecutionExtraContentDomainModel",
"HumanInputContent",
"HumanInputFormDefinition",
"HumanInputFormSubmissionData",
]

View File

@@ -28,8 +28,8 @@ from core.model_runtime.entities.provider_entities import (
)
from core.model_runtime.model_providers.__base.ai_model import AIModel
from core.model_runtime.model_providers.model_provider_factory import ModelProviderFactory
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.engine import db
from models.provider import (
LoadBalancingModelConfig,
Provider,

View File

@@ -6,8 +6,7 @@ from yarl import URL
from configs import dify_config
from core.helper.download import download_with_size_limit
from core.plugin.entities.marketplace import MarketplacePluginDeclaration, MarketplacePluginSnapshot
from extensions.ext_redis import redis_client
from core.plugin.entities.marketplace import MarketplacePluginDeclaration
marketplace_api_url = URL(str(dify_config.MARKETPLACE_API_URL))
logger = logging.getLogger(__name__)
@@ -44,37 +43,28 @@ def batch_fetch_plugin_by_ids(plugin_ids: list[str]) -> list[dict]:
return data.get("data", {}).get("plugins", [])
def batch_fetch_plugin_manifests_ignore_deserialization_error(
plugin_ids: list[str],
) -> Sequence[MarketplacePluginDeclaration]:
if len(plugin_ids) == 0:
return []
url = str(marketplace_api_url / "api/v1/plugins/batch")
response = httpx.post(url, json={"plugin_ids": plugin_ids}, headers={"X-Dify-Version": dify_config.project.version})
response.raise_for_status()
result: list[MarketplacePluginDeclaration] = []
for plugin in response.json()["data"]["plugins"]:
try:
result.append(MarketplacePluginDeclaration.model_validate(plugin))
except Exception:
logger.exception(
"Failed to deserialize marketplace plugin manifest for %s", plugin.get("plugin_id", "unknown")
)
return result
def record_install_plugin_event(plugin_unique_identifier: str):
url = str(marketplace_api_url / "api/v1/stats/plugins/install_count")
response = httpx.post(url, json={"unique_identifier": plugin_unique_identifier})
response.raise_for_status()
def fetch_global_plugin_manifest(cache_key_prefix: str, cache_ttl: int) -> None:
"""
Fetch all plugin manifests from marketplace and cache them in Redis.
This should be called once per check cycle to populate the instance-level cache.
Args:
cache_key_prefix: Redis key prefix for caching plugin manifests
cache_ttl: Cache TTL in seconds
Raises:
httpx.HTTPError: If the HTTP request fails
Exception: If any other error occurs during fetching or caching
"""
url = str(marketplace_api_url / "api/v1/dist/plugins/manifest.json")
response = httpx.get(url, headers={"X-Dify-Version": dify_config.project.version}, timeout=30)
response.raise_for_status()
raw_json = response.json()
plugins_data = raw_json.get("plugins", [])
# Parse and cache all plugin snapshots
for plugin_data in plugins_data:
plugin_snapshot = MarketplacePluginSnapshot.model_validate(plugin_data)
redis_client.setex(
name=f"{cache_key_prefix}{plugin_snapshot.plugin_id}",
time=cache_ttl,
value=plugin_snapshot.model_dump_json(),
)

View File

@@ -15,7 +15,10 @@ from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
from core.helper.encrypter import batch_decrypt_token, encrypt_token, obfuscated_token
from core.ops.entities.config_entity import OPS_FILE_PATH, TracingProviderEnum
from core.ops.entities.config_entity import (
OPS_FILE_PATH,
TracingProviderEnum,
)
from core.ops.entities.trace_entity import (
DatasetRetrievalTraceInfo,
GenerateNameTraceInfo,
@@ -28,8 +31,8 @@ from core.ops.entities.trace_entity import (
WorkflowTraceInfo,
)
from core.ops.utils import get_message_data
from extensions.ext_database import db
from extensions.ext_storage import storage
from models.engine import db
from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
from models.workflow import WorkflowAppLog
from tasks.ops_trace_task import process_trace_tasks
@@ -466,8 +469,6 @@ class TraceTask:
@classmethod
def _get_workflow_run_repo(cls):
from repositories.factory import DifyAPIRepositoryFactory
if cls._workflow_run_repo is None:
with cls._repo_lock:
if cls._workflow_run_repo is None:

View File

@@ -5,7 +5,7 @@ from urllib.parse import urlparse
from sqlalchemy import select
from models.engine import db
from extensions.ext_database import db
from models.model import Message

View File

@@ -1,4 +1,3 @@
import uuid
from collections.abc import Generator, Mapping
from typing import Union
@@ -12,7 +11,6 @@ from core.app.apps.chat.app_generator import ChatAppGenerator
from core.app.apps.completion.app_generator import CompletionAppGenerator
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.layers.pause_state_persist_layer import PauseStateLayerConfig
from core.plugin.backwards_invocation.base import BaseBackwardsInvocation
from extensions.ext_database import db
from models import Account
@@ -103,11 +101,6 @@ class PluginAppBackwardsInvocation(BaseBackwardsInvocation):
if not workflow:
raise ValueError("unexpected app type")
pause_config = PauseStateLayerConfig(
session_factory=db.engine,
state_owner_user_id=workflow.created_by,
)
return AdvancedChatAppGenerator().generate(
app_model=app,
workflow=workflow,
@@ -119,9 +112,7 @@ class PluginAppBackwardsInvocation(BaseBackwardsInvocation):
"conversation_id": conversation_id,
},
invoke_from=InvokeFrom.SERVICE_API,
workflow_run_id=str(uuid.uuid4()),
streaming=stream,
pause_state_config=pause_config,
)
elif app.mode == AppMode.AGENT_CHAT:
return AgentChatAppGenerator().generate(
@@ -168,11 +159,6 @@ class PluginAppBackwardsInvocation(BaseBackwardsInvocation):
if not workflow:
raise ValueError("unexpected app type")
pause_config = PauseStateLayerConfig(
session_factory=db.engine,
state_owner_user_id=workflow.created_by,
)
return WorkflowAppGenerator().generate(
app_model=app,
workflow=workflow,
@@ -181,7 +167,6 @@ class PluginAppBackwardsInvocation(BaseBackwardsInvocation):
invoke_from=InvokeFrom.SERVICE_API,
streaming=stream,
call_depth=1,
pause_state_config=pause_config,
)
@classmethod

View File

@@ -1,4 +1,4 @@
from pydantic import BaseModel, Field, computed_field, model_validator
from pydantic import BaseModel, Field, model_validator
from core.model_runtime.entities.provider_entities import ProviderEntity
from core.plugin.entities.endpoint import EndpointProviderDeclaration
@@ -48,15 +48,3 @@ class MarketplacePluginDeclaration(BaseModel):
if "tool" in data and not data["tool"]:
del data["tool"]
return data
class MarketplacePluginSnapshot(BaseModel):
org: str
name: str
latest_version: str
latest_package_identifier: str
latest_package_url: str
@computed_field
def plugin_id(self) -> str:
return f"{self.org}/{self.name}"

View File

@@ -1,15 +1,13 @@
import json
import logging
import math
import re
import threading
import time
from collections import Counter, defaultdict
from collections.abc import Generator, Mapping
from typing import Any, Union, cast
from flask import Flask, current_app
from sqlalchemy import and_, func, literal, or_, select
from sqlalchemy import and_, literal, or_, select
from sqlalchemy.orm import Session
from core.app.app_config.entities import (
@@ -20,7 +18,6 @@ from core.app.app_config.entities import (
)
from core.app.entities.app_invoke_entities import InvokeFrom, ModelConfigWithCredentialsEntity
from core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandler
from core.db.session_factory import session_factory
from core.entities.agent_entities import PlanningStrategy
from core.entities.model_entities import ModelStatus
from core.file import File, FileTransferMethod, FileType
@@ -61,30 +58,12 @@ from core.rag.retrieval.template_prompts import (
)
from core.tools.signature import sign_upload_file
from core.tools.utils.dataset_retriever.dataset_retriever_base_tool import DatasetRetrieverBaseTool
from core.workflow.nodes.knowledge_retrieval import exc
from core.workflow.repositories.rag_retrieval_protocol import (
KnowledgeRetrievalRequest,
Source,
SourceChildChunk,
SourceMetadata,
)
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.json_in_md_parser import parse_and_check_json_markdown
from models import UploadFile
from models.dataset import (
ChildChunk,
Dataset,
DatasetMetadata,
DatasetQuery,
DocumentSegment,
RateLimitLog,
SegmentAttachmentBinding,
)
from models.dataset import ChildChunk, Dataset, DatasetMetadata, DatasetQuery, DocumentSegment, SegmentAttachmentBinding
from models.dataset import Document as DatasetDocument
from models.dataset import Document as DocumentModel
from services.external_knowledge_service import ExternalDatasetService
from services.feature_service import FeatureService
default_retrieval_model: dict[str, Any] = {
"search_method": RetrievalMethod.SEMANTIC_SEARCH,
@@ -94,8 +73,6 @@ default_retrieval_model: dict[str, Any] = {
"score_threshold_enabled": False,
}
logger = logging.getLogger(__name__)
class DatasetRetrieval:
def __init__(self, application_generate_entity=None):
@@ -114,233 +91,6 @@ class DatasetRetrieval:
else:
self._llm_usage = self._llm_usage.plus(usage)
def knowledge_retrieval(self, request: KnowledgeRetrievalRequest) -> list[Source]:
self._check_knowledge_rate_limit(request.tenant_id)
available_datasets = self._get_available_datasets(request.tenant_id, request.dataset_ids)
available_datasets_ids = [i.id for i in available_datasets]
if not available_datasets_ids:
return []
if not request.query:
return []
metadata_filter_document_ids, metadata_condition = None, None
if request.metadata_filtering_mode != "disabled":
# Convert workflow layer types to app_config layer types
if not request.metadata_model_config:
raise ValueError("metadata_model_config is required for this method")
app_metadata_model_config = ModelConfig.model_validate(request.metadata_model_config.model_dump())
app_metadata_filtering_conditions = None
if request.metadata_filtering_conditions is not None:
app_metadata_filtering_conditions = MetadataFilteringCondition.model_validate(
request.metadata_filtering_conditions.model_dump()
)
query = request.query if request.query is not None else ""
metadata_filter_document_ids, metadata_condition = self.get_metadata_filter_condition(
dataset_ids=available_datasets_ids,
query=query,
tenant_id=request.tenant_id,
user_id=request.user_id,
metadata_filtering_mode=request.metadata_filtering_mode,
metadata_model_config=app_metadata_model_config,
metadata_filtering_conditions=app_metadata_filtering_conditions,
inputs={},
)
if request.retrieval_mode == DatasetRetrieveConfigEntity.RetrieveStrategy.SINGLE:
planning_strategy = PlanningStrategy.REACT_ROUTER
# Ensure required fields are not None for single retrieval mode
if request.model_provider is None or request.model_name is None or request.query is None:
raise ValueError("model_provider, model_name, and query are required for single retrieval mode")
model_manager = ModelManager()
model_instance = model_manager.get_model_instance(
tenant_id=request.tenant_id,
model_type=ModelType.LLM,
provider=request.model_provider,
model=request.model_name,
)
provider_model_bundle = model_instance.provider_model_bundle
model_type_instance = model_instance.model_type_instance
model_type_instance = cast(LargeLanguageModel, model_type_instance)
model_credentials = model_instance.credentials
# check model
provider_model = provider_model_bundle.configuration.get_provider_model(
model=request.model_name, model_type=ModelType.LLM
)
if provider_model is None:
raise exc.ModelNotExistError(f"Model {request.model_name} not exist.")
if provider_model.status == ModelStatus.NO_CONFIGURE:
raise exc.ModelCredentialsNotInitializedError(
f"Model {request.model_name} credentials is not initialized."
)
elif provider_model.status == ModelStatus.NO_PERMISSION:
raise exc.ModelNotSupportedError(f"Dify Hosted OpenAI {request.model_name} currently not support.")
elif provider_model.status == ModelStatus.QUOTA_EXCEEDED:
raise exc.ModelQuotaExceededError(f"Model provider {request.model_provider} quota exceeded.")
stop = []
completion_params = (request.completion_params or {}).copy()
if "stop" in completion_params:
stop = completion_params["stop"]
del completion_params["stop"]
model_schema = model_type_instance.get_model_schema(request.model_name, model_credentials)
if not model_schema:
raise exc.ModelNotExistError(f"Model {request.model_name} not exist.")
model_config = ModelConfigWithCredentialsEntity(
provider=request.model_provider,
model=request.model_name,
model_schema=model_schema,
mode=request.model_mode or "chat",
provider_model_bundle=provider_model_bundle,
credentials=model_credentials,
parameters=completion_params,
stop=stop,
)
all_documents = self.single_retrieve(
request.app_id,
request.tenant_id,
request.user_id,
request.user_from,
request.query,
available_datasets,
model_instance,
model_config,
planning_strategy,
None, # message_id
metadata_filter_document_ids,
metadata_condition,
)
else:
all_documents = self.multiple_retrieve(
app_id=request.app_id,
tenant_id=request.tenant_id,
user_id=request.user_id,
user_from=request.user_from,
available_datasets=available_datasets,
query=request.query,
top_k=request.top_k,
score_threshold=request.score_threshold,
reranking_mode=request.reranking_mode,
reranking_model=request.reranking_model,
weights=request.weights,
reranking_enable=request.reranking_enable,
metadata_filter_document_ids=metadata_filter_document_ids,
metadata_condition=metadata_condition,
attachment_ids=request.attachment_ids,
)
dify_documents = [item for item in all_documents if item.provider == "dify"]
external_documents = [item for item in all_documents if item.provider == "external"]
retrieval_resource_list = []
# deal with external documents
for item in external_documents:
source = Source(
metadata=SourceMetadata(
source="knowledge",
dataset_id=item.metadata.get("dataset_id"),
dataset_name=item.metadata.get("dataset_name"),
document_id=item.metadata.get("document_id"),
document_name=item.metadata.get("title"),
data_source_type="external",
retriever_from="workflow",
score=item.metadata.get("score"),
doc_metadata=item.metadata,
),
title=item.metadata.get("title"),
content=item.page_content,
)
retrieval_resource_list.append(source)
# deal with dify documents
if dify_documents:
records = RetrievalService.format_retrieval_documents(dify_documents)
dataset_ids = [i.segment.dataset_id for i in records]
document_ids = [i.segment.document_id for i in records]
with session_factory.create_session() as session:
datasets = session.query(Dataset).where(Dataset.id.in_(dataset_ids)).all()
documents = session.query(DatasetDocument).where(DatasetDocument.id.in_(document_ids)).all()
dataset_map = {i.id: i for i in datasets}
document_map = {i.id: i for i in documents}
if records:
for record in records:
segment = record.segment
dataset = dataset_map.get(segment.dataset_id)
document = document_map.get(segment.document_id)
if dataset and document:
source = Source(
metadata=SourceMetadata(
source="knowledge",
dataset_id=dataset.id,
dataset_name=dataset.name,
document_id=document.id,
document_name=document.name,
data_source_type=document.data_source_type,
segment_id=segment.id,
retriever_from="workflow",
score=record.score or 0.0,
segment_hit_count=segment.hit_count,
segment_word_count=segment.word_count,
segment_position=segment.position,
segment_index_node_hash=segment.index_node_hash,
doc_metadata=document.doc_metadata,
child_chunks=[
SourceChildChunk(
id=str(getattr(chunk, "id", "")),
content=str(getattr(chunk, "content", "")),
position=int(getattr(chunk, "position", 0)),
score=float(getattr(chunk, "score", 0.0)),
)
for chunk in (record.child_chunks or [])
],
position=None,
),
title=document.name,
files=list(record.files) if record.files else None,
content=segment.get_sign_content(),
)
if segment.answer:
source.content = f"question:{segment.get_sign_content()} \nanswer:{segment.answer}"
if record.summary:
source.summary = record.summary
retrieval_resource_list.append(source)
if retrieval_resource_list:
def _score(item: Source) -> float:
meta = item.metadata
score = meta.score
if isinstance(score, (int, float)):
return float(score)
return 0.0
retrieval_resource_list = sorted(
retrieval_resource_list,
key=_score, # type: ignore[arg-type, return-value]
reverse=True,
)
for position, item in enumerate(retrieval_resource_list, start=1):
item.metadata.position = position # type: ignore[index]
return retrieval_resource_list
def retrieve(
self,
app_id: str,
@@ -400,7 +150,14 @@ class DatasetRetrieval:
if features:
if ModelFeature.TOOL_CALL in features or ModelFeature.MULTI_TOOL_CALL in features:
planning_strategy = PlanningStrategy.ROUTER
available_datasets = self._get_available_datasets(tenant_id, dataset_ids)
available_datasets = []
dataset_stmt = select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id.in_(dataset_ids))
datasets: list[Dataset] = db.session.execute(dataset_stmt).scalars().all() # type: ignore
for dataset in datasets:
if dataset.available_document_count == 0 and dataset.provider != "external":
continue
available_datasets.append(dataset)
if inputs:
inputs = {key: str(value) for key, value in inputs.items()}
@@ -1404,6 +1161,7 @@ class DatasetRetrieval:
query=query or "",
)
result_text = ""
try:
# handle invoke result
invoke_result = cast(
@@ -1434,8 +1192,7 @@ class DatasetRetrieval:
"condition": item.get("comparison_operator"),
}
)
except Exception as e:
logger.warning(e, exc_info=True)
except Exception:
return None
return automatic_metadata_filters
@@ -1649,12 +1406,7 @@ class DatasetRetrieval:
usage = None
for result in invoke_result:
text = result.delta.message.content
if isinstance(text, str):
full_text += text
elif isinstance(text, list):
for i in text:
if i.data:
full_text += i.data
full_text += text
if not model:
model = result.model
@@ -1772,53 +1524,3 @@ class DatasetRetrieval:
cancel_event.set()
if thread_exceptions is not None:
thread_exceptions.append(e)
def _get_available_datasets(self, tenant_id: str, dataset_ids: list[str]) -> list[Dataset]:
with session_factory.create_session() as session:
subquery = (
session.query(DocumentModel.dataset_id, func.count(DocumentModel.id).label("available_document_count"))
.where(
DocumentModel.indexing_status == "completed",
DocumentModel.enabled == True,
DocumentModel.archived == False,
DocumentModel.dataset_id.in_(dataset_ids),
)
.group_by(DocumentModel.dataset_id)
.having(func.count(DocumentModel.id) > 0)
.subquery()
)
results = (
session.query(Dataset)
.outerjoin(subquery, Dataset.id == subquery.c.dataset_id)
.where(Dataset.tenant_id == tenant_id, Dataset.id.in_(dataset_ids))
.where((subquery.c.available_document_count > 0) | (Dataset.provider == "external"))
.all()
)
available_datasets = []
for dataset in results:
if not dataset:
continue
available_datasets.append(dataset)
return available_datasets
def _check_knowledge_rate_limit(self, tenant_id: str):
knowledge_rate_limit = FeatureService.get_knowledge_rate_limit(tenant_id)
if knowledge_rate_limit.enabled:
current_time = int(time.time() * 1000)
key = f"rate_limit_{tenant_id}"
redis_client.zadd(key, {current_time: current_time})
redis_client.zremrangebyscore(key, 0, current_time - 60000)
request_count = redis_client.zcard(key)
if request_count > knowledge_rate_limit.limit:
with session_factory.create_session() as session:
rate_limit_log = RateLimitLog(
tenant_id=tenant_id,
subscription_plan=knowledge_rate_limit.subscription_plan,
operation="knowledge",
)
session.add(rate_limit_log)
raise exc.RateLimitExceededError(
"you have reached the knowledge base request rate limit of your subscription."
)

View File

@@ -1,18 +1,19 @@
"""Repository implementations for data access."""
"""
Repository implementations for data access.
from __future__ import annotations
This package contains concrete implementations of the repository interfaces
defined in the core.workflow.repository package.
"""
from .celery_workflow_execution_repository import CeleryWorkflowExecutionRepository
from .celery_workflow_node_execution_repository import CeleryWorkflowNodeExecutionRepository
from .factory import DifyCoreRepositoryFactory, RepositoryImportError
from .sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
from .sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository
from core.repositories.celery_workflow_execution_repository import CeleryWorkflowExecutionRepository
from core.repositories.celery_workflow_node_execution_repository import CeleryWorkflowNodeExecutionRepository
from core.repositories.factory import DifyCoreRepositoryFactory, RepositoryImportError
from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository
__all__ = [
"CeleryWorkflowExecutionRepository",
"CeleryWorkflowNodeExecutionRepository",
"DifyCoreRepositoryFactory",
"RepositoryImportError",
"SQLAlchemyWorkflowExecutionRepository",
"SQLAlchemyWorkflowNodeExecutionRepository",
]

View File

@@ -1,553 +0,0 @@
import dataclasses
import json
from collections.abc import Mapping, Sequence
from datetime import datetime
from typing import Any
from sqlalchemy import Engine, select
from sqlalchemy.orm import Session, selectinload, sessionmaker
from core.workflow.nodes.human_input.entities import (
DeliveryChannelConfig,
EmailDeliveryMethod,
EmailRecipients,
ExternalRecipient,
FormDefinition,
HumanInputNodeData,
MemberRecipient,
WebAppDeliveryMethod,
)
from core.workflow.nodes.human_input.enums import (
DeliveryMethodType,
HumanInputFormKind,
HumanInputFormStatus,
)
from core.workflow.repositories.human_input_form_repository import (
FormCreateParams,
FormNotFoundError,
HumanInputFormEntity,
HumanInputFormRecipientEntity,
)
from libs.datetime_utils import naive_utc_now
from libs.uuid_utils import uuidv7
from models.account import Account, TenantAccountJoin
from models.human_input import (
BackstageRecipientPayload,
ConsoleDeliveryPayload,
ConsoleRecipientPayload,
EmailExternalRecipientPayload,
EmailMemberRecipientPayload,
HumanInputDelivery,
HumanInputForm,
HumanInputFormRecipient,
RecipientType,
StandaloneWebAppRecipientPayload,
)
@dataclasses.dataclass(frozen=True)
class _DeliveryAndRecipients:
delivery: HumanInputDelivery
recipients: Sequence[HumanInputFormRecipient]
@dataclasses.dataclass(frozen=True)
class _WorkspaceMemberInfo:
user_id: str
email: str
class _HumanInputFormRecipientEntityImpl(HumanInputFormRecipientEntity):
def __init__(self, recipient_model: HumanInputFormRecipient):
self._recipient_model = recipient_model
@property
def id(self) -> str:
return self._recipient_model.id
@property
def token(self) -> str:
if self._recipient_model.access_token is None:
raise AssertionError(f"access_token should not be None for recipient {self._recipient_model.id}")
return self._recipient_model.access_token
class _HumanInputFormEntityImpl(HumanInputFormEntity):
def __init__(self, form_model: HumanInputForm, recipient_models: Sequence[HumanInputFormRecipient]):
self._form_model = form_model
self._recipients = [_HumanInputFormRecipientEntityImpl(recipient) for recipient in recipient_models]
self._web_app_recipient = next(
(
recipient
for recipient in recipient_models
if recipient.recipient_type == RecipientType.STANDALONE_WEB_APP
),
None,
)
self._console_recipient = next(
(recipient for recipient in recipient_models if recipient.recipient_type == RecipientType.CONSOLE),
None,
)
self._submitted_data: Mapping[str, Any] | None = (
json.loads(form_model.submitted_data) if form_model.submitted_data is not None else None
)
@property
def id(self) -> str:
return self._form_model.id
@property
def web_app_token(self):
if self._console_recipient is not None:
return self._console_recipient.access_token
if self._web_app_recipient is None:
return None
return self._web_app_recipient.access_token
@property
def recipients(self) -> list[HumanInputFormRecipientEntity]:
return list(self._recipients)
@property
def rendered_content(self) -> str:
return self._form_model.rendered_content
@property
def selected_action_id(self) -> str | None:
return self._form_model.selected_action_id
@property
def submitted_data(self) -> Mapping[str, Any] | None:
return self._submitted_data
@property
def submitted(self) -> bool:
return self._form_model.submitted_at is not None
@property
def status(self) -> HumanInputFormStatus:
return self._form_model.status
@property
def expiration_time(self) -> datetime:
return self._form_model.expiration_time
@dataclasses.dataclass(frozen=True)
class HumanInputFormRecord:
form_id: str
workflow_run_id: str | None
node_id: str
tenant_id: str
app_id: str
form_kind: HumanInputFormKind
definition: FormDefinition
rendered_content: str
created_at: datetime
expiration_time: datetime
status: HumanInputFormStatus
selected_action_id: str | None
submitted_data: Mapping[str, Any] | None
submitted_at: datetime | None
submission_user_id: str | None
submission_end_user_id: str | None
completed_by_recipient_id: str | None
recipient_id: str | None
recipient_type: RecipientType | None
access_token: str | None
@property
def submitted(self) -> bool:
return self.submitted_at is not None
@classmethod
def from_models(
cls, form_model: HumanInputForm, recipient_model: HumanInputFormRecipient | None
) -> "HumanInputFormRecord":
definition_payload = json.loads(form_model.form_definition)
if "expiration_time" not in definition_payload:
definition_payload["expiration_time"] = form_model.expiration_time
return cls(
form_id=form_model.id,
workflow_run_id=form_model.workflow_run_id,
node_id=form_model.node_id,
tenant_id=form_model.tenant_id,
app_id=form_model.app_id,
form_kind=form_model.form_kind,
definition=FormDefinition.model_validate(definition_payload),
rendered_content=form_model.rendered_content,
created_at=form_model.created_at,
expiration_time=form_model.expiration_time,
status=form_model.status,
selected_action_id=form_model.selected_action_id,
submitted_data=json.loads(form_model.submitted_data) if form_model.submitted_data else None,
submitted_at=form_model.submitted_at,
submission_user_id=form_model.submission_user_id,
submission_end_user_id=form_model.submission_end_user_id,
completed_by_recipient_id=form_model.completed_by_recipient_id,
recipient_id=recipient_model.id if recipient_model else None,
recipient_type=recipient_model.recipient_type if recipient_model else None,
access_token=recipient_model.access_token if recipient_model else None,
)
class _InvalidTimeoutStatusError(ValueError):
pass
class HumanInputFormRepositoryImpl:
def __init__(
self,
session_factory: sessionmaker | Engine,
tenant_id: str,
):
if isinstance(session_factory, Engine):
session_factory = sessionmaker(bind=session_factory)
self._session_factory = session_factory
self._tenant_id = tenant_id
def _delivery_method_to_model(
self,
session: Session,
form_id: str,
delivery_method: DeliveryChannelConfig,
) -> _DeliveryAndRecipients:
delivery_id = str(uuidv7())
delivery_model = HumanInputDelivery(
id=delivery_id,
form_id=form_id,
delivery_method_type=delivery_method.type,
delivery_config_id=delivery_method.id,
channel_payload=delivery_method.model_dump_json(),
)
recipients: list[HumanInputFormRecipient] = []
if isinstance(delivery_method, WebAppDeliveryMethod):
recipient_model = HumanInputFormRecipient(
form_id=form_id,
delivery_id=delivery_id,
recipient_type=RecipientType.STANDALONE_WEB_APP,
recipient_payload=StandaloneWebAppRecipientPayload().model_dump_json(),
)
recipients.append(recipient_model)
elif isinstance(delivery_method, EmailDeliveryMethod):
email_recipients_config = delivery_method.config.recipients
recipients.extend(
self._build_email_recipients(
session=session,
form_id=form_id,
delivery_id=delivery_id,
recipients_config=email_recipients_config,
)
)
return _DeliveryAndRecipients(delivery=delivery_model, recipients=recipients)
def _build_email_recipients(
self,
session: Session,
form_id: str,
delivery_id: str,
recipients_config: EmailRecipients,
) -> list[HumanInputFormRecipient]:
member_user_ids = [
recipient.user_id for recipient in recipients_config.items if isinstance(recipient, MemberRecipient)
]
external_emails = [
recipient.email for recipient in recipients_config.items if isinstance(recipient, ExternalRecipient)
]
if recipients_config.whole_workspace:
members = self._query_all_workspace_members(session=session)
else:
members = self._query_workspace_members_by_ids(session=session, restrict_to_user_ids=member_user_ids)
return self._create_email_recipients_from_resolved(
form_id=form_id,
delivery_id=delivery_id,
members=members,
external_emails=external_emails,
)
@staticmethod
def _create_email_recipients_from_resolved(
*,
form_id: str,
delivery_id: str,
members: Sequence[_WorkspaceMemberInfo],
external_emails: Sequence[str],
) -> list[HumanInputFormRecipient]:
recipient_models: list[HumanInputFormRecipient] = []
seen_emails: set[str] = set()
for member in members:
if not member.email:
continue
if member.email in seen_emails:
continue
seen_emails.add(member.email)
payload = EmailMemberRecipientPayload(user_id=member.user_id, email=member.email)
recipient_models.append(
HumanInputFormRecipient.new(
form_id=form_id,
delivery_id=delivery_id,
payload=payload,
)
)
for email in external_emails:
if not email:
continue
if email in seen_emails:
continue
seen_emails.add(email)
recipient_models.append(
HumanInputFormRecipient.new(
form_id=form_id,
delivery_id=delivery_id,
payload=EmailExternalRecipientPayload(email=email),
)
)
return recipient_models
def _query_all_workspace_members(
self,
session: Session,
) -> list[_WorkspaceMemberInfo]:
stmt = (
select(Account.id, Account.email)
.join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id)
.where(TenantAccountJoin.tenant_id == self._tenant_id)
)
rows = session.execute(stmt).all()
return [_WorkspaceMemberInfo(user_id=account_id, email=email) for account_id, email in rows]
def _query_workspace_members_by_ids(
self,
session: Session,
restrict_to_user_ids: Sequence[str],
) -> list[_WorkspaceMemberInfo]:
unique_ids = {user_id for user_id in restrict_to_user_ids if user_id}
if not unique_ids:
return []
stmt = (
select(Account.id, Account.email)
.join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id)
.where(TenantAccountJoin.tenant_id == self._tenant_id)
)
stmt = stmt.where(Account.id.in_(unique_ids))
rows = session.execute(stmt).all()
return [_WorkspaceMemberInfo(user_id=account_id, email=email) for account_id, email in rows]
def create_form(self, params: FormCreateParams) -> HumanInputFormEntity:
form_config: HumanInputNodeData = params.form_config
with self._session_factory(expire_on_commit=False) as session, session.begin():
# Generate unique form ID
form_id = str(uuidv7())
start_time = naive_utc_now()
node_expiration = form_config.expiration_time(start_time)
form_definition = FormDefinition(
form_content=form_config.form_content,
inputs=form_config.inputs,
user_actions=form_config.user_actions,
rendered_content=params.rendered_content,
expiration_time=node_expiration,
default_values=dict(params.resolved_default_values),
display_in_ui=params.display_in_ui,
node_title=form_config.title,
)
form_model = HumanInputForm(
id=form_id,
tenant_id=self._tenant_id,
app_id=params.app_id,
workflow_run_id=params.workflow_execution_id,
form_kind=params.form_kind,
node_id=params.node_id,
form_definition=form_definition.model_dump_json(),
rendered_content=params.rendered_content,
expiration_time=node_expiration,
created_at=start_time,
)
session.add(form_model)
recipient_models: list[HumanInputFormRecipient] = []
for delivery in params.delivery_methods:
delivery_and_recipients = self._delivery_method_to_model(
session=session,
form_id=form_id,
delivery_method=delivery,
)
session.add(delivery_and_recipients.delivery)
session.add_all(delivery_and_recipients.recipients)
recipient_models.extend(delivery_and_recipients.recipients)
if params.console_recipient_required and not any(
recipient.recipient_type == RecipientType.CONSOLE for recipient in recipient_models
):
console_delivery_id = str(uuidv7())
console_delivery = HumanInputDelivery(
id=console_delivery_id,
form_id=form_id,
delivery_method_type=DeliveryMethodType.WEBAPP,
delivery_config_id=None,
channel_payload=ConsoleDeliveryPayload().model_dump_json(),
)
console_recipient = HumanInputFormRecipient(
form_id=form_id,
delivery_id=console_delivery_id,
recipient_type=RecipientType.CONSOLE,
recipient_payload=ConsoleRecipientPayload(
account_id=params.console_creator_account_id,
).model_dump_json(),
)
session.add(console_delivery)
session.add(console_recipient)
recipient_models.append(console_recipient)
if params.backstage_recipient_required and not any(
recipient.recipient_type == RecipientType.BACKSTAGE for recipient in recipient_models
):
backstage_delivery_id = str(uuidv7())
backstage_delivery = HumanInputDelivery(
id=backstage_delivery_id,
form_id=form_id,
delivery_method_type=DeliveryMethodType.WEBAPP,
delivery_config_id=None,
channel_payload=ConsoleDeliveryPayload().model_dump_json(),
)
backstage_recipient = HumanInputFormRecipient(
form_id=form_id,
delivery_id=backstage_delivery_id,
recipient_type=RecipientType.BACKSTAGE,
recipient_payload=BackstageRecipientPayload(
account_id=params.console_creator_account_id,
).model_dump_json(),
)
session.add(backstage_delivery)
session.add(backstage_recipient)
recipient_models.append(backstage_recipient)
session.flush()
return _HumanInputFormEntityImpl(form_model=form_model, recipient_models=recipient_models)
def get_form(self, workflow_execution_id: str, node_id: str) -> HumanInputFormEntity | None:
form_query = select(HumanInputForm).where(
HumanInputForm.workflow_run_id == workflow_execution_id,
HumanInputForm.node_id == node_id,
HumanInputForm.tenant_id == self._tenant_id,
)
with self._session_factory(expire_on_commit=False) as session:
form_model: HumanInputForm | None = session.scalars(form_query).first()
if form_model is None:
return None
recipient_query = select(HumanInputFormRecipient).where(HumanInputFormRecipient.form_id == form_model.id)
recipient_models = session.scalars(recipient_query).all()
return _HumanInputFormEntityImpl(form_model=form_model, recipient_models=recipient_models)
class HumanInputFormSubmissionRepository:
"""Repository for fetching and submitting human input forms."""
def __init__(self, session_factory: sessionmaker | Engine):
if isinstance(session_factory, Engine):
session_factory = sessionmaker(bind=session_factory)
self._session_factory = session_factory
def get_by_token(self, form_token: str) -> HumanInputFormRecord | None:
query = (
select(HumanInputFormRecipient)
.options(selectinload(HumanInputFormRecipient.form))
.where(HumanInputFormRecipient.access_token == form_token)
)
with self._session_factory(expire_on_commit=False) as session:
recipient_model = session.scalars(query).first()
if recipient_model is None or recipient_model.form is None:
return None
return HumanInputFormRecord.from_models(recipient_model.form, recipient_model)
def get_by_form_id_and_recipient_type(
self,
form_id: str,
recipient_type: RecipientType,
) -> HumanInputFormRecord | None:
query = (
select(HumanInputFormRecipient)
.options(selectinload(HumanInputFormRecipient.form))
.where(
HumanInputFormRecipient.form_id == form_id,
HumanInputFormRecipient.recipient_type == recipient_type,
)
)
with self._session_factory(expire_on_commit=False) as session:
recipient_model = session.scalars(query).first()
if recipient_model is None or recipient_model.form is None:
return None
return HumanInputFormRecord.from_models(recipient_model.form, recipient_model)
def mark_submitted(
self,
*,
form_id: str,
recipient_id: str | None,
selected_action_id: str,
form_data: Mapping[str, Any],
submission_user_id: str | None,
submission_end_user_id: str | None,
) -> HumanInputFormRecord:
with self._session_factory(expire_on_commit=False) as session, session.begin():
form_model = session.get(HumanInputForm, form_id)
if form_model is None:
raise FormNotFoundError(f"form not found, id={form_id}")
recipient_model = session.get(HumanInputFormRecipient, recipient_id) if recipient_id else None
form_model.selected_action_id = selected_action_id
form_model.submitted_data = json.dumps(form_data)
form_model.submitted_at = naive_utc_now()
form_model.status = HumanInputFormStatus.SUBMITTED
form_model.submission_user_id = submission_user_id
form_model.submission_end_user_id = submission_end_user_id
form_model.completed_by_recipient_id = recipient_id
session.add(form_model)
session.flush()
session.refresh(form_model)
if recipient_model is not None:
session.refresh(recipient_model)
return HumanInputFormRecord.from_models(form_model, recipient_model)
def mark_timeout(
self,
*,
form_id: str,
timeout_status: HumanInputFormStatus,
reason: str | None = None,
) -> HumanInputFormRecord:
with self._session_factory(expire_on_commit=False) as session, session.begin():
form_model = session.get(HumanInputForm, form_id)
if form_model is None:
raise FormNotFoundError(f"form not found, id={form_id}")
if timeout_status not in {HumanInputFormStatus.TIMEOUT, HumanInputFormStatus.EXPIRED}:
raise _InvalidTimeoutStatusError(f"invalid timeout status: {timeout_status}")
# already handled or submitted
if form_model.status in {HumanInputFormStatus.TIMEOUT, HumanInputFormStatus.EXPIRED}:
return HumanInputFormRecord.from_models(form_model, None)
if form_model.submitted_at is not None or form_model.status == HumanInputFormStatus.SUBMITTED:
raise FormNotFoundError(f"form already submitted, id={form_id}")
form_model.status = timeout_status
form_model.selected_action_id = None
form_model.submitted_data = None
form_model.submission_user_id = None
form_model.submission_end_user_id = None
form_model.completed_by_recipient_id = None
# Reason is recorded in status/error downstream; not stored on form.
session.add(form_model)
session.flush()
session.refresh(form_model)
return HumanInputFormRecord.from_models(form_model, None)

View File

@@ -488,7 +488,6 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository)
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
WorkflowNodeExecutionModel.tenant_id == self._tenant_id,
WorkflowNodeExecutionModel.triggered_from == triggered_from,
WorkflowNodeExecutionModel.status != WorkflowNodeExecutionStatus.PAUSED,
)
if self._app_id:

View File

@@ -1,5 +1,4 @@
from core.tools.entities.tool_entities import ToolInvokeMeta
from libs.exception import BaseHTTPException
class ToolProviderNotFoundError(ValueError):
@@ -38,12 +37,6 @@ class ToolCredentialPolicyViolationError(ValueError):
pass
class WorkflowToolHumanInputNotSupportedError(BaseHTTPException):
error_code = "workflow_tool_human_input_not_supported"
description = "Workflow with Human Input nodes cannot be published as a workflow tool."
code = 400
class ToolEngineInvokeError(Exception):
meta: ToolInvokeMeta

View File

@@ -3,8 +3,8 @@ from __future__ import annotations
import base64
import json
import logging
from collections.abc import Generator, Mapping
from typing import Any, cast
from collections.abc import Generator
from typing import Any
from core.mcp.auth_client import MCPClientWithAuthRetry
from core.mcp.error import MCPConnectionError
@@ -17,7 +17,6 @@ from core.mcp.types import (
TextContent,
TextResourceContents,
)
from core.model_runtime.entities.llm_entities import LLMUsage, LLMUsageMetadata
from core.tools.__base.tool import Tool
from core.tools.__base.tool_runtime import ToolRuntime
from core.tools.entities.tool_entities import ToolEntity, ToolInvokeMessage, ToolProviderType
@@ -47,7 +46,6 @@ class MCPTool(Tool):
self.headers = headers or {}
self.timeout = timeout
self.sse_read_timeout = sse_read_timeout
self._latest_usage = LLMUsage.empty_usage()
def tool_provider_type(self) -> ToolProviderType:
return ToolProviderType.MCP
@@ -61,10 +59,6 @@ class MCPTool(Tool):
message_id: str | None = None,
) -> Generator[ToolInvokeMessage, None, None]:
result = self.invoke_remote_mcp_tool(tool_parameters)
# Extract usage metadata from MCP protocol's _meta field
self._latest_usage = self._derive_usage_from_result(result)
# handle dify tool output
for content in result.content:
if isinstance(content, TextContent):
@@ -126,99 +120,6 @@ class MCPTool(Tool):
for item in json_list:
yield self.create_json_message(item)
@property
def latest_usage(self) -> LLMUsage:
return self._latest_usage
@classmethod
def _derive_usage_from_result(cls, result: CallToolResult) -> LLMUsage:
"""
Extract usage metadata from MCP tool result's _meta field.
The MCP protocol's _meta field (aliased as 'meta' in Python) can contain
usage information such as token counts, costs, and other metadata.
Args:
result: The CallToolResult from MCP tool invocation
Returns:
LLMUsage instance with values from meta or empty_usage if not found
"""
# Extract usage from the meta field if present
if result.meta:
usage_dict = cls._extract_usage_dict(result.meta)
if usage_dict is not None:
return LLMUsage.from_metadata(cast(LLMUsageMetadata, cast(object, dict(usage_dict))))
return LLMUsage.empty_usage()
@classmethod
def _extract_usage_dict(cls, payload: Mapping[str, Any]) -> Mapping[str, Any] | None:
"""
Recursively search for usage dictionary in the payload.
The MCP protocol's _meta field can contain usage data in various formats:
- Direct usage field: {"usage": {...}}
- Nested in metadata: {"metadata": {"usage": {...}}}
- Or nested within other fields
Args:
payload: The payload to search for usage data
Returns:
The usage dictionary if found, None otherwise
"""
# Check for direct usage field
usage_candidate = payload.get("usage")
if isinstance(usage_candidate, Mapping):
return usage_candidate
# Check for metadata nested usage
metadata_candidate = payload.get("metadata")
if isinstance(metadata_candidate, Mapping):
usage_candidate = metadata_candidate.get("usage")
if isinstance(usage_candidate, Mapping):
return usage_candidate
# Check for common token counting fields directly in payload
# Some MCP servers may include token counts directly
if "total_tokens" in payload or "prompt_tokens" in payload or "completion_tokens" in payload:
usage_dict: dict[str, Any] = {}
for key in (
"prompt_tokens",
"completion_tokens",
"total_tokens",
"prompt_unit_price",
"completion_unit_price",
"total_price",
"currency",
"prompt_price_unit",
"completion_price_unit",
"prompt_price",
"completion_price",
"latency",
"time_to_first_token",
"time_to_generate",
):
if key in payload:
usage_dict[key] = payload[key]
if usage_dict:
return usage_dict
# Recursively search through nested structures
for value in payload.values():
if isinstance(value, Mapping):
found = cls._extract_usage_dict(value)
if found is not None:
return found
elif isinstance(value, list) and not isinstance(value, (str, bytes, bytearray)):
for item in value:
if isinstance(item, Mapping):
found = cls._extract_usage_dict(item)
if found is not None:
return found
return None
def fork_tool_runtime(self, runtime: ToolRuntime) -> MCPTool:
return MCPTool(
entity=self.entity,

View File

@@ -3,8 +3,6 @@ from typing import Any
from core.app.app_config.entities import VariableEntity
from core.tools.entities.tool_entities import WorkflowToolParameterConfiguration
from core.tools.errors import WorkflowToolHumanInputNotSupportedError
from core.workflow.enums import NodeType
from core.workflow.nodes.base.entities import OutputVariableEntity
@@ -47,13 +45,6 @@ class WorkflowToolConfigurationUtils:
return [outputs_by_variable[variable] for variable in variable_order]
@classmethod
def ensure_no_human_input_nodes(cls, graph: Mapping[str, Any]) -> None:
nodes = graph.get("nodes", [])
for node in nodes:
if node.get("data", {}).get("type") == NodeType.HUMAN_INPUT:
raise WorkflowToolHumanInputNotSupportedError()
@classmethod
def check_is_synced(
cls, variables: list[VariableEntity], tool_configurations: list[WorkflowToolParameterConfiguration]

View File

@@ -98,10 +98,6 @@ class WorkflowTool(Tool):
invoke_from=self.runtime.invoke_from,
streaming=False,
call_depth=self.workflow_call_depth + 1,
# NOTE(QuantumGhost): We explicitly set `pause_state_config` to `None`
# because workflow pausing mechanisms (such as HumanInput) are not
# supported within WorkflowTool execution context.
pause_state_config=None,
)
assert isinstance(result, dict)
data = result.get("data", {})

View File

@@ -112,7 +112,7 @@ class ArrayBooleanVariable(ArrayBooleanSegment, ArrayVariable):
class RAGPipelineVariable(BaseModel):
belong_to_node_id: str = Field(description="belong to which node id, shared means public")
type: str = Field(description="variable type, text-input, paragraph, select, number, file, file-list")
type: str = Field(description="variable type, text-input, paragraph, select, number, file, file-list")
label: str = Field(description="label")
description: str | None = Field(description="description", default="")
variable: str = Field(description="variable key", default="")

View File

@@ -2,12 +2,10 @@ from .agent import AgentNodeStrategyInit
from .graph_init_params import GraphInitParams
from .workflow_execution import WorkflowExecution
from .workflow_node_execution import WorkflowNodeExecution
from .workflow_start_reason import WorkflowStartReason
__all__ = [
"AgentNodeStrategyInit",
"GraphInitParams",
"WorkflowExecution",
"WorkflowNodeExecution",
"WorkflowStartReason",
]

View File

@@ -5,16 +5,6 @@ from pydantic import BaseModel, Field
class GraphInitParams(BaseModel):
"""GraphInitParams encapsulates the configurations and contextual information
that remain constant throughout a single execution of the graph engine.
A single execution is defined as follows: as long as the execution has not reached
its conclusion, it is considered one execution. For instance, if a workflow is suspended
and later resumed, it is still regarded as a single execution, not two.
For the state diagram of workflow execution, refer to `WorkflowExecutionStatus`.
"""
# init params
tenant_id: str = Field(..., description="tenant / workspace id")
app_id: str = Field(..., description="app id")

View File

@@ -1,11 +1,8 @@
from collections.abc import Mapping
from enum import StrEnum, auto
from typing import Annotated, Any, Literal, TypeAlias
from typing import Annotated, Literal, TypeAlias
from pydantic import BaseModel, Field
from core.workflow.nodes.human_input.entities import FormInput, UserAction
class PauseReasonType(StrEnum):
HUMAN_INPUT_REQUIRED = auto()
@@ -14,31 +11,10 @@ class PauseReasonType(StrEnum):
class HumanInputRequired(BaseModel):
TYPE: Literal[PauseReasonType.HUMAN_INPUT_REQUIRED] = PauseReasonType.HUMAN_INPUT_REQUIRED
form_id: str
form_content: str
inputs: list[FormInput] = Field(default_factory=list)
actions: list[UserAction] = Field(default_factory=list)
display_in_ui: bool = False
# The identifier of the human input node causing the pause.
node_id: str
node_title: str
# The `resolved_default_values` stores the resolved values of variable defaults. It's a mapping from
# `output_variable_name` to their resolved values.
#
# For example, The form contains a input with output variable name `name` and placeholder type `VARIABLE`, its
# selector is ["start", "name"]. While the HumanInputNode is executed, the correspond value of variable
# `start.name` in variable pool is `John`. Thus, the resolved value of the output variable `name` is `John`. The
# `resolved_default_values` is `{"name": "John"}`.
#
# Only form inputs with default value type `VARIABLE` will be resolved and stored in `resolved_default_values`.
resolved_default_values: Mapping[str, Any] = Field(default_factory=dict)
# The `form_token` is the token used to submit the form via UI surfaces. It corresponds to
# `HumanInputFormRecipient.access_token`.
#
# This field is `None` if webapp delivery is not set and not
# in orchestrating mode.
form_token: str | None = None
class SchedulingPause(BaseModel):

View File

@@ -1,8 +0,0 @@
from enum import StrEnum
class WorkflowStartReason(StrEnum):
"""Reason for workflow start events across graph/queue/SSE layers."""
INITIAL = "initial" # First start of a workflow run.
RESUMPTION = "resumption" # Start triggered after resuming a paused run.

View File

@@ -1,15 +0,0 @@
import time
def get_timestamp() -> float:
"""Retrieve a timestamp as a float point numer representing the number of seconds
since the Unix epoch.
This function is primarily used to measure the execution time of the workflow engine.
Since workflow execution may be paused and resumed on a different machine,
`time.perf_counter` cannot be used as it is inconsistent across machines.
To address this, the function uses the wall clock as the time source.
However, it assumes that the clocks of all servers are properly synchronized.
"""
return round(time.time())

View File

@@ -2,14 +2,12 @@
GraphEngine configuration models.
"""
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel
class GraphEngineConfig(BaseModel):
"""Configuration for GraphEngine worker pool scaling."""
model_config = ConfigDict(frozen=True)
min_workers: int = 1
max_workers: int = 5
scale_up_threshold: int = 3

View File

@@ -10,7 +10,6 @@ from pydantic import BaseModel, Field
from core.workflow.entities.pause_reason import PauseReason
from core.workflow.enums import NodeState
from core.workflow.runtime.graph_runtime_state import GraphExecutionProtocol
from .node_execution import NodeExecution
@@ -237,6 +236,3 @@ class GraphExecution:
def record_node_failure(self) -> None:
"""Increment the count of node failures encountered during execution."""
self.exceptions_count += 1
_: GraphExecutionProtocol = GraphExecution(workflow_id="")

View File

@@ -192,13 +192,9 @@ class EventHandler:
self._event_collector.collect(edge_event)
# Enqueue ready nodes
if self._graph_execution.is_paused:
for node_id in ready_nodes:
self._graph_runtime_state.register_deferred_node(node_id)
else:
for node_id in ready_nodes:
self._state_manager.enqueue_node(node_id)
self._state_manager.start_execution(node_id)
for node_id in ready_nodes:
self._state_manager.enqueue_node(node_id)
self._state_manager.start_execution(node_id)
# Update execution tracking
self._state_manager.finish_execution(event.node_id)

View File

@@ -14,7 +14,6 @@ from collections.abc import Generator
from typing import TYPE_CHECKING, cast, final
from core.workflow.context import capture_current_context
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.enums import NodeExecutionType
from core.workflow.graph import Graph
from core.workflow.graph_events import (
@@ -56,9 +55,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
_DEFAULT_CONFIG = GraphEngineConfig()
@final
class GraphEngine:
"""
@@ -74,7 +70,7 @@ class GraphEngine:
graph: Graph,
graph_runtime_state: GraphRuntimeState,
command_channel: CommandChannel,
config: GraphEngineConfig = _DEFAULT_CONFIG,
config: GraphEngineConfig,
) -> None:
"""Initialize the graph engine with all subsystems and dependencies."""
# stop event
@@ -238,9 +234,7 @@ class GraphEngine:
self._graph_execution.paused = False
self._graph_execution.pause_reasons = []
start_event = GraphRunStartedEvent(
reason=WorkflowStartReason.RESUMPTION if is_resume else WorkflowStartReason.INITIAL,
)
start_event = GraphRunStartedEvent()
self._event_manager.notify_layers(start_event)
yield start_event
@@ -309,17 +303,15 @@ class GraphEngine:
for layer in self._layers:
try:
layer.on_graph_start()
except Exception:
logger.exception("Layer %s failed on_graph_start", layer.__class__.__name__)
except Exception as e:
logger.warning("Layer %s failed on_graph_start: %s", layer.__class__.__name__, e)
def _start_execution(self, *, resume: bool = False) -> None:
"""Start execution subsystems."""
self._stop_event.clear()
paused_nodes: list[str] = []
deferred_nodes: list[str] = []
if resume:
paused_nodes = self._graph_runtime_state.consume_paused_nodes()
deferred_nodes = self._graph_runtime_state.consume_deferred_nodes()
# Start worker pool (it calculates initial workers internally)
self._worker_pool.start()
@@ -335,11 +327,7 @@ class GraphEngine:
self._state_manager.enqueue_node(root_node.id)
self._state_manager.start_execution(root_node.id)
else:
seen_nodes: set[str] = set()
for node_id in paused_nodes + deferred_nodes:
if node_id in seen_nodes:
continue
seen_nodes.add(node_id)
for node_id in paused_nodes:
self._state_manager.enqueue_node(node_id)
self._state_manager.start_execution(node_id)
@@ -357,8 +345,8 @@ class GraphEngine:
for layer in self._layers:
try:
layer.on_graph_end(self._graph_execution.error)
except Exception:
logger.exception("Layer %s failed on_graph_end", layer.__class__.__name__)
except Exception as e:
logger.warning("Layer %s failed on_graph_end: %s", layer.__class__.__name__, e)
# Public property accessors for attributes that need external access
@property

View File

@@ -224,8 +224,6 @@ class GraphStateManager:
Returns:
Number of executing nodes
"""
# This count is a best-effort snapshot and can change concurrently.
# Only use it for pause-drain checks where scheduling is already frozen.
with self._lock:
return len(self._executing_nodes)

View File

@@ -83,12 +83,12 @@ class Dispatcher:
"""Main dispatcher loop."""
try:
self._process_commands()
paused = False
while not self._stop_event.is_set():
if self._execution_coordinator.aborted or self._execution_coordinator.execution_complete:
break
if self._execution_coordinator.paused:
paused = True
if (
self._execution_coordinator.aborted
or self._execution_coordinator.paused
or self._execution_coordinator.execution_complete
):
break
self._execution_coordinator.check_scaling()
@@ -101,10 +101,13 @@ class Dispatcher:
time.sleep(0.1)
self._process_commands()
if paused:
self._drain_events_until_idle()
else:
self._drain_event_queue()
while True:
try:
event = self._event_queue.get(block=False)
self._event_handler.dispatch(event)
self._event_queue.task_done()
except queue.Empty:
break
except Exception as e:
logger.exception("Dispatcher error")
@@ -119,24 +122,3 @@ class Dispatcher:
def _process_commands(self, event: GraphNodeEventBase | None = None):
if event is None or isinstance(event, self._COMMAND_TRIGGER_EVENTS):
self._execution_coordinator.process_commands()
def _drain_event_queue(self) -> None:
while True:
try:
event = self._event_queue.get(block=False)
self._event_handler.dispatch(event)
self._event_queue.task_done()
except queue.Empty:
break
def _drain_events_until_idle(self) -> None:
while not self._stop_event.is_set():
try:
event = self._event_queue.get(timeout=0.1)
self._event_handler.dispatch(event)
self._event_queue.task_done()
self._process_commands(event)
except queue.Empty:
if not self._execution_coordinator.has_executing_nodes():
break
self._drain_event_queue()

View File

@@ -94,11 +94,3 @@ class ExecutionCoordinator:
self._worker_pool.stop()
self._state_manager.clear_executing()
def has_executing_nodes(self) -> bool:
"""Return True if any nodes are currently marked as executing."""
# This check is only safe once execution has already paused.
# Before pause, executing state can change concurrently, which makes the result unreliable.
if not self._graph_execution.is_paused:
raise AssertionError("has_executing_nodes should only be called after execution is paused")
return self._state_manager.get_executing_count() > 0

View File

@@ -38,8 +38,6 @@ from .loop import (
from .node import (
NodeRunExceptionEvent,
NodeRunFailedEvent,
NodeRunHumanInputFormFilledEvent,
NodeRunHumanInputFormTimeoutEvent,
NodeRunPauseRequestedEvent,
NodeRunRetrieverResourceEvent,
NodeRunRetryEvent,
@@ -62,8 +60,6 @@ __all__ = [
"NodeRunAgentLogEvent",
"NodeRunExceptionEvent",
"NodeRunFailedEvent",
"NodeRunHumanInputFormFilledEvent",
"NodeRunHumanInputFormTimeoutEvent",
"NodeRunIterationFailedEvent",
"NodeRunIterationNextEvent",
"NodeRunIterationStartedEvent",

View File

@@ -1,16 +1,11 @@
from pydantic import Field
from core.workflow.entities.pause_reason import PauseReason
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.graph_events import BaseGraphEvent
class GraphRunStartedEvent(BaseGraphEvent):
# Reason is emitted for workflow start events and is always set.
reason: WorkflowStartReason = Field(
default=WorkflowStartReason.INITIAL,
description="reason for workflow start",
)
pass
class GraphRunSucceededEvent(BaseGraphEvent):

View File

@@ -54,22 +54,6 @@ class NodeRunRetryEvent(NodeRunStartedEvent):
retry_index: int = Field(..., description="which retry attempt is about to be performed")
class NodeRunHumanInputFormFilledEvent(GraphNodeEventBase):
"""Emitted when a HumanInput form is submitted and before the node finishes."""
node_title: str = Field(..., description="HumanInput node title")
rendered_content: str = Field(..., description="Markdown content rendered with user inputs.")
action_id: str = Field(..., description="User action identifier chosen in the form.")
action_text: str = Field(..., description="Display text of the chosen action button.")
class NodeRunHumanInputFormTimeoutEvent(GraphNodeEventBase):
"""Emitted when a HumanInput form times out."""
node_title: str = Field(..., description="HumanInput node title")
expiration_time: datetime = Field(..., description="Form expiration time")
class NodeRunPauseRequestedEvent(GraphNodeEventBase):
reason: PauseReason = Field(..., description="pause reason")

View File

@@ -13,8 +13,6 @@ from .loop import (
LoopSucceededEvent,
)
from .node import (
HumanInputFormFilledEvent,
HumanInputFormTimeoutEvent,
ModelInvokeCompletedEvent,
PauseRequestedEvent,
RunRetrieverResourceEvent,
@@ -25,8 +23,6 @@ from .node import (
__all__ = [
"AgentLogEvent",
"HumanInputFormFilledEvent",
"HumanInputFormTimeoutEvent",
"IterationFailedEvent",
"IterationNextEvent",
"IterationStartedEvent",

View File

@@ -47,19 +47,3 @@ class StreamCompletedEvent(NodeEventBase):
class PauseRequestedEvent(NodeEventBase):
reason: PauseReason = Field(..., description="pause reason")
class HumanInputFormFilledEvent(NodeEventBase):
"""Event emitted when a human input form is submitted."""
node_title: str
rendered_content: str
action_id: str
action_text: str
class HumanInputFormTimeoutEvent(NodeEventBase):
"""Event emitted when a human input form times out."""
node_title: str
expiration_time: datetime

View File

@@ -18,8 +18,6 @@ from core.workflow.graph_events import (
GraphNodeEventBase,
NodeRunAgentLogEvent,
NodeRunFailedEvent,
NodeRunHumanInputFormFilledEvent,
NodeRunHumanInputFormTimeoutEvent,
NodeRunIterationFailedEvent,
NodeRunIterationNextEvent,
NodeRunIterationStartedEvent,
@@ -36,8 +34,6 @@ from core.workflow.graph_events import (
)
from core.workflow.node_events import (
AgentLogEvent,
HumanInputFormFilledEvent,
HumanInputFormTimeoutEvent,
IterationFailedEvent,
IterationNextEvent,
IterationStartedEvent,
@@ -65,15 +61,6 @@ logger = logging.getLogger(__name__)
class Node(Generic[NodeDataT]):
"""BaseNode serves as the foundational class for all node implementations.
Nodes are allowed to maintain transient states (e.g., `LLMNode` uses the `_file_output`
attribute to track files generated by the LLM). However, these states are not persisted
when the workflow is suspended or resumed. If a node needs its state to be preserved
across workflow suspension and resumption, it should include the relevant state data
in its output.
"""
node_type: ClassVar[NodeType]
execution_type: NodeExecutionType = NodeExecutionType.EXECUTABLE
_node_data_type: ClassVar[type[BaseNodeData]] = BaseNodeData
@@ -264,33 +251,10 @@ class Node(Generic[NodeDataT]):
return self._node_execution_id
def ensure_execution_id(self) -> str:
if self._node_execution_id:
return self._node_execution_id
resumed_execution_id = self._restore_execution_id_from_runtime_state()
if resumed_execution_id:
self._node_execution_id = resumed_execution_id
return self._node_execution_id
self._node_execution_id = str(uuid4())
if not self._node_execution_id:
self._node_execution_id = str(uuid4())
return self._node_execution_id
def _restore_execution_id_from_runtime_state(self) -> str | None:
graph_execution = self.graph_runtime_state.graph_execution
try:
node_executions = graph_execution.node_executions
except AttributeError:
return None
if not isinstance(node_executions, dict):
return None
node_execution = node_executions.get(self._node_id)
if node_execution is None:
return None
execution_id = node_execution.execution_id
if not execution_id:
return None
return str(execution_id)
def _hydrate_node_data(self, data: Mapping[str, Any]) -> NodeDataT:
return cast(NodeDataT, self._node_data_type.model_validate(data))
@@ -656,28 +620,6 @@ class Node(Generic[NodeDataT]):
metadata=event.metadata,
)
@_dispatch.register
def _(self, event: HumanInputFormFilledEvent):
return NodeRunHumanInputFormFilledEvent(
id=self.execution_id,
node_id=self._node_id,
node_type=self.node_type,
node_title=event.node_title,
rendered_content=event.rendered_content,
action_id=event.action_id,
action_text=event.action_text,
)
@_dispatch.register
def _(self, event: HumanInputFormTimeoutEvent):
return NodeRunHumanInputFormTimeoutEvent(
id=self.execution_id,
node_id=self._node_id,
node_type=self.node_type,
node_title=event.node_title,
expiration_time=event.expiration_time,
)
@_dispatch.register
def _(self, event: LoopStartedEvent) -> NodeRunLoopStartedEvent:
return NodeRunLoopStartedEvent(

View File

@@ -1,3 +1,3 @@
"""
Human Input node implementation.
"""
from .human_input_node import HumanInputNode
__all__ = ["HumanInputNode"]

View File

@@ -1,350 +1,10 @@
"""
Human Input node entities.
"""
from pydantic import Field
import re
import uuid
from collections.abc import Mapping, Sequence
from datetime import datetime, timedelta
from typing import Annotated, Any, ClassVar, Literal, Self
from pydantic import BaseModel, Field, field_validator, model_validator
from core.variables.consts import SELECTORS_LENGTH
from core.workflow.nodes.base import BaseNodeData
from core.workflow.nodes.base.variable_template_parser import VariableTemplateParser
from core.workflow.runtime import VariablePool
from .enums import ButtonStyle, DeliveryMethodType, EmailRecipientType, FormInputType, PlaceholderType, TimeoutUnit
_OUTPUT_VARIABLE_PATTERN = re.compile(r"\{\{#\$output\.(?P<field_name>[a-zA-Z_][a-zA-Z0-9_]{0,29})#\}\}")
class _WebAppDeliveryConfig(BaseModel):
"""Configuration for webapp delivery method."""
pass # Empty for webapp delivery
class MemberRecipient(BaseModel):
"""Member recipient for email delivery."""
type: Literal[EmailRecipientType.MEMBER] = EmailRecipientType.MEMBER
user_id: str
class ExternalRecipient(BaseModel):
"""External recipient for email delivery."""
type: Literal[EmailRecipientType.EXTERNAL] = EmailRecipientType.EXTERNAL
email: str
EmailRecipient = Annotated[MemberRecipient | ExternalRecipient, Field(discriminator="type")]
class EmailRecipients(BaseModel):
"""Email recipients configuration."""
# When true, recipients are the union of all workspace members and external items.
# Member items are ignored because they are already covered by the workspace scope.
# De-duplication is applied by email, with member recipients taking precedence.
whole_workspace: bool = False
items: list[EmailRecipient] = Field(default_factory=list)
class EmailDeliveryConfig(BaseModel):
"""Configuration for email delivery method."""
URL_PLACEHOLDER: ClassVar[str] = "{{#url#}}"
recipients: EmailRecipients
# the subject of email
subject: str
# Body is the content of email.It may contain the speical placeholder `{{#url#}}`, which
# represent the url to submit the form.
#
# It may also reference the output variable of the previous node with the syntax
# `{{#<node_id>.<field_name>#}}`.
body: str
debug_mode: bool = False
def with_debug_recipient(self, user_id: str) -> "EmailDeliveryConfig":
if not user_id:
debug_recipients = EmailRecipients(whole_workspace=False, items=[])
return self.model_copy(update={"recipients": debug_recipients})
debug_recipients = EmailRecipients(whole_workspace=False, items=[MemberRecipient(user_id=user_id)])
return self.model_copy(update={"recipients": debug_recipients})
@classmethod
def replace_url_placeholder(cls, body: str, url: str | None) -> str:
"""Replace the url placeholder with provided value."""
return body.replace(cls.URL_PLACEHOLDER, url or "")
@classmethod
def render_body_template(
cls,
*,
body: str,
url: str | None,
variable_pool: VariablePool | None = None,
) -> str:
"""Render email body by replacing placeholders with runtime values."""
templated_body = cls.replace_url_placeholder(body, url)
if variable_pool is None:
return templated_body
return variable_pool.convert_template(templated_body).text
class _DeliveryMethodBase(BaseModel):
"""Base delivery method configuration."""
enabled: bool = True
id: uuid.UUID = Field(default_factory=uuid.uuid4)
def extract_variable_selectors(self) -> Sequence[Sequence[str]]:
return ()
class WebAppDeliveryMethod(_DeliveryMethodBase):
"""Webapp delivery method configuration."""
type: Literal[DeliveryMethodType.WEBAPP] = DeliveryMethodType.WEBAPP
# The config field is not used currently.
config: _WebAppDeliveryConfig = Field(default_factory=_WebAppDeliveryConfig)
class EmailDeliveryMethod(_DeliveryMethodBase):
"""Email delivery method configuration."""
type: Literal[DeliveryMethodType.EMAIL] = DeliveryMethodType.EMAIL
config: EmailDeliveryConfig
def extract_variable_selectors(self) -> Sequence[Sequence[str]]:
variable_template_parser = VariableTemplateParser(template=self.config.body)
selectors: list[Sequence[str]] = []
for variable_selector in variable_template_parser.extract_variable_selectors():
value_selector = list(variable_selector.value_selector)
if len(value_selector) < SELECTORS_LENGTH:
continue
selectors.append(value_selector[:SELECTORS_LENGTH])
return selectors
DeliveryChannelConfig = Annotated[WebAppDeliveryMethod | EmailDeliveryMethod, Field(discriminator="type")]
def apply_debug_email_recipient(
method: DeliveryChannelConfig,
*,
enabled: bool,
user_id: str,
) -> DeliveryChannelConfig:
if not enabled:
return method
if not isinstance(method, EmailDeliveryMethod):
return method
if not method.config.debug_mode:
return method
debug_config = method.config.with_debug_recipient(user_id or "")
return method.model_copy(update={"config": debug_config})
class FormInputDefault(BaseModel):
"""Default configuration for form inputs."""
# NOTE: Ideally, a discriminated union would be used to model
# FormInputDefault. However, the UI requires preserving the previous
# value when switching between `VARIABLE` and `CONSTANT` types. This
# necessitates retaining all fields, making a discriminated union unsuitable.
type: PlaceholderType
# The selector of default variable, used when `type` is `VARIABLE`.
selector: Sequence[str] = Field(default_factory=tuple) #
# The value of the default, used when `type` is `CONSTANT`.
# TODO: How should we express JSON values?
value: str = ""
@model_validator(mode="after")
def _validate_selector(self) -> Self:
if self.type == PlaceholderType.CONSTANT:
return self
if len(self.selector) < SELECTORS_LENGTH:
raise ValueError(f"the length of selector should be at least {SELECTORS_LENGTH}, selector={self.selector}")
return self
class FormInput(BaseModel):
"""Form input definition."""
type: FormInputType
output_variable_name: str
default: FormInputDefault | None = None
_IDENTIFIER_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
class UserAction(BaseModel):
"""User action configuration."""
# id is the identifier for this action.
# It also serves as the identifiers of output handle.
#
# The id must be a valid identifier (satisfy the _IDENTIFIER_PATTERN above.)
id: str = Field(max_length=20)
title: str = Field(max_length=20)
button_style: ButtonStyle = ButtonStyle.DEFAULT
@field_validator("id")
@classmethod
def _validate_id(cls, value: str) -> str:
if not _IDENTIFIER_PATTERN.match(value):
raise ValueError(
f"'{value}' is not a valid identifier. It must start with a letter or underscore, "
f"and contain only letters, numbers, or underscores."
)
return value
class HumanInputNodeData(BaseNodeData):
"""Human Input node data."""
"""Configuration schema for the HumanInput node."""
delivery_methods: list[DeliveryChannelConfig] = Field(default_factory=list)
form_content: str = ""
inputs: list[FormInput] = Field(default_factory=list)
user_actions: list[UserAction] = Field(default_factory=list)
timeout: int = 36
timeout_unit: TimeoutUnit = TimeoutUnit.HOUR
@field_validator("inputs")
@classmethod
def _validate_inputs(cls, inputs: list[FormInput]) -> list[FormInput]:
seen_names: set[str] = set()
for form_input in inputs:
name = form_input.output_variable_name
if name in seen_names:
raise ValueError(f"duplicated output_variable_name '{name}' in inputs")
seen_names.add(name)
return inputs
@field_validator("user_actions")
@classmethod
def _validate_user_actions(cls, user_actions: list[UserAction]) -> list[UserAction]:
seen_ids: set[str] = set()
for action in user_actions:
action_id = action.id
if action_id in seen_ids:
raise ValueError(f"duplicated user action id '{action_id}'")
seen_ids.add(action_id)
return user_actions
def is_webapp_enabled(self) -> bool:
for dm in self.delivery_methods:
if not dm.enabled:
continue
if dm.type == DeliveryMethodType.WEBAPP:
return True
return False
def expiration_time(self, start_time: datetime) -> datetime:
if self.timeout_unit == TimeoutUnit.HOUR:
return start_time + timedelta(hours=self.timeout)
elif self.timeout_unit == TimeoutUnit.DAY:
return start_time + timedelta(days=self.timeout)
else:
raise AssertionError("unknown timeout unit.")
def outputs_field_names(self) -> Sequence[str]:
field_names = []
for match in _OUTPUT_VARIABLE_PATTERN.finditer(self.form_content):
field_names.append(match.group("field_name"))
return field_names
def extract_variable_selector_to_variable_mapping(self, node_id: str) -> Mapping[str, Sequence[str]]:
variable_mappings: dict[str, Sequence[str]] = {}
def _add_variable_selectors(selectors: Sequence[Sequence[str]]) -> None:
for selector in selectors:
if len(selector) < SELECTORS_LENGTH:
continue
qualified_variable_mapping_key = f"{node_id}.#{'.'.join(selector[:SELECTORS_LENGTH])}#"
variable_mappings[qualified_variable_mapping_key] = list(selector[:SELECTORS_LENGTH])
form_template_parser = VariableTemplateParser(template=self.form_content)
_add_variable_selectors(
[selector.value_selector for selector in form_template_parser.extract_variable_selectors()]
)
for delivery_method in self.delivery_methods:
if not delivery_method.enabled:
continue
_add_variable_selectors(delivery_method.extract_variable_selectors())
for input in self.inputs:
default_value = input.default
if default_value is None:
continue
if default_value.type == PlaceholderType.CONSTANT:
continue
default_value_key = ".".join(default_value.selector)
qualified_variable_mapping_key = f"{node_id}.#{default_value_key}#"
variable_mappings[qualified_variable_mapping_key] = default_value.selector
return variable_mappings
def find_action_text(self, action_id: str) -> str:
"""
Resolve action display text by id.
"""
for action in self.user_actions:
if action.id == action_id:
return action.title
return action_id
class FormDefinition(BaseModel):
form_content: str
inputs: list[FormInput] = Field(default_factory=list)
user_actions: list[UserAction] = Field(default_factory=list)
rendered_content: str
expiration_time: datetime
# this is used to store the resolved default values
default_values: dict[str, Any] = Field(default_factory=dict)
# node_title records the title of the HumanInput node.
node_title: str | None = None
# display_in_ui controls whether the form should be displayed in UI surfaces.
display_in_ui: bool | None = None
class HumanInputSubmissionValidationError(ValueError):
pass
def validate_human_input_submission(
*,
inputs: Sequence[FormInput],
user_actions: Sequence[UserAction],
selected_action_id: str,
form_data: Mapping[str, Any],
) -> None:
available_actions = {action.id for action in user_actions}
if selected_action_id not in available_actions:
raise HumanInputSubmissionValidationError(f"Invalid action: {selected_action_id}")
provided_inputs = set(form_data.keys())
missing_inputs = [
form_input.output_variable_name
for form_input in inputs
if form_input.output_variable_name not in provided_inputs
]
if missing_inputs:
missing_list = ", ".join(missing_inputs)
raise HumanInputSubmissionValidationError(f"Missing required inputs: {missing_list}")
required_variables: list[str] = Field(default_factory=list)
pause_reason: str | None = Field(default=None)

Some files were not shown because too many files have changed in this diff Show More