Compare commits

..

5 Commits

Author SHA1 Message Date
zxhlyh
031565579a Merge branch 'main' into fix/workflow-sync-draft 2026-02-03 13:58:30 +08:00
zxhlyh
7815d33871 Merge branch 'main' into fix/workflow-sync-draft 2026-01-30 10:52:14 +08:00
zxhlyh
aeb41d3b2c Merge branch 'main' into fix/workflow-sync-draft 2026-01-30 09:39:18 +08:00
zxhlyh
4275aa729f Merge branch 'main' into fix/workflow-sync-draft 2026-01-27 10:47:51 +08:00
zxhlyh
0ed0a31ed6 fix: workflow sync draft 2026-01-26 14:41:12 +08:00
56 changed files with 2294 additions and 1494 deletions

View File

@@ -1,4 +1,3 @@
import logging
import uuid
from datetime import datetime
from typing import Any, Literal, TypeAlias
@@ -55,8 +54,6 @@ ALLOW_CREATE_APP_MODES = ["chat", "agent-chat", "advanced-chat", "workflow", "co
register_enum_models(console_ns, IconType)
_logger = logging.getLogger(__name__)
class AppListQuery(BaseModel):
page: int = Field(default=1, ge=1, le=99999, description="Page number (1-99999)")
@@ -502,7 +499,6 @@ class AppListApi(Resource):
select(Workflow).where(
Workflow.version == Workflow.VERSION_DRAFT,
Workflow.app_id.in_(workflow_capable_app_ids),
Workflow.tenant_id == current_tenant_id,
)
)
.scalars()
@@ -514,14 +510,12 @@ class AppListApi(Resource):
NodeType.TRIGGER_PLUGIN,
}
for workflow in draft_workflows:
node_id = None
try:
for node_id, node_data in workflow.walk_nodes():
for _, node_data in workflow.walk_nodes():
if node_data.get("type") in trigger_node_types:
draft_trigger_app_ids.add(str(workflow.app_id))
break
except Exception:
_logger.exception("error while walking nodes, workflow_id=%s, node_id=%s", workflow.id, node_id)
continue
for app in app_pagination.items:

View File

@@ -1,27 +1,14 @@
from typing import Literal
from uuid import UUID
from flask import request
from flask_restx import Namespace, Resource, fields, marshal_with
from pydantic import BaseModel, Field
from werkzeug.exceptions import Forbidden
from controllers.common.schema import register_schema_models
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, edit_permission_required, setup_required
from controllers.fastopenapi import console_router
from libs.login import current_account_with_tenant, login_required
from services.tag_service import TagService
dataset_tag_fields = {
"id": fields.String,
"name": fields.String,
"type": fields.String,
"binding_count": fields.String,
}
def build_dataset_tag_fields(api_or_ns: Namespace):
return api_or_ns.model("DataSetTag", dataset_tag_fields)
class TagBasePayload(BaseModel):
name: str = Field(description="Tag name", min_length=1, max_length=50)
@@ -45,115 +32,129 @@ class TagListQueryParam(BaseModel):
keyword: str | None = Field(None, description="Search keyword")
register_schema_models(
console_ns,
TagBasePayload,
TagBindingPayload,
TagBindingRemovePayload,
TagListQueryParam,
class TagResponse(BaseModel):
id: str = Field(description="Tag ID")
name: str = Field(description="Tag name")
type: str = Field(description="Tag type")
binding_count: int = Field(description="Number of bindings")
class TagBindingResult(BaseModel):
result: Literal["success"] = Field(description="Operation result", examples=["success"])
@console_router.get(
"/tags",
response_model=list[TagResponse],
tags=["console"],
)
@setup_required
@login_required
@account_initialization_required
def list_tags(query: TagListQueryParam) -> list[TagResponse]:
_, current_tenant_id = current_account_with_tenant()
tags = TagService.get_tags(query.type, current_tenant_id, query.keyword)
return [
TagResponse(
id=tag.id,
name=tag.name,
type=tag.type,
binding_count=int(tag.binding_count),
)
for tag in tags
]
@console_ns.route("/tags")
class TagListApi(Resource):
@setup_required
@login_required
@account_initialization_required
@console_ns.doc(
params={"type": 'Tag type filter. Can be "knowledge" or "app".', "keyword": "Search keyword for tag name."}
)
@marshal_with(dataset_tag_fields)
def get(self):
_, current_tenant_id = current_account_with_tenant()
raw_args = request.args.to_dict()
param = TagListQueryParam.model_validate(raw_args)
tags = TagService.get_tags(param.type, current_tenant_id, param.keyword)
@console_router.post(
"/tags",
response_model=TagResponse,
tags=["console"],
)
@setup_required
@login_required
@account_initialization_required
def create_tag(payload: TagBasePayload) -> TagResponse:
current_user, _ = current_account_with_tenant()
# The role of the current user in the tag table must be admin, owner, or editor
if not (current_user.has_edit_permission or current_user.is_dataset_editor):
raise Forbidden()
return tags, 200
tag = TagService.save_tags(payload.model_dump())
@console_ns.expect(console_ns.models[TagBasePayload.__name__])
@setup_required
@login_required
@account_initialization_required
def post(self):
current_user, _ = current_account_with_tenant()
# The role of the current user in the ta table must be admin, owner, or editor
if not (current_user.has_edit_permission or current_user.is_dataset_editor):
raise Forbidden()
payload = TagBasePayload.model_validate(console_ns.payload or {})
tag = TagService.save_tags(payload.model_dump())
response = {"id": tag.id, "name": tag.name, "type": tag.type, "binding_count": 0}
return response, 200
return TagResponse(id=tag.id, name=tag.name, type=tag.type, binding_count=0)
@console_ns.route("/tags/<uuid:tag_id>")
class TagUpdateDeleteApi(Resource):
@console_ns.expect(console_ns.models[TagBasePayload.__name__])
@setup_required
@login_required
@account_initialization_required
def patch(self, tag_id):
current_user, _ = current_account_with_tenant()
tag_id = str(tag_id)
# The role of the current user in the ta table must be admin, owner, or editor
if not (current_user.has_edit_permission or current_user.is_dataset_editor):
raise Forbidden()
@console_router.patch(
"/tags/<uuid:tag_id>",
response_model=TagResponse,
tags=["console"],
)
@setup_required
@login_required
@account_initialization_required
def update_tag(tag_id: UUID, payload: TagBasePayload) -> TagResponse:
current_user, _ = current_account_with_tenant()
tag_id_str = str(tag_id)
# The role of the current user in the ta table must be admin, owner, or editor
if not (current_user.has_edit_permission or current_user.is_dataset_editor):
raise Forbidden()
payload = TagBasePayload.model_validate(console_ns.payload or {})
tag = TagService.update_tags(payload.model_dump(), tag_id)
tag = TagService.update_tags(payload.model_dump(), tag_id_str)
binding_count = TagService.get_tag_binding_count(tag_id)
binding_count = TagService.get_tag_binding_count(tag_id_str)
response = {"id": tag.id, "name": tag.name, "type": tag.type, "binding_count": binding_count}
return response, 200
@setup_required
@login_required
@account_initialization_required
@edit_permission_required
def delete(self, tag_id):
tag_id = str(tag_id)
TagService.delete_tag(tag_id)
return 204
return TagResponse(id=tag.id, name=tag.name, type=tag.type, binding_count=binding_count)
@console_ns.route("/tag-bindings/create")
class TagBindingCreateApi(Resource):
@console_ns.expect(console_ns.models[TagBindingPayload.__name__])
@setup_required
@login_required
@account_initialization_required
def post(self):
current_user, _ = current_account_with_tenant()
# The role of the current user in the ta table must be admin, owner, editor, or dataset_operator
if not (current_user.has_edit_permission or current_user.is_dataset_editor):
raise Forbidden()
@console_router.delete(
"/tags/<uuid:tag_id>",
tags=["console"],
status_code=204,
)
@setup_required
@login_required
@account_initialization_required
@edit_permission_required
def delete_tag(tag_id: UUID) -> None:
tag_id_str = str(tag_id)
payload = TagBindingPayload.model_validate(console_ns.payload or {})
TagService.save_tag_binding(payload.model_dump())
return {"result": "success"}, 200
TagService.delete_tag(tag_id_str)
@console_ns.route("/tag-bindings/remove")
class TagBindingDeleteApi(Resource):
@console_ns.expect(console_ns.models[TagBindingRemovePayload.__name__])
@setup_required
@login_required
@account_initialization_required
def post(self):
current_user, _ = current_account_with_tenant()
# The role of the current user in the ta table must be admin, owner, editor, or dataset_operator
if not (current_user.has_edit_permission or current_user.is_dataset_editor):
raise Forbidden()
@console_router.post(
"/tag-bindings/create",
response_model=TagBindingResult,
tags=["console"],
)
@setup_required
@login_required
@account_initialization_required
def create_tag_binding(payload: TagBindingPayload) -> TagBindingResult:
current_user, _ = current_account_with_tenant()
# The role of the current user in the tag table must be admin, owner, editor, or dataset_operator
if not (current_user.has_edit_permission or current_user.is_dataset_editor):
raise Forbidden()
payload = TagBindingRemovePayload.model_validate(console_ns.payload or {})
TagService.delete_tag_binding(payload.model_dump())
TagService.save_tag_binding(payload.model_dump())
return {"result": "success"}, 200
return TagBindingResult(result="success")
@console_router.post(
"/tag-bindings/remove",
response_model=TagBindingResult,
tags=["console"],
)
@setup_required
@login_required
@account_initialization_required
def delete_tag_binding(payload: TagBindingRemovePayload) -> TagBindingResult:
current_user, _ = current_account_with_tenant()
# The role of the current user in the tag table must be admin, owner, editor, or dataset_operator
if not (current_user.has_edit_permission or current_user.is_dataset_editor):
raise Forbidden()
TagService.delete_tag_binding(payload.model_dump())
return TagBindingResult(result="success")

View File

@@ -1,6 +1,6 @@
[project]
name = "dify-api"
version = "1.12.1"
version = "1.11.4"
requires-python = ">=3.11,<3.13"
dependencies = [

View File

@@ -24,7 +24,7 @@ class TagService:
escaped_keyword = escape_like_pattern(keyword)
query = query.where(sa.and_(Tag.name.ilike(f"%{escaped_keyword}%", escape="\\")))
query = query.group_by(Tag.id, Tag.type, Tag.name, Tag.created_at)
results: list = query.order_by(Tag.created_at.desc()).all()
results = query.order_by(Tag.created_at.desc()).all()
return results
@staticmethod

View File

@@ -6,6 +6,7 @@ from celery import shared_task
from core.rag.datasource.vdb.vector_factory import Vector
from core.rag.models.document import Document
from extensions.ext_database import db
from models.dataset import Dataset
from services.dataset_service import DatasetCollectionBindingService
@@ -57,3 +58,5 @@ def add_annotation_to_index_task(
)
except Exception:
logger.exception("Build index for annotation failed")
finally:
db.session.close()

View File

@@ -5,6 +5,7 @@ import click
from celery import shared_task
from core.rag.datasource.vdb.vector_factory import Vector
from extensions.ext_database import db
from models.dataset import Dataset
from services.dataset_service import DatasetCollectionBindingService
@@ -39,3 +40,5 @@ def delete_annotation_index_task(annotation_id: str, app_id: str, tenant_id: str
logger.info(click.style(f"App annotations index deleted : {app_id} latency: {end_at - start_at}", fg="green"))
except Exception:
logger.exception("Annotation deleted index failed")
finally:
db.session.close()

View File

@@ -6,6 +6,7 @@ from celery import shared_task
from core.rag.datasource.vdb.vector_factory import Vector
from core.rag.models.document import Document
from extensions.ext_database import db
from models.dataset import Dataset
from services.dataset_service import DatasetCollectionBindingService
@@ -58,3 +59,5 @@ def update_annotation_to_index_task(
)
except Exception:
logger.exception("Build index for annotation failed")
finally:
db.session.close()

View File

@@ -14,9 +14,6 @@ from models.model import UploadFile
logger = logging.getLogger(__name__)
# Batch size for database operations to keep transactions short
BATCH_SIZE = 1000
@shared_task(queue="dataset")
def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form: str | None, file_ids: list[str]):
@@ -34,179 +31,63 @@ def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form
if not doc_form:
raise ValueError("doc_form is required")
storage_keys_to_delete: list[str] = []
index_node_ids: list[str] = []
segment_ids: list[str] = []
total_image_upload_file_ids: list[str] = []
with session_factory.create_session() as session:
try:
dataset = session.query(Dataset).where(Dataset.id == dataset_id).first()
if not dataset:
raise Exception("Document has no dataset")
session.query(DatasetMetadataBinding).where(
DatasetMetadataBinding.dataset_id == dataset_id,
DatasetMetadataBinding.document_id.in_(document_ids),
).delete(synchronize_session=False)
try:
# ============ Step 1: Query segment and file data (short read-only transaction) ============
with session_factory.create_session() as session:
# Get segments info
segments = session.scalars(
select(DocumentSegment).where(DocumentSegment.document_id.in_(document_ids))
).all()
# check segment is exist
if segments:
index_node_ids = [segment.index_node_id for segment in segments]
segment_ids = [segment.id for segment in segments]
index_processor = IndexProcessorFactory(doc_form).init_index_processor()
index_processor.clean(
dataset, index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True
)
# Collect image file IDs from segment content
for segment in segments:
image_upload_file_ids = get_image_upload_file_ids(segment.content)
total_image_upload_file_ids.extend(image_upload_file_ids)
# Query storage keys for image files
if total_image_upload_file_ids:
image_files = session.scalars(
select(UploadFile).where(UploadFile.id.in_(total_image_upload_file_ids))
).all()
storage_keys_to_delete.extend([f.key for f in image_files if f and f.key])
# Query storage keys for document files
image_files = session.query(UploadFile).where(UploadFile.id.in_(image_upload_file_ids)).all()
for image_file in image_files:
try:
if image_file and image_file.key:
storage.delete(image_file.key)
except Exception:
logger.exception(
"Delete image_files failed when storage deleted, \
image_upload_file_is: %s",
image_file.id,
)
stmt = delete(UploadFile).where(UploadFile.id.in_(image_upload_file_ids))
session.execute(stmt)
session.delete(segment)
if file_ids:
files = session.scalars(select(UploadFile).where(UploadFile.id.in_(file_ids))).all()
storage_keys_to_delete.extend([f.key for f in files if f and f.key])
for file in files:
try:
storage.delete(file.key)
except Exception:
logger.exception("Delete file failed when document deleted, file_id: %s", file.id)
stmt = delete(UploadFile).where(UploadFile.id.in_(file_ids))
session.execute(stmt)
# ============ Step 2: Clean vector index (external service, fresh session for dataset) ============
if index_node_ids:
try:
# Fetch dataset in a fresh session to avoid DetachedInstanceError
with session_factory.create_session() as session:
dataset = session.query(Dataset).where(Dataset.id == dataset_id).first()
if not dataset:
logger.warning("Dataset not found for vector index cleanup, dataset_id: %s", dataset_id)
else:
index_processor = IndexProcessorFactory(doc_form).init_index_processor()
index_processor.clean(
dataset, index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True
)
except Exception:
logger.exception(
"Failed to clean vector index for dataset_id: %s, document_ids: %s, index_node_ids count: %d",
dataset_id,
document_ids,
len(index_node_ids),
)
session.commit()
# ============ Step 3: Delete metadata binding (separate short transaction) ============
try:
with session_factory.create_session() as session:
deleted_count = (
session.query(DatasetMetadataBinding)
.where(
DatasetMetadataBinding.dataset_id == dataset_id,
DatasetMetadataBinding.document_id.in_(document_ids),
)
.delete(synchronize_session=False)
end_at = time.perf_counter()
logger.info(
click.style(
f"Cleaned documents when documents deleted latency: {end_at - start_at}",
fg="green",
)
session.commit()
logger.debug("Deleted %d metadata bindings for dataset_id: %s", deleted_count, dataset_id)
)
except Exception:
logger.exception(
"Failed to delete metadata bindings for dataset_id: %s, document_ids: %s",
dataset_id,
document_ids,
)
# ============ Step 4: Batch delete UploadFile records (multiple short transactions) ============
if total_image_upload_file_ids:
failed_batches = 0
total_batches = (len(total_image_upload_file_ids) + BATCH_SIZE - 1) // BATCH_SIZE
for i in range(0, len(total_image_upload_file_ids), BATCH_SIZE):
batch = total_image_upload_file_ids[i : i + BATCH_SIZE]
try:
with session_factory.create_session() as session:
stmt = delete(UploadFile).where(UploadFile.id.in_(batch))
session.execute(stmt)
session.commit()
except Exception:
failed_batches += 1
logger.exception(
"Failed to delete image UploadFile batch %d-%d for dataset_id: %s",
i,
i + len(batch),
dataset_id,
)
if failed_batches > 0:
logger.warning(
"Image UploadFile deletion: %d/%d batches failed for dataset_id: %s",
failed_batches,
total_batches,
dataset_id,
)
# ============ Step 5: Batch delete DocumentSegment records (multiple short transactions) ============
if segment_ids:
failed_batches = 0
total_batches = (len(segment_ids) + BATCH_SIZE - 1) // BATCH_SIZE
for i in range(0, len(segment_ids), BATCH_SIZE):
batch = segment_ids[i : i + BATCH_SIZE]
try:
with session_factory.create_session() as session:
segment_delete_stmt = delete(DocumentSegment).where(DocumentSegment.id.in_(batch))
session.execute(segment_delete_stmt)
session.commit()
except Exception:
failed_batches += 1
logger.exception(
"Failed to delete DocumentSegment batch %d-%d for dataset_id: %s, document_ids: %s",
i,
i + len(batch),
dataset_id,
document_ids,
)
if failed_batches > 0:
logger.warning(
"DocumentSegment deletion: %d/%d batches failed, document_ids: %s",
failed_batches,
total_batches,
document_ids,
)
# ============ Step 6: Delete document-associated files (separate short transaction) ============
if file_ids:
try:
with session_factory.create_session() as session:
stmt = delete(UploadFile).where(UploadFile.id.in_(file_ids))
session.execute(stmt)
session.commit()
except Exception:
logger.exception(
"Failed to delete document UploadFile records for dataset_id: %s, file_ids: %s",
dataset_id,
file_ids,
)
# ============ Step 7: Delete storage files (I/O operations, no DB transaction) ============
storage_delete_failures = 0
for storage_key in storage_keys_to_delete:
try:
storage.delete(storage_key)
except Exception:
storage_delete_failures += 1
logger.exception("Failed to delete file from storage, key: %s", storage_key)
if storage_delete_failures > 0:
logger.warning(
"Storage file deletion completed with %d failures out of %d total files for dataset_id: %s",
storage_delete_failures,
len(storage_keys_to_delete),
dataset_id,
)
end_at = time.perf_counter()
logger.info(
click.style(
f"Cleaned documents when documents deleted latency: {end_at - start_at:.2f}s, "
f"dataset_id: {dataset_id}, document_ids: {document_ids}, "
f"segments: {len(segment_ids)}, image_files: {len(total_image_upload_file_ids)}, "
f"storage_files: {len(storage_keys_to_delete)}",
fg="green",
)
)
except Exception:
logger.exception(
"Batch clean documents failed for dataset_id: %s, document_ids: %s",
dataset_id,
document_ids,
)
logger.exception("Cleaned documents when documents deleted failed")

View File

@@ -48,11 +48,6 @@ def batch_create_segment_to_index_task(
indexing_cache_key = f"segment_batch_import_{job_id}"
# Initialize variables with default values
upload_file_key: str | None = None
dataset_config: dict | None = None
document_config: dict | None = None
with session_factory.create_session() as session:
try:
dataset = session.get(Dataset, dataset_id)
@@ -74,115 +69,86 @@ def batch_create_segment_to_index_task(
if not upload_file:
raise ValueError("UploadFile not found.")
dataset_config = {
"id": dataset.id,
"indexing_technique": dataset.indexing_technique,
"tenant_id": dataset.tenant_id,
"embedding_model_provider": dataset.embedding_model_provider,
"embedding_model": dataset.embedding_model,
}
with tempfile.TemporaryDirectory() as temp_dir:
suffix = Path(upload_file.key).suffix
file_path = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}" # type: ignore
storage.download(upload_file.key, file_path)
document_config = {
"id": dataset_document.id,
"doc_form": dataset_document.doc_form,
"word_count": dataset_document.word_count or 0,
}
df = pd.read_csv(file_path)
content = []
for _, row in df.iterrows():
if dataset_document.doc_form == "qa_model":
data = {"content": row.iloc[0], "answer": row.iloc[1]}
else:
data = {"content": row.iloc[0]}
content.append(data)
if len(content) == 0:
raise ValueError("The CSV file is empty.")
upload_file_key = upload_file.key
document_segments = []
embedding_model = None
if dataset.indexing_technique == "high_quality":
model_manager = ModelManager()
embedding_model = model_manager.get_model_instance(
tenant_id=dataset.tenant_id,
provider=dataset.embedding_model_provider,
model_type=ModelType.TEXT_EMBEDDING,
model=dataset.embedding_model,
)
except Exception:
logger.exception("Segments batch created index failed")
redis_client.setex(indexing_cache_key, 600, "error")
return
# Ensure required variables are set before proceeding
if upload_file_key is None or dataset_config is None or document_config is None:
logger.error("Required configuration not set due to session error")
redis_client.setex(indexing_cache_key, 600, "error")
return
with tempfile.TemporaryDirectory() as temp_dir:
suffix = Path(upload_file_key).suffix
file_path = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}" # type: ignore
storage.download(upload_file_key, file_path)
df = pd.read_csv(file_path)
content = []
for _, row in df.iterrows():
if document_config["doc_form"] == "qa_model":
data = {"content": row.iloc[0], "answer": row.iloc[1]}
word_count_change = 0
if embedding_model:
tokens_list = embedding_model.get_text_embedding_num_tokens(
texts=[segment["content"] for segment in content]
)
else:
data = {"content": row.iloc[0]}
content.append(data)
if len(content) == 0:
raise ValueError("The CSV file is empty.")
tokens_list = [0] * len(content)
document_segments = []
embedding_model = None
if dataset_config["indexing_technique"] == "high_quality":
model_manager = ModelManager()
embedding_model = model_manager.get_model_instance(
tenant_id=dataset_config["tenant_id"],
provider=dataset_config["embedding_model_provider"],
model_type=ModelType.TEXT_EMBEDDING,
model=dataset_config["embedding_model"],
)
for segment, tokens in zip(content, tokens_list):
content = segment["content"]
doc_id = str(uuid.uuid4())
segment_hash = helper.generate_text_hash(content)
max_position = (
session.query(func.max(DocumentSegment.position))
.where(DocumentSegment.document_id == dataset_document.id)
.scalar()
)
segment_document = DocumentSegment(
tenant_id=tenant_id,
dataset_id=dataset_id,
document_id=document_id,
index_node_id=doc_id,
index_node_hash=segment_hash,
position=max_position + 1 if max_position else 1,
content=content,
word_count=len(content),
tokens=tokens,
created_by=user_id,
indexing_at=naive_utc_now(),
status="completed",
completed_at=naive_utc_now(),
)
if dataset_document.doc_form == "qa_model":
segment_document.answer = segment["answer"]
segment_document.word_count += len(segment["answer"])
word_count_change += segment_document.word_count
session.add(segment_document)
document_segments.append(segment_document)
word_count_change = 0
if embedding_model:
tokens_list = embedding_model.get_text_embedding_num_tokens(texts=[segment["content"] for segment in content])
else:
tokens_list = [0] * len(content)
with session_factory.create_session() as session, session.begin():
for segment, tokens in zip(content, tokens_list):
content = segment["content"]
doc_id = str(uuid.uuid4())
segment_hash = helper.generate_text_hash(content)
max_position = (
session.query(func.max(DocumentSegment.position))
.where(DocumentSegment.document_id == document_config["id"])
.scalar()
)
segment_document = DocumentSegment(
tenant_id=tenant_id,
dataset_id=dataset_id,
document_id=document_id,
index_node_id=doc_id,
index_node_hash=segment_hash,
position=max_position + 1 if max_position else 1,
content=content,
word_count=len(content),
tokens=tokens,
created_by=user_id,
indexing_at=naive_utc_now(),
status="completed",
completed_at=naive_utc_now(),
)
if document_config["doc_form"] == "qa_model":
segment_document.answer = segment["answer"]
segment_document.word_count += len(segment["answer"])
word_count_change += segment_document.word_count
session.add(segment_document)
document_segments.append(segment_document)
with session_factory.create_session() as session, session.begin():
dataset_document = session.get(Document, document_id)
if dataset_document:
assert dataset_document.word_count is not None
dataset_document.word_count += word_count_change
session.add(dataset_document)
with session_factory.create_session() as session:
dataset = session.get(Dataset, dataset_id)
if dataset:
VectorService.create_segments_vector(None, document_segments, dataset, document_config["doc_form"])
redis_client.setex(indexing_cache_key, 600, "completed")
end_at = time.perf_counter()
logger.info(
click.style(
f"Segment batch created job: {job_id} latency: {end_at - start_at}",
fg="green",
)
)
VectorService.create_segments_vector(None, document_segments, dataset, dataset_document.doc_form)
session.commit()
redis_client.setex(indexing_cache_key, 600, "completed")
end_at = time.perf_counter()
logger.info(
click.style(
f"Segment batch created job: {job_id} latency: {end_at - start_at}",
fg="green",
)
)
except Exception:
logger.exception("Segments batch created index failed")
redis_client.setex(indexing_cache_key, 600, "error")

View File

@@ -28,7 +28,6 @@ def clean_document_task(document_id: str, dataset_id: str, doc_form: str, file_i
"""
logger.info(click.style(f"Start clean document when document deleted: {document_id}", fg="green"))
start_at = time.perf_counter()
total_attachment_files = []
with session_factory.create_session() as session:
try:
@@ -48,91 +47,78 @@ def clean_document_task(document_id: str, dataset_id: str, doc_form: str, file_i
SegmentAttachmentBinding.document_id == document_id,
)
).all()
attachment_ids = [attachment_file.id for _, attachment_file in attachments_with_bindings]
binding_ids = [binding.id for binding, _ in attachments_with_bindings]
total_attachment_files.extend([attachment_file.key for _, attachment_file in attachments_with_bindings])
index_node_ids = [segment.index_node_id for segment in segments]
segment_contents = [segment.content for segment in segments]
except Exception:
logger.exception("Cleaned document when document deleted failed")
return
# check segment is exist
if index_node_ids:
index_processor = IndexProcessorFactory(doc_form).init_index_processor()
with session_factory.create_session() as session:
dataset = session.query(Dataset).where(Dataset.id == dataset_id).first()
if dataset:
# check segment is exist
if segments:
index_node_ids = [segment.index_node_id for segment in segments]
index_processor = IndexProcessorFactory(doc_form).init_index_processor()
index_processor.clean(
dataset, index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True
)
total_image_files = []
with session_factory.create_session() as session, session.begin():
for segment_content in segment_contents:
image_upload_file_ids = get_image_upload_file_ids(segment_content)
image_files = session.scalars(select(UploadFile).where(UploadFile.id.in_(image_upload_file_ids))).all()
total_image_files.extend([image_file.key for image_file in image_files])
image_file_delete_stmt = delete(UploadFile).where(UploadFile.id.in_(image_upload_file_ids))
session.execute(image_file_delete_stmt)
for segment in segments:
image_upload_file_ids = get_image_upload_file_ids(segment.content)
image_files = session.scalars(
select(UploadFile).where(UploadFile.id.in_(image_upload_file_ids))
).all()
for image_file in image_files:
if image_file is None:
continue
try:
storage.delete(image_file.key)
except Exception:
logger.exception(
"Delete image_files failed when storage deleted, \
image_upload_file_is: %s",
image_file.id,
)
with session_factory.create_session() as session, session.begin():
segment_delete_stmt = delete(DocumentSegment).where(DocumentSegment.document_id == document_id)
session.execute(segment_delete_stmt)
image_file_delete_stmt = delete(UploadFile).where(UploadFile.id.in_(image_upload_file_ids))
session.execute(image_file_delete_stmt)
session.delete(segment)
for image_file_key in total_image_files:
try:
storage.delete(image_file_key)
except Exception:
logger.exception(
"Delete image_files failed when storage deleted, \
image_upload_file_is: %s",
image_file_key,
session.commit()
if file_id:
file = session.query(UploadFile).where(UploadFile.id == file_id).first()
if file:
try:
storage.delete(file.key)
except Exception:
logger.exception("Delete file failed when document deleted, file_id: %s", file_id)
session.delete(file)
# delete segment attachments
if attachments_with_bindings:
attachment_ids = [attachment_file.id for _, attachment_file in attachments_with_bindings]
binding_ids = [binding.id for binding, _ in attachments_with_bindings]
for binding, attachment_file in attachments_with_bindings:
try:
storage.delete(attachment_file.key)
except Exception:
logger.exception(
"Delete attachment_file failed when storage deleted, \
attachment_file_id: %s",
binding.attachment_id,
)
attachment_file_delete_stmt = delete(UploadFile).where(UploadFile.id.in_(attachment_ids))
session.execute(attachment_file_delete_stmt)
binding_delete_stmt = delete(SegmentAttachmentBinding).where(
SegmentAttachmentBinding.id.in_(binding_ids)
)
session.execute(binding_delete_stmt)
# delete dataset metadata binding
session.query(DatasetMetadataBinding).where(
DatasetMetadataBinding.dataset_id == dataset_id,
DatasetMetadataBinding.document_id == document_id,
).delete()
session.commit()
end_at = time.perf_counter()
logger.info(
click.style(
f"Cleaned document when document deleted: {document_id} latency: {end_at - start_at}",
fg="green",
)
)
with session_factory.create_session() as session, session.begin():
if file_id:
file = session.query(UploadFile).where(UploadFile.id == file_id).first()
if file:
try:
storage.delete(file.key)
except Exception:
logger.exception("Delete file failed when document deleted, file_id: %s", file_id)
session.delete(file)
with session_factory.create_session() as session, session.begin():
# delete segment attachments
if attachment_ids:
attachment_file_delete_stmt = delete(UploadFile).where(UploadFile.id.in_(attachment_ids))
session.execute(attachment_file_delete_stmt)
if binding_ids:
binding_delete_stmt = delete(SegmentAttachmentBinding).where(SegmentAttachmentBinding.id.in_(binding_ids))
session.execute(binding_delete_stmt)
for attachment_file_key in total_attachment_files:
try:
storage.delete(attachment_file_key)
except Exception:
logger.exception(
"Delete attachment_file failed when storage deleted, \
attachment_file_id: %s",
attachment_file_key,
)
with session_factory.create_session() as session, session.begin():
# delete dataset metadata binding
session.query(DatasetMetadataBinding).where(
DatasetMetadataBinding.dataset_id == dataset_id,
DatasetMetadataBinding.document_id == document_id,
).delete()
end_at = time.perf_counter()
logger.info(
click.style(
f"Cleaned document when document deleted: {document_id} latency: {end_at - start_at}",
fg="green",
)
)
logger.exception("Cleaned document when document deleted failed")

View File

@@ -3,7 +3,6 @@ import time
import click
from celery import shared_task
from sqlalchemy import delete
from core.db.session_factory import session_factory
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
@@ -68,14 +67,8 @@ def delete_segment_from_index_task(
if segment_attachment_bindings:
attachment_ids = [binding.attachment_id for binding in segment_attachment_bindings]
index_processor.clean(dataset=dataset, node_ids=attachment_ids, with_keywords=False)
segment_attachment_bind_ids = [i.id for i in segment_attachment_bindings]
for i in range(0, len(segment_attachment_bind_ids), 1000):
segment_attachment_bind_delete_stmt = delete(SegmentAttachmentBinding).where(
SegmentAttachmentBinding.id.in_(segment_attachment_bind_ids[i : i + 1000])
)
session.execute(segment_attachment_bind_delete_stmt)
for binding in segment_attachment_bindings:
session.delete(binding)
# delete upload file
session.query(UploadFile).where(UploadFile.id.in_(attachment_ids)).delete(synchronize_session=False)
session.commit()

View File

@@ -28,7 +28,7 @@ def document_indexing_sync_task(dataset_id: str, document_id: str):
logger.info(click.style(f"Start sync document: {document_id}", fg="green"))
start_at = time.perf_counter()
with session_factory.create_session() as session, session.begin():
with session_factory.create_session() as session:
document = session.query(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).first()
if not document:
@@ -68,6 +68,7 @@ def document_indexing_sync_task(dataset_id: str, document_id: str):
document.indexing_status = "error"
document.error = "Datasource credential not found. Please reconnect your Notion workspace."
document.stopped_at = naive_utc_now()
session.commit()
return
loader = NotionExtractor(
@@ -84,6 +85,7 @@ def document_indexing_sync_task(dataset_id: str, document_id: str):
if last_edited_time != page_edited_time:
document.indexing_status = "parsing"
document.processing_started_at = naive_utc_now()
session.commit()
# delete all document segment and index
try:

View File

@@ -81,35 +81,26 @@ def _document_indexing(dataset_id: str, document_ids: Sequence[str]):
session.commit()
return
# Phase 1: Update status to parsing (short transaction)
with session_factory.create_session() as session, session.begin():
documents = (
session.query(Document).where(Document.id.in_(document_ids), Document.dataset_id == dataset_id).all()
)
for document_id in document_ids:
logger.info(click.style(f"Start process document: {document_id}", fg="green"))
document = (
session.query(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).first()
)
for document in documents:
if document:
document.indexing_status = "parsing"
document.processing_started_at = naive_utc_now()
documents.append(document)
session.add(document)
# Transaction committed and closed
session.commit()
# Phase 2: Execute indexing (no transaction - IndexingRunner creates its own sessions)
has_error = False
try:
indexing_runner = IndexingRunner()
indexing_runner.run(documents)
end_at = time.perf_counter()
logger.info(click.style(f"Processed dataset: {dataset_id} latency: {end_at - start_at}", fg="green"))
except DocumentIsPausedError as ex:
logger.info(click.style(str(ex), fg="yellow"))
has_error = True
except Exception:
logger.exception("Document indexing task failed, dataset_id: %s", dataset_id)
has_error = True
try:
indexing_runner = IndexingRunner()
indexing_runner.run(documents)
end_at = time.perf_counter()
logger.info(click.style(f"Processed dataset: {dataset_id} latency: {end_at - start_at}", fg="green"))
if not has_error:
with session_factory.create_session() as session:
# Trigger summary index generation for completed documents if enabled
# Only generate for high_quality indexing technique and when summary_index_setting is enabled
# Re-query dataset to get latest summary_index_setting (in case it was updated)
@@ -124,18 +115,17 @@ def _document_indexing(dataset_id: str, document_ids: Sequence[str]):
# expire all session to get latest document's indexing status
session.expire_all()
# Check each document's indexing status and trigger summary generation if completed
documents = (
session.query(Document)
.where(Document.id.in_(document_ids), Document.dataset_id == dataset_id)
.all()
)
for document in documents:
for document_id in document_ids:
# Re-query document to get latest status (IndexingRunner may have updated it)
document = (
session.query(Document)
.where(Document.id == document_id, Document.dataset_id == dataset_id)
.first()
)
if document:
logger.info(
"Checking document %s for summary generation: status=%s, doc_form=%s, need_summary=%s",
document.id,
document_id,
document.indexing_status,
document.doc_form,
document.need_summary,
@@ -146,36 +136,46 @@ def _document_indexing(dataset_id: str, document_ids: Sequence[str]):
and document.need_summary is True
):
try:
generate_summary_index_task.delay(dataset.id, document.id, None)
generate_summary_index_task.delay(dataset.id, document_id, None)
logger.info(
"Queued summary index generation task for document %s in dataset %s "
"after indexing completed",
document.id,
document_id,
dataset.id,
)
except Exception:
logger.exception(
"Failed to queue summary index generation task for document %s",
document.id,
document_id,
)
# Don't fail the entire indexing process if summary task queuing fails
else:
logger.info(
"Skipping summary generation for document %s: "
"status=%s, doc_form=%s, need_summary=%s",
document.id,
document_id,
document.indexing_status,
document.doc_form,
document.need_summary,
)
else:
logger.warning("Document %s not found after indexing", document.id)
logger.warning("Document %s not found after indexing", document_id)
else:
logger.info(
"Summary index generation skipped for dataset %s: summary_index_setting.enable=%s",
dataset.id,
summary_index_setting.get("enable") if summary_index_setting else None,
)
else:
logger.info(
"Summary index generation skipped for dataset %s: indexing_technique=%s (not 'high_quality')",
dataset.id,
dataset.indexing_technique,
)
except DocumentIsPausedError as ex:
logger.info(click.style(str(ex), fg="yellow"))
except Exception:
logger.exception("Document indexing task failed, dataset_id: %s", dataset_id)
def _document_indexing_with_tenant_queue(

View File

@@ -8,6 +8,7 @@ from sqlalchemy import delete, select
from core.db.session_factory import session_factory
from core.indexing_runner import DocumentIsPausedError, IndexingRunner
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.dataset import Dataset, Document, DocumentSegment
@@ -26,7 +27,7 @@ def document_indexing_update_task(dataset_id: str, document_id: str):
logger.info(click.style(f"Start update document: {document_id}", fg="green"))
start_at = time.perf_counter()
with session_factory.create_session() as session, session.begin():
with session_factory.create_session() as session:
document = session.query(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).first()
if not document:
@@ -35,6 +36,7 @@ def document_indexing_update_task(dataset_id: str, document_id: str):
document.indexing_status = "parsing"
document.processing_started_at = naive_utc_now()
session.commit()
# delete all document segment and index
try:
@@ -54,7 +56,7 @@ def document_indexing_update_task(dataset_id: str, document_id: str):
segment_ids = [segment.id for segment in segments]
segment_delete_stmt = delete(DocumentSegment).where(DocumentSegment.id.in_(segment_ids))
session.execute(segment_delete_stmt)
db.session.commit()
end_at = time.perf_counter()
logger.info(
click.style(

View File

@@ -259,8 +259,8 @@ def _delete_app_workflow_app_logs(tenant_id: str, app_id: str):
def _delete_app_workflow_archive_logs(tenant_id: str, app_id: str):
def del_workflow_archive_log(session, workflow_archive_log_id: str):
session.query(WorkflowArchiveLog).where(WorkflowArchiveLog.id == workflow_archive_log_id).delete(
def del_workflow_archive_log(workflow_archive_log_id: str):
db.session.query(WorkflowArchiveLog).where(WorkflowArchiveLog.id == workflow_archive_log_id).delete(
synchronize_session=False
)
@@ -420,7 +420,7 @@ def delete_draft_variables_batch(app_id: str, batch_size: int = 1000) -> int:
total_files_deleted = 0
while True:
with session_factory.create_session() as session, session.begin():
with session_factory.create_session() as session:
# Get a batch of draft variable IDs along with their file_ids
query_sql = """
SELECT id, file_id FROM workflow_draft_variables

View File

@@ -6,8 +6,9 @@ improving performance by offloading storage operations to background workers.
"""
from celery import shared_task # type: ignore[import-untyped]
from sqlalchemy.orm import Session
from core.db.session_factory import session_factory
from extensions.ext_database import db
from services.workflow_draft_variable_service import DraftVarFileDeletion, WorkflowDraftVariableService
@@ -16,6 +17,6 @@ def save_workflow_execution_task(
self,
deletions: list[DraftVarFileDeletion],
):
with session_factory.create_session() as session, session.begin():
with Session(bind=db.engine) as session, session.begin():
srv = WorkflowDraftVariableService(session=session)
srv.delete_workflow_draft_variable_file(deletions=deletions)

View File

@@ -10,10 +10,7 @@ from models import Tenant
from models.enums import CreatorUserRole
from models.model import App, UploadFile
from models.workflow import WorkflowDraftVariable, WorkflowDraftVariableFile
from tasks.remove_app_and_related_data_task import (
_delete_draft_variables,
delete_draft_variables_batch,
)
from tasks.remove_app_and_related_data_task import _delete_draft_variables, delete_draft_variables_batch
@pytest.fixture
@@ -300,18 +297,12 @@ class TestDeleteDraftVariablesWithOffloadIntegration:
def test_delete_draft_variables_with_offload_data(self, mock_storage, setup_offload_test_data):
data = setup_offload_test_data
app_id = data["app"].id
upload_file_ids = [uf.id for uf in data["upload_files"]]
variable_file_ids = [vf.id for vf in data["variable_files"]]
mock_storage.delete.return_value = None
with session_factory.create_session() as session:
draft_vars_before = session.query(WorkflowDraftVariable).filter_by(app_id=app_id).count()
var_files_before = (
session.query(WorkflowDraftVariableFile)
.where(WorkflowDraftVariableFile.id.in_(variable_file_ids))
.count()
)
upload_files_before = session.query(UploadFile).where(UploadFile.id.in_(upload_file_ids)).count()
var_files_before = session.query(WorkflowDraftVariableFile).count()
upload_files_before = session.query(UploadFile).count()
assert draft_vars_before == 3
assert var_files_before == 2
assert upload_files_before == 2
@@ -324,12 +315,8 @@ class TestDeleteDraftVariablesWithOffloadIntegration:
assert draft_vars_after == 0
with session_factory.create_session() as session:
var_files_after = (
session.query(WorkflowDraftVariableFile)
.where(WorkflowDraftVariableFile.id.in_(variable_file_ids))
.count()
)
upload_files_after = session.query(UploadFile).where(UploadFile.id.in_(upload_file_ids)).count()
var_files_after = session.query(WorkflowDraftVariableFile).count()
upload_files_after = session.query(UploadFile).count()
assert var_files_after == 0
assert upload_files_after == 0
@@ -342,8 +329,6 @@ class TestDeleteDraftVariablesWithOffloadIntegration:
def test_delete_draft_variables_storage_failure_continues_cleanup(self, mock_storage, setup_offload_test_data):
data = setup_offload_test_data
app_id = data["app"].id
upload_file_ids = [uf.id for uf in data["upload_files"]]
variable_file_ids = [vf.id for vf in data["variable_files"]]
mock_storage.delete.side_effect = [Exception("Storage error"), None]
deleted_count = delete_draft_variables_batch(app_id, batch_size=10)
@@ -354,12 +339,8 @@ class TestDeleteDraftVariablesWithOffloadIntegration:
assert draft_vars_after == 0
with session_factory.create_session() as session:
var_files_after = (
session.query(WorkflowDraftVariableFile)
.where(WorkflowDraftVariableFile.id.in_(variable_file_ids))
.count()
)
upload_files_after = session.query(UploadFile).where(UploadFile.id.in_(upload_file_ids)).count()
var_files_after = session.query(WorkflowDraftVariableFile).count()
upload_files_after = session.query(UploadFile).count()
assert var_files_after == 0
assert upload_files_after == 0
@@ -414,275 +395,3 @@ class TestDeleteDraftVariablesWithOffloadIntegration:
if app2_obj:
session.delete(app2_obj)
session.commit()
class TestDeleteDraftVariablesSessionCommit:
"""Test suite to verify session commit behavior in delete_draft_variables_batch."""
@pytest.fixture
def setup_offload_test_data(self, app_and_tenant):
"""Create test data with offload files for session commit tests."""
from core.variables.types import SegmentType
from libs.datetime_utils import naive_utc_now
tenant, app = app_and_tenant
with session_factory.create_session() as session:
upload_file1 = UploadFile(
tenant_id=tenant.id,
storage_type="local",
key="test/file1.json",
name="file1.json",
size=1024,
extension="json",
mime_type="application/json",
created_by_role=CreatorUserRole.ACCOUNT,
created_by=str(uuid.uuid4()),
created_at=naive_utc_now(),
used=False,
)
upload_file2 = UploadFile(
tenant_id=tenant.id,
storage_type="local",
key="test/file2.json",
name="file2.json",
size=2048,
extension="json",
mime_type="application/json",
created_by_role=CreatorUserRole.ACCOUNT,
created_by=str(uuid.uuid4()),
created_at=naive_utc_now(),
used=False,
)
session.add(upload_file1)
session.add(upload_file2)
session.flush()
var_file1 = WorkflowDraftVariableFile(
tenant_id=tenant.id,
app_id=app.id,
user_id=str(uuid.uuid4()),
upload_file_id=upload_file1.id,
size=1024,
length=10,
value_type=SegmentType.STRING,
)
var_file2 = WorkflowDraftVariableFile(
tenant_id=tenant.id,
app_id=app.id,
user_id=str(uuid.uuid4()),
upload_file_id=upload_file2.id,
size=2048,
length=20,
value_type=SegmentType.OBJECT,
)
session.add(var_file1)
session.add(var_file2)
session.flush()
draft_var1 = WorkflowDraftVariable.new_node_variable(
app_id=app.id,
node_id="node_1",
name="large_var_1",
value=StringSegment(value="truncated..."),
node_execution_id=str(uuid.uuid4()),
file_id=var_file1.id,
)
draft_var2 = WorkflowDraftVariable.new_node_variable(
app_id=app.id,
node_id="node_2",
name="large_var_2",
value=StringSegment(value="truncated..."),
node_execution_id=str(uuid.uuid4()),
file_id=var_file2.id,
)
draft_var3 = WorkflowDraftVariable.new_node_variable(
app_id=app.id,
node_id="node_3",
name="regular_var",
value=StringSegment(value="regular_value"),
node_execution_id=str(uuid.uuid4()),
)
session.add(draft_var1)
session.add(draft_var2)
session.add(draft_var3)
session.commit()
data = {
"app": app,
"tenant": tenant,
"upload_files": [upload_file1, upload_file2],
"variable_files": [var_file1, var_file2],
"draft_variables": [draft_var1, draft_var2, draft_var3],
}
yield data
with session_factory.create_session() as session:
for table, ids in [
(WorkflowDraftVariable, [v.id for v in data["draft_variables"]]),
(WorkflowDraftVariableFile, [vf.id for vf in data["variable_files"]]),
(UploadFile, [uf.id for uf in data["upload_files"]]),
]:
cleanup_query = delete(table).where(table.id.in_(ids)).execution_options(synchronize_session=False)
session.execute(cleanup_query)
session.commit()
@pytest.fixture
def setup_commit_test_data(self, app_and_tenant):
"""Create test data for session commit tests."""
tenant, app = app_and_tenant
variable_ids: list[str] = []
with session_factory.create_session() as session:
variables = []
for i in range(10):
var = WorkflowDraftVariable.new_node_variable(
app_id=app.id,
node_id=f"node_{i}",
name=f"var_{i}",
value=StringSegment(value="test_value"),
node_execution_id=str(uuid.uuid4()),
)
session.add(var)
variables.append(var)
session.commit()
variable_ids = [v.id for v in variables]
yield {
"app": app,
"tenant": tenant,
"variable_ids": variable_ids,
}
with session_factory.create_session() as session:
cleanup_query = (
delete(WorkflowDraftVariable)
.where(WorkflowDraftVariable.id.in_(variable_ids))
.execution_options(synchronize_session=False)
)
session.execute(cleanup_query)
session.commit()
def test_session_commit_is_called_after_each_batch(self, setup_commit_test_data):
"""Test that session.begin() is used for automatic transaction management."""
data = setup_commit_test_data
app_id = data["app"].id
# Since session.begin() is used, the transaction is automatically committed
# when the with block exits successfully. We verify this by checking that
# data is actually persisted.
deleted_count = delete_draft_variables_batch(app_id, batch_size=3)
# Verify all data was deleted (proves transaction was committed)
with session_factory.create_session() as session:
remaining_count = session.query(WorkflowDraftVariable).filter_by(app_id=app_id).count()
assert deleted_count == 10
assert remaining_count == 0
def test_data_persisted_after_batch_deletion(self, setup_commit_test_data):
"""Test that data is actually persisted to database after batch deletion with commits."""
data = setup_commit_test_data
app_id = data["app"].id
variable_ids = data["variable_ids"]
# Verify initial state
with session_factory.create_session() as session:
initial_count = session.query(WorkflowDraftVariable).filter_by(app_id=app_id).count()
assert initial_count == 10
# Perform deletion with small batch size to force multiple commits
deleted_count = delete_draft_variables_batch(app_id, batch_size=3)
assert deleted_count == 10
# Verify all data is deleted in a new session (proves commits worked)
with session_factory.create_session() as session:
final_count = session.query(WorkflowDraftVariable).filter_by(app_id=app_id).count()
assert final_count == 0
# Verify specific IDs are deleted
with session_factory.create_session() as session:
remaining_vars = (
session.query(WorkflowDraftVariable).where(WorkflowDraftVariable.id.in_(variable_ids)).count()
)
assert remaining_vars == 0
def test_session_commit_with_empty_dataset(self, setup_commit_test_data):
"""Test session behavior when deleting from an empty dataset."""
nonexistent_app_id = str(uuid.uuid4())
# Should not raise any errors and should return 0
deleted_count = delete_draft_variables_batch(nonexistent_app_id, batch_size=10)
assert deleted_count == 0
def test_session_commit_with_single_batch(self, setup_commit_test_data):
"""Test that commit happens correctly when all data fits in a single batch."""
data = setup_commit_test_data
app_id = data["app"].id
with session_factory.create_session() as session:
initial_count = session.query(WorkflowDraftVariable).filter_by(app_id=app_id).count()
assert initial_count == 10
# Delete all in a single batch
deleted_count = delete_draft_variables_batch(app_id, batch_size=100)
assert deleted_count == 10
# Verify data is persisted
with session_factory.create_session() as session:
final_count = session.query(WorkflowDraftVariable).filter_by(app_id=app_id).count()
assert final_count == 0
def test_invalid_batch_size_raises_error(self, setup_commit_test_data):
"""Test that invalid batch size raises ValueError."""
data = setup_commit_test_data
app_id = data["app"].id
with pytest.raises(ValueError, match="batch_size must be positive"):
delete_draft_variables_batch(app_id, batch_size=0)
with pytest.raises(ValueError, match="batch_size must be positive"):
delete_draft_variables_batch(app_id, batch_size=-1)
@patch("extensions.ext_storage.storage")
def test_session_commit_with_offload_data_cleanup(self, mock_storage, setup_offload_test_data):
"""Test that session commits correctly when cleaning up offload data."""
data = setup_offload_test_data
app_id = data["app"].id
upload_file_ids = [uf.id for uf in data["upload_files"]]
mock_storage.delete.return_value = None
# Verify initial state
with session_factory.create_session() as session:
draft_vars_before = session.query(WorkflowDraftVariable).filter_by(app_id=app_id).count()
var_files_before = (
session.query(WorkflowDraftVariableFile)
.where(WorkflowDraftVariableFile.id.in_([vf.id for vf in data["variable_files"]]))
.count()
)
upload_files_before = session.query(UploadFile).where(UploadFile.id.in_(upload_file_ids)).count()
assert draft_vars_before == 3
assert var_files_before == 2
assert upload_files_before == 2
# Delete variables with offload data
deleted_count = delete_draft_variables_batch(app_id, batch_size=10)
assert deleted_count == 3
# Verify all data is persisted (deleted) in new session
with session_factory.create_session() as session:
draft_vars_after = session.query(WorkflowDraftVariable).filter_by(app_id=app_id).count()
var_files_after = (
session.query(WorkflowDraftVariableFile)
.where(WorkflowDraftVariableFile.id.in_([vf.id for vf in data["variable_files"]]))
.count()
)
upload_files_after = session.query(UploadFile).where(UploadFile.id.in_(upload_file_ids)).count()
assert draft_vars_after == 0
assert var_files_after == 0
assert upload_files_after == 0
# Verify storage cleanup was called
assert mock_storage.delete.call_count == 2

View File

@@ -605,20 +605,26 @@ class TestBatchCreateSegmentToIndexTask:
mock_storage.download.side_effect = mock_download
# Execute the task - should raise ValueError for empty CSV
# Execute the task
job_id = str(uuid.uuid4())
with pytest.raises(ValueError, match="The CSV file is empty"):
batch_create_segment_to_index_task(
job_id=job_id,
upload_file_id=upload_file.id,
dataset_id=dataset.id,
document_id=document.id,
tenant_id=tenant.id,
user_id=account.id,
)
batch_create_segment_to_index_task(
job_id=job_id,
upload_file_id=upload_file.id,
dataset_id=dataset.id,
document_id=document.id,
tenant_id=tenant.id,
user_id=account.id,
)
# Verify error handling
# Since exception was raised, no segments should be created
# Check Redis cache was set to error status
from extensions.ext_redis import redis_client
cache_key = f"segment_batch_import_{job_id}"
cache_value = redis_client.get(cache_key)
assert cache_value == b"error"
# Verify no segments were created
from extensions.ext_database import db
segments = db.session.query(DocumentSegment).all()

View File

@@ -1,182 +0,0 @@
from unittest.mock import MagicMock, patch
import pytest
from faker import Faker
from models import Account, Tenant, TenantAccountJoin, TenantAccountRole
from models.dataset import Dataset, Document, DocumentSegment
from tasks.document_indexing_update_task import document_indexing_update_task
class TestDocumentIndexingUpdateTask:
@pytest.fixture
def mock_external_dependencies(self):
"""Patch external collaborators used by the update task.
- IndexProcessorFactory.init_index_processor().clean(...)
- IndexingRunner.run([...])
"""
with (
patch("tasks.document_indexing_update_task.IndexProcessorFactory") as mock_factory,
patch("tasks.document_indexing_update_task.IndexingRunner") as mock_runner,
):
processor_instance = MagicMock()
mock_factory.return_value.init_index_processor.return_value = processor_instance
runner_instance = MagicMock()
mock_runner.return_value = runner_instance
yield {
"factory": mock_factory,
"processor": processor_instance,
"runner": mock_runner,
"runner_instance": runner_instance,
}
def _create_dataset_document_with_segments(self, db_session_with_containers, *, segment_count: int = 2):
fake = Faker()
# Account and tenant
account = Account(
email=fake.email(),
name=fake.name(),
interface_language="en-US",
status="active",
)
db_session_with_containers.add(account)
db_session_with_containers.commit()
tenant = Tenant(name=fake.company(), status="normal")
db_session_with_containers.add(tenant)
db_session_with_containers.commit()
join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=TenantAccountRole.OWNER,
current=True,
)
db_session_with_containers.add(join)
db_session_with_containers.commit()
# Dataset and document
dataset = Dataset(
tenant_id=tenant.id,
name=fake.company(),
description=fake.text(max_nb_chars=64),
data_source_type="upload_file",
indexing_technique="high_quality",
created_by=account.id,
)
db_session_with_containers.add(dataset)
db_session_with_containers.commit()
document = Document(
tenant_id=tenant.id,
dataset_id=dataset.id,
position=0,
data_source_type="upload_file",
batch="test_batch",
name=fake.file_name(),
created_from="upload_file",
created_by=account.id,
indexing_status="waiting",
enabled=True,
doc_form="text_model",
)
db_session_with_containers.add(document)
db_session_with_containers.commit()
# Segments
node_ids = []
for i in range(segment_count):
node_id = f"node-{i + 1}"
seg = DocumentSegment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
position=i,
content=fake.text(max_nb_chars=32),
answer=None,
word_count=10,
tokens=5,
index_node_id=node_id,
status="completed",
created_by=account.id,
)
db_session_with_containers.add(seg)
node_ids.append(node_id)
db_session_with_containers.commit()
# Refresh to ensure ORM state
db_session_with_containers.refresh(dataset)
db_session_with_containers.refresh(document)
return dataset, document, node_ids
def test_cleans_segments_and_reindexes(self, db_session_with_containers, mock_external_dependencies):
dataset, document, node_ids = self._create_dataset_document_with_segments(db_session_with_containers)
# Act
document_indexing_update_task(dataset.id, document.id)
# Ensure we see committed changes from another session
db_session_with_containers.expire_all()
# Assert document status updated before reindex
updated = db_session_with_containers.query(Document).where(Document.id == document.id).first()
assert updated.indexing_status == "parsing"
assert updated.processing_started_at is not None
# Segments should be deleted
remaining = (
db_session_with_containers.query(DocumentSegment).where(DocumentSegment.document_id == document.id).count()
)
assert remaining == 0
# Assert index processor clean was called with expected args
clean_call = mock_external_dependencies["processor"].clean.call_args
assert clean_call is not None
args, kwargs = clean_call
# args[0] is a Dataset instance (from another session) — validate by id
assert getattr(args[0], "id", None) == dataset.id
# args[1] should contain our node_ids
assert set(args[1]) == set(node_ids)
assert kwargs.get("with_keywords") is True
assert kwargs.get("delete_child_chunks") is True
# Assert indexing runner invoked with the updated document
run_call = mock_external_dependencies["runner_instance"].run.call_args
assert run_call is not None
run_docs = run_call[0][0]
assert len(run_docs) == 1
first = run_docs[0]
assert getattr(first, "id", None) == document.id
def test_clean_error_is_logged_and_indexing_continues(self, db_session_with_containers, mock_external_dependencies):
dataset, document, node_ids = self._create_dataset_document_with_segments(db_session_with_containers)
# Force clean to raise; task should continue to indexing
mock_external_dependencies["processor"].clean.side_effect = Exception("boom")
document_indexing_update_task(dataset.id, document.id)
# Ensure we see committed changes from another session
db_session_with_containers.expire_all()
# Indexing should still be triggered
mock_external_dependencies["runner_instance"].run.assert_called_once()
# Segments should remain (since clean failed before DB delete)
remaining = (
db_session_with_containers.query(DocumentSegment).where(DocumentSegment.document_id == document.id).count()
)
assert remaining > 0
def test_document_not_found_noop(self, db_session_with_containers, mock_external_dependencies):
fake = Faker()
# Act with non-existent document id
document_indexing_update_task(dataset_id=fake.uuid4(), document_id=fake.uuid4())
# Neither processor nor runner should be called
mock_external_dependencies["processor"].clean.assert_not_called()
mock_external_dependencies["runner_instance"].run.assert_not_called()

View File

@@ -0,0 +1,222 @@
import builtins
import contextlib
import importlib
import sys
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
from flask import Flask
from flask.views import MethodView
from extensions import ext_fastopenapi
from extensions.ext_database import db
@pytest.fixture
def app():
app = Flask(__name__)
app.config["TESTING"] = True
app.config["SECRET_KEY"] = "test-secret"
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:"
db.init_app(app)
return app
@pytest.fixture(autouse=True)
def fix_method_view_issue(monkeypatch):
if not hasattr(builtins, "MethodView"):
monkeypatch.setattr(builtins, "MethodView", MethodView, raising=False)
def _create_isolated_router():
import controllers.fastopenapi
router_class = type(controllers.fastopenapi.console_router)
return router_class()
@contextlib.contextmanager
def _patch_auth_and_router(temp_router):
def noop(func):
return func
default_user = MagicMock(has_edit_permission=True, is_dataset_editor=False)
with (
patch("controllers.fastopenapi.console_router", temp_router),
patch("extensions.ext_fastopenapi.console_router", temp_router),
patch("controllers.console.wraps.setup_required", side_effect=noop),
patch("libs.login.login_required", side_effect=noop),
patch("controllers.console.wraps.account_initialization_required", side_effect=noop),
patch("controllers.console.wraps.edit_permission_required", side_effect=noop),
patch("libs.login.current_account_with_tenant", return_value=(default_user, "tenant-id")),
patch("configs.dify_config.EDITION", "CLOUD"),
):
import extensions.ext_fastopenapi
importlib.reload(extensions.ext_fastopenapi)
yield
def _force_reload_module(target_module: str, alias_module: str):
if target_module in sys.modules:
del sys.modules[target_module]
if alias_module in sys.modules:
del sys.modules[alias_module]
module = importlib.import_module(target_module)
sys.modules[alias_module] = sys.modules[target_module]
return module
def _dedupe_routes(router):
seen = set()
unique_routes = []
for path, method, endpoint in reversed(router.get_routes()):
key = (path, method, endpoint.__name__)
if key in seen:
continue
seen.add(key)
unique_routes.append((path, method, endpoint))
router._routes = list(reversed(unique_routes))
def _cleanup_modules(target_module: str, alias_module: str):
if target_module in sys.modules:
del sys.modules[target_module]
if alias_module in sys.modules:
del sys.modules[alias_module]
@pytest.fixture
def mock_tags_module_env():
target_module = "controllers.console.tag.tags"
alias_module = "api.controllers.console.tag.tags"
temp_router = _create_isolated_router()
try:
with _patch_auth_and_router(temp_router):
tags_module = _force_reload_module(target_module, alias_module)
_dedupe_routes(temp_router)
yield tags_module
finally:
_cleanup_modules(target_module, alias_module)
def test_list_tags_success(app: Flask, mock_tags_module_env):
# Arrange
tag = SimpleNamespace(id="tag-1", name="Alpha", type="app", binding_count=2)
with patch("controllers.console.tag.tags.TagService.get_tags", return_value=[tag]):
ext_fastopenapi.init_app(app)
client = app.test_client()
# Act
response = client.get("/console/api/tags?type=app&keyword=Alpha")
# Assert
assert response.status_code == 200
assert response.get_json() == [
{"id": "tag-1", "name": "Alpha", "type": "app", "binding_count": 2},
]
def test_create_tag_success(app: Flask, mock_tags_module_env):
# Arrange
tag = SimpleNamespace(id="tag-2", name="Beta", type="app")
with patch("controllers.console.tag.tags.TagService.save_tags", return_value=tag) as mock_save:
ext_fastopenapi.init_app(app)
client = app.test_client()
# Act
response = client.post("/console/api/tags", json={"name": "Beta", "type": "app"})
# Assert
assert response.status_code == 200
assert response.get_json() == {
"id": "tag-2",
"name": "Beta",
"type": "app",
"binding_count": 0,
}
mock_save.assert_called_once_with({"name": "Beta", "type": "app"})
def test_update_tag_success(app: Flask, mock_tags_module_env):
# Arrange
tag = SimpleNamespace(id="tag-3", name="Gamma", type="app")
with (
patch("controllers.console.tag.tags.TagService.update_tags", return_value=tag) as mock_update,
patch("controllers.console.tag.tags.TagService.get_tag_binding_count", return_value=4),
):
ext_fastopenapi.init_app(app)
client = app.test_client()
# Act
response = client.patch(
"/console/api/tags/11111111-1111-1111-1111-111111111111",
json={"name": "Gamma", "type": "app"},
)
# Assert
assert response.status_code == 200
assert response.get_json() == {
"id": "tag-3",
"name": "Gamma",
"type": "app",
"binding_count": 4,
}
mock_update.assert_called_once_with(
{"name": "Gamma", "type": "app"},
"11111111-1111-1111-1111-111111111111",
)
def test_delete_tag_success(app: Flask, mock_tags_module_env):
# Arrange
with patch("controllers.console.tag.tags.TagService.delete_tag") as mock_delete:
ext_fastopenapi.init_app(app)
client = app.test_client()
# Act
response = client.delete("/console/api/tags/11111111-1111-1111-1111-111111111111")
# Assert
assert response.status_code == 204
mock_delete.assert_called_once_with("11111111-1111-1111-1111-111111111111")
def test_create_tag_binding_success(app: Flask, mock_tags_module_env):
# Arrange
payload = {"tag_ids": ["tag-1", "tag-2"], "target_id": "target-1", "type": "app"}
with patch("controllers.console.tag.tags.TagService.save_tag_binding") as mock_bind:
ext_fastopenapi.init_app(app)
client = app.test_client()
# Act
response = client.post("/console/api/tag-bindings/create", json=payload)
# Assert
assert response.status_code == 200
assert response.get_json() == {"result": "success"}
mock_bind.assert_called_once_with(payload)
def test_delete_tag_binding_success(app: Flask, mock_tags_module_env):
# Arrange
payload = {"tag_id": "tag-1", "target_id": "target-1", "type": "app"}
with patch("controllers.console.tag.tags.TagService.delete_tag_binding") as mock_unbind:
ext_fastopenapi.init_app(app)
client = app.test_client()
# Act
response = client.post("/console/api/tag-bindings/remove", json=payload)
# Assert
assert response.status_code == 200
assert response.get_json() == {"result": "success"}
mock_unbind.assert_called_once_with(payload)

View File

@@ -83,127 +83,23 @@ def mock_documents(document_ids, dataset_id):
def mock_db_session():
"""Mock database session via session_factory.create_session()."""
with patch("tasks.document_indexing_task.session_factory") as mock_sf:
sessions = [] # Track all created sessions
# Shared mock data that all sessions will access
shared_mock_data = {"dataset": None, "documents": None, "doc_iter": None}
session = MagicMock()
# Ensure tests that expect session.close() to be called can observe it via the context manager
session.close = MagicMock()
cm = MagicMock()
cm.__enter__.return_value = session
# Link __exit__ to session.close so "close" expectations reflect context manager teardown
def create_session_side_effect():
session = MagicMock()
session.close = MagicMock()
def _exit_side_effect(*args, **kwargs):
session.close()
# Track commit calls
commit_mock = MagicMock()
session.commit = commit_mock
cm = MagicMock()
cm.__enter__.return_value = session
cm.__exit__.side_effect = _exit_side_effect
mock_sf.create_session.return_value = cm
def _exit_side_effect(*args, **kwargs):
session.close()
cm.__exit__.side_effect = _exit_side_effect
# Support session.begin() for transactions
begin_cm = MagicMock()
begin_cm.__enter__.return_value = session
def begin_exit_side_effect(*args, **kwargs):
# Auto-commit on transaction exit (like SQLAlchemy)
session.commit()
# Also mark wrapper's commit as called
if sessions:
sessions[0].commit()
begin_cm.__exit__ = MagicMock(side_effect=begin_exit_side_effect)
session.begin = MagicMock(return_value=begin_cm)
sessions.append(session)
# Setup query with side_effect to handle both Dataset and Document queries
def query_side_effect(*args):
query = MagicMock()
if args and args[0] == Dataset and shared_mock_data["dataset"] is not None:
where_result = MagicMock()
where_result.first.return_value = shared_mock_data["dataset"]
query.where = MagicMock(return_value=where_result)
elif args and args[0] == Document and shared_mock_data["documents"] is not None:
# Support both .first() and .all() calls with chaining
where_result = MagicMock()
where_result.where = MagicMock(return_value=where_result)
# Create an iterator for .first() calls if not exists
if shared_mock_data["doc_iter"] is None:
docs = shared_mock_data["documents"] or [None]
shared_mock_data["doc_iter"] = iter(docs)
where_result.first = lambda: next(shared_mock_data["doc_iter"], None)
docs_or_empty = shared_mock_data["documents"] or []
where_result.all = MagicMock(return_value=docs_or_empty)
query.where = MagicMock(return_value=where_result)
else:
query.where = MagicMock(return_value=query)
return query
session.query = MagicMock(side_effect=query_side_effect)
return cm
mock_sf.create_session.side_effect = create_session_side_effect
# Create a wrapper that behaves like the first session but has access to all sessions
class SessionWrapper:
def __init__(self):
self._sessions = sessions
self._shared_data = shared_mock_data
# Create a default session for setup phase
self._default_session = MagicMock()
self._default_session.close = MagicMock()
self._default_session.commit = MagicMock()
# Support session.begin() for default session too
begin_cm = MagicMock()
begin_cm.__enter__.return_value = self._default_session
def default_begin_exit_side_effect(*args, **kwargs):
self._default_session.commit()
begin_cm.__exit__ = MagicMock(side_effect=default_begin_exit_side_effect)
self._default_session.begin = MagicMock(return_value=begin_cm)
def default_query_side_effect(*args):
query = MagicMock()
if args and args[0] == Dataset and shared_mock_data["dataset"] is not None:
where_result = MagicMock()
where_result.first.return_value = shared_mock_data["dataset"]
query.where = MagicMock(return_value=where_result)
elif args and args[0] == Document and shared_mock_data["documents"] is not None:
where_result = MagicMock()
where_result.where = MagicMock(return_value=where_result)
if shared_mock_data["doc_iter"] is None:
docs = shared_mock_data["documents"] or [None]
shared_mock_data["doc_iter"] = iter(docs)
where_result.first = lambda: next(shared_mock_data["doc_iter"], None)
docs_or_empty = shared_mock_data["documents"] or []
where_result.all = MagicMock(return_value=docs_or_empty)
query.where = MagicMock(return_value=where_result)
else:
query.where = MagicMock(return_value=query)
return query
self._default_session.query = MagicMock(side_effect=default_query_side_effect)
def __getattr__(self, name):
# Forward all attribute access to the first session, or default if none created yet
target_session = self._sessions[0] if self._sessions else self._default_session
return getattr(target_session, name)
@property
def all_sessions(self):
"""Access all created sessions for testing."""
return self._sessions
wrapper = SessionWrapper()
yield wrapper
query = MagicMock()
session.query.return_value = query
query.where.return_value = query
yield session
@pytest.fixture
@@ -356,9 +252,18 @@ class TestTaskEnqueuing:
use the deprecated function.
"""
# Arrange
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
# Return documents one by one for each call
mock_query.where.return_value.first.side_effect = mock_documents
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features:
mock_features.return_value.billing.enabled = False
@@ -399,9 +304,21 @@ class TestBatchProcessing:
doc.processing_started_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
# Create an iterator for documents
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
# Return documents one by one for each call
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features:
mock_features.return_value.billing.enabled = False
@@ -440,9 +357,19 @@ class TestBatchProcessing:
doc.stopped_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
mock_feature_service.get_features.return_value.billing.enabled = True
mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL
@@ -480,9 +407,19 @@ class TestBatchProcessing:
doc.stopped_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
mock_feature_service.get_features.return_value.billing.enabled = True
mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.SANDBOX
@@ -507,10 +444,7 @@ class TestBatchProcessing:
"""
# Arrange
document_ids = []
# Set shared mock data with empty documents list
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = []
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features:
mock_features.return_value.billing.enabled = False
@@ -548,9 +482,19 @@ class TestProgressTracking:
doc.processing_started_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features:
mock_features.return_value.billing.enabled = False
@@ -584,9 +528,19 @@ class TestProgressTracking:
doc.processing_started_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features:
mock_features.return_value.billing.enabled = False
@@ -681,9 +635,19 @@ class TestErrorHandling:
doc.stopped_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
# Set up to trigger vector space limit error
mock_feature_service.get_features.return_value.billing.enabled = True
@@ -710,9 +674,17 @@ class TestErrorHandling:
Errors during indexing should be caught and logged, but not crash the task.
"""
# Arrange
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first.side_effect = mock_documents
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
# Make IndexingRunner raise an exception
mock_indexing_runner.run.side_effect = Exception("Indexing failed")
@@ -736,9 +708,17 @@ class TestErrorHandling:
but not treated as a failure.
"""
# Arrange
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first.side_effect = mock_documents
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
# Make IndexingRunner raise DocumentIsPausedError
mock_indexing_runner.run.side_effect = DocumentIsPausedError("Document is paused")
@@ -873,9 +853,17 @@ class TestTaskCancellation:
Session cleanup should happen in finally block.
"""
# Arrange
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first.side_effect = mock_documents
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features:
mock_features.return_value.billing.enabled = False
@@ -895,9 +883,17 @@ class TestTaskCancellation:
Session cleanup should happen even when errors occur.
"""
# Arrange
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first.side_effect = mock_documents
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
# Make IndexingRunner raise an exception
mock_indexing_runner.run.side_effect = Exception("Test error")
@@ -966,7 +962,6 @@ class TestAdvancedScenarios:
document_ids = [str(uuid.uuid4()) for _ in range(3)]
# Create only 2 documents (simulate one missing)
# The new code uses .all() which will only return existing documents
mock_documents = []
for i, doc_id in enumerate([document_ids[0], document_ids[2]]): # Skip middle one
doc = MagicMock(spec=Document)
@@ -976,9 +971,21 @@ class TestAdvancedScenarios:
doc.processing_started_at = None
mock_documents.append(doc)
# Set shared mock data - .all() will only return existing documents
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
# Create iterator that returns None for missing document
doc_responses = [mock_documents[0], None, mock_documents[1]]
doc_iter = iter(doc_responses)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features:
mock_features.return_value.billing.enabled = False
@@ -1068,9 +1075,19 @@ class TestAdvancedScenarios:
doc.stopped_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
# Set vector space exactly at limit
mock_feature_service.get_features.return_value.billing.enabled = True
@@ -1202,9 +1219,19 @@ class TestAdvancedScenarios:
doc.processing_started_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
# Billing disabled - limits should not be checked
mock_feature_service.get_features.return_value.billing.enabled = False
@@ -1246,9 +1273,19 @@ class TestIntegration:
# Set up rpop to return None for concurrency check (no more tasks)
mock_redis.rpop.side_effect = [None]
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features:
mock_features.return_value.billing.enabled = False
@@ -1284,9 +1321,19 @@ class TestIntegration:
# Set up rpop to return None for concurrency check (no more tasks)
mock_redis.rpop.side_effect = [None]
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features:
mock_features.return_value.billing.enabled = False
@@ -1368,9 +1415,17 @@ class TestEdgeCases:
mock_document.indexing_status = "waiting"
mock_document.processing_started_at = None
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = [mock_document]
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: mock_document
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features:
mock_features.return_value.billing.enabled = False
@@ -1410,9 +1465,17 @@ class TestEdgeCases:
mock_document.indexing_status = "waiting"
mock_document.processing_started_at = None
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = [mock_document]
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: mock_document
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features:
mock_features.return_value.billing.enabled = False
@@ -1492,9 +1555,19 @@ class TestEdgeCases:
doc.processing_started_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
# Set vector space limit to 0 (unlimited)
mock_feature_service.get_features.return_value.billing.enabled = True
@@ -1539,9 +1612,19 @@ class TestEdgeCases:
doc.processing_started_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
# Set negative vector space limit
mock_feature_service.get_features.return_value.billing.enabled = True
@@ -1592,9 +1675,19 @@ class TestPerformanceScenarios:
doc.processing_started_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
# Configure billing with sufficient limits
mock_feature_service.get_features.return_value.billing.enabled = True
@@ -1733,9 +1826,19 @@ class TestRobustness:
doc.processing_started_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
# Make IndexingRunner raise an exception
mock_indexing_runner.run.side_effect = RuntimeError("Unexpected indexing error")
@@ -1763,7 +1866,7 @@ class TestRobustness:
- No exceptions occur
Expected behavior:
- All database sessions are closed
- Database session is closed
- No connection leaks
"""
# Arrange
@@ -1776,9 +1879,19 @@ class TestRobustness:
doc.processing_started_at = None
mock_documents.append(doc)
# Set shared mock data so all sessions can access it
mock_db_session._shared_data["dataset"] = mock_dataset
mock_db_session._shared_data["documents"] = mock_documents
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
doc_iter = iter(mock_documents)
def mock_query_side_effect(*args):
mock_query = MagicMock()
if args[0] == Dataset:
mock_query.where.return_value.first.return_value = mock_dataset
elif args[0] == Document:
mock_query.where.return_value.first = lambda: next(doc_iter, None)
return mock_query
mock_db_session.query.side_effect = mock_query_side_effect
with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features:
mock_features.return_value.billing.enabled = False
@@ -1786,11 +1899,10 @@ class TestRobustness:
# Act
_document_indexing(dataset_id, document_ids)
# Assert - All created sessions should be closed
# The code creates multiple sessions: validation, Phase 1 (parsing), Phase 3 (summary)
assert len(mock_db_session.all_sessions) >= 1
for session in mock_db_session.all_sessions:
assert session.close.called, "All sessions should be closed"
# Assert
assert mock_db_session.close.called
# Verify close is called exactly once
assert mock_db_session.close.call_count == 1
def test_task_proxy_handles_feature_service_failure(self, tenant_id, dataset_id, document_ids, mock_redis):
"""

View File

@@ -114,21 +114,6 @@ def mock_db_session():
session = MagicMock()
# Ensure tests can observe session.close() via context manager teardown
session.close = MagicMock()
session.commit = MagicMock()
# Mock session.begin() context manager to auto-commit on exit
begin_cm = MagicMock()
begin_cm.__enter__.return_value = session
def _begin_exit_side_effect(*args, **kwargs):
# session.begin().__exit__() should commit if no exception
if args[0] is None: # No exception
session.commit()
begin_cm.__exit__.side_effect = _begin_exit_side_effect
session.begin.return_value = begin_cm
# Mock create_session() context manager
cm = MagicMock()
cm.__enter__.return_value = session

View File

@@ -350,7 +350,7 @@ class TestDeleteWorkflowArchiveLogs:
mock_query.where.return_value = mock_delete_query
mock_db.session.query.return_value = mock_query
delete_func(mock_db.session, "log-1")
delete_func("log-1")
mock_db.session.query.assert_called_once_with(WorkflowArchiveLog)
mock_query.where.assert_called_once()

2
api/uv.lock generated
View File

@@ -1368,7 +1368,7 @@ wheels = [
[[package]]
name = "dify-api"
version = "1.12.1"
version = "1.11.4"
source = { virtual = "." }
dependencies = [
{ name = "aliyun-log-python-sdk" },

View File

@@ -1,5 +1,5 @@
#!/bin/bash
set -euxo pipefail
set -x
SCRIPT_DIR="$(dirname "$(realpath "$0")")"
cd "$SCRIPT_DIR/../.."

View File

@@ -21,7 +21,7 @@ services:
# API service
api:
image: langgenius/dify-api:1.12.1
image: langgenius/dify-api:1.11.4
restart: always
environment:
# Use the shared environment variables.
@@ -63,7 +63,7 @@ services:
# worker service
# The Celery worker for processing all queues (dataset, workflow, mail, etc.)
worker:
image: langgenius/dify-api:1.12.1
image: langgenius/dify-api:1.11.4
restart: always
environment:
# Use the shared environment variables.
@@ -102,7 +102,7 @@ services:
# worker_beat service
# Celery beat for scheduling periodic tasks.
worker_beat:
image: langgenius/dify-api:1.12.1
image: langgenius/dify-api:1.11.4
restart: always
environment:
# Use the shared environment variables.
@@ -132,7 +132,7 @@ services:
# Frontend web application.
web:
image: langgenius/dify-web:1.12.1
image: langgenius/dify-web:1.11.4
restart: always
environment:
CONSOLE_API_URL: ${CONSOLE_API_URL:-}
@@ -662,14 +662,13 @@ services:
- "${IRIS_SUPER_SERVER_PORT:-1972}:1972"
- "${IRIS_WEB_SERVER_PORT:-52773}:52773"
volumes:
- ./volumes/iris:/durable
- ./volumes/iris:/opt/iris
- ./iris/iris-init.script:/iris-init.script
- ./iris/docker-entrypoint.sh:/custom-entrypoint.sh
entrypoint: ["/custom-entrypoint.sh"]
tty: true
environment:
TZ: ${IRIS_TIMEZONE:-UTC}
ISC_DATA_DIRECTORY: /durable/iris
# Oracle vector database
oracle:

View File

@@ -707,7 +707,7 @@ services:
# API service
api:
image: langgenius/dify-api:1.12.1
image: langgenius/dify-api:1.11.4
restart: always
environment:
# Use the shared environment variables.
@@ -749,7 +749,7 @@ services:
# worker service
# The Celery worker for processing all queues (dataset, workflow, mail, etc.)
worker:
image: langgenius/dify-api:1.12.1
image: langgenius/dify-api:1.11.4
restart: always
environment:
# Use the shared environment variables.
@@ -788,7 +788,7 @@ services:
# worker_beat service
# Celery beat for scheduling periodic tasks.
worker_beat:
image: langgenius/dify-api:1.12.1
image: langgenius/dify-api:1.11.4
restart: always
environment:
# Use the shared environment variables.
@@ -818,7 +818,7 @@ services:
# Frontend web application.
web:
image: langgenius/dify-web:1.12.1
image: langgenius/dify-web:1.11.4
restart: always
environment:
CONSOLE_API_URL: ${CONSOLE_API_URL:-}
@@ -1348,14 +1348,13 @@ services:
- "${IRIS_SUPER_SERVER_PORT:-1972}:1972"
- "${IRIS_WEB_SERVER_PORT:-52773}:52773"
volumes:
- ./volumes/iris:/durable
- ./volumes/iris:/opt/iris
- ./iris/iris-init.script:/iris-init.script
- ./iris/docker-entrypoint.sh:/custom-entrypoint.sh
entrypoint: ["/custom-entrypoint.sh"]
tty: true
environment:
TZ: ${IRIS_TIMEZONE:-UTC}
ISC_DATA_DIRECTORY: /durable/iris
# Oracle vector database
oracle:

View File

@@ -1,33 +1,15 @@
#!/bin/bash
set -e
# IRIS configuration flag file (stored in durable directory to persist with data)
IRIS_CONFIG_DONE="/durable/.iris-configured"
# Function to wait for IRIS to be ready
wait_for_iris() {
echo "Waiting for IRIS to be ready..."
local max_attempts=30
local attempt=1
while [ "$attempt" -le "$max_attempts" ]; do
if iris qlist IRIS 2>/dev/null | grep -q "running"; then
echo "IRIS is ready."
return 0
fi
echo "Attempt $attempt/$max_attempts: IRIS not ready yet, waiting..."
sleep 2
attempt=$((attempt + 1))
done
echo "ERROR: IRIS failed to start within expected time." >&2
return 1
}
# IRIS configuration flag file
IRIS_CONFIG_DONE="/opt/iris/.iris-configured"
# Function to configure IRIS
configure_iris() {
echo "Configuring IRIS for first-time setup..."
# Wait for IRIS to be fully started
wait_for_iris
sleep 5
# Execute the initialization script
iris session IRIS < /iris-init.script

View File

@@ -3,7 +3,7 @@
import type { ReactNode } from 'react'
import Cookies from 'js-cookie'
import { usePathname, useRouter, useSearchParams } from 'next/navigation'
import { parseAsBoolean, useQueryState } from 'nuqs'
import { parseAsString, useQueryState } from 'nuqs'
import { useCallback, useEffect, useState } from 'react'
import {
EDUCATION_VERIFY_URL_SEARCHPARAMS_ACTION,
@@ -28,7 +28,7 @@ export const AppInitializer = ({
const [init, setInit] = useState(false)
const [oauthNewUser, setOauthNewUser] = useQueryState(
'oauth_new_user',
parseAsBoolean.withOptions({ history: 'replace' }),
parseAsString.withOptions({ history: 'replace' }),
)
const isSetupFinished = useCallback(async () => {
@@ -46,7 +46,7 @@ export const AppInitializer = ({
(async () => {
const action = searchParams.get('action')
if (oauthNewUser) {
if (oauthNewUser === 'true') {
let utmInfo = null
const utmInfoStr = Cookies.get('utm_info')
if (utmInfoStr) {

View File

@@ -109,7 +109,6 @@ const AgentTools: FC = () => {
tool_parameters: paramsWithDefaultValue,
notAuthor: !tool.is_team_authorization,
enabled: true,
type: tool.provider_type as CollectionType,
}
}
const handleSelectTool = (tool: ToolDefaultValue) => {

View File

@@ -62,19 +62,19 @@ const AppCard = ({
{app.description}
</div>
</div>
{(canCreate || isTrialApp) && (
{canCreate && (
<div className={cn('absolute bottom-0 left-0 right-0 hidden bg-gradient-to-t from-components-panel-gradient-2 from-[60.27%] to-transparent p-4 pt-8 group-hover:flex')}>
<div className={cn('grid h-8 w-full grid-cols-1 items-center space-x-2', canCreate && 'grid-cols-2')}>
{canCreate && (
<Button variant="primary" onClick={() => onCreate()}>
<PlusIcon className="mr-1 h-4 w-4" />
<span className="text-xs">{t('newApp.useTemplate', { ns: 'app' })}</span>
<div className={cn('grid h-8 w-full grid-cols-1 items-center space-x-2', isTrialApp && 'grid-cols-2')}>
<Button variant="primary" onClick={() => onCreate()}>
<PlusIcon className="mr-1 h-4 w-4" />
<span className="text-xs">{t('newApp.useTemplate', { ns: 'app' })}</span>
</Button>
{isTrialApp && (
<Button onClick={showTryAPPPanel(app.app_id)}>
<RiInformation2Line className="mr-1 size-4" />
<span>{t('appCard.try', { ns: 'explore' })}</span>
</Button>
)}
<Button onClick={showTryAPPPanel(app.app_id)}>
<RiInformation2Line className="mr-1 size-4" />
<span>{t('appCard.try', { ns: 'explore' })}</span>
</Button>
</div>
</div>
)}

View File

@@ -154,7 +154,7 @@ export const GeneralChunkingOptions: FC<GeneralChunkingOptionsProps> = ({
</div>
))}
{
showSummaryIndexSetting && IS_CE_EDITION && (
showSummaryIndexSetting && (
<div className="mt-3">
<SummaryIndexSetting
entry="create-document"

View File

@@ -12,7 +12,6 @@ import Divider from '@/app/components/base/divider'
import { ParentChildChunk } from '@/app/components/base/icons/src/vender/knowledge'
import RadioCard from '@/app/components/base/radio-card'
import SummaryIndexSetting from '@/app/components/datasets/settings/summary-index-setting'
import { IS_CE_EDITION } from '@/config'
import { ChunkingMode } from '@/models/datasets'
import FileList from '../../assets/file-list-3-fill.svg'
import Note from '../../assets/note-mod.svg'
@@ -192,7 +191,7 @@ export const ParentChildOptions: FC<ParentChildOptionsProps> = ({
</div>
))}
{
showSummaryIndexSetting && IS_CE_EDITION && (
showSummaryIndexSetting && (
<div className="mt-3">
<SummaryIndexSetting
entry="create-document"

View File

@@ -26,7 +26,6 @@ import CustomPopover from '@/app/components/base/popover'
import Switch from '@/app/components/base/switch'
import { ToastContext } from '@/app/components/base/toast'
import Tooltip from '@/app/components/base/tooltip'
import { IS_CE_EDITION } from '@/config'
import { DataSourceType, DocumentActionType } from '@/models/datasets'
import {
useDocumentArchive,
@@ -264,14 +263,10 @@ const Operations = ({
<span className={s.actionName}>{t('list.action.sync', { ns: 'datasetDocuments' })}</span>
</div>
)}
{
IS_CE_EDITION && (
<div className={s.actionItem} onClick={() => onOperate('summary')}>
<SearchLinesSparkle className="h-4 w-4 text-text-tertiary" />
<span className={s.actionName}>{t('list.action.summary', { ns: 'datasetDocuments' })}</span>
</div>
)
}
<div className={s.actionItem} onClick={() => onOperate('summary')}>
<SearchLinesSparkle className="h-4 w-4 text-text-tertiary" />
<span className={s.actionName}>{t('list.action.summary', { ns: 'datasetDocuments' })}</span>
</div>
<Divider className="my-1" />
</>
)}

View File

@@ -7,7 +7,6 @@ import Button from '@/app/components/base/button'
import Confirm from '@/app/components/base/confirm'
import Divider from '@/app/components/base/divider'
import { SearchLinesSparkle } from '@/app/components/base/icons/src/vender/knowledge'
import { IS_CE_EDITION } from '@/config'
import { cn } from '@/utils/classnames'
const i18nPrefix = 'batchAction'
@@ -88,7 +87,7 @@ const BatchAction: FC<IBatchActionProps> = ({
<span className="px-0.5">{t('metadata.metadata', { ns: 'dataset' })}</span>
</Button>
)}
{onBatchSummary && IS_CE_EDITION && (
{onBatchSummary && (
<Button
variant="ghost"
className="gap-x-0.5 px-3"

View File

@@ -3,6 +3,8 @@ import type { FC, ReactNode } from 'react'
import type { SliceProps } from './type'
import { autoUpdate, flip, FloatingFocusManager, offset, shift, useDismiss, useFloating, useHover, useInteractions, useRole } from '@floating-ui/react'
import { RiDeleteBinLine } from '@remixicon/react'
// @ts-expect-error no types available
import lineClamp from 'line-clamp'
import { useState } from 'react'
import ActionButton, { ActionButtonState } from '@/app/components/base/action-button'
import { cn } from '@/utils/classnames'
@@ -56,8 +58,12 @@ export const EditSlice: FC<EditSliceProps> = (props) => {
<>
<SliceContainer
{...rest}
className={cn('mr-0 line-clamp-4 block', className)}
ref={refs.setReference}
className={cn('mr-0 block', className)}
ref={(ref) => {
refs.setReference(ref)
if (ref)
lineClamp(ref, 4)
}}
{...getReferenceProps()}
>
<SliceLabel

View File

@@ -21,7 +21,6 @@ import RetrievalMethodConfig from '@/app/components/datasets/common/retrieval-me
import { ModelTypeEnum } from '@/app/components/header/account-setting/model-provider-page/declarations'
import { useModelList } from '@/app/components/header/account-setting/model-provider-page/hooks'
import ModelSelector from '@/app/components/header/account-setting/model-provider-page/model-selector'
import { IS_CE_EDITION } from '@/config'
import { useSelector as useAppContextWithSelector } from '@/context/app-context'
import { useDatasetDetailContextWithSelector } from '@/context/dataset-detail'
import { useDocLink } from '@/context/i18n'
@@ -360,7 +359,7 @@ const Form = () => {
{
indexMethod === IndexingType.QUALIFIED
&& [ChunkingMode.text, ChunkingMode.parentChild].includes(currentDataset?.doc_form as ChunkingMode)
&& IS_CE_EDITION && (
&& (
<>
<Divider
type="horizontal"

View File

@@ -74,15 +74,11 @@ const AppCard = ({
</div>
{isExplore && (canCreate || isTrialApp) && (
<div className={cn('absolute bottom-0 left-0 right-0 hidden bg-gradient-to-t from-components-panel-gradient-2 from-[60.27%] to-transparent p-4 pt-8 group-hover:flex')}>
<div className={cn('grid h-8 w-full grid-cols-1 space-x-2', canCreate && 'grid-cols-2')}>
{
canCreate && (
<Button variant="primary" className="h-7" onClick={() => onCreate()}>
<PlusIcon className="mr-1 h-4 w-4" />
<span className="text-xs">{t('appCard.addToWorkspace', { ns: 'explore' })}</span>
</Button>
)
}
<div className={cn('grid h-8 w-full grid-cols-2 space-x-2')}>
<Button variant="primary" className="h-7" onClick={() => onCreate()}>
<PlusIcon className="mr-1 h-4 w-4" />
<span className="text-xs">{t('appCard.addToWorkspace', { ns: 'explore' })}</span>
</Button>
<Button className="h-7" onClick={showTryAPPPanel(app.app_id)}>
<RiInformation2Line className="mr-1 size-4" />
<span>{t('appCard.try', { ns: 'explore' })}</span>

View File

@@ -16,14 +16,6 @@ vi.mock('react-i18next', () => ({
}),
}))
vi.mock('@/config', async (importOriginal) => {
const actual = await importOriginal() as object
return {
...actual,
IS_CLOUD_EDITION: true,
}
})
const mockUseGetTryAppInfo = vi.fn()
vi.mock('@/service/use-try-app', () => ({

View File

@@ -14,14 +14,6 @@ vi.mock('react-i18next', () => ({
}),
}))
vi.mock('@/config', async (importOriginal) => {
const actual = await importOriginal() as object
return {
...actual,
IS_CLOUD_EDITION: true,
}
})
describe('Tab', () => {
afterEach(() => {
cleanup()

View File

@@ -1,5 +1,5 @@
import type { TriggerSubscriptionBuilder } from '@/app/components/workflow/block-selector/types'
import { act, fireEvent, render, screen, waitFor } from '@testing-library/react'
import { fireEvent, render, screen, waitFor } from '@testing-library/react'
import * as React from 'react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
// Import after mocks
@@ -821,9 +821,6 @@ describe('CommonCreateModal', () => {
expect(mockCreateBuilder).toHaveBeenCalled()
})
// Flush pending state updates from createBuilder promise resolution
await act(async () => {})
const input = screen.getByTestId('form-field-webhook_url')
fireEvent.change(input, { target: { value: 'https://example.com/webhook' } })

View File

@@ -129,7 +129,6 @@ export const useToolSelectorState = ({
extra: {
description: tool.tool_description,
},
type: tool.provider_type,
}
}, [])

View File

@@ -1,5 +1,5 @@
import type { PropsWithChildren } from 'react'
import { act, cleanup, fireEvent, render, screen, waitFor } from '@testing-library/react'
import { cleanup, fireEvent, render, screen, waitFor } from '@testing-library/react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { DSLImportStatus } from '@/models/app'
import UpdateDSLModal from './update-dsl-modal'
@@ -140,13 +140,13 @@ class MockFileReader {
onload: ((e: { target: { result: string | null } }) => void) | null = null
readAsText(_file: File) {
// Simulate async file reading using queueMicrotask for more reliable async behavior
queueMicrotask(() => {
// Simulate async file reading
setTimeout(() => {
this.result = 'test file content'
if (this.onload) {
this.onload({ target: { result: this.result } })
}
})
}, 0)
}
}
@@ -174,7 +174,6 @@ describe('UpdateDSLModal', () => {
status: DSLImportStatus.COMPLETED,
pipeline_id: 'test-pipeline-id',
})
mockHandleCheckPluginDependencies.mockResolvedValue(undefined)
// Mock FileReader
originalFileReader = globalThis.FileReader
@@ -473,14 +472,14 @@ describe('UpdateDSLModal', () => {
await waitFor(() => {
const importButton = screen.getByText('common.overwriteAndImport')
expect(importButton).not.toBeDisabled()
}, { timeout: 1000 })
})
const importButton = screen.getByText('common.overwriteAndImport')
fireEvent.click(importButton)
await waitFor(() => {
expect(mockOnImport).toHaveBeenCalled()
}, { timeout: 1000 })
})
})
it('should show warning notification on import with warnings', async () => {
@@ -648,8 +647,6 @@ describe('UpdateDSLModal', () => {
})
it('should show error modal when import status is PENDING', async () => {
vi.useFakeTimers({ shouldAdvanceTime: true })
mockImportDSL.mockResolvedValue({
id: 'import-id',
status: DSLImportStatus.PENDING,
@@ -662,29 +659,20 @@ describe('UpdateDSLModal', () => {
const fileInput = screen.getByTestId('file-input')
const file = new File(['test content'], 'test.pipeline', { type: 'text/yaml' })
fireEvent.change(fileInput, { target: { files: [file] } })
await act(async () => {
fireEvent.change(fileInput, { target: { files: [file] } })
// Flush microtasks scheduled by the FileReader mock (which uses queueMicrotask)
await new Promise<void>(resolve => queueMicrotask(resolve))
await waitFor(() => {
const importButton = screen.getByText('common.overwriteAndImport')
expect(importButton).not.toBeDisabled()
})
const importButton = screen.getByText('common.overwriteAndImport')
expect(importButton).not.toBeDisabled()
await act(async () => {
fireEvent.click(importButton)
// Flush the promise resolution from mockImportDSL
await Promise.resolve()
// Advance past the 300ms setTimeout in the component
await vi.advanceTimersByTimeAsync(350)
})
fireEvent.click(importButton)
// Wait for the error modal to be shown after setTimeout
await waitFor(() => {
expect(screen.getByText('newApp.appCreateDSLErrorTitle')).toBeInTheDocument()
})
vi.useRealTimers()
}, { timeout: 500 })
})
it('should show version info in error modal', async () => {

View File

@@ -61,12 +61,6 @@ vi.mock('@/service/use-pipeline', () => ({
}),
}))
// Mock download utility
const mockDownloadBlob = vi.fn()
vi.mock('@/utils/download', () => ({
downloadBlob: (...args: unknown[]) => mockDownloadBlob(...args),
}))
// Mock workflow service
const mockFetchWorkflowDraft = vi.fn()
vi.mock('@/service/workflow', () => ({
@@ -83,9 +77,33 @@ vi.mock('@/app/components/workflow/constants', () => ({
// ============================================================================
describe('useDSL', () => {
let mockLink: { href: string, download: string, click: ReturnType<typeof vi.fn> }
let originalCreateElement: typeof document.createElement
let mockCreateObjectURL: ReturnType<typeof vi.spyOn>
let mockRevokeObjectURL: ReturnType<typeof vi.spyOn>
beforeEach(() => {
vi.clearAllMocks()
// Create a proper mock link element
mockLink = {
href: '',
download: '',
click: vi.fn(),
}
// Save original and mock selectively - only intercept 'a' elements
originalCreateElement = document.createElement.bind(document)
document.createElement = vi.fn((tagName: string) => {
if (tagName === 'a') {
return mockLink as unknown as HTMLElement
}
return originalCreateElement(tagName)
}) as typeof document.createElement
mockCreateObjectURL = vi.spyOn(URL, 'createObjectURL').mockReturnValue('blob:test-url')
mockRevokeObjectURL = vi.spyOn(URL, 'revokeObjectURL').mockImplementation(() => {})
// Default store state
mockWorkflowStoreGetState.mockReturnValue({
pipelineId: 'test-pipeline-id',
@@ -100,6 +118,9 @@ describe('useDSL', () => {
})
afterEach(() => {
document.createElement = originalCreateElement
mockCreateObjectURL.mockRestore()
mockRevokeObjectURL.mockRestore()
vi.clearAllMocks()
})
@@ -166,7 +187,9 @@ describe('useDSL', () => {
await result.current.handleExportDSL()
})
expect(mockDownloadBlob).toHaveBeenCalled()
expect(document.createElement).toHaveBeenCalledWith('a')
expect(mockCreateObjectURL).toHaveBeenCalled()
expect(mockRevokeObjectURL).toHaveBeenCalledWith('blob:test-url')
})
it('should use correct file extension for download', async () => {
@@ -176,25 +199,17 @@ describe('useDSL', () => {
await result.current.handleExportDSL()
})
expect(mockDownloadBlob).toHaveBeenCalledWith(
expect.objectContaining({
fileName: 'Test Knowledge Base.pipeline',
}),
)
expect(mockLink.download).toBe('Test Knowledge Base.pipeline')
})
it('should pass blob data to downloadBlob', async () => {
it('should trigger download click', async () => {
const { result } = renderHook(() => useDSL())
await act(async () => {
await result.current.handleExportDSL()
})
expect(mockDownloadBlob).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.any(Blob),
}),
)
expect(mockLink.click).toHaveBeenCalled()
})
it('should show error notification on export failure', async () => {

View File

@@ -172,9 +172,6 @@ describe('EditCustomCollectionModal', () => {
expect(parseParamsSchemaMock).toHaveBeenCalledWith('{}')
})
// Flush pending state updates from parseParamsSchema promise resolution
await act(async () => {})
await act(async () => {
fireEvent.click(screen.getByText('common.operation.save'))
})
@@ -187,10 +184,6 @@ describe('EditCustomCollectionModal', () => {
credentials: {
auth_type: 'none',
},
icon: {
content: '🕵️',
background: '#FEF7C3',
},
labels: [],
}))
expect(toastNotifySpy).not.toHaveBeenCalled()

View File

@@ -0,0 +1,705 @@
/**
* Test Suite for useNodesSyncDraft Hook
*
* PURPOSE:
* This hook handles syncing workflow draft to the server. The key fix being tested
* is the error handling behavior when `draft_workflow_not_sync` error occurs.
*
* MULTI-TAB PROBLEM SCENARIO:
* 1. User opens the same workflow in Tab A and Tab B (both have hash: v1)
* 2. Tab A saves successfully, server returns new hash: v2
* 3. Tab B tries to save with old hash: v1, server returns 400 error with code
* 'draft_workflow_not_sync'
* 4. BEFORE FIX: handleRefreshWorkflowDraft() was called without args, which fetched
* draft AND overwrote canvas - user lost unsaved changes in Tab B
* 5. AFTER FIX: handleRefreshWorkflowDraft(true) is called, which fetches draft but
* only updates hash (notUpdateCanvas=true), preserving user's canvas changes
*
* TESTING STRATEGY:
* We don't simulate actual tab switching UI behavior. Instead, we mock the API to
* return `draft_workflow_not_sync` error and verify:
* - The hook calls handleRefreshWorkflowDraft(true) - not handleRefreshWorkflowDraft()
* - This ensures canvas data is preserved while hash is updated for retry
*
* This is behavior-driven testing - we verify "what the code does when receiving
* specific API errors" rather than simulating complete user interaction flows.
* True multi-tab integration testing would require E2E frameworks like Playwright.
*/
import { act, renderHook, waitFor } from '@testing-library/react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { useNodesSyncDraft } from './use-nodes-sync-draft'
// Mock reactflow store
const mockGetNodes = vi.fn()
type MockEdge = {
id: string
source: string
target: string
data: Record<string, unknown>
}
const mockStoreState: {
getNodes: ReturnType<typeof vi.fn>
edges: MockEdge[]
transform: number[]
} = {
getNodes: mockGetNodes,
edges: [],
transform: [0, 0, 1],
}
vi.mock('reactflow', () => ({
useStoreApi: () => ({
getState: () => mockStoreState,
}),
}))
// Mock features store
const mockFeaturesState = {
features: {
opening: { enabled: false, opening_statement: '', suggested_questions: [] },
suggested: {},
text2speech: {},
speech2text: {},
citation: {},
moderation: {},
file: {},
},
}
vi.mock('@/app/components/base/features/hooks', () => ({
useFeaturesStore: () => ({
getState: () => mockFeaturesState,
}),
}))
// Mock workflow service
const mockSyncWorkflowDraft = vi.fn()
vi.mock('@/service/workflow', () => ({
syncWorkflowDraft: (...args: unknown[]) => mockSyncWorkflowDraft(...args),
}))
// Mock useNodesReadOnly
const mockGetNodesReadOnly = vi.fn()
vi.mock('@/app/components/workflow/hooks/use-workflow', () => ({
useNodesReadOnly: () => ({
getNodesReadOnly: mockGetNodesReadOnly,
}),
}))
// Mock useSerialAsyncCallback - pass through the callback
vi.mock('@/app/components/workflow/hooks/use-serial-async-callback', () => ({
useSerialAsyncCallback: (callback: (...args: unknown[]) => unknown) => callback,
}))
// Mock workflow store
const mockSetSyncWorkflowDraftHash = vi.fn()
const mockSetDraftUpdatedAt = vi.fn()
const createMockWorkflowStoreState = (overrides = {}) => ({
appId: 'test-app-id',
conversationVariables: [],
environmentVariables: [],
syncWorkflowDraftHash: 'current-hash-123',
isWorkflowDataLoaded: true,
setSyncWorkflowDraftHash: mockSetSyncWorkflowDraftHash,
setDraftUpdatedAt: mockSetDraftUpdatedAt,
...overrides,
})
const mockWorkflowStoreGetState = vi.fn()
vi.mock('@/app/components/workflow/store', () => ({
useWorkflowStore: () => ({
getState: mockWorkflowStoreGetState,
}),
}))
// Mock useWorkflowRefreshDraft (THE KEY DEPENDENCY FOR THIS TEST)
const mockHandleRefreshWorkflowDraft = vi.fn()
vi.mock('.', () => ({
useWorkflowRefreshDraft: () => ({
handleRefreshWorkflowDraft: mockHandleRefreshWorkflowDraft,
}),
}))
// Mock API_PREFIX
vi.mock('@/config', () => ({
API_PREFIX: '/api',
}))
// Create a mock error response that mimics the actual API error
const createMockErrorResponse = (code: string) => {
const errorBody = { code, message: 'Draft not in sync' }
let bodyUsed = false
return {
json: vi.fn().mockImplementation(() => {
bodyUsed = true
return Promise.resolve(errorBody)
}),
get bodyUsed() {
return bodyUsed
},
}
}
describe('useNodesSyncDraft', () => {
beforeEach(() => {
vi.clearAllMocks()
mockGetNodesReadOnly.mockReturnValue(false)
mockGetNodes.mockReturnValue([
{ id: 'node-1', type: 'start', data: { type: 'start' } },
{ id: 'node-2', type: 'llm', data: { type: 'llm' } },
])
mockStoreState.edges = [
{ id: 'edge-1', source: 'node-1', target: 'node-2', data: {} },
]
mockWorkflowStoreGetState.mockReturnValue(createMockWorkflowStoreState())
mockSyncWorkflowDraft.mockResolvedValue({
hash: 'new-hash-456',
updated_at: Date.now(),
})
})
afterEach(() => {
vi.resetAllMocks()
})
describe('doSyncWorkflowDraft function', () => {
it('should return doSyncWorkflowDraft function', () => {
const { result } = renderHook(() => useNodesSyncDraft())
expect(result.current.doSyncWorkflowDraft).toBeDefined()
expect(typeof result.current.doSyncWorkflowDraft).toBe('function')
})
it('should return syncWorkflowDraftWhenPageClose function', () => {
const { result } = renderHook(() => useNodesSyncDraft())
expect(result.current.syncWorkflowDraftWhenPageClose).toBeDefined()
expect(typeof result.current.syncWorkflowDraftWhenPageClose).toBe('function')
})
})
describe('successful sync', () => {
it('should call syncWorkflowDraft service on successful sync', async () => {
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
expect(mockSyncWorkflowDraft).toHaveBeenCalledWith({
url: '/apps/test-app-id/workflows/draft',
params: expect.objectContaining({
hash: 'current-hash-123',
graph: expect.objectContaining({
nodes: expect.any(Array),
edges: expect.any(Array),
viewport: expect.any(Object),
}),
}),
})
})
it('should update syncWorkflowDraftHash on success', async () => {
mockSyncWorkflowDraft.mockResolvedValue({
hash: 'new-hash-789',
updated_at: 1234567890,
})
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
expect(mockSetSyncWorkflowDraftHash).toHaveBeenCalledWith('new-hash-789')
})
it('should update draftUpdatedAt on success', async () => {
const updatedAt = 1234567890
mockSyncWorkflowDraft.mockResolvedValue({
hash: 'new-hash',
updated_at: updatedAt,
})
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
expect(mockSetDraftUpdatedAt).toHaveBeenCalledWith(updatedAt)
})
it('should call onSuccess callback on success', async () => {
const onSuccess = vi.fn()
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft(false, { onSuccess })
})
expect(onSuccess).toHaveBeenCalled()
})
it('should call onSettled callback after success', async () => {
const onSettled = vi.fn()
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft(false, { onSettled })
})
expect(onSettled).toHaveBeenCalled()
})
})
describe('sync error handling - draft_workflow_not_sync (THE KEY FIX)', () => {
/**
* This is THE KEY TEST for the bug fix.
*
* SCENARIO: Multi-tab editing
* 1. User opens workflow in Tab A and Tab B
* 2. Tab A saves draft successfully, gets new hash
* 3. Tab B tries to save with old hash
* 4. Server returns 400 with code 'draft_workflow_not_sync'
*
* BEFORE FIX:
* - handleRefreshWorkflowDraft() was called without arguments
* - This would fetch draft AND overwrite the canvas
* - User loses their unsaved changes in Tab B
*
* AFTER FIX:
* - handleRefreshWorkflowDraft(true) is called
* - This fetches draft but DOES NOT overwrite canvas
* - Only hash is updated for the next sync attempt
* - User's unsaved changes are preserved
*/
it('should call handleRefreshWorkflowDraft with notUpdateCanvas=true when draft_workflow_not_sync error occurs', async () => {
const mockError = createMockErrorResponse('draft_workflow_not_sync')
mockSyncWorkflowDraft.mockRejectedValue(mockError)
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
// THE KEY ASSERTION: handleRefreshWorkflowDraft must be called with true
await waitFor(() => {
expect(mockHandleRefreshWorkflowDraft).toHaveBeenCalledWith(true)
})
})
it('should NOT call handleRefreshWorkflowDraft when notRefreshWhenSyncError is true', async () => {
const mockError = createMockErrorResponse('draft_workflow_not_sync')
mockSyncWorkflowDraft.mockRejectedValue(mockError)
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
// First parameter is notRefreshWhenSyncError
await result.current.doSyncWorkflowDraft(true)
})
// Wait a bit for async operations
await new Promise(resolve => setTimeout(resolve, 100))
expect(mockHandleRefreshWorkflowDraft).not.toHaveBeenCalled()
})
it('should call onError callback when draft_workflow_not_sync error occurs', async () => {
const mockError = createMockErrorResponse('draft_workflow_not_sync')
mockSyncWorkflowDraft.mockRejectedValue(mockError)
const onError = vi.fn()
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft(false, { onError })
})
expect(onError).toHaveBeenCalled()
})
it('should call onSettled callback after error', async () => {
const mockError = createMockErrorResponse('draft_workflow_not_sync')
mockSyncWorkflowDraft.mockRejectedValue(mockError)
const onSettled = vi.fn()
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft(false, { onSettled })
})
expect(onSettled).toHaveBeenCalled()
})
})
describe('other error handling', () => {
it('should NOT call handleRefreshWorkflowDraft for non-draft_workflow_not_sync errors', async () => {
const mockError = createMockErrorResponse('some_other_error')
mockSyncWorkflowDraft.mockRejectedValue(mockError)
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
// Wait a bit for async operations
await new Promise(resolve => setTimeout(resolve, 100))
expect(mockHandleRefreshWorkflowDraft).not.toHaveBeenCalled()
})
it('should handle error without json method', async () => {
const mockError = new Error('Network error')
mockSyncWorkflowDraft.mockRejectedValue(mockError)
const { result } = renderHook(() => useNodesSyncDraft())
const onError = vi.fn()
await act(async () => {
await result.current.doSyncWorkflowDraft(false, { onError })
})
expect(onError).toHaveBeenCalled()
expect(mockHandleRefreshWorkflowDraft).not.toHaveBeenCalled()
})
it('should handle error with bodyUsed already true', async () => {
const mockError = {
json: vi.fn(),
bodyUsed: true,
}
mockSyncWorkflowDraft.mockRejectedValue(mockError)
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
// Should not call json() when bodyUsed is true
expect(mockError.json).not.toHaveBeenCalled()
expect(mockHandleRefreshWorkflowDraft).not.toHaveBeenCalled()
})
})
describe('read-only mode', () => {
it('should not sync when nodes are read-only', async () => {
mockGetNodesReadOnly.mockReturnValue(true)
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
expect(mockSyncWorkflowDraft).not.toHaveBeenCalled()
})
it('should not sync on page close when nodes are read-only', () => {
mockGetNodesReadOnly.mockReturnValue(true)
// Mock sendBeacon
const mockSendBeacon = vi.fn()
Object.defineProperty(navigator, 'sendBeacon', {
value: mockSendBeacon,
writable: true,
})
const { result } = renderHook(() => useNodesSyncDraft())
act(() => {
result.current.syncWorkflowDraftWhenPageClose()
})
expect(mockSendBeacon).not.toHaveBeenCalled()
})
})
describe('workflow data not loaded', () => {
it('should not sync when workflow data is not loaded', async () => {
mockWorkflowStoreGetState.mockReturnValue(
createMockWorkflowStoreState({ isWorkflowDataLoaded: false }),
)
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
expect(mockSyncWorkflowDraft).not.toHaveBeenCalled()
})
})
describe('no appId', () => {
it('should not sync when appId is not set', async () => {
mockWorkflowStoreGetState.mockReturnValue(
createMockWorkflowStoreState({ appId: null }),
)
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
expect(mockSyncWorkflowDraft).not.toHaveBeenCalled()
})
})
describe('node filtering', () => {
it('should filter out temp nodes', async () => {
mockGetNodes.mockReturnValue([
{ id: 'node-1', type: 'start', data: { type: 'start' } },
{ id: 'node-temp', type: 'custom', data: { type: 'custom', _isTempNode: true } },
{ id: 'node-2', type: 'llm', data: { type: 'llm' } },
])
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
expect(mockSyncWorkflowDraft).toHaveBeenCalledWith(
expect.objectContaining({
params: expect.objectContaining({
graph: expect.objectContaining({
nodes: expect.not.arrayContaining([
expect.objectContaining({ id: 'node-temp' }),
]),
}),
}),
}),
)
})
it('should remove internal underscore properties from nodes', async () => {
mockGetNodes.mockReturnValue([
{
id: 'node-1',
type: 'start',
data: {
type: 'start',
_internalProp: 'should be removed',
_anotherInternal: true,
publicProp: 'should remain',
},
},
])
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
const callArgs = mockSyncWorkflowDraft.mock.calls[0][0]
const sentNode = callArgs.params.graph.nodes[0]
expect(sentNode.data).not.toHaveProperty('_internalProp')
expect(sentNode.data).not.toHaveProperty('_anotherInternal')
expect(sentNode.data).toHaveProperty('publicProp', 'should remain')
})
})
describe('edge filtering', () => {
it('should filter out temp edges', async () => {
mockStoreState.edges = [
{ id: 'edge-1', source: 'node-1', target: 'node-2', data: {} },
{ id: 'edge-temp', source: 'node-1', target: 'node-3', data: { _isTemp: true } },
]
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
const callArgs = mockSyncWorkflowDraft.mock.calls[0][0]
const sentEdges = callArgs.params.graph.edges
expect(sentEdges).toHaveLength(1)
expect(sentEdges[0].id).toBe('edge-1')
})
it('should remove internal underscore properties from edges', async () => {
mockStoreState.edges = [
{
id: 'edge-1',
source: 'node-1',
target: 'node-2',
data: {
_internalEdgeProp: 'should be removed',
publicEdgeProp: 'should remain',
},
},
]
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
const callArgs = mockSyncWorkflowDraft.mock.calls[0][0]
const sentEdge = callArgs.params.graph.edges[0]
expect(sentEdge.data).not.toHaveProperty('_internalEdgeProp')
expect(sentEdge.data).toHaveProperty('publicEdgeProp', 'should remain')
})
})
describe('viewport handling', () => {
it('should send current viewport from transform', async () => {
mockStoreState.transform = [100, 200, 1.5]
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
expect(mockSyncWorkflowDraft).toHaveBeenCalledWith(
expect.objectContaining({
params: expect.objectContaining({
graph: expect.objectContaining({
viewport: { x: 100, y: 200, zoom: 1.5 },
}),
}),
}),
)
})
})
describe('multi-tab concurrent editing scenario (END-TO-END TEST)', () => {
/**
* Simulates the complete multi-tab scenario to verify the fix works correctly.
*
* Scenario:
* 1. Tab A and Tab B both have the workflow open with hash 'hash-v1'
* 2. Tab A saves successfully, server returns 'hash-v2'
* 3. Tab B tries to save with 'hash-v1', gets 'draft_workflow_not_sync' error
* 4. Tab B should only update hash to 'hash-v2', not overwrite canvas
* 5. Tab B can now retry save with correct hash
*/
it('should preserve canvas data during hash conflict resolution', async () => {
// Initial state: both tabs have hash-v1
mockWorkflowStoreGetState.mockReturnValue(
createMockWorkflowStoreState({ syncWorkflowDraftHash: 'hash-v1' }),
)
// Tab B tries to save with old hash, server returns error
const syncError = createMockErrorResponse('draft_workflow_not_sync')
mockSyncWorkflowDraft.mockRejectedValue(syncError)
const { result } = renderHook(() => useNodesSyncDraft())
// Tab B attempts to sync
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
// Verify the sync was attempted with old hash
expect(mockSyncWorkflowDraft).toHaveBeenCalledWith(
expect.objectContaining({
params: expect.objectContaining({
hash: 'hash-v1',
}),
}),
)
// Verify handleRefreshWorkflowDraft was called with true (not overwrite canvas)
await waitFor(() => {
expect(mockHandleRefreshWorkflowDraft).toHaveBeenCalledWith(true)
})
// The key assertion: only one argument (true) was passed
expect(mockHandleRefreshWorkflowDraft).toHaveBeenCalledTimes(1)
expect(mockHandleRefreshWorkflowDraft.mock.calls[0]).toEqual([true])
})
it('should handle multiple consecutive sync failures gracefully', async () => {
// Create fresh error for each call to avoid bodyUsed issue
mockSyncWorkflowDraft
.mockRejectedValueOnce(createMockErrorResponse('draft_workflow_not_sync'))
.mockRejectedValueOnce(createMockErrorResponse('draft_workflow_not_sync'))
const { result } = renderHook(() => useNodesSyncDraft())
// First sync attempt
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
// Wait for first refresh call
await waitFor(() => {
expect(mockHandleRefreshWorkflowDraft).toHaveBeenCalledTimes(1)
})
// Second sync attempt
await act(async () => {
await result.current.doSyncWorkflowDraft()
})
// Both should call handleRefreshWorkflowDraft with true
await waitFor(() => {
expect(mockHandleRefreshWorkflowDraft).toHaveBeenCalledTimes(2)
})
mockHandleRefreshWorkflowDraft.mock.calls.forEach((call) => {
expect(call).toEqual([true])
})
})
})
describe('callbacks behavior', () => {
it('should not call onSuccess when sync fails', async () => {
const syncError = createMockErrorResponse('draft_workflow_not_sync')
mockSyncWorkflowDraft.mockRejectedValue(syncError)
const onSuccess = vi.fn()
const onError = vi.fn()
const { result } = renderHook(() => useNodesSyncDraft())
await act(async () => {
await result.current.doSyncWorkflowDraft(false, { onSuccess, onError })
})
expect(onSuccess).not.toHaveBeenCalled()
expect(onError).toHaveBeenCalled()
})
it('should always call onSettled regardless of success or failure', async () => {
const onSettled = vi.fn()
const { result } = renderHook(() => useNodesSyncDraft())
// Test success case
await act(async () => {
await result.current.doSyncWorkflowDraft(false, { onSettled })
})
expect(onSettled).toHaveBeenCalledTimes(1)
// Reset
onSettled.mockClear()
// Test failure case
const syncError = createMockErrorResponse('draft_workflow_not_sync')
mockSyncWorkflowDraft.mockRejectedValue(syncError)
await act(async () => {
await result.current.doSyncWorkflowDraft(false, { onSettled })
})
expect(onSettled).toHaveBeenCalledTimes(1)
})
})
})

View File

@@ -115,7 +115,7 @@ export const useNodesSyncDraft = () => {
if (error && error.json && !error.bodyUsed) {
error.json().then((err: any) => {
if (err.code === 'draft_workflow_not_sync' && !notRefreshWhenSyncError)
handleRefreshWorkflowDraft()
handleRefreshWorkflowDraft(true)
})
}
callback?.onError?.()

View File

@@ -0,0 +1,556 @@
/**
* Test Suite for useWorkflowRefreshDraft Hook
*
* PURPOSE:
* This hook is responsible for refreshing workflow draft data from the server.
* The key fix being tested is the `notUpdateCanvas` parameter behavior.
*
* MULTI-TAB PROBLEM SCENARIO:
* 1. User opens the same workflow in Tab A and Tab B (both have hash: v1)
* 2. Tab A saves successfully, server returns new hash: v2
* 3. Tab B tries to save with old hash: v1, server returns 400 error (draft_workflow_not_sync)
* 4. BEFORE FIX: handleRefreshWorkflowDraft() was called without args, which fetched
* draft AND overwrote canvas - user lost unsaved changes in Tab B
* 5. AFTER FIX: handleRefreshWorkflowDraft(true) is called, which fetches draft but
* only updates hash, preserving user's canvas changes
*
* TESTING STRATEGY:
* We don't simulate actual tab switching UI behavior. Instead, we test the hook's
* response to specific inputs:
* - When notUpdateCanvas=true: should NOT call handleUpdateWorkflowCanvas
* - When notUpdateCanvas=false/undefined: should call handleUpdateWorkflowCanvas
*
* This is behavior-driven testing - we verify "what the code does when given specific
* inputs" rather than simulating complete user interaction flows.
*/
import { act, renderHook, waitFor } from '@testing-library/react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { useWorkflowRefreshDraft } from './use-workflow-refresh-draft'
// Mock the workflow service
const mockFetchWorkflowDraft = vi.fn()
vi.mock('@/service/workflow', () => ({
fetchWorkflowDraft: (...args: unknown[]) => mockFetchWorkflowDraft(...args),
}))
// Mock the workflow update hook
const mockHandleUpdateWorkflowCanvas = vi.fn()
vi.mock('@/app/components/workflow/hooks', () => ({
useWorkflowUpdate: () => ({
handleUpdateWorkflowCanvas: mockHandleUpdateWorkflowCanvas,
}),
}))
// Mock store state
const mockSetSyncWorkflowDraftHash = vi.fn()
const mockSetIsSyncingWorkflowDraft = vi.fn()
const mockSetEnvironmentVariables = vi.fn()
const mockSetEnvSecrets = vi.fn()
const mockSetConversationVariables = vi.fn()
const mockSetIsWorkflowDataLoaded = vi.fn()
const mockCancelDebouncedSync = vi.fn()
const createMockStoreState = (overrides = {}) => ({
appId: 'test-app-id',
setSyncWorkflowDraftHash: mockSetSyncWorkflowDraftHash,
setIsSyncingWorkflowDraft: mockSetIsSyncingWorkflowDraft,
setEnvironmentVariables: mockSetEnvironmentVariables,
setEnvSecrets: mockSetEnvSecrets,
setConversationVariables: mockSetConversationVariables,
setIsWorkflowDataLoaded: mockSetIsWorkflowDataLoaded,
isWorkflowDataLoaded: true,
debouncedSyncWorkflowDraft: {
cancel: mockCancelDebouncedSync,
},
...overrides,
})
const mockWorkflowStoreGetState = vi.fn()
vi.mock('@/app/components/workflow/store', () => ({
useWorkflowStore: () => ({
getState: mockWorkflowStoreGetState,
}),
}))
// Default mock response from fetchWorkflowDraft
const createMockDraftResponse = (overrides = {}) => ({
hash: 'new-hash-12345',
graph: {
nodes: [{ id: 'node-1', type: 'start', data: {} }],
edges: [{ id: 'edge-1', source: 'node-1', target: 'node-2' }],
viewport: { x: 100, y: 200, zoom: 1.5 },
},
environment_variables: [
{ id: 'env-1', name: 'API_KEY', value: 'secret-key', value_type: 'secret' },
{ id: 'env-2', name: 'BASE_URL', value: 'https://api.example.com', value_type: 'string' },
],
conversation_variables: [
{ id: 'conv-1', name: 'user_input', value: 'test' },
],
...overrides,
})
describe('useWorkflowRefreshDraft', () => {
beforeEach(() => {
vi.clearAllMocks()
mockWorkflowStoreGetState.mockReturnValue(createMockStoreState())
mockFetchWorkflowDraft.mockResolvedValue(createMockDraftResponse())
})
afterEach(() => {
vi.resetAllMocks()
})
describe('handleRefreshWorkflowDraft function', () => {
it('should return handleRefreshWorkflowDraft function', () => {
const { result } = renderHook(() => useWorkflowRefreshDraft())
expect(result.current.handleRefreshWorkflowDraft).toBeDefined()
expect(typeof result.current.handleRefreshWorkflowDraft).toBe('function')
})
})
describe('notUpdateCanvas parameter behavior (THE KEY FIX)', () => {
it('should NOT call handleUpdateWorkflowCanvas when notUpdateCanvas is true', async () => {
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft(true)
})
await waitFor(() => {
expect(mockFetchWorkflowDraft).toHaveBeenCalledWith('/apps/test-app-id/workflows/draft')
})
await waitFor(() => {
expect(mockSetSyncWorkflowDraftHash).toHaveBeenCalledWith('new-hash-12345')
})
// THE KEY ASSERTION: Canvas should NOT be updated when notUpdateCanvas is true
expect(mockHandleUpdateWorkflowCanvas).not.toHaveBeenCalled()
})
it('should call handleUpdateWorkflowCanvas when notUpdateCanvas is false', async () => {
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft(false)
})
await waitFor(() => {
expect(mockFetchWorkflowDraft).toHaveBeenCalledWith('/apps/test-app-id/workflows/draft')
})
await waitFor(() => {
// Canvas SHOULD be updated when notUpdateCanvas is false
expect(mockHandleUpdateWorkflowCanvas).toHaveBeenCalledWith({
nodes: [{ id: 'node-1', type: 'start', data: {} }],
edges: [{ id: 'edge-1', source: 'node-1', target: 'node-2' }],
viewport: { x: 100, y: 200, zoom: 1.5 },
})
})
await waitFor(() => {
expect(mockSetSyncWorkflowDraftHash).toHaveBeenCalledWith('new-hash-12345')
})
})
it('should call handleUpdateWorkflowCanvas when notUpdateCanvas is undefined (default)', async () => {
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft()
})
await waitFor(() => {
expect(mockFetchWorkflowDraft).toHaveBeenCalled()
})
await waitFor(() => {
// Canvas SHOULD be updated when notUpdateCanvas is undefined
expect(mockHandleUpdateWorkflowCanvas).toHaveBeenCalled()
})
})
it('should still update hash even when notUpdateCanvas is true', async () => {
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft(true)
})
await waitFor(() => {
expect(mockSetSyncWorkflowDraftHash).toHaveBeenCalledWith('new-hash-12345')
})
// Verify canvas was NOT updated
expect(mockHandleUpdateWorkflowCanvas).not.toHaveBeenCalled()
})
it('should still update environment variables when notUpdateCanvas is true', async () => {
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft(true)
})
await waitFor(() => {
expect(mockSetEnvironmentVariables).toHaveBeenCalledWith([
{ id: 'env-1', name: 'API_KEY', value: '[__HIDDEN__]', value_type: 'secret' },
{ id: 'env-2', name: 'BASE_URL', value: 'https://api.example.com', value_type: 'string' },
])
})
expect(mockHandleUpdateWorkflowCanvas).not.toHaveBeenCalled()
})
it('should still update env secrets when notUpdateCanvas is true', async () => {
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft(true)
})
await waitFor(() => {
expect(mockSetEnvSecrets).toHaveBeenCalledWith({
'env-1': 'secret-key',
})
})
expect(mockHandleUpdateWorkflowCanvas).not.toHaveBeenCalled()
})
it('should still update conversation variables when notUpdateCanvas is true', async () => {
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft(true)
})
await waitFor(() => {
expect(mockSetConversationVariables).toHaveBeenCalledWith([
{ id: 'conv-1', name: 'user_input', value: 'test' },
])
})
expect(mockHandleUpdateWorkflowCanvas).not.toHaveBeenCalled()
})
})
describe('syncing state management', () => {
it('should set isSyncingWorkflowDraft to true before fetch', () => {
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft()
})
expect(mockSetIsSyncingWorkflowDraft).toHaveBeenCalledWith(true)
})
it('should set isSyncingWorkflowDraft to false after fetch completes', async () => {
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft()
})
await waitFor(() => {
expect(mockSetIsSyncingWorkflowDraft).toHaveBeenCalledWith(false)
})
})
it('should set isSyncingWorkflowDraft to false even when fetch fails', async () => {
mockFetchWorkflowDraft.mockRejectedValue(new Error('Network error'))
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft()
})
await waitFor(() => {
expect(mockSetIsSyncingWorkflowDraft).toHaveBeenCalledWith(false)
})
})
})
describe('isWorkflowDataLoaded flag management', () => {
it('should set isWorkflowDataLoaded to false before fetch when it was true', () => {
mockWorkflowStoreGetState.mockReturnValue(
createMockStoreState({ isWorkflowDataLoaded: true }),
)
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft()
})
expect(mockSetIsWorkflowDataLoaded).toHaveBeenCalledWith(false)
})
it('should set isWorkflowDataLoaded to true after fetch succeeds', async () => {
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft()
})
await waitFor(() => {
expect(mockSetIsWorkflowDataLoaded).toHaveBeenCalledWith(true)
})
})
it('should restore isWorkflowDataLoaded when fetch fails and it was previously loaded', async () => {
mockWorkflowStoreGetState.mockReturnValue(
createMockStoreState({ isWorkflowDataLoaded: true }),
)
mockFetchWorkflowDraft.mockRejectedValue(new Error('Network error'))
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft()
})
await waitFor(() => {
// Should restore to true because wasLoaded was true
expect(mockSetIsWorkflowDataLoaded).toHaveBeenLastCalledWith(true)
})
})
})
describe('debounced sync cancellation', () => {
it('should cancel debounced sync before fetching draft', () => {
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft()
})
expect(mockCancelDebouncedSync).toHaveBeenCalled()
})
it('should handle case when debouncedSyncWorkflowDraft has no cancel method', () => {
mockWorkflowStoreGetState.mockReturnValue(
createMockStoreState({ debouncedSyncWorkflowDraft: {} }),
)
const { result } = renderHook(() => useWorkflowRefreshDraft())
// Should not throw
expect(() => {
act(() => {
result.current.handleRefreshWorkflowDraft()
})
}).not.toThrow()
})
})
describe('edge cases', () => {
it('should handle empty graph in response', async () => {
mockFetchWorkflowDraft.mockResolvedValue({
hash: 'hash-empty',
graph: null,
environment_variables: [],
conversation_variables: [],
})
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft(false)
})
await waitFor(() => {
expect(mockHandleUpdateWorkflowCanvas).toHaveBeenCalledWith({
nodes: [],
edges: [],
viewport: { x: 0, y: 0, zoom: 1 },
})
})
})
it('should handle missing viewport in response', async () => {
mockFetchWorkflowDraft.mockResolvedValue({
hash: 'hash-no-viewport',
graph: {
nodes: [{ id: 'node-1' }],
edges: [],
viewport: null,
},
environment_variables: [],
conversation_variables: [],
})
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft(false)
})
await waitFor(() => {
expect(mockHandleUpdateWorkflowCanvas).toHaveBeenCalledWith({
nodes: [{ id: 'node-1' }],
edges: [],
viewport: { x: 0, y: 0, zoom: 1 },
})
})
})
it('should handle missing environment_variables in response', async () => {
mockFetchWorkflowDraft.mockResolvedValue({
hash: 'hash-no-env',
graph: { nodes: [], edges: [], viewport: { x: 0, y: 0, zoom: 1 } },
environment_variables: undefined,
conversation_variables: [],
})
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft(true)
})
await waitFor(() => {
expect(mockSetEnvironmentVariables).toHaveBeenCalledWith([])
expect(mockSetEnvSecrets).toHaveBeenCalledWith({})
})
})
it('should handle missing conversation_variables in response', async () => {
mockFetchWorkflowDraft.mockResolvedValue({
hash: 'hash-no-conv',
graph: { nodes: [], edges: [], viewport: { x: 0, y: 0, zoom: 1 } },
environment_variables: [],
conversation_variables: undefined,
})
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft(true)
})
await waitFor(() => {
expect(mockSetConversationVariables).toHaveBeenCalledWith([])
})
})
it('should filter only secret type for envSecrets', async () => {
mockFetchWorkflowDraft.mockResolvedValue({
hash: 'hash-mixed-env',
graph: { nodes: [], edges: [], viewport: { x: 0, y: 0, zoom: 1 } },
environment_variables: [
{ id: 'env-1', name: 'SECRET_KEY', value: 'secret-value', value_type: 'secret' },
{ id: 'env-2', name: 'PUBLIC_URL', value: 'https://example.com', value_type: 'string' },
{ id: 'env-3', name: 'ANOTHER_SECRET', value: 'another-secret', value_type: 'secret' },
],
conversation_variables: [],
})
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft(true)
})
await waitFor(() => {
expect(mockSetEnvSecrets).toHaveBeenCalledWith({
'env-1': 'secret-value',
'env-3': 'another-secret',
})
})
})
it('should hide secret values in environment variables', async () => {
mockFetchWorkflowDraft.mockResolvedValue({
hash: 'hash-secrets',
graph: { nodes: [], edges: [], viewport: { x: 0, y: 0, zoom: 1 } },
environment_variables: [
{ id: 'env-1', name: 'SECRET_KEY', value: 'super-secret', value_type: 'secret' },
{ id: 'env-2', name: 'PUBLIC_URL', value: 'https://example.com', value_type: 'string' },
],
conversation_variables: [],
})
const { result } = renderHook(() => useWorkflowRefreshDraft())
act(() => {
result.current.handleRefreshWorkflowDraft(true)
})
await waitFor(() => {
expect(mockSetEnvironmentVariables).toHaveBeenCalledWith([
{ id: 'env-1', name: 'SECRET_KEY', value: '[__HIDDEN__]', value_type: 'secret' },
{ id: 'env-2', name: 'PUBLIC_URL', value: 'https://example.com', value_type: 'string' },
])
})
})
})
describe('multi-tab scenario simulation (THE BUG FIX VERIFICATION)', () => {
/**
* This test verifies the fix for the multi-tab scenario:
* 1. User opens workflow in Tab A and Tab B
* 2. Tab A saves draft successfully
* 3. Tab B tries to save but gets 'draft_workflow_not_sync' error (hash mismatch)
* 4. BEFORE FIX: Tab B would fetch draft and overwrite canvas with old data
* 5. AFTER FIX: Tab B only updates hash, preserving user's canvas changes
*/
it('should only update hash when called with notUpdateCanvas=true (simulating sync error recovery)', async () => {
const mockResponse = createMockDraftResponse()
mockFetchWorkflowDraft.mockResolvedValue(mockResponse)
const { result } = renderHook(() => useWorkflowRefreshDraft())
// Simulate the sync error recovery scenario where notUpdateCanvas is true
act(() => {
result.current.handleRefreshWorkflowDraft(true)
})
await waitFor(() => {
expect(mockFetchWorkflowDraft).toHaveBeenCalled()
})
await waitFor(() => {
// Hash should be updated for next sync attempt
expect(mockSetSyncWorkflowDraftHash).toHaveBeenCalledWith('new-hash-12345')
})
// Canvas should NOT be updated - user's changes are preserved
expect(mockHandleUpdateWorkflowCanvas).not.toHaveBeenCalled()
// Other states should still be updated
expect(mockSetEnvironmentVariables).toHaveBeenCalled()
expect(mockSetConversationVariables).toHaveBeenCalled()
})
it('should update canvas when called with notUpdateCanvas=false (normal refresh)', async () => {
const mockResponse = createMockDraftResponse()
mockFetchWorkflowDraft.mockResolvedValue(mockResponse)
const { result } = renderHook(() => useWorkflowRefreshDraft())
// Simulate normal refresh scenario
act(() => {
result.current.handleRefreshWorkflowDraft(false)
})
await waitFor(() => {
expect(mockFetchWorkflowDraft).toHaveBeenCalled()
})
await waitFor(() => {
expect(mockSetSyncWorkflowDraftHash).toHaveBeenCalledWith('new-hash-12345')
})
// Canvas SHOULD be updated in normal refresh
await waitFor(() => {
expect(mockHandleUpdateWorkflowCanvas).toHaveBeenCalled()
})
})
})
})

View File

@@ -8,7 +8,7 @@ export const useWorkflowRefreshDraft = () => {
const workflowStore = useWorkflowStore()
const { handleUpdateWorkflowCanvas } = useWorkflowUpdate()
const handleRefreshWorkflowDraft = useCallback(() => {
const handleRefreshWorkflowDraft = useCallback((notUpdateCanvas?: boolean) => {
const {
appId,
setSyncWorkflowDraftHash,
@@ -31,12 +31,14 @@ export const useWorkflowRefreshDraft = () => {
fetchWorkflowDraft(`/apps/${appId}/workflows/draft`)
.then((response) => {
// Ensure we have a valid workflow structure with viewport
const workflowData: WorkflowDataUpdater = {
nodes: response.graph?.nodes || [],
edges: response.graph?.edges || [],
viewport: response.graph?.viewport || { x: 0, y: 0, zoom: 1 },
if (!notUpdateCanvas) {
const workflowData: WorkflowDataUpdater = {
nodes: response.graph?.nodes || [],
edges: response.graph?.edges || [],
viewport: response.graph?.viewport || { x: 0, y: 0, zoom: 1 },
}
handleUpdateWorkflowCanvas(workflowData)
}
handleUpdateWorkflowCanvas(workflowData)
setSyncWorkflowDraftHash(response.hash)
setEnvSecrets((response.environment_variables || []).filter(env => env.value_type === 'secret').reduce((acc, env) => {
acc[env.id] = env.value

View File

@@ -87,7 +87,6 @@ export type ToolValue = {
enabled?: boolean
extra?: { description?: string } & Record<string, unknown>
credential_id?: string
type?: string
}
export type DataSourceItem = {

View File

@@ -18,7 +18,6 @@ import {
Group,
} from '@/app/components/workflow/nodes/_base/components/layout'
import VarReferencePicker from '@/app/components/workflow/nodes/_base/components/variable/var-reference-picker'
import { IS_CE_EDITION } from '@/config'
import Split from '../_base/components/split'
import ChunkStructure from './components/chunk-structure'
import EmbeddingModel from './components/embedding-model'
@@ -173,7 +172,7 @@ const Panel: FC<NodePanelProps<KnowledgeBaseNodeType>> = ({
{
data.indexing_technique === IndexMethodEnum.QUALIFIED
&& [ChunkStructureEnum.general, ChunkStructureEnum.parent_child].includes(data.chunk_structure)
&& IS_CE_EDITION && (
&& (
<>
<SummaryIndexSetting
summaryIndexSetting={data.summary_index_setting}

View File

@@ -1,7 +1,7 @@
{
"name": "dify-web",
"type": "module",
"version": "1.12.1",
"version": "1.11.4",
"private": true,
"packageManager": "pnpm@10.27.0+sha512.72d699da16b1179c14ba9e64dc71c9a40988cbdc65c264cb0e489db7de917f20dcf4d64d8723625f2969ba52d4b7e2a1170682d9ac2a5dcaeaab732b7e16f04a",
"imports": {
@@ -117,6 +117,7 @@
"ky": "1.12.0",
"lamejs": "1.2.1",
"lexical": "0.38.2",
"line-clamp": "1.0.0",
"mermaid": "11.11.0",
"mime": "4.1.0",
"mitt": "3.0.1",

8
web/pnpm-lock.yaml generated
View File

@@ -233,6 +233,9 @@ importers:
lexical:
specifier: 0.38.2
version: 0.38.2
line-clamp:
specifier: 1.0.0
version: 1.0.0
mermaid:
specifier: 11.11.0
version: 11.11.0
@@ -5400,6 +5403,9 @@ packages:
resolution: {integrity: sha512-/vlFKAoH5Cgt3Ie+JLhRbwOsCQePABiU3tJ1egGvyQ+33R/vcwM2Zl2QR/LzjsBeItPt3oSVXapn+m4nQDvpzw==}
engines: {node: '>=14'}
line-clamp@1.0.0:
resolution: {integrity: sha512-dCDlvMj572RIRBQ3x9aIX0DTdt2St1bMdpi64jVTAi5vqBck7wf+J97//+J7+pS80rFJaYa8HiyXCTp0flpnBA==}
lines-and-columns@1.2.4:
resolution: {integrity: sha512-7ylylesZQ/PV29jhEDl3Ufjo6ZX7gCqJr5F7PKrqc93v7fzSymt1BpwEU8nAUXs8qzzvqhbjhK5QZg6Mt/HkBg==}
@@ -12907,6 +12913,8 @@ snapshots:
lilconfig@3.1.3: {}
line-clamp@1.0.0: {}
lines-and-columns@1.2.4: {}
lint-staged@15.5.2:

View File

@@ -1,80 +0,0 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
const loadGetBaseURL = async (isClientValue: boolean) => {
vi.resetModules()
vi.doMock('@/utils/client', () => ({ isClient: isClientValue }))
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})
// eslint-disable-next-line next/no-assign-module-variable
const module = await import('./client')
warnSpy.mockClear()
return { getBaseURL: module.getBaseURL, warnSpy }
}
// Scenario: base URL selection and warnings.
describe('getBaseURL', () => {
beforeEach(() => {
vi.clearAllMocks()
})
afterEach(() => {
vi.restoreAllMocks()
})
// Scenario: client environment uses window origin.
it('should use window origin when running on the client', async () => {
// Arrange
const { origin } = window.location
const { getBaseURL, warnSpy } = await loadGetBaseURL(true)
// Act
const url = getBaseURL('/api')
// Assert
expect(url.href).toBe(`${origin}/api`)
expect(warnSpy).not.toHaveBeenCalled()
})
// Scenario: server environment falls back to localhost with warning.
it('should fall back to localhost and warn on the server', async () => {
// Arrange
const { getBaseURL, warnSpy } = await loadGetBaseURL(false)
// Act
const url = getBaseURL('/api')
// Assert
expect(url.href).toBe('http://localhost/api')
expect(warnSpy).toHaveBeenCalledTimes(1)
expect(warnSpy).toHaveBeenCalledWith('Using localhost as base URL in server environment, please configure accordingly.')
})
// Scenario: non-http protocols surface warnings.
it('should warn when protocol is not http or https', async () => {
// Arrange
const { getBaseURL, warnSpy } = await loadGetBaseURL(true)
// Act
const url = getBaseURL('localhost:5001/console/api')
// Assert
expect(url.protocol).toBe('localhost:')
expect(url.href).toBe('localhost:5001/console/api')
expect(warnSpy).toHaveBeenCalledTimes(1)
expect(warnSpy).toHaveBeenCalledWith(
'Unexpected protocol for API requests, expected http or https. Current protocol: localhost:. Please configure accordingly.',
)
})
// Scenario: absolute http URLs are preserved.
it('should keep absolute http URLs intact', async () => {
// Arrange
const { getBaseURL, warnSpy } = await loadGetBaseURL(true)
// Act
const url = getBaseURL('https://api.example.com/console/api')
// Assert
expect(url.href).toBe('https://api.example.com/console/api')
expect(warnSpy).not.toHaveBeenCalled()
})
})

View File

@@ -13,38 +13,12 @@ import {
consoleRouterContract,
marketplaceRouterContract,
} from '@/contract/router'
import { isClient } from '@/utils/client'
import { request } from './base'
const getMarketplaceHeaders = () => new Headers({
'X-Dify-Version': !IS_MARKETPLACE ? APP_VERSION : '999.0.0',
})
function isURL(path: string) {
try {
// eslint-disable-next-line no-new
new URL(path)
return true
}
catch {
return false
}
}
export function getBaseURL(path: string) {
const url = new URL(path, isURL(path) ? undefined : isClient ? window.location.origin : 'http://localhost')
if (!isClient && !isURL(path)) {
console.warn('Using localhost as base URL in server environment, please configure accordingly.')
}
if (url.protocol !== 'http:' && url.protocol !== 'https:') {
console.warn(`Unexpected protocol for API requests, expected http or https. Current protocol: ${url.protocol}. Please configure accordingly.`)
}
return url
}
const marketplaceLink = new OpenAPILink(marketplaceRouterContract, {
url: MARKETPLACE_API_PREFIX,
headers: () => (getMarketplaceHeaders()),
@@ -65,7 +39,7 @@ export const marketplaceClient: JsonifiedClient<ContractRouterClient<typeof mark
export const marketplaceQuery = createTanstackQueryUtils(marketplaceClient, { path: ['marketplace'] })
const consoleLink = new OpenAPILink(consoleRouterContract, {
url: getBaseURL(API_PREFIX),
url: API_PREFIX,
fetch: (input, init) => {
return request(
input.url,