Feature/elasticsearch vector store integration - Infosys (#972)

* Feature/elastic

Elasticsearch vectorstore, dataprep and retriever

---------

Co-authored-by: Adarsh <reachaadi@gmail.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Liang Lv <liang1.lv@intel.com>
This commit is contained in:
kkrishTa
2024-12-10 07:10:44 +05:30
committed by GitHub
parent fbf3017afb
commit 5ed041bded
21 changed files with 1229 additions and 0 deletions

View File

@@ -0,0 +1,38 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
FROM python:3.11-slim
ENV LANG=C.UTF-8
ARG ARCH="cpu"
RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \
build-essential \
default-jre \
libgl1-mesa-glx \
libjemalloc-dev
RUN useradd -m -s /bin/bash user && \
mkdir -p /home/user && \
chown -R user /home/user/
USER user
COPY comps /home/user/comps
RUN pip install --no-cache-dir --upgrade pip setuptools && \
if [ ${ARCH} = "cpu" ]; then pip install --no-cache-dir torch torchvision --index-url https://download.pytorch.org/whl/cpu; fi && \
pip install --no-cache-dir -r /home/user/comps/dataprep/elasticsearch/langchain/requirements.txt
ENV PYTHONPATH=$PYTHONPATH:/home/user
USER root
RUN mkdir -p /home/user/comps/dataprep/elasticsearch/langchain/uploaded_files && chown -R user /home/user/comps/dataprep/elasticsearch/langchain/uploaded_files
USER user
WORKDIR /home/user/comps/dataprep/elasticsearch/langchain
ENTRYPOINT ["python", "prepare_doc_elasticsearch.py"]

View File

@@ -0,0 +1,130 @@
# Dataprep Microservice with Elasticsearch
## 🚀1. Start Microservice with PythonOption 1
### 1.1 Install Requirements
```bash
pip install -r requirements.txt
```
### 1.2 Setup Environment Variables
```bash
export ES_CONNECTION_STRING=http://localhost:9200
export INDEX_NAME=${your_index_name}
```
### 1.3 Start Elasticsearch
Please refer to this [readme](../../../vectorstores/elasticsearch/README.md).
### 1.4 Start Document Preparation Microservice for Elasticsearch with Python Script
Start document preparation microservice for Elasticsearch with below command.
```bash
python prepare_doc_elastic.py
```
## 🚀2. Start Microservice with Docker (Option 2)
### 2.1 Start Elasticsearch
Please refer to this [readme](../../../vectorstores/elasticsearch/README.md).
### 2.2 Setup Environment Variables
```bash
export ES_CONNECTION_STRING=http://localhost:9200
export INDEX_NAME=${your_index_name}
```
### 2.3 Build Docker Image
```bash
cd GenAIComps
docker build -t opea/dataprep-elasticsearch:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/dataprep/elasticsearch/langchain/Dockerfile .
```
### 2.4 Run Docker with CLI (Option A)
```bash
docker run --name="dataprep-elasticsearch" -p 6011:6011 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e ES_CONNECTION_STRING=$ES_CONNECTION_STRING -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT opea/dataprep-elastic:latest
```
### 2.5 Run with Docker Compose (Option B)
```bash
cd comps/dataprep/elasticsearch/langchain
docker compose -f docker-compose-dataprep-elastic.yaml up -d
```
## 🚀3. Consume Microservice
### 3.1 Consume Upload API
Once document preparation microservice for Elasticsearch is started, user can use below command to invoke the
microservice to convert the document to embedding and save to the database.
```bash
curl -X POST \
-H "Content-Type: application/json" \
-d '{"path":"/path/to/document"}' \
http://localhost:6011/v1/dataprep
```
### 3.2 Consume get_file API
To get uploaded file structures, use the following command:
```bash
curl -X POST \
-H "Content-Type: application/json" \
http://localhost:6011/v1/dataprep/get_file
```
Then you will get the response JSON like this:
```json
[
{
"name": "uploaded_file_1.txt",
"id": "uploaded_file_1.txt",
"type": "File",
"parent": ""
},
{
"name": "uploaded_file_2.txt",
"id": "uploaded_file_2.txt",
"type": "File",
"parent": ""
}
]
```
### 4.3 Consume delete_file API
To delete uploaded file/link, use the following command.
The `file_path` here should be the `id` get from `/v1/dataprep/get_file` API.
```bash
# delete link
curl -X POST \
-H "Content-Type: application/json" \
-d '{"file_path": "https://www.ces.tech/.txt"}' \
http://localhost:6011/v1/dataprep/delete_file
# delete file
curl -X POST \
-H "Content-Type: application/json" \
-d '{"file_path": "uploaded_file_1.txt"}' \
http://localhost:6011/v1/dataprep/delete_file
# delete all files and links
curl -X POST \
-H "Content-Type: application/json" \
-d '{"file_path": "all"}' \
http://localhost:6011/v1/dataprep/delete_file
```

View File

@@ -0,0 +1,2 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

View File

@@ -0,0 +1,23 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import os
ES_CONNECTION_STRING = os.getenv("ES_CONNECTION_STRING", "http://localhost:9200")
UPLOADED_FILES_PATH = os.getenv("UPLOADED_FILES_PATH", "./uploaded_files/")
# Embedding model
EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5")
# TEI Embedding endpoints
TEI_ENDPOINT = os.getenv("TEI_ENDPOINT", "")
# Vector Index Configuration
INDEX_NAME = os.getenv("INDEX_NAME", "rag-elastic")
# chunk parameters
CHUNK_SIZE = os.getenv("CHUNK_SIZE", 1500)
CHUNK_OVERLAP = os.getenv("CHUNK_OVERLAP", 100)
# Logging enabled
LOG_FLAG = os.getenv("LOGFLAG", False)

View File

@@ -0,0 +1,41 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
version: "3"
services:
elasticsearch-vector-db:
hostname: db
container_name: elasticsearch-vector-db
image: docker.elastic.co/elasticsearch/elasticsearch:8.16.0
ports:
- "9200:9200"
- "9300:9300"
restart: always
ipc: host
environment:
- ES_JAVA_OPTS=-Xms1g -Xmx1g
- discovery.type=single-node
- xpack.security.enabled=false
- bootstrap.memory_lock=false
- no_proxy= ${no_proxy}
- http_proxy= ${http_proxy}
- https_proxy= ${https_proxy}
dataprep-elasticsearch:
image: opea/dataprep-elasticsearch:latest
container_name: dataprep-elasticsearch
ports:
- "6011:6011"
ipc: host
environment:
http_proxy: ${http_proxy}
https_proxy: ${https_proxy}
ES_CONNECTION_STRING: ${ES_CONNECTION_STRING}
INDEX_NAME: ${INDEX_NAME}
TEI_ENDPOINT: ${TEI_ENDPOINT}
HUGGINGFACEHUB_API_TOKEN: ${HUGGINGFACEHUB_API_TOKEN}
restart: unless-stopped
networks:
default:
driver: bridge

View File

@@ -0,0 +1,373 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import json
import os
from pathlib import Path
from typing import List, Optional, Union
from config import (
CHUNK_OVERLAP,
CHUNK_SIZE,
EMBED_MODEL,
ES_CONNECTION_STRING,
INDEX_NAME,
LOG_FLAG,
TEI_ENDPOINT,
UPLOADED_FILES_PATH,
)
from elasticsearch import Elasticsearch
from fastapi import Body, File, Form, HTTPException, UploadFile
from langchain.text_splitter import HTMLHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
from langchain_core.documents import Document
from langchain_elasticsearch import ElasticsearchStore
from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings
from comps import CustomLogger, DocPath, opea_microservices, register_microservice
from comps.dataprep.utils import (
create_upload_folder,
document_loader,
encode_filename,
get_file_structure,
get_separators,
get_tables_result,
parse_html,
remove_folder_with_ignore,
save_content_to_local_disk,
)
logger = CustomLogger(__name__)
def create_index() -> None:
if not es_client.indices.exists(index=INDEX_NAME):
es_client.indices.create(index=INDEX_NAME)
def get_embedder() -> Union[HuggingFaceEndpointEmbeddings, HuggingFaceBgeEmbeddings]:
"""Obtain required Embedder."""
if TEI_ENDPOINT:
return HuggingFaceEndpointEmbeddings(model=TEI_ENDPOINT)
else:
return HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)
def get_elastic_store(embedder: Union[HuggingFaceEndpointEmbeddings, HuggingFaceBgeEmbeddings]) -> ElasticsearchStore:
"""Get Elasticsearch vector store."""
return ElasticsearchStore(index_name=INDEX_NAME, embedding=embedder, es_connection=es_client)
def delete_embeddings(doc_name: str) -> bool:
"""Delete documents from Elasticsearch."""
try:
if doc_name == "all":
if LOG_FLAG:
logger.info("Deleting all documents from vectorstore")
query = {"query": {"match_all": {}}}
else:
if LOG_FLAG:
logger.info(f"Deleting {doc_name} from vectorstore")
query = {"query": {"match": {"metadata.doc_name": {"query": doc_name, "operator": "AND"}}}}
es_client.delete_by_query(index=INDEX_NAME, body=query)
return True
except Exception as e:
if LOG_FLAG:
logger.info(f"An unexpected error occurred: {e}")
return False
def search_by_filename(file_name: str) -> bool:
"""Search Elasticsearch by file name."""
query = {"query": {"match": {"metadata.doc_name": {"query": file_name, "operator": "AND"}}}}
results = es_client.search(index=INDEX_NAME, body=query)
if LOG_FLAG:
logger.info(f"[ search by file ] searched by {file_name}")
logger.info(f"[ search by file ] {len(results['hits'])} results: {results}")
return results["hits"]["total"]["value"] > 0
def ingest_doc_to_elastic(doc_path: DocPath) -> None:
"""Ingest documents to Elasticsearch."""
path = doc_path.path
file_name = path.split("/")[-1]
if LOG_FLAG:
logger.info(f"Parsing document {path}, file name: {file_name}.")
if path.endswith(".html"):
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
("h3", "Header 3"),
]
text_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
else:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=doc_path.chunk_size,
chunk_overlap=doc_path.chunk_overlap,
add_start_index=True,
separators=get_separators(),
)
content = document_loader(path)
structured_types = [".xlsx", ".csv", ".json", "jsonl"]
_, ext = os.path.splitext(path)
if ext in structured_types:
chunks = content
else:
chunks = text_splitter.split_text(content)
if doc_path.process_table and path.endswith(".pdf"):
table_chunks = get_tables_result(path, doc_path.table_strategy)
chunks = chunks + table_chunks
if LOG_FLAG:
logger.info(f"Done preprocessing. Created {len(chunks)} chunks of the original file.")
batch_size = 32
num_chunks = len(chunks)
metadata = dict({"doc_name": str(file_name)})
for i in range(0, num_chunks, batch_size):
batch_chunks = chunks[i : i + batch_size]
batch_texts = batch_chunks
documents = [Document(page_content=text, metadata=metadata) for text in batch_texts]
_ = es_store.add_documents(documents=documents)
if LOG_FLAG:
logger.info(f"Processed batch {i // batch_size + 1}/{(num_chunks - 1) // batch_size + 1}")
async def ingest_link_to_elastic(link_list: List[str]) -> None:
"""Ingest data scraped from website links into Elasticsearch."""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
add_start_index=True,
separators=get_separators(),
)
batch_size = 32
for link in link_list:
content = parse_html([link])[0][0]
if LOG_FLAG:
logger.info(f"[ ingest link ] link: {link} content: {content}")
encoded_link = encode_filename(link)
save_path = UPLOADED_FILES_PATH + encoded_link + ".txt"
doc_path = UPLOADED_FILES_PATH + link + ".txt"
if LOG_FLAG:
logger.info(f"[ ingest link ] save_path: {save_path}")
await save_content_to_local_disk(save_path, content)
chunks = text_splitter.split_text(content)
num_chunks = len(chunks)
metadata = [dict({"doc_name": str(doc_path)})]
for i in range(0, num_chunks, batch_size):
batch_chunks = chunks[i : i + batch_size]
batch_texts = batch_chunks
documents = [Document(page_content=text, metadata=metadata) for text in batch_texts]
_ = es_store.add_documents(documents=documents)
if LOG_FLAG:
logger.info(f"Processed batch {i // batch_size + 1}/{(num_chunks - 1) // batch_size + 1}")
@register_microservice(name="opea_service@prepare_doc_elastic", endpoint="/v1/dataprep", host="0.0.0.0", port=6011)
async def ingest_documents(
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: int = Form(1500),
chunk_overlap: int = Form(100),
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
):
"""Ingest documents for RAG."""
if LOG_FLAG:
logger.info(f"files:{files}")
logger.info(f"link_list:{link_list}")
if files and link_list:
raise HTTPException(status_code=400, detail="Provide either a file or a string list, not both.")
if files:
if not isinstance(files, list):
files = [files]
if not os.path.exists(UPLOADED_FILES_PATH):
Path(UPLOADED_FILES_PATH).mkdir(parents=True, exist_ok=True)
for file in files:
encode_file = encode_filename(file.filename)
save_path = UPLOADED_FILES_PATH + encode_file
filename = save_path.split("/")[-1]
try:
exists = search_by_filename(filename)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed when searching in Elasticsearch for file {file.filename}.",
)
if exists:
if LOG_FLAG:
logger.info(f"[ upload ] File {file.filename} already exists.")
raise HTTPException(
status_code=400,
detail=f"Uploaded file {file.filename} already exists. Please change file name.",
)
await save_content_to_local_disk(save_path, file)
ingest_doc_to_elastic(
DocPath(
path=save_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)
)
if LOG_FLAG:
logger.info(f"Successfully saved file {save_path}")
result = {"status": 200, "message": "Data preparation succeeded"}
if LOG_FLAG:
logger.info(result)
return result
if link_list:
try:
link_list = json.loads(link_list) # Parse JSON string to list
if not isinstance(link_list, list):
raise HTTPException(status_code=400, detail="link_list should be a list.")
await ingest_link_to_elastic(link_list)
if LOG_FLAG:
logger.info(f"Successfully saved link list {link_list}")
result = {"status": 200, "message": "Data preparation succeeded"}
if LOG_FLAG:
logger.info(result)
return result
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.")
raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")
@register_microservice(
name="opea_service@prepare_doc_elastic",
endpoint="/v1/dataprep/get_file",
host="0.0.0.0",
port=6011,
)
async def rag_get_file_structure():
"""Obtain uploaded file list."""
if LOG_FLAG:
logger.info("[ dataprep - get file ] start to get file structure")
if not Path(UPLOADED_FILES_PATH).exists():
if LOG_FLAG:
logger.info("No file uploaded, return empty list.")
return []
file_content = get_file_structure(UPLOADED_FILES_PATH)
if LOG_FLAG:
logger.info(file_content)
return file_content
@register_microservice(
name="opea_service@prepare_doc_elastic",
endpoint="/v1/dataprep/delete_file",
host="0.0.0.0",
port=6011,
)
async def delete_single_file(file_path: str = Body(..., embed=True)):
"""Delete file according to `file_path`.
`file_path`:
- specific file path (e.g. /path/to/file.txt)
- folder path (e.g. /path/to/folder)
- "all": delete all files uploaded
"""
if file_path == "all":
if LOG_FLAG:
logger.info("[dataprep - del] delete all files")
remove_folder_with_ignore(UPLOADED_FILES_PATH)
assert delete_embeddings(file_path)
if LOG_FLAG:
logger.info("[dataprep - del] successfully delete all files.")
create_upload_folder(UPLOADED_FILES_PATH)
if LOG_FLAG:
logger.info({"status": True})
return {"status": True}
delete_path = Path(UPLOADED_FILES_PATH + "/" + encode_filename(file_path))
if LOG_FLAG:
logger.info(f"[dataprep - del] delete_path: {delete_path}")
if delete_path.exists():
# delete file
if delete_path.is_file():
try:
assert delete_embeddings(file_path)
delete_path.unlink()
except Exception as e:
if LOG_FLAG:
logger.info(f"[dataprep - del] fail to delete file {delete_path}: {e}")
logger.info({"status": False})
return {"status": False}
# delete folder
else:
if LOG_FLAG:
logger.info("[dataprep - del] delete folder is not supported for now.")
logger.info({"status": False})
return {"status": False}
if LOG_FLAG:
logger.info({"status": True})
return {"status": True}
else:
raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.")
if __name__ == "__main__":
es_client = Elasticsearch(hosts=ES_CONNECTION_STRING)
es_store = get_elastic_store(get_embedder())
create_upload_folder(UPLOADED_FILES_PATH)
create_index()
opea_microservices["opea_service@prepare_doc_elastic"].start()

View File

@@ -0,0 +1,30 @@
beautifulsoup4
cairosvg
docarray[full]
docx2txt
easyocr
elasticsearch
fastapi
huggingface_hub
langchain
langchain-community
langchain-elasticsearch
langchain-huggingface
langchain-text-splitters
markdown
numpy
opentelemetry-api
opentelemetry-exporter-otlp
opentelemetry-sdk
pandas
Pillow
prometheus-fastapi-instrumentator
pymupdf
pytesseract
python-bidi
python-docx
python-pptx
sentence_transformers
shortuuid
unstructured[all-docs]
uvicorn

View File

@@ -0,0 +1,28 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
FROM python:3.11-slim
ARG ARCH="cpu"
RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \
libgl1-mesa-glx \
libjemalloc-dev
RUN useradd -m -s /bin/bash user && \
mkdir -p /home/user && \
chown -R user /home/user/
COPY comps /home/user/comps
USER user
RUN pip install --no-cache-dir --upgrade pip setuptools && \
if [ ${ARCH} = "cpu" ]; then pip install --no-cache-dir torch torchvision --index-url https://download.pytorch.org/whl/cpu; fi && \
pip install --no-cache-dir -r /home/user/comps/retrievers/elasticsearch/langchain/requirements.txt
ENV PYTHONPATH=$PYTHONPATH:/home/user
WORKDIR /home/user/comps/retrievers/elasticsearch/langchain
ENTRYPOINT ["python", "retriever_elasticsearch.py"]

View File

@@ -0,0 +1,122 @@
# Retriever Microservice
This retriever microservice is a highly efficient search service designed for handling and retrieving embedding vectors.
It operates by receiving an embedding vector as input and conducting a similarity search against vectors stored in a
VectorDB database. Users must specify the VectorDB's URL and the index name, and the service searches within that index
to find documents with the highest similarity to the input vector.
The service primarily utilizes similarity measures in vector space to rapidly retrieve contentually similar documents.
The vector-based retrieval approach is particularly suited for handling large datasets, offering fast and accurate
search results that significantly enhance the efficiency and quality of information retrieval.
Overall, this microservice provides robust backend support for applications requiring efficient similarity searches,
playing a vital role in scenarios such as recommendation systems, information retrieval, or any other context where
precise measurement of document similarity is crucial.
## 🚀1. Start Microservice with Python (Option 1)
To start the retriever microservice, you must first install the required python packages.
### 1.1 Install Requirements
```bash
pip install -r requirements.txt
```
### 1.2 Start TEI Service
```bash
model=BAAI/bge-base-en-v1.5
volume=$PWD/data
docker run -d -p 6060:80 -v $volume:/data -e http_proxy=$http_proxy -e https_proxy=$https_proxy --pull always ghcr.io/huggingface/text-embeddings-inference:cpu-1.5 --model-id $model
```
### 1.3 Verify the TEI Service
Health check the embedding service with:
```bash
curl 127.0.0.1:6060/embed \
-X POST \
-d '{"inputs":"What is Deep Learning?"}' \
-H 'Content-Type: application/json'
```
### 1.4 Setup VectorDB Service
You need to setup your own VectorDB service (Elasticsearch in this example), and ingest your knowledge documents into
the vector database.
As for Elasticsearch, you could start a docker container using the following commands.
Remember to ingest data into it manually.
```bash
docker run -d --name vectorstore-elasticsearch -e ES_JAVA_OPTS="-Xms1g -Xmx1g" -e "discovery.type=single-node" -e "xpack.security.enabled=false" -p 9200:9200 -p 9300:9300 docker.elastic.co/elasticsearch/elasticsearch:8.16.0
```
### 1.5 Start Retriever Service
```bash
export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060"
python retriever_elasticsearch.py
```
## 🚀2. Start Microservice with Docker (Option 2)
### 2.1 Setup Environment Variables
```bash
export EMBED_MODEL="BAAI/bge-base-en-v1.5"
export ES_CONNECTION_STRING="http://localhost:9200"
export INDEX_NAME=${your_index_name}
export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060"
```
### 2.2 Build Docker Image
```bash
cd comps/retrievers/elasticsearch/langchain
docker build -t opea/retriever-elasticsearch:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/retrievers/elasticsearch/langchain/Dockerfile .
```
To start a docker container, you have two options:
- A. Run Docker with CLI
- B. Run Docker with Docker Compose
You can choose one as needed.
### 2.3 Run Docker with CLI (Option A)
```bash
docker run -d --name="retriever-elasticsearch" -p 7000:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e ES_CONNECTION_STRING=$ES_CONNECTION_STRING -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT opea/retriever-elasticsearch:latest
```
### 2.4 Run Docker with Docker Compose (Option B)
```bash
cd comps/retrievers/elasticsearch/langchain
docker compose -f docker_compose_retriever.yaml up -d
```
## 🚀3. Consume Retriever Service
### 3.1 Check Service Status
```bash
curl http://localhost:7000/v1/health_check \
-X GET \
-H 'Content-Type: application/json'
```
### 3.2 Consume Embedding Service
To consume the Retriever Microservice, you can generate a mock embedding vector of length 768 with Python.
```bash
export your_embedding=$(python -c "import random; embedding = [random.uniform(-1, 1) for _ in range(768)]; print(embedding)")
curl http://${your_ip}:7000/v1/retrieval \
-X POST \
-d "{\"text\":\"What is the revenue of Nike in 2023?\",\"embedding\":${your_embedding}}" \
-H 'Content-Type: application/json'
```

View File

@@ -0,0 +1,18 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import os
# Embedding model
EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5")
ES_CONNECTION_STRING = os.getenv("ES_CONNECTION_STRING", "http://localhost:9200")
# TEI Embedding endpoints
TEI_ENDPOINT = os.getenv("TEI_ENDPOINT", "")
# Vector Index Configuration
INDEX_NAME = os.getenv("INDEX_NAME", "rag-elastic")
# Logging enabled
LOG_FLAG = os.getenv("LOGFLAG", False)

View File

@@ -0,0 +1,33 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
version: "3.8"
services:
tei_xeon_service:
image: ghcr.io/huggingface/text-embeddings-inference:cpu-1.2
container_name: tei-xeon-server
ports:
- "6060:80"
volumes:
- "./data:/data"
shm_size: 1g
command: --model-id ${RETRIEVE_MODEL_ID}
retriever:
image: opea/retriever-elasticsearch:latest
container_name: retriever-elasticsearch
ports:
- "7000:7000"
ipc: host
environment:
http_proxy: ${http_proxy}
https_proxy: ${https_proxy}
INDEX_NAME: ${INDEX_NAME}
TEI_ENDPOINT: ${TEI_ENDPOINT}
ES_CONNECTION_STRING: ${ES_CONNECTION_STRING}
HUGGINGFACEHUB_API_TOKEN: ${HUGGINGFACEHUB_API_TOKEN}
restart: unless-stopped
networks:
default:
driver: bridge

View File

@@ -0,0 +1,14 @@
docarray[full]
easyocr
fastapi
langchain-community
langchain-elasticsearch
langchain-huggingface
opentelemetry-api
opentelemetry-exporter-otlp
opentelemetry-sdk
prometheus-fastapi-instrumentator==7.0.0
pymupdf
sentence_transformers
shortuuid
uvicorn

View File

@@ -0,0 +1,105 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import time
from typing import Union
from config import EMBED_MODEL, ES_CONNECTION_STRING, INDEX_NAME, LOG_FLAG, TEI_ENDPOINT
from elasticsearch import Elasticsearch
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
from langchain_elasticsearch import ElasticsearchStore
from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings
from comps import (
CustomLogger,
EmbedDoc,
SearchedDoc,
ServiceType,
TextDoc,
opea_microservices,
register_microservice,
register_statistics,
statistics_dict,
)
logger = CustomLogger(__name__)
def create_index() -> None:
if not es_client.indices.exists(index=INDEX_NAME):
es_client.indices.create(index=INDEX_NAME)
def get_embedder() -> Union[HuggingFaceEndpointEmbeddings, HuggingFaceBgeEmbeddings]:
"""Obtain required Embedder."""
if TEI_ENDPOINT:
return HuggingFaceEndpointEmbeddings(model=TEI_ENDPOINT)
else:
return HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)
def get_elastic_store(embedder: Union[HuggingFaceEndpointEmbeddings, HuggingFaceBgeEmbeddings]) -> ElasticsearchStore:
"""Get Elasticsearch vector store."""
return ElasticsearchStore(index_name=INDEX_NAME, embedding=embedder, es_connection=es_client)
@register_microservice(
name="opea_service@retriever_elasticsearch",
service_type=ServiceType.RETRIEVER,
endpoint="/v1/retrieval",
host="0.0.0.0",
port=7000,
)
@register_statistics(names=["opea_service@retriever_elasticsearch"])
async def retrieve(input: EmbedDoc) -> SearchedDoc:
"""Retrieve documents based on similarity search type."""
if LOG_FLAG:
logger.info(input)
start = time.time()
if input.search_type == "similarity":
docs_and_similarities = vector_db.similarity_search_by_vector_with_relevance_scores(
embedding=input.embedding, k=input.k
)
search_res = [doc for doc, _ in docs_and_similarities]
elif input.search_type == "similarity_distance_threshold":
if input.distance_threshold is None:
raise ValueError("distance_threshold must be provided for " + "similarity_distance_threshold retriever")
docs_and_similarities = vector_db.similarity_search_by_vector_with_relevance_scores(
embedding=input.embedding, k=input.k
)
search_res = [doc for doc, similarity in docs_and_similarities if similarity > input.distance_threshold]
elif input.search_type == "similarity_score_threshold":
docs_and_similarities = vector_db.similarity_search_by_vector_with_relevance_scores(query=input.text, k=input.k)
search_res = [doc for doc, similarity in docs_and_similarities if similarity > input.score_threshold]
elif input.search_type == "mmr":
search_res = vector_db.max_marginal_relevance_search(
query=input.text, k=input.k, fetch_k=input.fetch_k, lambda_mult=input.lambda_mult
)
else:
raise ValueError(f"{input.search_type} not valid")
searched_docs = []
for r in search_res:
searched_docs.append(TextDoc(text=r.page_content))
result = SearchedDoc(retrieved_docs=searched_docs, initial_query=input.text)
statistics_dict["opea_service@retriever_elasticsearch"].append_latency(time.time() - start, None)
if LOG_FLAG:
logger.info(result)
return result
if __name__ == "__main__":
es_client = Elasticsearch(hosts=ES_CONNECTION_STRING)
vector_db = get_elastic_store(get_embedder())
create_index()
opea_microservices["opea_service@retriever_elasticsearch"].start()

View File

@@ -0,0 +1,13 @@
# Start Elasticsearch server
## 1. Download Elasticsearch image
```bash
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.16.0
```
## 2. Run Elasticsearch service
```bash
docker run -p 9200:9200 -p 9300:9300 -e ES_JAVA_OPTS="-Xms1g -Xmx1g" -e "discovery.type=single-node" -e "xpack.security.enabled=false" \ docker.elastic.co/elasticsearch/elasticsearch:8.16.0
```

View File

@@ -0,0 +1,2 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

View File

@@ -0,0 +1,16 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
version: "3"
services:
elasticsearch-vector-db:
image: docker.elastic.co/elasticsearch/elasticsearch:8.16.0
container_name: elasticsearch-vector-db
ports:
- "9200:9200"
- "9300:9300"
environment:
- ES_JAVA_OPTS=-Xms1g -Xmx1g
- discovery.type=single-node
- xpack.security.enabled=false
- bootstrap.memory_lock=false