mirror of
https://github.com/langgenius/dify.git
synced 2026-01-07 23:04:12 +00:00
Merge branch 'feat/support-multimodal-embedding' into deploy/dev
Some checks are pending
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Waiting to run
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Blocked by required conditions
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Blocked by required conditions
Some checks are pending
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Waiting to run
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Blocked by required conditions
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Blocked by required conditions
# Conflicts: # api/core/indexing_runner.py
This commit is contained in:
@@ -551,7 +551,7 @@ class IndexingRunner:
|
||||
indexing_start_at = time.perf_counter()
|
||||
tokens = 0
|
||||
create_keyword_thread = None
|
||||
if (dataset_document.doc_form != IndexStructureType.PARENT_CHILD_INDEX
|
||||
if (dataset_document.doc_form != IndexStructureType.PARENT_CHILD_INDEX
|
||||
and dataset.indexing_technique == "economy"):
|
||||
# create keyword index
|
||||
create_keyword_thread = threading.Thread(
|
||||
@@ -636,11 +636,11 @@ class IndexingRunner:
|
||||
db.session.commit()
|
||||
|
||||
def _process_chunk(
|
||||
self, flask_app: Flask,
|
||||
index_processor: BaseIndexProcessor,
|
||||
chunk_documents: list[Document],
|
||||
dataset: Dataset,
|
||||
dataset_document: DatasetDocument,
|
||||
self, flask_app: Flask,
|
||||
index_processor: BaseIndexProcessor,
|
||||
chunk_documents: list[Document],
|
||||
dataset: Dataset,
|
||||
dataset_document: DatasetDocument,
|
||||
embedding_model_instance: ModelInstance | None
|
||||
):
|
||||
with flask_app.app_context():
|
||||
@@ -652,15 +652,15 @@ class IndexingRunner:
|
||||
page_content_list = [document.page_content for document in chunk_documents]
|
||||
tokens += sum(embedding_model_instance.get_text_embedding_num_tokens(page_content_list))
|
||||
|
||||
multimodel_documents = []
|
||||
multimodal_documents = []
|
||||
for document in chunk_documents:
|
||||
if document.attachments:
|
||||
multimodel_documents.extend(document.attachments)
|
||||
if document.attachments and dataset.is_multimodal:
|
||||
multimodal_documents.extend(document.attachments)
|
||||
|
||||
# load index
|
||||
index_processor.load(dataset,
|
||||
chunk_documents,
|
||||
multimodel_documents=multimodel_documents,
|
||||
index_processor.load(dataset,
|
||||
chunk_documents,
|
||||
multimodal_documents=multimodal_documents,
|
||||
with_keywords=False)
|
||||
|
||||
document_ids = [document.metadata["doc_id"] for document in chunk_documents]
|
||||
@@ -758,7 +758,7 @@ class IndexingRunner:
|
||||
)
|
||||
|
||||
# add document segments
|
||||
doc_store.add_documents(docs=documents,
|
||||
doc_store.add_documents(docs=documents,
|
||||
save_child=dataset_document.doc_form == IndexStructureType.PARENT_CHILD_INDEX)
|
||||
|
||||
# update document status to indexing
|
||||
|
||||
@@ -208,7 +208,7 @@ class Vector:
|
||||
self._vector_processor.create(texts=batch, embeddings=batch_embeddings, **kwargs)
|
||||
logger.info("Embedding %s texts took %s s", len(texts), time.time() - start)
|
||||
|
||||
def create_multimodel(self, file_documents: list | None = None, **kwargs):
|
||||
def create_multimodal(self, file_documents: list | None = None, **kwargs):
|
||||
if file_documents:
|
||||
start = time.time()
|
||||
logger.info("start embedding %s files %s", len(file_documents), start)
|
||||
|
||||
@@ -171,6 +171,7 @@ class CacheEmbedding(Embeddings):
|
||||
model_name=self._model_instance.model,
|
||||
hash=file_id,
|
||||
provider_name=self._model_instance.provider,
|
||||
embedding=pickle.dumps(n_embedding, protocol=pickle.HIGHEST_PROTOCOL),
|
||||
)
|
||||
embedding_cache.set_embedding(n_embedding)
|
||||
db.session.add(embedding_cache)
|
||||
|
||||
@@ -40,7 +40,7 @@ class BaseIndexProcessor(ABC):
|
||||
self,
|
||||
dataset: Dataset,
|
||||
documents: list[Document],
|
||||
multimodel_documents: list[Document] | None = None,
|
||||
multimodal_documents: list[AttachmentDocument] | None = None,
|
||||
with_keywords: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
@@ -114,19 +114,23 @@ class BaseIndexProcessor(ABC):
|
||||
"""
|
||||
multi_model_documents = []
|
||||
text = document.page_content
|
||||
|
||||
# Collect all upload_file_ids including duplicates to preserve occurrence count
|
||||
upload_file_id_list = []
|
||||
|
||||
# For data before v0.10.0
|
||||
pattern = r"/files/([a-f0-9\-]+)/image-preview(?:\?.*?)?"
|
||||
matches = re.finditer(pattern, text)
|
||||
upload_file_ids = []
|
||||
for match in matches:
|
||||
upload_file_id = match.group(1)
|
||||
upload_file_ids.append(upload_file_id)
|
||||
upload_file_id_list.append(upload_file_id)
|
||||
|
||||
# For data after v0.10.0
|
||||
pattern = r"/files/([a-f0-9\-]+)/file-preview(?:\?.*?)?"
|
||||
matches = re.finditer(pattern, text)
|
||||
for match in matches:
|
||||
upload_file_id = match.group(1)
|
||||
upload_file_ids.append(upload_file_id)
|
||||
upload_file_id_list.append(upload_file_id)
|
||||
|
||||
# For tools directory - direct file formats (e.g., .png, .jpg, etc.)
|
||||
# Match URL including any query parameters up to common URL boundaries (space, parenthesis, quotes)
|
||||
@@ -134,11 +138,23 @@ class BaseIndexProcessor(ABC):
|
||||
matches = re.finditer(pattern, text)
|
||||
for match in matches:
|
||||
upload_file_id = match.group(1)
|
||||
upload_file_ids.append(upload_file_id)
|
||||
upload_files = db.session.query(UploadFile).filter(UploadFile.id.in_(upload_file_ids)).all()
|
||||
if upload_files:
|
||||
for upload_file in upload_files:
|
||||
multi_model_documents.append(Document(
|
||||
upload_file_id_list.append(upload_file_id)
|
||||
|
||||
if not upload_file_id_list:
|
||||
return multi_model_documents
|
||||
|
||||
# Get unique IDs for database query
|
||||
unique_upload_file_ids = list(set(upload_file_id_list))
|
||||
upload_files = db.session.query(UploadFile).filter(UploadFile.id.in_(unique_upload_file_ids)).all()
|
||||
|
||||
# Create a mapping from ID to UploadFile for quick lookup
|
||||
upload_file_map = {upload_file.id: upload_file for upload_file in upload_files}
|
||||
|
||||
# Create a Document for each occurrence (including duplicates)
|
||||
for upload_file_id in upload_file_id_list:
|
||||
upload_file = upload_file_map.get(upload_file_id)
|
||||
if upload_file:
|
||||
multi_model_documents.append(AttachmentDocument(
|
||||
page_content=upload_file.name,
|
||||
metadata={
|
||||
"doc_id": upload_file.id,
|
||||
|
||||
@@ -85,15 +85,15 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
|
||||
self,
|
||||
dataset: Dataset,
|
||||
documents: list[Document],
|
||||
multimodel_documents: list[Document] | None = None,
|
||||
multimodal_documents: list[AttachmentDocument] | None = None,
|
||||
with_keywords: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
if dataset.indexing_technique == "high_quality":
|
||||
vector = Vector(dataset)
|
||||
vector.create(documents)
|
||||
if multimodel_documents:
|
||||
vector.create_multimodel(multimodel_documents)
|
||||
if multimodal_documents and dataset.is_multimodal:
|
||||
vector.create_multimodal(multimodal_documents)
|
||||
with_keywords = False
|
||||
if with_keywords:
|
||||
keywords_list = kwargs.get("keywords_list")
|
||||
@@ -201,7 +201,7 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
|
||||
vector = Vector(dataset)
|
||||
vector.create(documents)
|
||||
if all_multimodal_documents:
|
||||
vector.create_multimodel(all_multimodal_documents)
|
||||
vector.create_multimodal(all_multimodal_documents)
|
||||
elif dataset.indexing_technique == "economy":
|
||||
keyword = Keyword(dataset)
|
||||
keyword.add_texts(documents)
|
||||
|
||||
@@ -115,7 +115,7 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
|
||||
self,
|
||||
dataset: Dataset,
|
||||
documents: list[Document],
|
||||
multimodel_documents: list[Document] | None = None,
|
||||
multimodal_documents: list[AttachmentDocument] | None = None,
|
||||
with_keywords: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
@@ -128,8 +128,8 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
|
||||
Document.model_validate(child_document.model_dump()) for child_document in child_documents
|
||||
]
|
||||
vector.create(formatted_child_documents)
|
||||
if multimodel_documents:
|
||||
vector.create_multimodel(multimodel_documents)
|
||||
if multimodal_documents and dataset.is_multimodal:
|
||||
vector.create_multimodal(multimodal_documents)
|
||||
|
||||
def clean(self, dataset: Dataset, node_ids: list[str] | None, with_keywords: bool = True, **kwargs):
|
||||
# node_ids is segment's node_ids
|
||||
@@ -308,7 +308,7 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
|
||||
if all_child_documents:
|
||||
vector.create(all_child_documents)
|
||||
if all_multimodal_documents:
|
||||
vector.create_multimodel(all_multimodal_documents)
|
||||
vector.create_multimodal(all_multimodal_documents)
|
||||
|
||||
def format_preview(self, chunks: Any) -> Mapping[str, Any]:
|
||||
parent_childs = ParentChildStructureChunk.model_validate(chunks)
|
||||
|
||||
@@ -20,7 +20,7 @@ from core.rag.extractor.entity.extract_setting import ExtractSetting
|
||||
from core.rag.extractor.extract_processor import ExtractProcessor
|
||||
from core.rag.index_processor.constant.index_type import IndexStructureType
|
||||
from core.rag.index_processor.index_processor_base import BaseIndexProcessor
|
||||
from core.rag.models.document import Document, QAStructureChunk
|
||||
from core.rag.models.document import Document, QAStructureChunk, AttachmentDocument
|
||||
from core.rag.retrieval.retrieval_methods import RetrievalMethod
|
||||
from core.tools.utils.text_processing_utils import remove_leading_symbols
|
||||
from libs import helper
|
||||
@@ -132,15 +132,15 @@ class QAIndexProcessor(BaseIndexProcessor):
|
||||
self,
|
||||
dataset: Dataset,
|
||||
documents: list[Document],
|
||||
multimodel_documents: list[Document] | None = None,
|
||||
multimodal_documents: list[AttachmentDocument] | None = None,
|
||||
with_keywords: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
if dataset.indexing_technique == "high_quality":
|
||||
vector = Vector(dataset)
|
||||
vector.create(documents)
|
||||
if multimodel_documents:
|
||||
vector.create_multimodel(multimodel_documents)
|
||||
if multimodal_documents and dataset.is_multimodal:
|
||||
vector.create_multimodal(multimodal_documents)
|
||||
|
||||
def clean(self, dataset: Dataset, node_ids: list[str] | None, with_keywords: bool = True, **kwargs):
|
||||
vector = Vector(dataset)
|
||||
|
||||
@@ -13,5 +13,5 @@ def remove_leading_symbols(text: str) -> str:
|
||||
"""
|
||||
# Match Unicode ranges for punctuation and symbols
|
||||
# FIXME this pattern is confused quick fix for #11868 maybe refactor it later
|
||||
pattern = r"^[\u2000-\u206F\u2E00-\u2E7F\u3000-\u303F!\"#$%&'()*+,./:;<=>?@^_`~]+"
|
||||
pattern = r"^[\u2000-\u206F\u2E00-\u2E7F\u3000-\u303F\"#$%&'()*+,./:;<=>?@^_`~]+"
|
||||
return re.sub(pattern, "", text)
|
||||
|
||||
@@ -424,13 +424,13 @@ class DatasetService:
|
||||
if not dataset:
|
||||
raise ValueError("Dataset not found")
|
||||
# check if dataset name is exists
|
||||
|
||||
if DatasetService._has_dataset_same_name(
|
||||
tenant_id=dataset.tenant_id,
|
||||
dataset_id=dataset_id,
|
||||
name=data.get("name", dataset.name),
|
||||
):
|
||||
raise ValueError("Dataset name already exists")
|
||||
if data.get("name") and data.get("name") != dataset.name:
|
||||
if DatasetService._has_dataset_same_name(
|
||||
tenant_id=dataset.tenant_id,
|
||||
dataset_id=dataset_id,
|
||||
name=data.get("name", dataset.name),
|
||||
):
|
||||
raise ValueError("Dataset name already exists")
|
||||
|
||||
# Verify user has permission to update this dataset
|
||||
DatasetService.check_dataset_permission(dataset, user)
|
||||
@@ -866,6 +866,10 @@ class DatasetService:
|
||||
model_type=ModelType.TEXT_EMBEDDING,
|
||||
model=knowledge_configuration.embedding_model or "",
|
||||
)
|
||||
is_multimodal = DatasetService.check_is_multimodal_model(
|
||||
current_user.current_tenant_id, knowledge_configuration.embedding_model_provider, knowledge_configuration.embedding_model
|
||||
)
|
||||
dataset.is_multimodal = is_multimodal
|
||||
dataset.embedding_model = embedding_model.model
|
||||
dataset.embedding_model_provider = embedding_model.provider
|
||||
dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding(
|
||||
@@ -902,6 +906,10 @@ class DatasetService:
|
||||
dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding(
|
||||
embedding_model.provider, embedding_model.model
|
||||
)
|
||||
is_multimodal = DatasetService.check_is_multimodal_model(
|
||||
current_user.current_tenant_id, knowledge_configuration.embedding_model_provider, knowledge_configuration.embedding_model
|
||||
)
|
||||
dataset.is_multimodal = is_multimodal
|
||||
dataset.collection_binding_id = dataset_collection_binding.id
|
||||
dataset.indexing_technique = knowledge_configuration.indexing_technique
|
||||
except LLMBadRequestError:
|
||||
@@ -959,6 +967,10 @@ class DatasetService:
|
||||
)
|
||||
)
|
||||
dataset.collection_binding_id = dataset_collection_binding.id
|
||||
is_multimodal = DatasetService.check_is_multimodal_model(
|
||||
current_user.current_tenant_id, knowledge_configuration.embedding_model_provider, knowledge_configuration.embedding_model
|
||||
)
|
||||
dataset.is_multimodal = is_multimodal
|
||||
except LLMBadRequestError:
|
||||
raise ValueError(
|
||||
"No Embedding Model available. Please configure a valid provider "
|
||||
|
||||
@@ -8,7 +8,7 @@ from core.rag.index_processor.constant.doc_type import DocType
|
||||
from core.rag.index_processor.constant.index_type import IndexStructureType
|
||||
from core.rag.index_processor.index_processor_base import BaseIndexProcessor
|
||||
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
|
||||
from core.rag.models.document import Document
|
||||
from core.rag.models.document import AttachmentDocument, Document
|
||||
from extensions.ext_database import db
|
||||
from models import UploadFile
|
||||
from models.dataset import ChildChunk, Dataset, DatasetProcessRule, DocumentSegment, SegmentAttachmentBinding
|
||||
@@ -24,7 +24,7 @@ class VectorService:
|
||||
cls, keywords_list: list[list[str]] | None, segments: list[DocumentSegment], dataset: Dataset, doc_form: str
|
||||
):
|
||||
documents: list[Document] = []
|
||||
multimodel_documents: list[Document] = []
|
||||
multimodal_documents: list[AttachmentDocument] = []
|
||||
|
||||
for segment in segments:
|
||||
if doc_form == IndexStructureType.PARENT_CHILD_INDEX:
|
||||
@@ -80,7 +80,7 @@ class VectorService:
|
||||
documents.append(rag_document)
|
||||
if dataset.is_multimodal:
|
||||
for attachment in segment.attachments:
|
||||
multimodel_document: Document = Document(
|
||||
multimodal_document: AttachmentDocument = AttachmentDocument(
|
||||
page_content=attachment["name"],
|
||||
metadata={
|
||||
"doc_id": attachment["id"],
|
||||
@@ -90,15 +90,15 @@ class VectorService:
|
||||
"doc_type": DocType.IMAGE,
|
||||
},
|
||||
)
|
||||
multimodel_documents.append(multimodel_document)
|
||||
multimodal_documents.append(multimodal_document)
|
||||
index_processor: BaseIndexProcessor = IndexProcessorFactory(doc_form).init_index_processor()
|
||||
|
||||
if len(documents) > 0:
|
||||
index_processor.load(
|
||||
dataset, documents, multimodel_documents, with_keywords=True, keywords_list=keywords_list
|
||||
dataset, documents, None, with_keywords=True, keywords_list=keywords_list
|
||||
)
|
||||
if len(multimodel_documents) > 0:
|
||||
index_processor.load(dataset, multimodel_documents, with_keywords=False)
|
||||
if len(multimodal_documents) > 0:
|
||||
index_processor.load(dataset, [], multimodal_documents, with_keywords=False)
|
||||
|
||||
@classmethod
|
||||
def update_segment_vector(cls, keywords: list[str] | None, segment: DocumentSegment, dataset: Dataset):
|
||||
|
||||
@@ -4,9 +4,10 @@ import time
|
||||
import click
|
||||
from celery import shared_task
|
||||
|
||||
from core.rag.index_processor.constant.doc_type import DocType
|
||||
from core.rag.index_processor.constant.index_type import IndexStructureType
|
||||
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
|
||||
from core.rag.models.document import ChildDocument, Document
|
||||
from core.rag.models.document import AttachmentDocument, ChildDocument, Document
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
@@ -55,6 +56,7 @@ def add_document_to_index_task(dataset_document_id: str):
|
||||
)
|
||||
|
||||
documents = []
|
||||
multimodal_documents = []
|
||||
for segment in segments:
|
||||
document = Document(
|
||||
page_content=segment.content,
|
||||
@@ -81,11 +83,23 @@ def add_document_to_index_task(dataset_document_id: str):
|
||||
)
|
||||
child_documents.append(child_document)
|
||||
document.children = child_documents
|
||||
if dataset.is_multimodal:
|
||||
for attachment in segment.attachments:
|
||||
multimodal_documents.append(AttachmentDocument(
|
||||
page_content=attachment["name"],
|
||||
metadata={
|
||||
"doc_id": attachment["id"],
|
||||
"doc_hash": "",
|
||||
"document_id": segment.document_id,
|
||||
"dataset_id": segment.dataset_id,
|
||||
"doc_type": DocType.IMAGE,
|
||||
},
|
||||
))
|
||||
documents.append(document)
|
||||
|
||||
index_type = dataset.doc_form
|
||||
index_processor = IndexProcessorFactory(index_type).init_index_processor()
|
||||
index_processor.load(dataset, documents)
|
||||
index_processor.load(dataset, documents, multimodal_documents=multimodal_documents)
|
||||
|
||||
# delete auto disable log
|
||||
db.session.query(DatasetAutoDisableLog).where(DatasetAutoDisableLog.document_id == dataset_document.id).delete()
|
||||
|
||||
@@ -4,9 +4,10 @@ import time
|
||||
import click
|
||||
from celery import shared_task # type: ignore
|
||||
|
||||
from core.rag.index_processor.constant.doc_type import DocType
|
||||
from core.rag.index_processor.constant.index_type import IndexStructureType
|
||||
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
|
||||
from core.rag.models.document import ChildDocument, Document
|
||||
from core.rag.models.document import AttachmentDocument, ChildDocument, Document
|
||||
from extensions.ext_database import db
|
||||
from models.dataset import Dataset, DocumentSegment
|
||||
from models.dataset import Document as DatasetDocument
|
||||
@@ -28,7 +29,7 @@ def deal_dataset_index_update_task(dataset_id: str, action: str):
|
||||
|
||||
if not dataset:
|
||||
raise Exception("Dataset not found")
|
||||
index_type = dataset.doc_form or IndexType.PARAGRAPH_INDEX
|
||||
index_type = dataset.doc_form or IndexStructureType.PARAGRAPH_INDEX
|
||||
index_processor = IndexProcessorFactory(index_type).init_index_processor()
|
||||
if action == "upgrade":
|
||||
dataset_documents = (
|
||||
@@ -119,6 +120,7 @@ def deal_dataset_index_update_task(dataset_id: str, action: str):
|
||||
)
|
||||
if segments:
|
||||
documents = []
|
||||
multimodal_documents = []
|
||||
for segment in segments:
|
||||
document = Document(
|
||||
page_content=segment.content,
|
||||
@@ -145,9 +147,21 @@ def deal_dataset_index_update_task(dataset_id: str, action: str):
|
||||
)
|
||||
child_documents.append(child_document)
|
||||
document.children = child_documents
|
||||
if dataset.is_multimodal:
|
||||
for attachment in segment.attachments:
|
||||
multimodal_documents.append(AttachmentDocument(
|
||||
page_content=attachment["name"],
|
||||
metadata={
|
||||
"doc_id": attachment["id"],
|
||||
"doc_hash": "",
|
||||
"document_id": segment.document_id,
|
||||
"dataset_id": segment.dataset_id,
|
||||
"doc_type": DocType.IMAGE,
|
||||
},
|
||||
))
|
||||
documents.append(document)
|
||||
# save vector index
|
||||
index_processor.load(dataset, documents, with_keywords=False)
|
||||
index_processor.load(dataset, documents, multimodal_documents=multimodal_documents, with_keywords=False)
|
||||
db.session.query(DatasetDocument).where(DatasetDocument.id == dataset_document.id).update(
|
||||
{"indexing_status": "completed"}, synchronize_session=False
|
||||
)
|
||||
|
||||
@@ -6,9 +6,10 @@ import click
|
||||
from celery import shared_task
|
||||
from sqlalchemy import select
|
||||
|
||||
from core.rag.index_processor.constant.doc_type import DocType
|
||||
from core.rag.index_processor.constant.index_type import IndexStructureType
|
||||
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
|
||||
from core.rag.models.document import ChildDocument, Document
|
||||
from core.rag.models.document import AttachmentDocument, ChildDocument, Document
|
||||
from extensions.ext_database import db
|
||||
from models.dataset import Dataset, DocumentSegment
|
||||
from models.dataset import Document as DatasetDocument
|
||||
@@ -17,7 +18,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task(queue="dataset")
|
||||
def deal_dataset_vector_index_task(dataset_id: str, action: Literal["remove", "add", "update"]):
|
||||
def deal_dataset_vector_index_task(dataset_id: str, action: str):
|
||||
"""
|
||||
Async deal dataset from index
|
||||
:param dataset_id: dataset_id
|
||||
@@ -32,7 +33,7 @@ def deal_dataset_vector_index_task(dataset_id: str, action: Literal["remove", "a
|
||||
|
||||
if not dataset:
|
||||
raise Exception("Dataset not found")
|
||||
index_type = dataset.doc_form or IndexType.PARAGRAPH_INDEX
|
||||
index_type = dataset.doc_form or IndexStructureType.PARAGRAPH_INDEX
|
||||
index_processor = IndexProcessorFactory(index_type).init_index_processor()
|
||||
if action == "remove":
|
||||
index_processor.clean(dataset, None, with_keywords=False)
|
||||
@@ -119,6 +120,7 @@ def deal_dataset_vector_index_task(dataset_id: str, action: Literal["remove", "a
|
||||
)
|
||||
if segments:
|
||||
documents = []
|
||||
multimodal_documents = []
|
||||
for segment in segments:
|
||||
document = Document(
|
||||
page_content=segment.content,
|
||||
@@ -145,9 +147,21 @@ def deal_dataset_vector_index_task(dataset_id: str, action: Literal["remove", "a
|
||||
)
|
||||
child_documents.append(child_document)
|
||||
document.children = child_documents
|
||||
if dataset.is_multimodal:
|
||||
for attachment in segment.attachments:
|
||||
multimodal_documents.append(AttachmentDocument(
|
||||
page_content=attachment["name"],
|
||||
metadata={
|
||||
"doc_id": attachment["id"],
|
||||
"doc_hash": "",
|
||||
"document_id": segment.document_id,
|
||||
"dataset_id": segment.dataset_id,
|
||||
"doc_type": DocType.IMAGE,
|
||||
},
|
||||
))
|
||||
documents.append(document)
|
||||
# save vector index
|
||||
index_processor.load(dataset, documents, with_keywords=False)
|
||||
index_processor.load(dataset, documents, multimodal_documents=multimodal_documents, with_keywords=False)
|
||||
db.session.query(DatasetDocument).where(DatasetDocument.id == dataset_document.id).update(
|
||||
{"indexing_status": "completed"}, synchronize_session=False
|
||||
)
|
||||
|
||||
@@ -7,6 +7,7 @@ from celery import shared_task
|
||||
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
|
||||
from extensions.ext_database import db
|
||||
from models.dataset import Dataset, Document, SegmentAttachmentBinding
|
||||
from models.model import UploadFile
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -51,10 +52,18 @@ def delete_segment_from_index_task(
|
||||
)
|
||||
if dataset.is_multimodal:
|
||||
# delete segment attachment binding
|
||||
db.session.query(SegmentAttachmentBinding).filter(
|
||||
segment_attachment_bindings = db.session.query(SegmentAttachmentBinding).filter(
|
||||
SegmentAttachmentBinding.segment_id.in_(segment_ids)
|
||||
).delete()
|
||||
db.session.commit()
|
||||
).all()
|
||||
if segment_attachment_bindings:
|
||||
attachment_ids = [binding.attachment_id for binding in segment_attachment_bindings]
|
||||
index_processor.clean(
|
||||
dataset=dataset, node_ids=attachment_ids, with_keywords=False)
|
||||
for binding in segment_attachment_bindings:
|
||||
db.session.delete(binding)
|
||||
# delete upload file
|
||||
db.session.query(UploadFile).filter(UploadFile.id.in_(attachment_ids)).delete(synchronize_session=False)
|
||||
db.session.commit()
|
||||
|
||||
end_at = time.perf_counter()
|
||||
logger.info(click.style(f"Segment deleted from index latency: {end_at - start_at}", fg="green"))
|
||||
|
||||
@@ -8,7 +8,7 @@ from sqlalchemy import select
|
||||
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.dataset import Dataset, DocumentSegment
|
||||
from models.dataset import Dataset, DocumentSegment, SegmentAttachmentBinding
|
||||
from models.dataset import Document as DatasetDocument
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -59,6 +59,14 @@ def disable_segments_from_index_task(segment_ids: list, dataset_id: str, documen
|
||||
|
||||
try:
|
||||
index_node_ids = [segment.index_node_id for segment in segments]
|
||||
if dataset.is_multimodal:
|
||||
segment_ids = [segment.id for segment in segments]
|
||||
segment_attachment_bindings = db.session.query(SegmentAttachmentBinding).filter(
|
||||
SegmentAttachmentBinding.segment_id.in_(segment_ids)
|
||||
).all()
|
||||
if segment_attachment_bindings:
|
||||
attachment_ids = [binding.attachment_id for binding in segment_attachment_bindings]
|
||||
index_node_ids.extend(attachment_ids)
|
||||
index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=False)
|
||||
|
||||
end_at = time.perf_counter()
|
||||
|
||||
@@ -4,9 +4,10 @@ import time
|
||||
import click
|
||||
from celery import shared_task
|
||||
|
||||
from core.rag.index_processor.constant.doc_type import DocType
|
||||
from core.rag.index_processor.constant.index_type import IndexStructureType
|
||||
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
|
||||
from core.rag.models.document import ChildDocument, Document
|
||||
from core.rag.models.document import AttachmentDocument, ChildDocument, Document
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
@@ -83,8 +84,22 @@ def enable_segment_to_index_task(segment_id: str):
|
||||
)
|
||||
child_documents.append(child_document)
|
||||
document.children = child_documents
|
||||
multimodel_documents = []
|
||||
if dataset.is_multimodal:
|
||||
for attachment in segment.attachments:
|
||||
multimodel_documents.append(AttachmentDocument(
|
||||
page_content=attachment["name"],
|
||||
metadata={
|
||||
"doc_id": attachment["id"],
|
||||
"doc_hash": "",
|
||||
"document_id": segment.document_id,
|
||||
"dataset_id": segment.dataset_id,
|
||||
"doc_type": DocType.IMAGE,
|
||||
},
|
||||
))
|
||||
|
||||
# save vector index
|
||||
index_processor.load(dataset, [document])
|
||||
index_processor.load(dataset, [document], multimodal_documents=multimodel_documents)
|
||||
|
||||
end_at = time.perf_counter()
|
||||
logger.info(click.style(f"Segment enabled to index: {segment.id} latency: {end_at - start_at}", fg="green"))
|
||||
|
||||
@@ -5,9 +5,10 @@ import click
|
||||
from celery import shared_task
|
||||
from sqlalchemy import select
|
||||
|
||||
from core.rag.index_processor.constant.doc_type import DocType
|
||||
from core.rag.index_processor.constant.index_type import IndexStructureType
|
||||
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
|
||||
from core.rag.models.document import ChildDocument, Document
|
||||
from core.rag.models.document import AttachmentDocument, ChildDocument, Document
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
@@ -60,6 +61,7 @@ def enable_segments_to_index_task(segment_ids: list, dataset_id: str, document_i
|
||||
|
||||
try:
|
||||
documents = []
|
||||
multimodal_documents = []
|
||||
for segment in segments:
|
||||
document = Document(
|
||||
page_content=segment.content,
|
||||
@@ -87,9 +89,22 @@ def enable_segments_to_index_task(segment_ids: list, dataset_id: str, document_i
|
||||
)
|
||||
child_documents.append(child_document)
|
||||
document.children = child_documents
|
||||
|
||||
if dataset.is_multimodal:
|
||||
for attachment in segment.attachments:
|
||||
multimodal_documents.append(AttachmentDocument(
|
||||
page_content=attachment["name"],
|
||||
metadata={
|
||||
"doc_id": attachment["id"],
|
||||
"doc_hash": "",
|
||||
"document_id": segment.document_id,
|
||||
"dataset_id": segment.dataset_id,
|
||||
"doc_type": DocType.IMAGE,
|
||||
},
|
||||
))
|
||||
documents.append(document)
|
||||
# save vector index
|
||||
index_processor.load(dataset, documents)
|
||||
index_processor.load(dataset, documents, multimodal_documents=multimodal_documents)
|
||||
|
||||
end_at = time.perf_counter()
|
||||
logger.info(click.style(f"Segments enabled to index latency: {end_at - start_at}", fg="green"))
|
||||
|
||||
Reference in New Issue
Block a user