|
|
|
|
@@ -10,13 +10,13 @@ from typing import List, Optional, Union
|
|
|
|
|
|
|
|
|
|
# from pyspark import SparkConf, SparkContext
|
|
|
|
|
import redis
|
|
|
|
|
from config import EMBED_MODEL, INDEX_NAME, KEY_INDEX_NAME, REDIS_URL
|
|
|
|
|
from config import EMBED_MODEL, INDEX_NAME, KEY_INDEX_NAME, REDIS_URL, SEARCH_BATCH_SIZE
|
|
|
|
|
from fastapi import Body, File, Form, HTTPException, UploadFile
|
|
|
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
|
|
|
|
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings
|
|
|
|
|
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
|
|
|
|
|
from langchain_community.vectorstores import Redis
|
|
|
|
|
from langchain_huggingface import HuggingFaceEndpointEmbeddings
|
|
|
|
|
from langchain_text_splitters import HTMLHeaderTextSplitter
|
|
|
|
|
from langsmith import traceable
|
|
|
|
|
from redis.commands.search.field import TextField
|
|
|
|
|
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
|
|
|
|
|
|
|
|
|
|
@@ -25,7 +25,7 @@ from comps.dataprep.utils import (
|
|
|
|
|
create_upload_folder,
|
|
|
|
|
document_loader,
|
|
|
|
|
encode_filename,
|
|
|
|
|
get_file_structure,
|
|
|
|
|
format_search_results,
|
|
|
|
|
get_separators,
|
|
|
|
|
get_tables_result,
|
|
|
|
|
parse_html,
|
|
|
|
|
@@ -76,7 +76,7 @@ def search_by_id(client, doc_id):
|
|
|
|
|
print(f"[ search by id ] searching docs of {doc_id}")
|
|
|
|
|
try:
|
|
|
|
|
results = client.load_document(doc_id)
|
|
|
|
|
print(f"[ search by id ] search success of {doc_id}")
|
|
|
|
|
print(f"[ search by id ] search success of {doc_id}: {results}")
|
|
|
|
|
return results
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[ search by id ] fail to search docs of {doc_id}: {e}")
|
|
|
|
|
@@ -109,7 +109,7 @@ def ingest_chunks_to_redis(file_name: str, chunks: List):
|
|
|
|
|
# Create vectorstore
|
|
|
|
|
if tei_embedding_endpoint:
|
|
|
|
|
# create embeddings using TEI endpoint service
|
|
|
|
|
embedder = HuggingFaceHubEmbeddings(model=tei_embedding_endpoint)
|
|
|
|
|
embedder = HuggingFaceEndpointEmbeddings(model=tei_embedding_endpoint)
|
|
|
|
|
else:
|
|
|
|
|
# create embeddings using local embedding model
|
|
|
|
|
embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)
|
|
|
|
|
@@ -139,8 +139,12 @@ def ingest_chunks_to_redis(file_name: str, chunks: List):
|
|
|
|
|
client = r.ft(KEY_INDEX_NAME)
|
|
|
|
|
if not check_index_existance(client):
|
|
|
|
|
assert create_index(client)
|
|
|
|
|
assert store_by_id(client, key=file_name, value="#".join(file_ids))
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
assert store_by_id(client, key=file_name, value="#".join(file_ids))
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[ ingest chunks ] {e}. Fail to store chunks of file {file_name}.")
|
|
|
|
|
raise HTTPException(status_code=500, detail=f"Fail to store chunks of file {file_name}.")
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -177,7 +181,6 @@ def ingest_data_to_redis(doc_path: DocPath):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_microservice(name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep", host="0.0.0.0", port=6007)
|
|
|
|
|
@traceable(run_type="tool")
|
|
|
|
|
async def ingest_documents(
|
|
|
|
|
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
|
|
|
|
|
link_list: Optional[str] = Form(None),
|
|
|
|
|
@@ -189,12 +192,30 @@ async def ingest_documents(
|
|
|
|
|
print(f"files:{files}")
|
|
|
|
|
print(f"link_list:{link_list}")
|
|
|
|
|
|
|
|
|
|
r = redis.Redis(connection_pool=redis_pool)
|
|
|
|
|
client = r.ft(KEY_INDEX_NAME)
|
|
|
|
|
|
|
|
|
|
if files:
|
|
|
|
|
if not isinstance(files, list):
|
|
|
|
|
files = [files]
|
|
|
|
|
uploaded_files = []
|
|
|
|
|
|
|
|
|
|
for file in files:
|
|
|
|
|
encode_file = encode_filename(file.filename)
|
|
|
|
|
doc_id = "file:" + encode_file
|
|
|
|
|
|
|
|
|
|
# check whether the file already exists
|
|
|
|
|
key_ids = None
|
|
|
|
|
try:
|
|
|
|
|
key_ids = search_by_id(client, doc_id).key_ids
|
|
|
|
|
print(f"[ upload file ] File {file.filename} already exists.")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[ upload file ] File {file.filename} does not exist.")
|
|
|
|
|
if key_ids:
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=400, detail=f"Uploaded file {file.filename} already exists. Please change file name."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
save_path = upload_folder + encode_file
|
|
|
|
|
await save_content_to_local_disk(save_path, file)
|
|
|
|
|
ingest_data_to_redis(
|
|
|
|
|
@@ -234,28 +255,39 @@ async def ingest_documents(
|
|
|
|
|
return {"status": 200, "message": "Data preparation succeeded"}
|
|
|
|
|
|
|
|
|
|
if link_list:
|
|
|
|
|
try:
|
|
|
|
|
link_list = json.loads(link_list) # Parse JSON string to list
|
|
|
|
|
if not isinstance(link_list, list):
|
|
|
|
|
raise HTTPException(status_code=400, detail="link_list should be a list.")
|
|
|
|
|
for link in link_list:
|
|
|
|
|
encoded_link = encode_filename(link)
|
|
|
|
|
save_path = upload_folder + encoded_link + ".txt"
|
|
|
|
|
content = parse_html([link])[0][0]
|
|
|
|
|
await save_content_to_local_disk(save_path, content)
|
|
|
|
|
ingest_data_to_redis(
|
|
|
|
|
DocPath(
|
|
|
|
|
path=save_path,
|
|
|
|
|
chunk_size=chunk_size,
|
|
|
|
|
chunk_overlap=chunk_overlap,
|
|
|
|
|
process_table=process_table,
|
|
|
|
|
table_strategy=table_strategy,
|
|
|
|
|
)
|
|
|
|
|
link_list = json.loads(link_list) # Parse JSON string to list
|
|
|
|
|
if not isinstance(link_list, list):
|
|
|
|
|
raise HTTPException(status_code=400, detail=f"Link_list {link_list} should be a list.")
|
|
|
|
|
for link in link_list:
|
|
|
|
|
encoded_link = encode_filename(link)
|
|
|
|
|
doc_id = "file:" + encoded_link + ".txt"
|
|
|
|
|
|
|
|
|
|
# check whether the link file already exists
|
|
|
|
|
key_ids = None
|
|
|
|
|
try:
|
|
|
|
|
key_ids = search_by_id(client, doc_id).key_ids
|
|
|
|
|
print(f"[ upload file ] Link {link} already exists.")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[ upload file ] Link {link} does not exist. Keep storing.")
|
|
|
|
|
if key_ids:
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=400, detail=f"Uploaded link {link} already exists. Please change another link."
|
|
|
|
|
)
|
|
|
|
|
print(f"Successfully saved link list {link_list}")
|
|
|
|
|
return {"status": 200, "message": "Data preparation succeeded"}
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.")
|
|
|
|
|
|
|
|
|
|
save_path = upload_folder + encoded_link + ".txt"
|
|
|
|
|
content = parse_html([link])[0][0]
|
|
|
|
|
await save_content_to_local_disk(save_path, content)
|
|
|
|
|
ingest_data_to_redis(
|
|
|
|
|
DocPath(
|
|
|
|
|
path=save_path,
|
|
|
|
|
chunk_size=chunk_size,
|
|
|
|
|
chunk_overlap=chunk_overlap,
|
|
|
|
|
process_table=process_table,
|
|
|
|
|
table_strategy=table_strategy,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
print(f"Successfully saved link list {link_list}")
|
|
|
|
|
return {"status": 200, "message": "Data preparation succeeded"}
|
|
|
|
|
|
|
|
|
|
raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")
|
|
|
|
|
|
|
|
|
|
@@ -263,36 +295,73 @@ async def ingest_documents(
|
|
|
|
|
@register_microservice(
|
|
|
|
|
name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6007
|
|
|
|
|
)
|
|
|
|
|
@traceable(run_type="tool")
|
|
|
|
|
async def rag_get_file_structure():
|
|
|
|
|
print("[ dataprep - get file ] start to get file structure")
|
|
|
|
|
|
|
|
|
|
if not Path(upload_folder).exists():
|
|
|
|
|
print("No file uploaded, return empty list.")
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
file_content = get_file_structure(upload_folder)
|
|
|
|
|
return file_content
|
|
|
|
|
# define redis client
|
|
|
|
|
r = redis.Redis(connection_pool=redis_pool)
|
|
|
|
|
offset = 0
|
|
|
|
|
file_list = []
|
|
|
|
|
while True:
|
|
|
|
|
response = r.execute_command("FT.SEARCH", KEY_INDEX_NAME, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE)
|
|
|
|
|
# no doc retrieved
|
|
|
|
|
if len(response) < 2:
|
|
|
|
|
break
|
|
|
|
|
file_list = format_search_results(response, file_list)
|
|
|
|
|
offset += SEARCH_BATCH_SIZE
|
|
|
|
|
# last batch
|
|
|
|
|
if (len(response) - 1) // 2 < SEARCH_BATCH_SIZE:
|
|
|
|
|
break
|
|
|
|
|
return file_list
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_microservice(
|
|
|
|
|
name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep/delete_file", host="0.0.0.0", port=6007
|
|
|
|
|
)
|
|
|
|
|
@traceable(run_type="tool")
|
|
|
|
|
async def delete_single_file(file_path: str = Body(..., embed=True)):
|
|
|
|
|
"""Delete file according to `file_path`.
|
|
|
|
|
|
|
|
|
|
`file_path`:
|
|
|
|
|
- specific file path (e.g. /path/to/file.txt)
|
|
|
|
|
- folder path (e.g. /path/to/folder)
|
|
|
|
|
- "all": delete all files uploaded
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# define redis client
|
|
|
|
|
r = redis.Redis(connection_pool=redis_pool)
|
|
|
|
|
client = r.ft(KEY_INDEX_NAME)
|
|
|
|
|
client2 = r.ft(INDEX_NAME)
|
|
|
|
|
|
|
|
|
|
# delete all uploaded files
|
|
|
|
|
if file_path == "all":
|
|
|
|
|
print("[dataprep - del] delete all files")
|
|
|
|
|
remove_folder_with_ignore(upload_folder)
|
|
|
|
|
assert drop_index(index_name=INDEX_NAME)
|
|
|
|
|
assert drop_index(index_name=KEY_INDEX_NAME)
|
|
|
|
|
|
|
|
|
|
# drop index KEY_INDEX_NAME
|
|
|
|
|
if check_index_existance(client):
|
|
|
|
|
try:
|
|
|
|
|
assert drop_index(index_name=KEY_INDEX_NAME)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[dataprep - del] {e}. Fail to drop index {KEY_INDEX_NAME}.")
|
|
|
|
|
raise HTTPException(status_code=500, detail=f"Fail to drop index {KEY_INDEX_NAME}.")
|
|
|
|
|
else:
|
|
|
|
|
print(f"[dataprep - del] Index {KEY_INDEX_NAME} does not exits.")
|
|
|
|
|
|
|
|
|
|
# drop index INDEX_NAME
|
|
|
|
|
if check_index_existance(client2):
|
|
|
|
|
try:
|
|
|
|
|
assert drop_index(index_name=INDEX_NAME)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[dataprep - del] {e}. Fail to drop index {INDEX_NAME}.")
|
|
|
|
|
raise HTTPException(status_code=500, detail=f"Fail to drop index {INDEX_NAME}.")
|
|
|
|
|
else:
|
|
|
|
|
print(f"[dataprep - del] Index {INDEX_NAME} does not exits.")
|
|
|
|
|
|
|
|
|
|
# delete files on local disk
|
|
|
|
|
try:
|
|
|
|
|
remove_folder_with_ignore(upload_folder)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[dataprep - del] {e}. Fail to delete {upload_folder}.")
|
|
|
|
|
raise HTTPException(status_code=500, detail=f"Fail to delete {upload_folder}.")
|
|
|
|
|
|
|
|
|
|
print("[dataprep - del] successfully delete all files.")
|
|
|
|
|
create_upload_folder(upload_folder)
|
|
|
|
|
return {"status": True}
|
|
|
|
|
@@ -300,35 +369,58 @@ async def delete_single_file(file_path: str = Body(..., embed=True)):
|
|
|
|
|
delete_path = Path(upload_folder + "/" + encode_filename(file_path))
|
|
|
|
|
print(f"[dataprep - del] delete_path: {delete_path}")
|
|
|
|
|
|
|
|
|
|
# partially delete files/folders
|
|
|
|
|
# partially delete files
|
|
|
|
|
if delete_path.exists():
|
|
|
|
|
r = redis.Redis(connection_pool=redis_pool)
|
|
|
|
|
client = r.ft(KEY_INDEX_NAME)
|
|
|
|
|
client2 = r.ft(INDEX_NAME)
|
|
|
|
|
doc_id = "file:" + encode_filename(file_path)
|
|
|
|
|
objs = search_by_id(client, doc_id).key_ids
|
|
|
|
|
file_ids = objs.split("#")
|
|
|
|
|
|
|
|
|
|
# determine whether this file exists in db KEY_INDEX_NAME
|
|
|
|
|
try:
|
|
|
|
|
key_ids = search_by_id(client, doc_id).key_ids
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[dataprep - del] {e}, File {file_path} does not exists.")
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path."
|
|
|
|
|
)
|
|
|
|
|
file_ids = key_ids.split("#")
|
|
|
|
|
|
|
|
|
|
# delete file
|
|
|
|
|
if delete_path.is_file():
|
|
|
|
|
# delete file keys id in db KEY_INDEX_NAME
|
|
|
|
|
try:
|
|
|
|
|
for file_id in file_ids:
|
|
|
|
|
assert delete_by_id(client2, file_id)
|
|
|
|
|
assert delete_by_id(client, doc_id)
|
|
|
|
|
delete_path.unlink()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[dataprep - del] fail to delete file {delete_path}: {e}")
|
|
|
|
|
return {"status": False}
|
|
|
|
|
print(f"[dataprep - del] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.")
|
|
|
|
|
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed.")
|
|
|
|
|
|
|
|
|
|
# delete file content in db INDEX_NAME
|
|
|
|
|
for file_id in file_ids:
|
|
|
|
|
# determine whether this file exists in db INDEX_NAME
|
|
|
|
|
try:
|
|
|
|
|
content = search_by_id(client2, file_id).content
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[dataprep - del] {e}. File {file_path} does not exists.")
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=404, detail=f"File not found in db {INDEX_NAME}. Please check file_path."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# delete file content
|
|
|
|
|
try:
|
|
|
|
|
assert delete_by_id(client2, file_id)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[dataprep - del] {e}. File {file_path} delete failed for db {INDEX_NAME}")
|
|
|
|
|
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed.")
|
|
|
|
|
|
|
|
|
|
# delete file on local disk
|
|
|
|
|
delete_path.unlink()
|
|
|
|
|
|
|
|
|
|
return {"status": True}
|
|
|
|
|
|
|
|
|
|
# delete folder
|
|
|
|
|
else:
|
|
|
|
|
try:
|
|
|
|
|
shutil.rmtree(delete_path)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[dataprep - del] fail to delete folder {delete_path}: {e}")
|
|
|
|
|
return {"status": False}
|
|
|
|
|
return {"status": True}
|
|
|
|
|
print(f"[dataprep - del] Delete folder {file_path} is not supported for now.")
|
|
|
|
|
raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.")
|
|
|
|
|
else:
|
|
|
|
|
raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.")
|
|
|
|
|
raise HTTPException(status_code=404, detail=f"File {file_path} not found. Please check file_path.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|