Compare commits

...

7 Commits

Author SHA1 Message Date
jyong
a480053d93 add celery prefetch setting
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2025-10-22 15:57:20 +08:00
jyong
b4985e9e46 add celery prefetch setting
Some checks are pending
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Waiting to run
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Blocked by required conditions
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Blocked by required conditions
2025-10-22 11:10:20 +08:00
crazywoola
3ee1a5729d Revert "fix: sync FileUploader context with props to fix inconsistent file parameter state in “View cached variables”." (#26548) 2025-10-02 19:10:47 +08:00
crazywoola
06440c4988 fix: style with self-start (#26492) 2025-10-02 19:10:47 +08:00
quicksand
79ded7d6e3 fix: document is not bound to a session (#26480) 2025-10-02 19:10:47 +08:00
zlyszx
4a67f661d3 fix: duplicate data in datasets pagination list (#25783) 2025-10-02 19:10:46 +08:00
goofy
2d11560d88 fix single-step runs support user input as structured_output variable values (#26430) 2025-10-02 19:10:46 +08:00
6 changed files with 32 additions and 50 deletions

View File

@@ -136,6 +136,11 @@ class KnowledgeIndexNode(Node):
document = db.session.query(Document).filter_by(id=document_id.value).first()
if not document:
raise KnowledgeIndexNodeError(f"Document {document_id.value} not found.")
doc_id_value = document.id
ds_id_value = dataset.id
dataset_name_value = dataset.name
document_name_value = document.name
created_at_value = document.created_at
# chunk nodes by chunk size
indexing_start_at = time.perf_counter()
index_processor = IndexProcessorFactory(dataset.chunk_structure).init_index_processor()
@@ -161,16 +166,16 @@ class KnowledgeIndexNode(Node):
document.word_count = (
db.session.query(func.sum(DocumentSegment.word_count))
.where(
DocumentSegment.document_id == document.id,
DocumentSegment.dataset_id == dataset.id,
DocumentSegment.document_id == doc_id_value,
DocumentSegment.dataset_id == ds_id_value,
)
.scalar()
)
db.session.add(document)
# update document segment status
db.session.query(DocumentSegment).where(
DocumentSegment.document_id == document.id,
DocumentSegment.dataset_id == dataset.id,
DocumentSegment.document_id == doc_id_value,
DocumentSegment.dataset_id == ds_id_value,
).update(
{
DocumentSegment.status: "completed",
@@ -182,13 +187,13 @@ class KnowledgeIndexNode(Node):
db.session.commit()
return {
"dataset_id": dataset.id,
"dataset_name": dataset.name,
"dataset_id": ds_id_value,
"dataset_name": dataset_name_value,
"batch": batch.value,
"document_id": document.id,
"document_name": document.name,
"created_at": document.created_at.timestamp(),
"display_status": document.indexing_status,
"document_id": doc_id_value,
"document_name": document_name_value,
"created_at": created_at_value.timestamp(),
"display_status": "completed",
}
def _get_preview_output(self, chunk_structure: str, chunks: Any) -> Mapping[str, Any]:

View File

@@ -416,4 +416,8 @@ class WorkflowEntry:
# append variable and value to variable pool
if variable_node_id != ENVIRONMENT_VARIABLE_NODE_ID:
# In single run, the input_value is set as the LLM's structured output value within the variable_pool.
if len(variable_key_list) == 2 and variable_key_list[0] == "structured_output":
input_value = {variable_key_list[1]: input_value}
variable_key_list = variable_key_list[0:1]
variable_pool.add([variable_node_id] + variable_key_list, input_value)

View File

@@ -32,7 +32,8 @@ if [[ "${MODE}" == "worker" ]]; then
exec celery -A celery_entrypoint.celery worker -P ${CELERY_WORKER_CLASS:-gevent} $CONCURRENCY_OPTION \
--max-tasks-per-child ${MAX_TASKS_PER_CHILD:-50} --loglevel ${LOG_LEVEL:-INFO} \
-Q ${CELERY_QUEUES:-dataset,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation}
-Q ${CELERY_QUEUES:-dataset,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation} \
--prefetch-multiplier=1 --without-mingle --without-gossip
elif [[ "${MODE}" == "beat" ]]; then
exec celery -A app.celery beat --loglevel ${LOG_LEVEL:-INFO}

View File

@@ -93,7 +93,7 @@ logger = logging.getLogger(__name__)
class DatasetService:
@staticmethod
def get_datasets(page, per_page, tenant_id=None, user=None, search=None, tag_ids=None, include_all=False):
query = select(Dataset).where(Dataset.tenant_id == tenant_id).order_by(Dataset.created_at.desc())
query = select(Dataset).where(Dataset.tenant_id == tenant_id).order_by(Dataset.created_at.desc(), Dataset.id)
if user:
# get permitted dataset ids

View File

@@ -1,7 +1,6 @@
import {
createContext,
useContext,
useEffect,
useRef,
} from 'react'
import {
@@ -19,11 +18,13 @@ type Shape = {
export const createFileStore = (
value: FileEntity[] = [],
onChange?: (files: FileEntity[]) => void,
) => {
return create<Shape>(set => ({
files: value ? [...value] : [],
setFiles: (files) => {
set({ files })
onChange?.(files)
},
}))
}
@@ -54,35 +55,9 @@ export const FileContextProvider = ({
onChange,
}: FileProviderProps) => {
const storeRef = useRef<FileStore | undefined>(undefined)
const onChangeRef = useRef<FileProviderProps['onChange']>(onChange)
const isSyncingRef = useRef(false)
if (!storeRef.current)
storeRef.current = createFileStore(value)
// keep latest onChange
useEffect(() => {
onChangeRef.current = onChange
}, [onChange])
// subscribe to store changes and call latest onChange
useEffect(() => {
const store = storeRef.current!
const unsubscribe = store.subscribe((state: Shape) => {
if (isSyncingRef.current) return
onChangeRef.current?.(state.files)
})
return unsubscribe
}, [])
// sync external value into internal store when value changes
useEffect(() => {
const store = storeRef.current!
const nextFiles = value ? [...value] : []
isSyncingRef.current = true
store.setState({ files: nextFiles })
isSyncingRef.current = false
}, [value])
storeRef.current = createFileStore(value, onChange)
return (
<FileContext.Provider value={storeRef.current}>

View File

@@ -152,23 +152,20 @@ const Apps = ({
<div className={cn(
'mt-6 flex items-center justify-between px-12',
)}>
<>
<Category
list={categories}
value={currCategory}
onChange={setCurrCategory}
allCategoriesEn={allCategoriesEn}
/>
</>
<Category
list={categories}
value={currCategory}
onChange={setCurrCategory}
allCategoriesEn={allCategoriesEn}
/>
<Input
showLeftIcon
showClearIcon
wrapperClassName='w-[200px]'
wrapperClassName='w-[200px] self-start'
value={keywords}
onChange={e => handleKeywordsChange(e.target.value)}
onClear={() => handleKeywordsChange('')}
/>
</div>
<div className={cn(