Refine Dataprep Milvus MS (#570)

Signed-off-by: letonghan <letong.han@intel.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
Letong Han
2024-08-29 15:29:41 +08:00
committed by GitHub
parent 2360e5ad07
commit 7686cfa601
5 changed files with 1412 additions and 242 deletions

View File

@@ -1,8 +1,8 @@
# Dataprep Microservice with Milvus
## 🚀Start Microservice with Python
## 🚀1. Start Microservice with Python (Option 1)
### Install Requirements
### 1.1 Requirements
```bash
pip install -r requirements.txt
@@ -11,11 +11,11 @@ apt-get install libtesseract-dev -y
apt-get install poppler-utils -y
```
### Start Milvus Server
### 1.2 Start Milvus Server
Please refer to this [readme](../../../vectorstores/langchain/milvus/README.md).
### Setup Environment Variables
### 1.3 Setup Environment Variables
```bash
export no_proxy=${your_no_proxy}
@@ -27,7 +27,30 @@ export COLLECTION_NAME=${your_collection_name}
export MOSEC_EMBEDDING_ENDPOINT=${your_embedding_endpoint}
```
### Start Document Preparation Microservice for Milvus with Python Script
### 1.4 Start Mosec Embedding Service
First, you need to build a mosec embedding serving docker image.
```bash
cd ../../..
docker build --build-arg http_proxy=$http_proxy --build-arg https_proxy=$https_proxy -t opea/embedding-mosec-endpoint:latest -f comps/embeddings/langchain-mosec/mosec-docker/Dockerfile .
```
Then start the mosec embedding server.
```bash
your_port=6010
docker run -d --name="embedding-mosec-endpoint" -p $your_port:8000 opea/embedding-mosec-endpoint:latest
```
Setup environment variables:
```bash
export MOSEC_EMBEDDING_ENDPOINT="http://localhost:$your_port"
export MILVUS=${your_host_ip}
```
### 1.5 Start Document Preparation Microservice for Milvus with Python Script
Start document preparation microservice for Milvus with below command.
@@ -35,22 +58,45 @@ Start document preparation microservice for Milvus with below command.
python prepare_doc_milvus.py
```
## 🚀Start Microservice with Docker
## 🚀2. Start Microservice with Docker (Option 2)
### Build Docker Image
### 2.1 Start Milvus Server
Please refer to this [readme](../../../vectorstores/langchain/milvus/README.md).
### 2.2 Build Docker Image
```bash
cd ../../../../
cd ../../..
# build mosec embedding docker image
docker build --build-arg http_proxy=$http_proxy --build-arg https_proxy=$https_proxy -t opea/embedding-langchain-mosec-endpoint:latest -f comps/embeddings/langchain-mosec/mosec-docker/Dockerfile .
# build dataprep milvus docker image
docker build -t opea/dataprep-milvus:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy --build-arg no_proxy=$no_proxy -f comps/dataprep/milvus/docker/Dockerfile .
```
### Run Docker with CLI
### 2.3 Setup Environment Variables
```bash
docker run -d --name="dataprep-milvus-server" -p 6010:6010 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e no_proxy=$no_proxy -e MOSEC_EMBEDDING_ENDPOINT=${your_embedding_endpoint} -e MILVUS=${your_milvus_host_ip} opea/dataprep-milvus:latest
export MOSEC_EMBEDDING_ENDPOINT="http://localhost:$your_port"
export MILVUS=${your_host_ip}
```
## Invoke Microservice
### 2.3 Run Docker with CLI (Option A)
```bash
docker run -d --name="dataprep-milvus-server" -p 6010:6010 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e no_proxy=$no_proxy -e MOSEC_EMBEDDING_ENDPOINT=${MOSEC_EMBEDDING_ENDPOINT} -e MILVUS=${MILVUS} opea/dataprep-milvus:latest
```
### 2.4 Run with Docker Compose (Option B)
```bash
cd docker
docker compose -f docker-compose-dataprep-milvus.yaml up -d
```
## 🚀3. Consume Microservice
### 3.1 Consume Upload API
Once document preparation microservice for Milvus is started, user can use below command to invoke the microservice to convert the document to embedding and save to the database.
@@ -65,13 +111,13 @@ curl -X POST \
http://localhost:6010/v1/dataprep
```
You can specify chunk_size and chunk_size by the following commands.
You can specify chunk_size and chunk_size by the following commands. To avoid big chunks, pass a small chun_size like 500 as below (default 1500).
```bash
curl -X POST \
-H "Content-Type: multipart/form-data" \
-F "files=@./file.pdf" \
-F "chunk_size=1500" \
-F "chunk_size=500" \
-F "chunk_overlap=100" \
http://localhost:6010/v1/dataprep
```
@@ -132,3 +178,70 @@ Note: If you specify "table_strategy=llm", You should first start TGI Service, p
```bash
curl -X POST -H "Content-Type: application/json" -d '{"path":"/home/user/doc/your_document_name","process_table":true,"table_strategy":"hq"}' http://localhost:6010/v1/dataprep
```
### 3.2 Consume get_file API
To get uploaded file structures, use the following command:
```bash
curl -X POST \
-H "Content-Type: application/json" \
http://localhost:6010/v1/dataprep/get_file
```
Then you will get the response JSON like this:
```json
[
{
"name": "uploaded_file_1.txt",
"id": "uploaded_file_1.txt",
"type": "File",
"parent": ""
},
{
"name": "uploaded_file_2.txt",
"id": "uploaded_file_2.txt",
"type": "File",
"parent": ""
}
]
```
### 3.3 Consume delete_file API
To delete uploaded file/link, use the following command.
The `file_path` here should be the `id` get from `/v1/dataprep/get_file` API.
```bash
# delete link
curl -X POST \
-H "Content-Type: application/json" \
-d '{"file_path": "https://www.ces.tech/.txt"}' \
http://localhost:6010/v1/dataprep/delete_file
# delete file
curl -X POST \
-H "Content-Type: application/json" \
-d '{"file_path": "uploaded_file_1.txt"}' \
http://localhost:6010/v1/dataprep/delete_file
# delete all files and links, will drop the entire db collection
curl -X POST \
-H "Content-Type: application/json" \
-d '{"file_path": "all"}' \
http://localhost:6010/v1/dataprep/delete_file
```
## 🚀4. Troubleshooting
1. If you get errors from Mosec Embedding Endpoint like `cannot find this task, maybe it has expired` while uploading files, try to reduce the `chunk_size` in the curl command like below (the default chunk_size=1500).
```bash
curl -X POST \
-H "Content-Type: multipart/form-data" \
-F "files=@./file.pdf" \
-F "chunk_size=500" \
http://localhost:6010/v1/dataprep
```

View File

@@ -0,0 +1,93 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
version: "3"
services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
healthcheck:
test: ["CMD", "etcdctl", "endpoint", "health"]
interval: 30s
timeout: 20s
retries: 3
minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
ports:
- "9001:9001"
- "9000:9000"
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
command: minio server /minio_data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
standalone:
container_name: milvus-standalone
image: milvusdb/milvus:v2.4.6
command: ["milvus", "run", "standalone"]
security_opt:
- seccomp:unconfined
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/milvus.yaml:/milvus/configs/milvus.yaml
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
interval: 30s
start_period: 90s
timeout: 20s
retries: 3
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- "etcd"
- "minio"
mosec-embedding:
image: opea/embedding-mosec-endpoint:latest
container_name: embedding-mosec-server
ports:
- "6009:8000"
ipc: host
environment:
http_proxy: ${http_proxy}
https_proxy: ${https_proxy}
restart: unless-stopped
dataprep-milvus:
image: opea/dataprep-milvus:latest
container_name: dataprep-milvus-server
ports:
- "6010:6010"
ipc: host
environment:
no_proxy: ${no_proxy}
http_proxy: ${http_proxy}
https_proxy: ${https_proxy}
MOSEC_EMBEDDING_ENDPOINT: ${MOSEC_EMBEDDING_ENDPOINT}
MILVUS: ${MILVUS}
restart: unless-stopped
networks:
default:
driver: bridge

View File

@@ -0,0 +1,811 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Related configuration of etcd, used to store Milvus metadata & service discovery.
etcd:
endpoints: localhost:2379
rootPath: by-dev # The root path where data is stored in etcd
metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath
kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath
log:
level: info # Only supports debug, info, warn, error, panic, or fatal. Default 'info'.
# path is one of:
# - "default" as os.Stderr,
# - "stderr" as os.Stderr,
# - "stdout" as os.Stdout,
# - file path to append server logs to.
# please adjust in embedded Milvus: /tmp/milvus/logs/etcd.log
path: stdout
ssl:
enabled: false # Whether to support ETCD secure connection mode
tlsCert: /path/to/etcd-client.pem # path to your cert file
tlsKey: /path/to/etcd-client-key.pem # path to your key file
tlsCACert: /path/to/ca.pem # path to your CACert file
# TLS min version
# Optional values: 1.0, 1.1, 1.2, 1.3。
# We recommend using version 1.2 and above.
tlsMinVersion: 1.3
requestTimeout: 10000 # Etcd operation timeout in milliseconds
use:
embed: false # Whether to enable embedded Etcd (an in-process EtcdServer).
data:
dir: default.etcd # Embedded Etcd only. please adjust in embedded Milvus: /tmp/milvus/etcdData/
auth:
enabled: false # Whether to enable authentication
userName: # username for etcd authentication
password: # password for etcd authentication
metastore:
type: etcd # Default value: etcd, Valid values: [etcd, tikv]
# Related configuration of tikv, used to store Milvus metadata.
# Notice that when TiKV is enabled for metastore, you still need to have etcd for service discovery.
# TiKV is a good option when the metadata size requires better horizontal scalability.
tikv:
endpoints: 127.0.0.1:2389 # Note that the default pd port of tikv is 2379, which conflicts with etcd.
rootPath: by-dev # The root path where data is stored in tikv
metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath
kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath
requestTimeout: 10000 # ms, tikv request timeout
snapshotScanSize: 256 # batch size of tikv snapshot scan
ssl:
enabled: false # Whether to support TiKV secure connection mode
tlsCert: # path to your cert file
tlsKey: # path to your key file
tlsCACert: # path to your CACert file
localStorage:
path: /var/lib/milvus/data/ # please adjust in embedded Milvus: /tmp/milvus/data/
# Related configuration of MinIO/S3/GCS or any other service supports S3 API, which is responsible for data persistence for Milvus.
# We refer to the storage service as MinIO/S3 in the following description for simplicity.
minio:
address: localhost # Address of MinIO/S3
port: 9000 # Port of MinIO/S3
accessKeyID: minioadmin # accessKeyID of MinIO/S3
secretAccessKey: minioadmin # MinIO/S3 encryption string
useSSL: false # Access to MinIO/S3 with SSL
ssl:
tlsCACert: /path/to/public.crt # path to your CACert file
bucketName: a-bucket # Bucket name in MinIO/S3
rootPath: files # The root path where the message is stored in MinIO/S3
# Whether to useIAM role to access S3/GCS instead of access/secret keys
# For more information, refer to
# aws: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use.html
# gcp: https://cloud.google.com/storage/docs/access-control/iam
# aliyun (ack): https://www.alibabacloud.com/help/en/container-service-for-kubernetes/latest/use-rrsa-to-enforce-access-control
# aliyun (ecs): https://www.alibabacloud.com/help/en/elastic-compute-service/latest/attach-an-instance-ram-role
useIAM: false
# Cloud Provider of S3. Supports: "aws", "gcp", "aliyun".
# You can use "aws" for other cloud provider supports S3 API with signature v4, e.g.: minio
# You can use "gcp" for other cloud provider supports S3 API with signature v2
# You can use "aliyun" for other cloud provider uses virtual host style bucket
# When useIAM enabled, only "aws", "gcp", "aliyun" is supported for now
cloudProvider: aws
# Custom endpoint for fetch IAM role credentials. when useIAM is true & cloudProvider is "aws".
# Leave it empty if you want to use AWS default endpoint
iamEndpoint:
logLevel: fatal # Log level for aws sdk log. Supported level: off, fatal, error, warn, info, debug, trace
region: # Specify minio storage system location region
useVirtualHost: false # Whether use virtual host mode for bucket
requestTimeoutMs: 10000 # minio timeout for request time in milliseconds
# The maximum number of objects requested per batch in minio ListObjects rpc,
# 0 means using oss client by default, decrease these configuration if ListObjects timeout
listObjectsMaxKeys: 0
# Milvus supports four MQ: rocksmq(based on RockDB), natsmq(embedded nats-server), Pulsar and Kafka.
# You can change your mq by setting mq.type field.
# If you don't set mq.type field as default, there is a note about enabling priority if we config multiple mq in this file.
# 1. standalone(local) mode: rocksmq(default) > natsmq > Pulsar > Kafka
# 2. cluster mode: Pulsar(default) > Kafka (rocksmq and natsmq is unsupported in cluster mode)
mq:
# Default value: "default"
# Valid values: [default, pulsar, kafka, rocksmq, natsmq]
type: default
enablePursuitMode: true # Default value: "true"
pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds
pursuitBufferSize: 8388608 # pursuit mode buffer size in bytes
mqBufSize: 16 # MQ client consumer buffer length
dispatcher:
mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge
targetBufSize: 16 # the length of channel buffer for targe
maxTolerantLag: 3 # Default value: "3", the timeout(in seconds) that target sends msgPack
# Related configuration of pulsar, used to manage Milvus logs of recent mutation operations, output streaming log, and provide log publish-subscribe services.
pulsar:
address: localhost # Address of pulsar
port: 6650 # Port of Pulsar
webport: 80 # Web port of pulsar, if you connect directly without proxy, should use 8080
maxMessageSize: 5242880 # 5 * 1024 * 1024 Bytes, Maximum size of each message in pulsar.
tenant: public
namespace: default
requestTimeout: 60 # pulsar client global request timeout in seconds
enableClientMetrics: false # Whether to register pulsar client metrics into milvus metrics path.
# If you want to enable kafka, needs to comment the pulsar configs
# kafka:
# brokerList:
# saslUsername:
# saslPassword:
# saslMechanisms:
# securityProtocol:
# ssl:
# enabled: false # whether to enable ssl mode
# tlsCert: # path to client's public key (PEM) used for authentication
# tlsKey: # path to client's private key (PEM) used for authentication
# tlsCaCert: # file or directory path to CA certificate(s) for verifying the broker's key
# tlsKeyPassword: # private key passphrase for use with ssl.key.location and set_ssl_cert(), if any
# readTimeout: 10
rocksmq:
# The path where the message is stored in rocksmq
# please adjust in embedded Milvus: /tmp/milvus/rdb_data
path: /var/lib/milvus/rdb_data
lrucacheratio: 0.06 # rocksdb cache memory ratio
rocksmqPageSize: 67108864 # 64 MB, 64 * 1024 * 1024 bytes, The size of each page of messages in rocksmq
retentionTimeInMinutes: 4320 # 3 days, 3 * 24 * 60 minutes, The retention time of the message in rocksmq.
retentionSizeInMB: 8192 # 8 GB, 8 * 1024 MB, The retention size of the message in rocksmq.
compactionInterval: 86400 # 1 day, trigger rocksdb compaction every day to remove deleted data
compressionTypes: 0,0,7,7,7 # compaction compression type, only support use 0,7. 0 means not compress, 7 will use zstd. Length of types means num of rocksdb level.
# natsmq configuration.
# more detail: https://docs.nats.io/running-a-nats-service/configuration
natsmq:
server:
port: 4222 # Port for nats server listening
storeDir: /var/lib/milvus/nats # Directory to use for JetStream storage of nats
maxFileStore: 17179869184 # Maximum size of the 'file' storage
maxPayload: 8388608 # Maximum number of bytes in a message payload
maxPending: 67108864 # Maximum number of bytes buffered for a connection Applies to client connections
initializeTimeout: 4000 # waiting for initialization of natsmq finished
monitor:
trace: false # If true enable protocol trace log messages
debug: false # If true enable debug log messages
logTime: true # If set to false, log without timestamps.
logFile: /tmp/milvus/logs/nats.log # Log file path relative to .. of milvus binary if use relative path
logSizeLimit: 536870912 # Size in bytes after the log file rolls over to a new one
retention:
maxAge: 4320 # Maximum age of any message in the P-channel
maxBytes: # How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size
maxMsgs: # How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit
# Related configuration of rootCoord, used to handle data definition language (DDL) and data control language (DCL) requests
rootCoord:
dmlChannelNum: 16 # The number of dml channels created at system startup
maxPartitionNum: 1024 # Maximum number of partitions in a collection
minSegmentSizeToEnableIndex: 1024 # It's a threshold. When the segment size is less than this value, the segment will not be indexed
enableActiveStandby: false
maxDatabaseNum: 64 # Maximum number of database
maxGeneralCapacity: 65536 # upper limit for the sum of of product of partitionNumber and shardNumber
gracefulStopTimeout: 5 # seconds. force stop node without graceful stop
ip: # if not specified, use the first unicastable address
port: 53100
grpc:
serverMaxSendSize: 536870912
serverMaxRecvSize: 268435456
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
# Related configuration of proxy, used to validate client requests and reduce the returned results.
proxy:
timeTickInterval: 200 # ms, the interval that proxy synchronize the time tick
healthCheckTimeout: 3000 # ms, the interval that to do component healthy check
msgStream:
timeTick:
bufSize: 512
maxNameLength: 255 # Maximum length of name for a collection or alias
# Maximum number of fields in a collection.
# As of today (2.2.0 and after) it is strongly DISCOURAGED to set maxFieldNum >= 64.
# So adjust at your risk!
maxFieldNum: 64
maxVectorFieldNum: 4 # Maximum number of vector fields in a collection.
maxShardNum: 16 # Maximum number of shards in a collection
maxDimension: 32768 # Maximum dimension of a vector
# Whether to produce gin logs.\n
# please adjust in embedded Milvus: false
ginLogging: true
ginLogSkipPaths: / # skip url path for gin log
maxTaskNum: 1024 # max task number of proxy task queue
mustUsePartitionKey: false # switch for whether proxy must use partition key for the collection
accessLog:
enable: false # if use access log
minioEnable: false # if upload sealed access log file to minio
localPath: /tmp/milvus_access
filename: # Log filename, leave empty to use stdout.
maxSize: 64 # Max size for a single file, in MB.
cacheSize: 10240 # Size of log of memory cache, in B
rotatedTime: 0 # Max time for single access log file in seconds
remotePath: access_log/ # File path in minIO
remoteMaxTime: 0 # Max time for log file in minIO, in hours
formatters:
base:
format: "[$time_now] [ACCESS] <$user_name: $user_addr> $method_name [status: $method_status] [code: $error_code] [sdk: $sdk_version] [msg: $error_msg] [traceID: $trace_id] [timeCost: $time_cost]"
query:
format: "[$time_now] [ACCESS] <$user_name: $user_addr> $method_name [status: $method_status] [code: $error_code] [sdk: $sdk_version] [msg: $error_msg] [traceID: $trace_id] [timeCost: $time_cost] [database: $database_name] [collection: $collection_name] [partitions: $partition_name] [expr: $method_expr]"
methods: "Query,Search,Delete"
connectionCheckIntervalSeconds: 120 # the interval time(in seconds) for connection manager to scan inactive client info
connectionClientInfoTTLSeconds: 86400 # inactive client info TTL duration, in seconds
maxConnectionNum: 10000 # the max client info numbers that proxy should manage, avoid too many client infos
gracefulStopTimeout: 30 # seconds. force stop node without graceful stop
slowQuerySpanInSeconds: 5 # query whose executed time exceeds the `slowQuerySpanInSeconds` can be considered slow, in seconds.
http:
enabled: true # Whether to enable the http server
debug_mode: false # Whether to enable http server debug mode
port: # high-level restful api
acceptTypeAllowInt64: true # high-level restful api, whether http client can deal with int64
enablePprof: true # Whether to enable pprof middleware on the metrics port
ip: # if not specified, use the first unicastable address
port: 19530
internalPort: 19529
grpc:
serverMaxSendSize: 268435456
serverMaxRecvSize: 67108864
clientMaxSendSize: 268435456
clientMaxRecvSize: 67108864
# Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments.
queryCoord:
taskMergeCap: 1
taskExecutionCap: 256
autoHandoff: true # Enable auto handoff
autoBalance: true # Enable auto balance
autoBalanceChannel: true # Enable auto balance channel
balancer: ScoreBasedBalancer # auto balancer used for segments on queryNodes
globalRowCountFactor: 0.1 # the weight used when balancing segments among queryNodes
scoreUnbalanceTolerationFactor: 0.05 # the least value for unbalanced extent between from and to nodes when doing balance
reverseUnBalanceTolerationFactor: 1.3 # the largest value for unbalanced extent between from and to nodes after doing balance
overloadedMemoryThresholdPercentage: 90 # The threshold percentage that memory overload
balanceIntervalSeconds: 60
memoryUsageMaxDifferencePercentage: 30
rowCountFactor: 0.4 # the row count weight used when balancing segments among queryNodes
segmentCountFactor: 0.4 # the segment count weight used when balancing segments among queryNodes
globalSegmentCountFactor: 0.1 # the segment count weight used when balancing segments among queryNodes
segmentCountMaxSteps: 50 # segment count based plan generator max steps
rowCountMaxSteps: 50 # segment count based plan generator max steps
randomMaxSteps: 10 # segment count based plan generator max steps
growingRowCountWeight: 4 # the memory weight of growing segment row count
balanceCostThreshold: 0.001 # the threshold of balance cost, if the difference of cluster's cost after executing the balance plan is less than this value, the plan will not be executed
checkSegmentInterval: 1000
checkChannelInterval: 1000
checkBalanceInterval: 10000
checkIndexInterval: 10000
channelTaskTimeout: 60000 # 1 minute
segmentTaskTimeout: 120000 # 2 minute
distPullInterval: 500
collectionObserverInterval: 200
checkExecutedFlagInterval: 100
heartbeatAvailableInterval: 10000 # 10s, Only QueryNodes which fetched heartbeats within the duration are available
loadTimeoutSeconds: 600
distRequestTimeout: 5000 # the request timeout for querycoord fetching data distribution from querynodes, in milliseconds
heatbeatWarningLag: 5000 # the lag value for querycoord report warning when last heartbeat is too old, in milliseconds
checkHandoffInterval: 5000
enableActiveStandby: false
checkInterval: 1000
checkHealthInterval: 3000 # 3s, the interval when query coord try to check health of query node
checkHealthRPCTimeout: 2000 # 100ms, the timeout of check health rpc to query node
brokerTimeout: 5000 # 5000ms, querycoord broker rpc timeout
collectionRecoverTimes: 3 # if collection recover times reach the limit during loading state, release it
observerTaskParallel: 16 # the parallel observer dispatcher task number
checkAutoBalanceConfigInterval: 10 # the interval of check auto balance config
checkNodeSessionInterval: 60 # the interval(in seconds) of check querynode cluster session
gracefulStopTimeout: 5 # seconds. force stop node without graceful stop
enableStoppingBalance: true # whether enable stopping balance
channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode
cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds
ip: # if not specified, use the first unicastable address
port: 19531
grpc:
serverMaxSendSize: 536870912
serverMaxRecvSize: 268435456
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
# Related configuration of queryNode, used to run hybrid search between vector and scalar data.
queryNode:
stats:
publishInterval: 1000 # Interval for querynode to report node information (milliseconds)
segcore:
knowhereThreadPoolNumRatio: 4 # The number of threads in knowhere's thread pool. If disk is enabled, the pool size will multiply with knowhereThreadPoolNumRatio([1, 32]).
chunkRows: 128 # The number of vectors in a chunk.
interimIndex:
enableIndex: true # Enable segment build with index to accelerate vector search when segment is in growing or binlog.
nlist: 128 # temp index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8
nprobe: 16 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist
memExpansionRate: 1.15 # extra memory needed by building interim index
buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num
knowhereScoreConsistency: false # Enable knowhere strong consistency score computation logic
loadMemoryUsageFactor: 1 # The multiply factor of calculating the memory usage while loading segments
enableDisk: false # enable querynode load disk index, and search on disk index
maxDiskUsagePercentage: 95
cache:
enabled: true
memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024
readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed`
# options: async, sync, disable.
# Specifies the necessity for warming up the chunk cache.
# 1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the
# chunk cache during the load process. This approach has the potential to substantially reduce query/search latency
# for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage;
# 2. If set to "disable" original vector data will only be loaded into the chunk cache during search/query.
warmup: disable
mmap:
mmapEnabled: false # Enable mmap for loading data
lazyload:
enabled: false # Enable lazyload for loading data
waitTimeout: 30000 # max wait timeout duration in milliseconds before start to do lazyload search and retrieve
requestResourceTimeout: 5000 # max timeout in milliseconds for waiting request resource for lazy load, 5s by default
requestResourceRetryInterval: 2000 # retry interval in milliseconds for waiting request resource for lazy load, 2s by default
maxRetryTimes: 1 # max retry times for lazy load, 1 by default
maxEvictPerRetry: 1 # max evict count for lazy load, 1 by default
grouping:
enabled: true
maxNQ: 1000
topKMergeRatio: 20
scheduler:
receiveChanSize: 10240
unsolvedQueueSize: 10240
# maxReadConcurrentRatio is the concurrency ratio of read task (search task and query task).
# Max read concurrency would be the value of hardware.GetCPUNum * maxReadConcurrentRatio.
# It defaults to 2.0, which means max read concurrency would be the value of hardware.GetCPUNum * 2.
# Max read concurrency must greater than or equal to 1, and less than or equal to hardware.GetCPUNum * 100.
# (0, 100]
maxReadConcurrentRatio: 1
cpuRatio: 10 # ratio used to estimate read task cpu usage.
maxTimestampLag: 86400
scheduleReadPolicy:
# fifo: A FIFO queue support the schedule.
# user-task-polling:
# The user's tasks will be polled one by one and scheduled.
# Scheduling is fair on task granularity.
# The policy is based on the username for authentication.
# And an empty username is considered the same user.
# When there are no multi-users, the policy decay into FIFO"
name: fifo
taskQueueExpire: 60 # Control how long (many seconds) that queue retains since queue is empty
enableCrossUserGrouping: false # Enable Cross user grouping when using user-task-polling policy. (Disable it if user's task can not merge each other)
maxPendingTaskPerUser: 1024 # Max pending task per user in scheduler
dataSync:
flowGraph:
maxQueueLength: 16 # Maximum length of task queue in flowgraph
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
enableSegmentPrune: false # use partition prune function on shard delegator
ip: # if not specified, use the first unicastable address
port: 21123
grpc:
serverMaxSendSize: 536870912
serverMaxRecvSize: 268435456
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
indexCoord:
bindIndexNodeMode:
enable: false
address: localhost:22930
withCred: false
nodeID: 0
segment:
minSegmentNumRowsToEnableIndex: 1024 # It's a threshold. When the segment num rows is less than this value, the segment will not be indexed
indexNode:
scheduler:
buildParallel: 1
enableDisk: true # enable index node build disk vector index
maxDiskUsagePercentage: 95
ip: # if not specified, use the first unicastable address
port: 21121
grpc:
serverMaxSendSize: 536870912
serverMaxRecvSize: 268435456
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
dataCoord:
channel:
watchTimeoutInterval: 300 # Timeout on watching channels (in seconds). Datanode tickler update watch progress will reset timeout timer.
balanceWithRpc: true # Whether to enable balance with RPC, default to use etcd watch
legacyVersionWithoutRPCWatch: 2.4.1 # Datanodes <= this version are considered as legacy nodes, which doesn't have rpc based watch(). This is only used during rolling upgrade where legacy nodes won't get new channels
balanceSilentDuration: 300 # The duration after which the channel manager start background channel balancing
balanceInterval: 360 # The interval with which the channel manager check dml channel balance status
checkInterval: 1 # The interval in seconds with which the channel manager advances channel states
notifyChannelOperationTimeout: 5 # Timeout notifing channel operations (in seconds).
segment:
maxSize: 1024 # Maximum size of a segment in MB
diskSegmentMaxSize: 2048 # Maximum size of a segment in MB for collection which has Disk index
sealProportion: 0.12
assignmentExpiration: 2000 # The time of the assignment expiration in ms
allocLatestExpireAttempt: 200 # The time attempting to alloc latest lastExpire from rootCoord after restart
maxLife: 86400 # The max lifetime of segment in seconds, 24*60*60
# If a segment didn't accept dml records in maxIdleTime and the size of segment is greater than
# minSizeFromIdleToSealed, Milvus will automatically seal it.
# The max idle time of segment in seconds, 10*60.
maxIdleTime: 600
minSizeFromIdleToSealed: 16 # The min size in MB of segment which can be idle from sealed.
# The max number of binlog file for one segment, the segment will be sealed if
# the number of binlog file reaches to max value.
maxBinlogFileNumber: 32
smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than
# (smallProportion * segment max # of rows).
# A compaction will happen on small segments if the segment after compaction will have
compactableProportion: 0.85
# over (compactableProportion * segment max # of rows) rows.
# MUST BE GREATER THAN OR EQUAL TO <smallProportion>!!!
# During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%.
expansionRate: 1.25
autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version
enableCompaction: true # Enable data segment compaction
compaction:
enableAutoCompaction: true
indexBasedCompaction: true
rpcTimeout: 10
maxParallelTaskNum: 10
workerMaxParallelTaskNum: 2
levelzero:
forceTrigger:
minSize: 8388608 # The minimum size in bytes to force trigger a LevelZero Compaction, default as 8MB
maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB
deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
enableGarbageCollection: true
gc:
interval: 3600 # gc interval in seconds
missingTolerance: 86400 # file meta missing tolerance duration in seconds, default to 24hr(1d)
dropTolerance: 10800 # file belongs to dropped entity tolerance duration in seconds. 3600
removeConcurrent: 32 # number of concurrent goroutines to remove dropped s3 objects
scanInterval: 168 # garbage collection scan residue interval in hours
enableActiveStandby: false
brokerTimeout: 5000 # 5000ms, dataCoord broker rpc timeout
autoBalance: true # Enable auto balance
checkAutoBalanceConfigInterval: 10 # the interval of check auto balance config
import:
filesPerPreImportTask: 2 # The maximum number of files allowed per pre-import task.
taskRetention: 10800 # The retention period in seconds for tasks in the Completed or Failed state.
maxSizeInMBPerImportTask: 6144 # To prevent generating of small segments, we will re-group imported files. This parameter represents the sum of file sizes in each group (each ImportTask).
scheduleInterval: 2 # The interval for scheduling import, measured in seconds.
checkIntervalHigh: 2 # The interval for checking import, measured in seconds, is set to a high frequency for the import checker.
checkIntervalLow: 120 # The interval for checking import, measured in seconds, is set to a low frequency for the import checker.
maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request.
waitForIndex: true # Indicates whether the import operation waits for the completion of index building.
gracefulStopTimeout: 5 # seconds. force stop node without graceful stop
ip: # if not specified, use the first unicastable address
port: 13333
grpc:
serverMaxSendSize: 536870912
serverMaxRecvSize: 268435456
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
dataNode:
dataSync:
flowGraph:
maxQueueLength: 16 # Maximum length of task queue in flowgraph
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
maxParallelSyncMgrTasks: 256 # The max concurrent sync task number of datanode sync mgr globally
skipMode:
enable: true # Support skip some timetick message to reduce CPU usage
skipNum: 4 # Consume one for every n records skipped
coldTime: 60 # Turn on skip mode after there are only timetick msg for x seconds
segment:
insertBufSize: 16777216 # Max buffer size to flush for a single segment.
deleteBufBytes: 16777216 # Max buffer size in bytes to flush del for a single channel, default as 16MB
syncPeriod: 600 # The period to sync segments if buffer is not empty.
memory:
forceSyncEnable: true # Set true to force sync if memory usage is too high
forceSyncSegmentNum: 1 # number of segments to sync, segments with top largest buffer will be synced.
checkInterval: 3000 # the interval to check datanode memory usage, in milliseconds
forceSyncWatermark: 0.5 # memory watermark for standalone, upon reaching this watermark, segments will be synced.
timetick:
byRPC: true
interval: 500
channel:
# specify the size of global work pool of all channels
# if this parameter <= 0, will set it as the maximum number of CPUs that can be executing
# suggest to set it bigger on large collection numbers to avoid blocking
workPoolSize: -1
# specify the size of global work pool for channel checkpoint updating
# if this parameter <= 0, will set it as 10
updateChannelCheckpointMaxParallel: 10
updateChannelCheckpointInterval: 60 # the interval duration(in seconds) for datanode to update channel checkpoint of each channel
updateChannelCheckpointRPCTimeout: 20 # timeout in seconds for UpdateChannelCheckpoint RPC call
maxChannelCheckpointsPerPRC: 128 # The maximum number of channel checkpoints per UpdateChannelCheckpoint RPC.
channelCheckpointUpdateTickInSeconds: 10 # The frequency, in seconds, at which the channel checkpoint updater executes updates.
import:
maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode.
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import.
compaction:
levelZeroBatchMemoryRatio: 0.05 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop
ip: # if not specified, use the first unicastable address
port: 21124
grpc:
serverMaxSendSize: 536870912
serverMaxRecvSize: 268435456
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
# Configures the system log output.
log:
level: info # Only supports debug, info, warn, error, panic, or fatal. Default 'info'.
file:
rootPath: # root dir path to put logs, default "" means no log file will print. please adjust in embedded Milvus: /tmp/milvus/logs
maxSize: 300 # MB
maxAge: 10 # Maximum time for log retention in day.
maxBackups: 20
format: text # text or json
stdout: true # Stdout enable or not
grpc:
log:
level: WARNING
gracefulStopTimeout: 10 # second, time to wait graceful stop finish
client:
compressionEnabled: false
dialTimeout: 200
keepAliveTime: 10000
keepAliveTimeout: 20000
maxMaxAttempts: 10
initialBackoff: 0.2
maxBackoff: 10
minResetInterval: 1000
maxCancelError: 32
minSessionCheckInterval: 200
# Configure the proxy tls enable.
tls:
serverPemPath: configs/cert/server.pem
serverKeyPath: configs/cert/server.key
caPemPath: configs/cert/ca.pem
common:
defaultPartitionName: _default # default partition name for a collection
defaultIndexName: _default_idx # default index name
entityExpiration: -1 # Entity expiration in seconds, CAUTION -1 means never expire
indexSliceSize: 16 # MB
threadCoreCoefficient:
highPriority: 10 # This parameter specify how many times the number of threads is the number of cores in high priority pool
middlePriority: 5 # This parameter specify how many times the number of threads is the number of cores in middle priority pool
lowPriority: 1 # This parameter specify how many times the number of threads is the number of cores in low priority pool
buildIndexThreadPoolRatio: 0.75
DiskIndex:
MaxDegree: 56
SearchListSize: 100
PQCodeBudgetGBRatio: 0.125
BuildNumThreadsRatio: 1
SearchCacheBudgetGBRatio: 0.1
LoadNumThreadRatio: 8
BeamWidthRatio: 4
gracefulTime: 5000 # milliseconds. it represents the interval (in ms) by which the request arrival time needs to be subtracted in the case of Bounded Consistency.
gracefulStopTimeout: 1800 # seconds. it will force quit the server if the graceful stop process is not completed during this time.
storageType: remote # please adjust in embedded Milvus: local, available values are [local, remote, opendal], value minio is deprecated, use remote instead
# Default value: auto
# Valid values: [auto, avx512, avx2, avx, sse4_2]
# This configuration is only used by querynode and indexnode, it selects CPU instruction set for Searching and Index-building.
simdType: auto
security:
authorizationEnabled: false
# The superusers will ignore some system check processes,
# like the old password verification when updating the credential
superUsers:
tlsMode: 0
session:
ttl: 30 # ttl value when session granting a lease to register service
retryTimes: 30 # retry times when session sending etcd requests
locks:
metrics:
enable: false # whether gather statistics for metrics locks
threshold:
info: 500 # minimum milliseconds for printing durations in info level
warn: 1000 # minimum milliseconds for printing durations in warn level
storage:
scheme: s3
enablev2: false
ttMsgEnabled: true # Whether the instance disable sending ts messages
traceLogMode: 0 # trace request info
bloomFilterSize: 100000 # bloom filter initial size
maxBloomFalsePositive: 0.001 # max false positive rate for bloom filter
# QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable:
# 1. TT protection;
# 2. Memory protection.
# 3. Disk quota protection.
# You can enable:
# 1. DML throughput limitation;
# 2. DDL, DQL qps/rps limitation;
# 3. DQL Queue length/latency protection;
# 4. DQL result rate protection;
# If necessary, you can also manually force to deny RW requests.
quotaAndLimits:
enabled: true # `true` to enable quota and limits, `false` to disable.
# quotaCenterCollectInterval is the time interval that quotaCenter
# collects metrics from Proxies, Query cluster and Data cluster.
# seconds, (0 ~ 65536)
quotaCenterCollectInterval: 3
ddl:
enabled: false
collectionRate: -1 # qps, default no limit, rate for CreateCollection, DropCollection, LoadCollection, ReleaseCollection
partitionRate: -1 # qps, default no limit, rate for CreatePartition, DropPartition, LoadPartition, ReleasePartition
db:
collectionRate: -1 # qps of db level , default no limit, rate for CreateCollection, DropCollection, LoadCollection, ReleaseCollection
partitionRate: -1 # qps of db level, default no limit, rate for CreatePartition, DropPartition, LoadPartition, ReleasePartition
indexRate:
enabled: false
max: -1 # qps, default no limit, rate for CreateIndex, DropIndex
db:
max: -1 # qps of db level, default no limit, rate for CreateIndex, DropIndex
flushRate:
enabled: true
max: -1 # qps, default no limit, rate for flush
collection:
max: 0.1 # qps, default no limit, rate for flush at collection level.
db:
max: -1 # qps of db level, default no limit, rate for flush
compactionRate:
enabled: false
max: -1 # qps, default no limit, rate for manualCompaction
db:
max: -1 # qps of db level, default no limit, rate for manualCompaction
dml:
# dml limit rates, default no limit.
# The maximum rate will not be greater than max.
enabled: false
insertRate:
max: -1 # MB/s, default no limit
db:
max: -1 # MB/s, default no limit
collection:
max: -1 # MB/s, default no limit
partition:
max: -1 # MB/s, default no limit
upsertRate:
max: -1 # MB/s, default no limit
db:
max: -1 # MB/s, default no limit
collection:
max: -1 # MB/s, default no limit
partition:
max: -1 # MB/s, default no limit
deleteRate:
max: -1 # MB/s, default no limit
db:
max: -1 # MB/s, default no limit
collection:
max: -1 # MB/s, default no limit
partition:
max: -1 # MB/s, default no limit
bulkLoadRate:
max: -1 # MB/s, default no limit, not support yet. TODO: limit bulkLoad rate
db:
max: -1 # MB/s, default no limit, not support yet. TODO: limit db bulkLoad rate
collection:
max: -1 # MB/s, default no limit, not support yet. TODO: limit collection bulkLoad rate
partition:
max: -1 # MB/s, default no limit, not support yet. TODO: limit partition bulkLoad rate
dql:
# dql limit rates, default no limit.
# The maximum rate will not be greater than max.
enabled: false
searchRate:
max: -1 # vps (vectors per second), default no limit
db:
max: -1 # vps (vectors per second), default no limit
collection:
max: -1 # vps (vectors per second), default no limit
partition:
max: -1 # vps (vectors per second), default no limit
queryRate:
max: -1 # qps, default no limit
db:
max: -1 # qps, default no limit
collection:
max: -1 # qps, default no limit
partition:
max: -1 # qps, default no limit
limits:
maxCollectionNum: 65536
maxCollectionNumPerDB: 65536
maxInsertSize: -1 # maximum size of a single insert request, in bytes, -1 means no limit
maxResourceGroupNumOfQueryNode: 1024 # maximum number of resource groups of query nodes
limitWriting:
# forceDeny false means dml requests are allowed (except for some
# specific conditions, such as memory of nodes to water marker), true means always reject all dml requests.
forceDeny: false
ttProtection:
enabled: false
# maxTimeTickDelay indicates the backpressure for DML Operations.
# DML rates would be reduced according to the ratio of time tick delay to maxTimeTickDelay,
# if time tick delay is greater than maxTimeTickDelay, all DML requests would be rejected.
# seconds
maxTimeTickDelay: 300
memProtection:
# When memory usage > memoryHighWaterLevel, all dml requests would be rejected;
# When memoryLowWaterLevel < memory usage < memoryHighWaterLevel, reduce the dml rate;
# When memory usage < memoryLowWaterLevel, no action.
enabled: true
dataNodeMemoryLowWaterLevel: 0.85 # (0, 1], memoryLowWaterLevel in DataNodes
dataNodeMemoryHighWaterLevel: 0.95 # (0, 1], memoryHighWaterLevel in DataNodes
queryNodeMemoryLowWaterLevel: 0.85 # (0, 1], memoryLowWaterLevel in QueryNodes
queryNodeMemoryHighWaterLevel: 0.95 # (0, 1], memoryHighWaterLevel in QueryNodes
growingSegmentsSizeProtection:
# No action will be taken if the growing segments size is less than the low watermark.
# When the growing segments size exceeds the low watermark, the dml rate will be reduced,
# but the rate will not be lower than minRateRatio * dmlRate.
enabled: false
minRateRatio: 0.5
lowWaterLevel: 0.2
highWaterLevel: 0.4
diskProtection:
enabled: true # When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected;
diskQuota: -1 # MB, (0, +inf), default no limit
diskQuotaPerDB: -1 # MB, (0, +inf), default no limit
diskQuotaPerCollection: -1 # MB, (0, +inf), default no limit
diskQuotaPerPartition: -1 # MB, (0, +inf), default no limit
limitReading:
# forceDeny false means dql requests are allowed (except for some
# specific conditions, such as collection has been dropped), true means always reject all dql requests.
forceDeny: false
queueProtection:
enabled: false
# nqInQueueThreshold indicated that the system was under backpressure for Search/Query path.
# If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off
# until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1.
# int, default no limit
nqInQueueThreshold: -1
# queueLatencyThreshold indicated that the system was under backpressure for Search/Query path.
# If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off
# until the latency of queuing no longer exceeds queueLatencyThreshold.
# The latency here refers to the averaged latency over a period of time.
# milliseconds, default no limit
queueLatencyThreshold: -1
resultProtection:
enabled: false
# maxReadResultRate indicated that the system was under backpressure for Search/Query path.
# If dql result rate is greater than maxReadResultRate, search&query rates would gradually cool off
# until the read result rate no longer exceeds maxReadResultRate.
# MB/s, default no limit
maxReadResultRate: -1
maxReadResultRatePerDB: -1
maxReadResultRatePerCollection: -1
# colOffSpeed is the speed of search&query rates cool off.
# (0, 1]
coolOffSpeed: 0.9
trace:
# trace exporter type, default is stdout,
# optional values: ['noop','stdout', 'jaeger', 'otlp']
exporter: noop
# fraction of traceID based sampler,
# optional values: [0, 1]
# Fractions >= 1 will always sample. Fractions < 0 are treated as zero.
sampleFraction: 0
jaeger:
url: # when exporter is jaeger should set the jaeger's URL
otlp:
endpoint: # example: "127.0.0.1:4318"
secure: true
#when using GPU indexing, Milvus will utilize a memory pool to avoid frequent memory allocation and deallocation.
#here, you can set the size of the memory occupied by the memory pool, with the unit being MB.
#note that there is a possibility of Milvus crashing when the actual memory demand exceeds the value set by maxMemSize.
#if initMemSize and MaxMemSize both set zero,
#milvus will automatically initialize half of the available GPU memory,
#maxMemSize will the whole available GPU memory.
gpu:
initMemSize: # Gpu Memory Pool init size
maxMemSize: # Gpu Memory Pool Max size

View File

@@ -27,6 +27,7 @@ from pyspark import SparkConf, SparkContext
from comps import CustomLogger, DocPath, opea_microservices, register_microservice
from comps.dataprep.utils import (
create_upload_folder,
decode_filename,
document_loader,
encode_filename,
get_file_structure,
@@ -72,7 +73,44 @@ class MosecEmbeddings(OpenAIEmbeddings):
return [e if e is not None else empty_embedding() for e in batched_embeddings]
def ingest_data_to_milvus(doc_path: DocPath):
def ingest_chunks_to_milvus(file_name: str, chunks: List, embedder):
if logflag:
logger.info(f"[ ingest chunks ] file name: {file_name}")
# insert documents to Milvus
insert_docs = []
for chunk in chunks:
insert_docs.append(Document(page_content=chunk, metadata={partition_field_name: file_name}))
# Batch size
batch_size = 32
num_chunks = len(chunks)
for i in range(0, num_chunks, batch_size):
if logflag:
logger.info(f"[ ingest chunks ] Current batch: {i}")
batch_docs = insert_docs[i : i + batch_size]
try:
_ = Milvus.from_documents(
batch_docs,
embedder,
collection_name=COLLECTION_NAME,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
partition_key_field=partition_field_name,
)
except Exception as e:
if logflag:
logger.info(f"[ ingest chunks ] fail to ingest chunks into Milvus. error: {e}")
raise HTTPException(status_code=500, detail=f"Fail to store chunks of file {file_name}.")
if logflag:
logger.info(f"[ ingest chunks ] Docs ingested file {file_name} to Milvus collection {COLLECTION_NAME}.")
return True
def ingest_data_to_milvus(doc_path: DocPath, embedder):
"""Ingest document to Milvus."""
path = doc_path.path
file_name = path.split("/")[-1]
@@ -88,10 +126,15 @@ def ingest_data_to_milvus(doc_path: DocPath):
text_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
else:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=doc_path.chunk_size, chunk_overlap=100, add_start_index=True, separators=get_separators()
chunk_size=doc_path.chunk_size,
chunk_overlap=doc_path.chunk_overlap,
add_start_index=True,
separators=get_separators(),
)
content = document_loader(path)
if logflag:
logger.info("[ ingest data ] file content loaded")
structured_types = [".xlsx", ".csv", ".json", "jsonl"]
_, ext = os.path.splitext(path)
@@ -105,210 +148,291 @@ def ingest_data_to_milvus(doc_path: DocPath):
table_chunks = get_tables_result(path, doc_path.table_strategy)
chunks = chunks + table_chunks
if logflag:
logger.info("[ ingest data ] Done preprocessing. Created ", len(chunks), " chunks of the original file.")
logger.info(f"[ ingest data ] Done preprocessing. Created {len(chunks)} chunks of the original file.")
# Create vectorstore
if MOSEC_EMBEDDING_ENDPOINT:
# create embeddings using MOSEC endpoint service
if logflag:
logger.info(
f"[ ingest data ] MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT}, MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}"
)
embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL)
elif TEI_EMBEDDING_ENDPOINT:
# create embeddings using TEI endpoint service
if logflag:
logger.info(f"[ ingest data ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
else:
# create embeddings using local embedding model
if logflag:
logger.info(f"[ ingest data ] Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}")
embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL)
return ingest_chunks_to_milvus(file_name, chunks, embedder)
# insert documents to Milvus
insert_docs = []
for chunk in chunks:
insert_docs.append(Document(page_content=chunk, metadata={partition_field_name: file_name}))
try:
_ = Milvus.from_documents(
insert_docs,
embedder,
collection_name=COLLECTION_NAME,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
partition_key_field=partition_field_name,
)
except Exception as e:
if logflag:
logger.info(f"[ ingest data ] fail to ingest data into Milvus. error: {e}")
return False
def search_by_file(collection, file_name):
query = f"{partition_field_name} == '{file_name}'"
results = collection.query(
expr=query,
output_fields=[partition_field_name, "pk"],
)
if logflag:
logger.info(f"[ ingest data ] Docs ingested from {path} to Milvus collection {COLLECTION_NAME}.")
return True
logger.info(f"[ search by file ] searched by {file_name}")
logger.info(f"[ search by file ] {len(results)} results: {results}")
return results
async def ingest_link_to_milvus(link_list: List[str]):
# Create vectorstore
if MOSEC_EMBEDDING_ENDPOINT:
# create embeddings using MOSEC endpoint service
if logflag:
logger.info(
f"MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT},MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}"
)
embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL)
elif TEI_EMBEDDING_ENDPOINT:
# create embeddings using TEI endpoint service
if logflag:
logger.info(f"TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
else:
# create embeddings using local embedding model
if logflag:
logger.info(f"Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}")
embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL)
def search_all(collection):
results = collection.query(expr="pk >= 0", output_fields=[partition_field_name, "pk"])
if logflag:
logger.info(f"[ search all ] {len(results)} results: {results}")
return results
for link in link_list:
content = parse_html([link])[0][0]
if logflag:
logger.info(f"[ ingest link ] link: {link} content: {content}")
encoded_link = encode_filename(link)
save_path = upload_folder + encoded_link + ".txt"
if logflag:
logger.info(f"[ ingest link ] save_path: {save_path}")
await save_content_to_local_disk(save_path, content)
document = Document(page_content=content, metadata={partition_field_name: encoded_link + ".txt"})
_ = Milvus.from_documents(
document,
embedder,
collection_name=COLLECTION_NAME,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
partition_key_field=partition_field_name,
)
def delete_all_data(my_milvus):
if logflag:
logger.info("[ delete all ] deleting all data in milvus")
if my_milvus.col:
my_milvus.col.drop()
if logflag:
logger.info("[ delete all ] delete success: all data")
def delete_by_partition_field(my_milvus, partition_field):
if logflag:
logger.info(f"[ delete partition ] deleting {partition_field_name} {partition_field}")
pks = my_milvus.get_pks(f'{partition_field_name} == "{partition_field}"')
if logflag:
logger.info(f"[ delete partition ] target pks: {pks}")
res = my_milvus.delete(pks)
my_milvus.col.flush()
if logflag:
logger.info(f"[ delete partition ] delete success: {res}")
@register_microservice(name="opea_service@prepare_doc_milvus", endpoint="/v1/dataprep", host="0.0.0.0", port=6010)
async def ingest_documents(
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: int = Form(1500),
chunk_size: int = Form(1000),
chunk_overlap: int = Form(100),
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
):
if logflag:
logger.info(f"files:{files}")
logger.info(f"link_list:{link_list}")
logger.info(f"[ upload ] files:{files}")
logger.info(f"[ upload ] link_list:{link_list}")
if files and link_list:
raise HTTPException(status_code=400, detail="Provide either a file or a string list, not both.")
# Create vectorstore
if MOSEC_EMBEDDING_ENDPOINT:
# create embeddings using MOSEC endpoint service
if logflag:
logger.info(
f"[ upload ] MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT}, MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}"
)
embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL)
elif TEI_EMBEDDING_ENDPOINT:
# create embeddings using TEI endpoint service
if logflag:
logger.info(f"[ upload ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
else:
# create embeddings using local embedding model
if logflag:
logger.info(f"[ upload ] Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}")
embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL)
# define Milvus obj
my_milvus = Milvus(
embedding_function=embedder,
collection_name=COLLECTION_NAME,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
index_params=index_params,
auto_id=True,
)
if files:
if not isinstance(files, list):
files = [files]
uploaded_files = []
for file in files:
save_path = upload_folder + file.filename
encode_file = encode_filename(file.filename)
save_path = upload_folder + encode_file
if logflag:
logger.info(f"[ upload ] processing file {save_path}")
if my_milvus.col:
# check whether the file is already uploaded
try:
search_res = search_by_file(my_milvus.col, encode_file)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed when searching in Milvus db for file {file.filename}."
)
if len(search_res) > 0:
if logflag:
logger.info(f"[ upload ] File {file.filename} already exists.")
raise HTTPException(
status_code=400,
detail=f"Uploaded file {file.filename} already exists. Please change file name.",
)
await save_content_to_local_disk(save_path, file)
ingest_data_to_milvus(
DocPath(
path=save_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
),
embedder,
)
uploaded_files.append(save_path)
if logflag:
logger.info(f"Successfully saved file {save_path}")
logger.info(f"Saved file {save_path} into local disk.")
def process_files_wrapper(files):
if not isinstance(files, list):
files = [files]
for file in files:
assert ingest_data_to_milvus(
DocPath(
path=file,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)
)
# def process_files_wrapper(files):
# if not isinstance(files, list):
# files = [files]
# for file in files:
# encode_file = encode_filename(file.filename)
# save_path = upload_folder + encode_file
# ingest_data_to_milvus(
# DocPath(
# path=save_path,
# chunk_size=chunk_size,
# chunk_overlap=chunk_overlap,
# process_table=process_table,
# table_strategy=table_strategy,
# ),
# embedder
# )
try:
# Create a SparkContext
conf = SparkConf().setAppName("Parallel-dataprep").setMaster("local[*]")
sc = SparkContext(conf=conf)
# Create an RDD with parallel processing
parallel_num = min(len(uploaded_files), os.cpu_count())
rdd = sc.parallelize(uploaded_files, parallel_num)
# Perform a parallel operation
rdd_trans = rdd.map(process_files_wrapper)
rdd_trans.collect()
# Stop the SparkContext
sc.stop()
except:
# Stop the SparkContext
sc.stop()
# try:
# # Create a SparkContext
# conf = SparkConf().setAppName("Parallel-dataprep").setMaster("local[*]")
# sc = SparkContext(conf=conf)
# # Create an RDD with parallel processing
# parallel_num = min(len(uploaded_files), os.cpu_count())
# rdd = sc.parallelize(uploaded_files, parallel_num)
# print(uploaded_files)
# # Perform a parallel operation
# rdd_trans = rdd.map(process_files_wrapper)
# rdd_trans.collect()
# # Stop the SparkContext
# sc.stop()
# except:
# # Stop the SparkContext
# sc.stop()
results = {"status": 200, "message": "Data preparation succeeded"}
if logflag:
logger.info(results)
return results
if link_list:
try:
link_list = json.loads(link_list) # Parse JSON string to list
if not isinstance(link_list, list):
raise HTTPException(status_code=400, detail="link_list should be a list.")
await ingest_link_to_milvus(link_list)
link_list = json.loads(link_list) # Parse JSON string to list
if not isinstance(link_list, list):
raise HTTPException(status_code=400, detail="link_list should be a list.")
for link in link_list:
encoded_link = encode_filename(link)
if logflag:
logger.info(f"Successfully saved link list {link_list}")
results = {"status": 200, "message": "Data preparation succeeded"}
if logflag:
logger.info(results)
return results
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.")
logger.info(f"[ upload ] processing link {encoded_link}")
# check whether the link file already exists
if my_milvus.col:
try:
search_res = search_by_file(my_milvus.col, encoded_link + ".txt")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed when searching in Milvus db for link {link}.")
if len(search_res) > 0:
if logflag:
logger.info(f"[ upload ] Link {link} already exists.")
raise HTTPException(
status_code=400, detail=f"Uploaded link {link} already exists. Please change link."
)
save_path = upload_folder + encoded_link + ".txt"
content = parse_html([link])[0][0]
await save_content_to_local_disk(save_path, content)
ingest_data_to_milvus(
DocPath(
path=save_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
),
embedder,
)
if logflag:
logger.info(f"[ upload ] Successfully saved link list {link_list}")
return {"status": 200, "message": "Data preparation succeeded"}
raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")
@register_microservice(
name="opea_service@prepare_doc_milvus_file", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6011
name="opea_service@prepare_doc_milvus", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6010
)
async def rag_get_file_structure():
if logflag:
logger.info("[ dataprep - get file ] start to get file structure")
logger.info("[ get ] start to get file structure")
if not Path(upload_folder).exists():
# Create vectorstore
if MOSEC_EMBEDDING_ENDPOINT:
# create embeddings using MOSEC endpoint service
if logflag:
logger.info("No file uploaded, return empty list.")
logger.info(
f"[ get ] MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT}, MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}"
)
embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL)
elif TEI_EMBEDDING_ENDPOINT:
# create embeddings using TEI endpoint service
if logflag:
logger.info(f"[ get ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
else:
# create embeddings using local embedding model
if logflag:
logger.info(f"[ get ] Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}")
embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL)
# define Milvus obj
my_milvus = Milvus(
embedding_function=embedder,
collection_name=COLLECTION_NAME,
connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT},
index_params=index_params,
auto_id=True,
)
# collection does not exist
if not my_milvus.col:
logger.info(f"[ get ] collection {COLLECTION_NAME} does not exist.")
return []
file_content = get_file_structure(upload_folder)
if logflag:
logger.info(file_content)
return file_content
# get all files from db
try:
all_data = search_all(my_milvus.col)
except Exception as e:
raise HTTPException(status_code=500, detail="Failed when searching in Milvus db for all files.")
# return [] if no data in db
if len(all_data) == 0:
return []
def delete_all_data(my_milvus):
res_file = [res["filename"] for res in all_data]
unique_list = list(set(res_file))
if logflag:
logger.info("[ delete ] deleting all data in milvus")
my_milvus.delete(expr="pk >= 0")
my_milvus.col.flush()
if logflag:
logger.info("[ delete ] delete success: all data")
logger.info(f"[ get ] unique list from db: {unique_list}")
# construct result file list in format
file_list = []
for file_name in unique_list:
file_dict = {
"name": decode_filename(file_name),
"id": decode_filename(file_name),
"type": "File",
"parent": "",
}
file_list.append(file_dict)
def delete_by_partition_field(my_milvus, partition_field):
if logflag:
logger.info(f"[ delete ] deleting {partition_field_name} {partition_field}")
pks = my_milvus.get_pks(f'{partition_field_name} == "{partition_field}"')
if logflag:
logger.info(f"[ delete ] target pks: {pks}")
res = my_milvus.delete(pks)
my_milvus.col.flush()
if logflag:
logger.info(f"[ delete ] delete success: {res}")
logger.info(f"[ get ] final file list: {file_list}")
return file_list
@register_microservice(
name="opea_service@prepare_doc_milvus_del", endpoint="/v1/dataprep/delete_file", host="0.0.0.0", port=6012
name="opea_service@prepare_doc_milvus", endpoint="/v1/dataprep/delete_file", host="0.0.0.0", port=6010
)
async def delete_single_file(file_path: str = Body(..., embed=True)):
"""Delete file according to `file_path`.
@@ -319,23 +443,24 @@ async def delete_single_file(file_path: str = Body(..., embed=True)):
"""
if logflag:
logger.info(file_path)
# create embedder obj
# Create vectorstore
if MOSEC_EMBEDDING_ENDPOINT:
# create embeddings using MOSEC endpoint service
if logflag:
logger.info(
f"[ dataprep - del ] MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT},MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}"
f"[ delete ] MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT}, MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}"
)
embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL)
elif TEI_EMBEDDING_ENDPOINT:
# create embeddings using TEI endpoint service
if logflag:
logger.info(f"[ dataprep - del ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
logger.info(f"[ delete ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
else:
# create embeddings using local embedding model
if logflag:
logger.info(f"[ dataprep - del ] Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}")
logger.info(f"[ delete ] Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}")
embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL)
# define Milvus obj
@@ -350,51 +475,61 @@ async def delete_single_file(file_path: str = Body(..., embed=True)):
# delete all uploaded files
if file_path == "all":
if logflag:
logger.info("[ dataprep - del ] deleting all files")
logger.info("[ delete ] deleting all files")
delete_all_data(my_milvus)
remove_folder_with_ignore(upload_folder)
# delete files on local disk
try:
remove_folder_with_ignore(upload_folder)
except Exception as e:
if logflag:
logger.info(f"[ delete ] {e}. Fail to delete {upload_folder}.")
raise HTTPException(status_code=500, detail=f"Fail to delete {upload_folder}.")
if logflag:
logger.info("[ dataprep - del ] successfully delete all files.")
logger.info("[ delete ] successfully delete all files.")
create_upload_folder(upload_folder)
if logflag:
logger.info({"status": True})
logger.info("[ delete ] new upload folder created.")
return {"status": True}
encode_file_name = encode_filename(file_path)
delete_path = Path(upload_folder + "/" + encode_file_name)
if logflag:
logger.info(f"[dataprep - del] delete_path: {delete_path}")
logger.info(f"[delete] delete_path: {delete_path}")
# partially delete files
if delete_path.exists():
# file
# TODO: check existence before delete
# delete file
if delete_path.is_file():
if logflag:
logger.info(f"[dataprep - del] deleting file {encode_file_name}")
logger.info(f"[delete] deleting file {encode_file_name}")
try:
delete_by_partition_field(my_milvus, encode_file_name)
delete_path.unlink()
if logflag:
logger.info(f"[dataprep - del] file {encode_file_name} deleted")
logger.info({"status": True})
return {"status": True}
except Exception as e:
if logflag:
logger.info(f"[dataprep - del] fail to delete file {delete_path}: {e}")
logger.info({"status": False})
logger.info(f"[delete] fail to delete file {delete_path}: {e}")
return {"status": False}
# folder
delete_path.unlink()
if logflag:
logger.info(f"[delete] file {file_path} deleted")
return {"status": True}
# delete folder
else:
if logflag:
logger.info("[dataprep - del] delete folder is not supported for now.")
logger.info({"status": False})
return {"status": False}
logger.info(f"[delete] delete folder {file_path} is not supported for now.")
raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.")
else:
raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.")
if __name__ == "__main__":
create_upload_folder(upload_folder)
opea_microservices["opea_service@prepare_doc_milvus"].start()
opea_microservices["opea_service@prepare_doc_milvus_file"].start()
opea_microservices["opea_service@prepare_doc_milvus_del"].start()