Refine Component Interface (#1106)
* Refine component interface Signed-off-by: lvliang-intel <liang1.lv@intel.com> * update env Signed-off-by: lvliang-intel <liang1.lv@intel.com> * add health check Signed-off-by: lvliang-intel <liang1.lv@intel.com> * update mulimodal embedding Signed-off-by: lvliang-intel <liang1.lv@intel.com> * update import Signed-off-by: lvliang-intel <liang1.lv@intel.com> * refine other components Signed-off-by: lvliang-intel <liang1.lv@intel.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix dataprepissue Signed-off-by: lvliang-intel <liang1.lv@intel.com> * fix tts issue Signed-off-by: lvliang-intel <liang1.lv@intel.com> * fix ci issues Signed-off-by: lvliang-intel <liang1.lv@intel.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix tts response issue Signed-off-by: lvliang-intel <liang1.lv@intel.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix comments Signed-off-by: lvliang-intel <liang1.lv@intel.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: lvliang-intel <liang1.lv@intel.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: chen, suyue <suyue.chen@intel.com>
This commit is contained in:
@@ -53,7 +53,7 @@ from comps.cores.mega.micro_service import MicroService, register_microservice,
|
||||
from comps.cores.telemetry.opea_telemetry import opea_telemetry
|
||||
|
||||
# Common
|
||||
from comps.cores.common.component import OpeaComponent, OpeaComponentController
|
||||
from comps.cores.common.component import OpeaComponent, OpeaComponentRegistry, OpeaComponentLoader
|
||||
|
||||
# Statistics
|
||||
from comps.cores.mega.base_statistics import statistics_dict, register_statistics
|
||||
|
||||
@@ -5,18 +5,22 @@ import os
|
||||
|
||||
import requests
|
||||
|
||||
from comps import CustomLogger, OpeaComponent, ServiceType
|
||||
from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, ServiceType
|
||||
|
||||
logger = CustomLogger("opea_animation")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_ANIMATION")
|
||||
class OpeaAnimation(OpeaComponent):
|
||||
"""A specialized animation component derived from OpeaComponent."""
|
||||
|
||||
def __init__(self, name: str, description: str, config: dict = None):
|
||||
super().__init__(name, ServiceType.ANIMATION.name.lower(), description, config)
|
||||
self.base_url = os.getenv("WAV2LIP_ENDPOINT", "http://localhost:7860")
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaAnimation health check failed.")
|
||||
|
||||
def invoke(self, input: str):
|
||||
"""Invokes the animation service to generate embeddings for the animation input.
|
||||
|
||||
@@ -9,7 +9,7 @@ import os
|
||||
import time
|
||||
|
||||
# GenAIComps
|
||||
from comps import CustomLogger, OpeaComponentController
|
||||
from comps import CustomLogger, OpeaComponentLoader
|
||||
from comps.animation.src.integration.opea import OpeaAnimation
|
||||
|
||||
logger = CustomLogger("opea_animation")
|
||||
@@ -24,22 +24,12 @@ from comps import (
|
||||
statistics_dict,
|
||||
)
|
||||
|
||||
# Initialize OpeaComponentController
|
||||
controller = OpeaComponentController()
|
||||
|
||||
# Register components
|
||||
try:
|
||||
# Instantiate Animation component and register it to controller
|
||||
opea_animation = OpeaAnimation(
|
||||
name="OpeaAnimation",
|
||||
description="OPEA Animation Service",
|
||||
)
|
||||
controller.register(opea_animation)
|
||||
|
||||
# Discover and activate a healthy component
|
||||
controller.discover_and_activate()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize components: {e}")
|
||||
animation_component_name = os.getenv("ANIMATION_COMPONENT_NAME", "OPEA_ANIMATION")
|
||||
# Initialize OpeaComponentLoader
|
||||
loader = OpeaComponentLoader(
|
||||
animation_component_name,
|
||||
description=f"OPEA ANIMATION Component: {animation_component_name}",
|
||||
)
|
||||
|
||||
|
||||
# Register the microservice
|
||||
@@ -56,7 +46,7 @@ except Exception as e:
|
||||
def animate(audio: Base64ByteStrDoc):
|
||||
start = time.time()
|
||||
|
||||
outfile = opea_animation.invoke(audio.byte_str)
|
||||
outfile = loader.invoke(audio.byte_str)
|
||||
if logflag:
|
||||
logger.info(f"Video generated successfully, check {outfile} for the result.")
|
||||
|
||||
|
||||
@@ -8,13 +8,14 @@ from typing import List
|
||||
import requests
|
||||
from fastapi import File, Form, UploadFile
|
||||
|
||||
from comps import CustomLogger, OpeaComponent, ServiceType
|
||||
from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, ServiceType
|
||||
from comps.cores.proto.api_protocol import AudioTranscriptionResponse
|
||||
|
||||
logger = CustomLogger("opea_whisper")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_WHISPER_ASR")
|
||||
class OpeaWhisperAsr(OpeaComponent):
|
||||
"""A specialized ASR (Automatic Speech Recognition) component derived from OpeaComponent for Whisper ASR services.
|
||||
|
||||
@@ -25,6 +26,9 @@ class OpeaWhisperAsr(OpeaComponent):
|
||||
def __init__(self, name: str, description: str, config: dict = None):
|
||||
super().__init__(name, ServiceType.ASR.name.lower(), description, config)
|
||||
self.base_url = os.getenv("ASR_ENDPOINT", "http://localhost:7066")
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaWhisperAsr health check failed.")
|
||||
|
||||
async def invoke(
|
||||
self,
|
||||
|
||||
@@ -12,7 +12,7 @@ from comps import (
|
||||
Base64ByteStrDoc,
|
||||
CustomLogger,
|
||||
LLMParamsDoc,
|
||||
OpeaComponentController,
|
||||
OpeaComponentLoader,
|
||||
ServiceType,
|
||||
opea_microservices,
|
||||
register_microservice,
|
||||
@@ -24,24 +24,9 @@ from comps.cores.proto.api_protocol import AudioTranscriptionResponse
|
||||
logger = CustomLogger("opea_asr_microservice")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
# Initialize OpeaComponentController
|
||||
controller = OpeaComponentController()
|
||||
|
||||
# Register components
|
||||
try:
|
||||
# Instantiate ASR components
|
||||
opea_whisper = OpeaWhisperAsr(
|
||||
name="OpeaWhisperAsr",
|
||||
description="OPEA Whisper ASR Service",
|
||||
)
|
||||
|
||||
# Register components with the controller
|
||||
controller.register(opea_whisper)
|
||||
|
||||
# Discover and activate a healthy component
|
||||
controller.discover_and_activate()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize components: {e}")
|
||||
asr_component_name = os.getenv("ASR_COMPONENT_NAME", "OPEA_WHISPER_ASR")
|
||||
# Initialize OpeaComponentLoader
|
||||
loader = OpeaComponentLoader(asr_component_name, description=f"OPEA ASR Component: {asr_component_name}")
|
||||
|
||||
|
||||
@register_microservice(
|
||||
@@ -69,8 +54,8 @@ async def audio_to_text(
|
||||
logger.info("ASR file uploaded.")
|
||||
|
||||
try:
|
||||
# Use the controller to invoke the active component
|
||||
asr_response = await controller.invoke(
|
||||
# Use the loader to invoke the component
|
||||
asr_response = await loader.invoke(
|
||||
file=file,
|
||||
model=model,
|
||||
language=language,
|
||||
|
||||
@@ -86,75 +86,78 @@ class OpeaComponent(ABC):
|
||||
return f"OpeaComponent(name={self.name}, type={self.type}, description={self.description})"
|
||||
|
||||
|
||||
class OpeaComponentController(ABC):
|
||||
"""The OpeaComponentController class serves as the base class for managing and orchestrating multiple
|
||||
instances of components of the same type. It provides a unified interface for routing tasks,
|
||||
registering components, and dynamically discovering available components.
|
||||
class OpeaComponentRegistry:
|
||||
"""Registry class to manage component instances.
|
||||
|
||||
Attributes:
|
||||
components (dict): A dictionary to store registered components by their unique identifiers.
|
||||
This registry allows storing, retrieving, and managing component instances by their names.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initializes the OpeaComponentController instance with an empty component registry."""
|
||||
self.components = {}
|
||||
self.active_component = None
|
||||
_registry = {}
|
||||
|
||||
def register(self, component):
|
||||
"""Registers an OpeaComponent instance to the controller.
|
||||
@classmethod
|
||||
def register(cls, name):
|
||||
"""Decorator to register a component class with a specified name.
|
||||
|
||||
Args:
|
||||
component (OpeaComponent): An instance of a subclass of OpeaComponent to be managed.
|
||||
|
||||
Raises:
|
||||
ValueError: If the component is already registered.
|
||||
:param name: The name to associate with the component class
|
||||
:return: Decorator function
|
||||
"""
|
||||
if component.name in self.components:
|
||||
raise ValueError(f"Component '{component.name}' is already registered.")
|
||||
logger.info(f"Registered component: {component.name}")
|
||||
self.components[component.name] = component
|
||||
|
||||
def discover_and_activate(self):
|
||||
"""Discovers healthy components and activates one.
|
||||
def decorator(component_class):
|
||||
if name in cls._registry:
|
||||
raise ValueError(f"A component with the name '{name}' is already registered.")
|
||||
cls._registry[name] = component_class
|
||||
return component_class
|
||||
|
||||
If multiple components are healthy, it prioritizes the first registered component.
|
||||
return decorator
|
||||
|
||||
@classmethod
|
||||
def get(cls, name):
|
||||
"""Retrieve a component class by its name.
|
||||
|
||||
:param name: The name of the component class to retrieve
|
||||
:return: The component class
|
||||
"""
|
||||
for component in self.components.values():
|
||||
if component.check_health():
|
||||
self.active_component = component
|
||||
logger.info(f"Activated component: {component.name}")
|
||||
return
|
||||
raise RuntimeError("No healthy components available.")
|
||||
if name not in cls._registry:
|
||||
raise KeyError(f"No component found with the name '{name}'.")
|
||||
return cls._registry[name]
|
||||
|
||||
@classmethod
|
||||
def unregister(cls, name):
|
||||
"""Remove a component class from the registry by its name.
|
||||
|
||||
:param name: The name of the component class to remove
|
||||
"""
|
||||
if name in cls._registry:
|
||||
del cls._registry[name]
|
||||
|
||||
|
||||
class OpeaComponentLoader:
|
||||
"""Loader class to dynamically load and invoke components.
|
||||
|
||||
This loader retrieves components from the registry and invokes their functionality.
|
||||
"""
|
||||
|
||||
def __init__(self, component_name, **kwargs):
|
||||
"""Initialize the loader with a component retrieved from the registry and instantiate it.
|
||||
|
||||
:param component_name: The name of the component to load
|
||||
:param kwargs: Additional parameters for the component's initialization
|
||||
"""
|
||||
kwargs["name"] = component_name
|
||||
|
||||
# Retrieve the component class from the registry
|
||||
component_class = OpeaComponentRegistry.get(component_name)
|
||||
|
||||
# Instantiate the component with the given arguments
|
||||
self.component = component_class(**kwargs)
|
||||
|
||||
async def invoke(self, *args, **kwargs):
|
||||
"""Invokes service accessing using the active component.
|
||||
"""Invoke the loaded component's execute method.
|
||||
|
||||
Args:
|
||||
*args: Positional arguments.
|
||||
**kwargs: Keyword arguments.
|
||||
|
||||
Returns:
|
||||
Any: The result of the service accessing.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If no active component is set.
|
||||
:param args: Positional arguments for the invoke method
|
||||
:param kwargs: Keyword arguments for the invoke method
|
||||
:return: The result of the component's invoke method
|
||||
"""
|
||||
if not self.active_component:
|
||||
raise RuntimeError("No active component. Call 'discover_and_activate' first.")
|
||||
return await self.active_component.invoke(*args, **kwargs)
|
||||
|
||||
def list_components(self):
|
||||
"""Lists all registered components.
|
||||
|
||||
Returns:
|
||||
list: A list of component names that are currently registered.
|
||||
"""
|
||||
return self.components.keys()
|
||||
|
||||
def __repr__(self):
|
||||
"""Provides a string representation of the controller and its registered components.
|
||||
|
||||
Returns:
|
||||
str: A string representation of the OpeaComponentController instance.
|
||||
"""
|
||||
return f"OpeaComponentController(registered_components={self.list_components()})"
|
||||
if not hasattr(self.component, "invoke"):
|
||||
raise AttributeError(f"The component '{self.component}' does not have an 'invoke' method.")
|
||||
return await self.component.invoke(*args, **kwargs)
|
||||
|
||||
@@ -15,7 +15,7 @@ from langchain_core.documents import Document
|
||||
from langchain_milvus.vectorstores import Milvus
|
||||
from langchain_text_splitters import HTMLHeaderTextSplitter
|
||||
|
||||
from comps import CustomLogger, DocPath, OpeaComponent, ServiceType
|
||||
from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType
|
||||
from comps.dataprep.src.utils import (
|
||||
create_upload_folder,
|
||||
document_loader,
|
||||
@@ -157,6 +157,7 @@ def delete_by_partition_field(my_milvus, partition_field):
|
||||
logger.info(f"[ delete partition ] delete success: {res}")
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_DATAPREP_MILVUS")
|
||||
class OpeaMilvusDataprep(OpeaComponent):
|
||||
"""A specialized dataprep component derived from OpeaComponent for milvus dataprep services.
|
||||
|
||||
@@ -167,6 +168,9 @@ class OpeaMilvusDataprep(OpeaComponent):
|
||||
def __init__(self, name: str, description: str, config: dict = None):
|
||||
super().__init__(name, ServiceType.DATAPREP.name.lower(), description, config)
|
||||
self.embedder = self._initialize_embedder()
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaMilvusDataprep health check failed.")
|
||||
|
||||
def _initialize_embedder(self):
|
||||
if logflag:
|
||||
|
||||
@@ -18,7 +18,7 @@ from langchain_text_splitters import HTMLHeaderTextSplitter
|
||||
from redis.commands.search.field import TextField
|
||||
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
|
||||
|
||||
from comps import CustomLogger, DocPath, OpeaComponent, ServiceType
|
||||
from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType
|
||||
from comps.dataprep.src.utils import (
|
||||
create_upload_folder,
|
||||
document_loader,
|
||||
@@ -215,6 +215,7 @@ def ingest_data_to_redis(doc_path: DocPath):
|
||||
return ingest_chunks_to_redis(file_name, chunks)
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_DATAPREP_REDIS")
|
||||
class OpeaRedisDataprep(OpeaComponent):
|
||||
"""A specialized dataprep component derived from OpeaComponent for redis dataprep services.
|
||||
|
||||
@@ -227,6 +228,9 @@ class OpeaRedisDataprep(OpeaComponent):
|
||||
self.client = self._initialize_client()
|
||||
self.data_index_client = self.client.ft(INDEX_NAME)
|
||||
self.key_index_client = self.client.ft(KEY_INDEX_NAME)
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaRedisDataprep health check failed.")
|
||||
|
||||
def _initialize_client(self) -> redis.Redis:
|
||||
if logflag:
|
||||
|
||||
@@ -1,33 +0,0 @@
|
||||
# Copyright (C) 2024 Intel Corporation
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
|
||||
import os
|
||||
|
||||
from comps import CustomLogger, OpeaComponentController
|
||||
|
||||
logger = CustomLogger("opea_dataprep_controller")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
|
||||
class OpeaDataprepController(OpeaComponentController):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def invoke(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
async def ingest_files(self, *args, **kwargs):
|
||||
if logflag:
|
||||
logger.info("[ dataprep controller ] ingest files")
|
||||
return await self.active_component.ingest_files(*args, **kwargs)
|
||||
|
||||
async def get_files(self, *args, **kwargs):
|
||||
if logflag:
|
||||
logger.info("[ dataprep controller ] get files")
|
||||
return await self.active_component.get_files(*args, **kwargs)
|
||||
|
||||
async def delete_files(self, *args, **kwargs):
|
||||
if logflag:
|
||||
logger.info("[ dataprep controller ] delete files")
|
||||
return await self.active_component.delete_files(*args, **kwargs)
|
||||
33
comps/dataprep/src/opea_dataprep_loader.py
Normal file
33
comps/dataprep/src/opea_dataprep_loader.py
Normal file
@@ -0,0 +1,33 @@
|
||||
# Copyright (C) 2024 Intel Corporation
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
|
||||
import os
|
||||
|
||||
from comps import CustomLogger, OpeaComponentLoader
|
||||
|
||||
logger = CustomLogger("opea_dataprep_loader")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
|
||||
class OpeaDataprepLoader(OpeaComponentLoader):
|
||||
def __init__(self, component_name, **kwargs):
|
||||
super().__init__(component_name=component_name, **kwargs)
|
||||
|
||||
def invoke(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
async def ingest_files(self, *args, **kwargs):
|
||||
if logflag:
|
||||
logger.info("[ dataprep loader ] ingest files")
|
||||
return await self.component.ingest_files(*args, **kwargs)
|
||||
|
||||
async def get_files(self, *args, **kwargs):
|
||||
if logflag:
|
||||
logger.info("[ dataprep loader ] get files")
|
||||
return await self.component.get_files(*args, **kwargs)
|
||||
|
||||
async def delete_files(self, *args, **kwargs):
|
||||
if logflag:
|
||||
logger.info("[ dataprep loader ] delete files")
|
||||
return await self.component.delete_files(*args, **kwargs)
|
||||
@@ -9,7 +9,7 @@ from typing import List, Optional, Union
|
||||
from fastapi import Body, File, Form, UploadFile
|
||||
from integrations.milvus import OpeaMilvusDataprep
|
||||
from integrations.redis import OpeaRedisDataprep
|
||||
from opea_dataprep_controller import OpeaDataprepController
|
||||
from opea_dataprep_loader import OpeaDataprepLoader
|
||||
|
||||
from comps import (
|
||||
CustomLogger,
|
||||
@@ -23,32 +23,14 @@ from comps.dataprep.src.utils import create_upload_folder
|
||||
|
||||
logger = CustomLogger("opea_dataprep_microservice")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
dataprep_type = os.getenv("DATAPREP_TYPE", False)
|
||||
upload_folder = "./uploaded_files/"
|
||||
# Initialize Controller
|
||||
controller = OpeaDataprepController()
|
||||
|
||||
|
||||
# Register components
|
||||
try:
|
||||
# Instantiate Dataprep components and register it to controller
|
||||
if dataprep_type == "redis":
|
||||
redis_dataprep = OpeaRedisDataprep(
|
||||
name="OpeaRedisDataprep",
|
||||
description="OPEA Redis Dataprep Service",
|
||||
)
|
||||
controller.register(redis_dataprep)
|
||||
elif dataprep_type == "milvus":
|
||||
milvus_dataprep = OpeaMilvusDataprep(
|
||||
name="OpeaMilvusDataprep",
|
||||
description="OPEA Milvus Dataprep Service",
|
||||
)
|
||||
controller.register(milvus_dataprep)
|
||||
|
||||
# Discover and activate a healthy component
|
||||
controller.discover_and_activate()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize components: {e}")
|
||||
dataprep_component_name = os.getenv("DATAPREP_COMPONENT_NAME", "OPEA_DATAPREP_REDIS")
|
||||
# Initialize OpeaComponentLoader
|
||||
loader = OpeaDataprepLoader(
|
||||
dataprep_component_name,
|
||||
description=f"OPEA DATAPREP Component: {dataprep_component_name}",
|
||||
)
|
||||
|
||||
|
||||
@register_microservice(
|
||||
@@ -74,10 +56,8 @@ async def ingest_files(
|
||||
logger.info(f"[ ingest ] link_list:{link_list}")
|
||||
|
||||
try:
|
||||
# Use the controller to invoke the active component
|
||||
response = await controller.ingest_files(
|
||||
files, link_list, chunk_size, chunk_overlap, process_table, table_strategy
|
||||
)
|
||||
# Use the loader to invoke the component
|
||||
response = await loader.ingest_files(files, link_list, chunk_size, chunk_overlap, process_table, table_strategy)
|
||||
# Log the result if logging is enabled
|
||||
if logflag:
|
||||
logger.info(f"[ ingest ] Output generated: {response}")
|
||||
@@ -104,8 +84,8 @@ async def get_files():
|
||||
logger.info("[ get ] start to get ingested files")
|
||||
|
||||
try:
|
||||
# Use the controller to invoke the active component
|
||||
response = await controller.get_files()
|
||||
# Use the loader to invoke the component
|
||||
response = await loader.get_files()
|
||||
# Log the result if logging is enabled
|
||||
if logflag:
|
||||
logger.info(f"[ get ] ingested files: {response}")
|
||||
@@ -132,8 +112,8 @@ async def delete_files(file_path: str = Body(..., embed=True)):
|
||||
logger.info("[ delete ] start to delete ingested files")
|
||||
|
||||
try:
|
||||
# Use the controller to invoke the active component
|
||||
response = await controller.delete_files(file_path)
|
||||
# Use the loader to invoke the component
|
||||
response = await loader.delete_files(file_path)
|
||||
# Log the result if logging is enabled
|
||||
if logflag:
|
||||
logger.info(f"[ delete ] deleted result: {response}")
|
||||
|
||||
@@ -31,6 +31,7 @@ services:
|
||||
http_proxy: ${http_proxy}
|
||||
https_proxy: ${https_proxy}
|
||||
MULTIMODAL_EMBEDDING: true
|
||||
EMBEDDING_COMPONENT_NAME: "OPEA_MULTIMODAL_EMBEDDING_BRIDGETOWER"
|
||||
MMEI_EMBEDDING_ENDPOINT: ${MMEI_EMBEDDING_ENDPOINT}
|
||||
MM_EMBEDDING_PORT_MICROSERVICE: ${MM_EMBEDDING_PORT_MICROSERVICE}
|
||||
restart: unless-stopped
|
||||
|
||||
@@ -35,6 +35,7 @@ services:
|
||||
http_proxy: ${http_proxy}
|
||||
https_proxy: ${https_proxy}
|
||||
MULTIMODAL_EMBEDDING: true
|
||||
EMBEDDING_COMPONENT_NAME: "OPEA_MULTIMODAL_EMBEDDING_BRIDGETOWER"
|
||||
MMEI_EMBEDDING_ENDPOINT: ${MMEI_EMBEDDING_ENDPOINT}
|
||||
MM_EMBEDDING_PORT_MICROSERVICE: ${MM_EMBEDDING_PORT_MICROSERVICE}
|
||||
restart: unless-stopped
|
||||
|
||||
@@ -14,6 +14,7 @@ services:
|
||||
https_proxy: ${https_proxy}
|
||||
PG_EMBEDDING_MODEL_NAME: ${PG_EMBEDDING_MODEL_NAME}
|
||||
PREDICTIONGUARD_API_KEY: ${PREDICTIONGUARD_API_KEY}
|
||||
EMBEDDING_COMPONENT_NAME: "OPEA_PREDICTIONGUARD_EMBEDDING"
|
||||
restart: unless-stopped
|
||||
|
||||
networks:
|
||||
|
||||
@@ -33,6 +33,7 @@ services:
|
||||
http_proxy: ${http_proxy}
|
||||
https_proxy: ${https_proxy}
|
||||
TEI_EMBEDDING_ENDPOINT: ${TEI_EMBEDDING_ENDPOINT}
|
||||
EMBEDDING_COMPONENT_NAME: "OPEA_TEI_EMBEDDING"
|
||||
depends_on:
|
||||
tei-embedding-service:
|
||||
condition: service_healthy
|
||||
|
||||
@@ -7,12 +7,22 @@ import os
|
||||
|
||||
import requests
|
||||
|
||||
from comps import CustomLogger, EmbedMultimodalDoc, MultimodalDoc, OpeaComponent, ServiceType, TextDoc, TextImageDoc
|
||||
from comps import (
|
||||
CustomLogger,
|
||||
EmbedMultimodalDoc,
|
||||
MultimodalDoc,
|
||||
OpeaComponent,
|
||||
OpeaComponentRegistry,
|
||||
ServiceType,
|
||||
TextDoc,
|
||||
TextImageDoc,
|
||||
)
|
||||
|
||||
logger = CustomLogger("opea_multimodal_embedding_bridgetower")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_MULTIMODAL_EMBEDDING_BRIDGETOWER")
|
||||
class OpeaMultimodalEmbeddingBrigeTower(OpeaComponent):
|
||||
"""A specialized embedding component derived from OpeaComponent for local deployed BrigeTower multimodal embedding services.
|
||||
|
||||
@@ -23,6 +33,9 @@ class OpeaMultimodalEmbeddingBrigeTower(OpeaComponent):
|
||||
def __init__(self, name: str, description: str, config: dict = None):
|
||||
super().__init__(name, ServiceType.EMBEDDING.name.lower(), description, config)
|
||||
self.base_url = os.getenv("MMEI_EMBEDDING_ENDPOINT", "http://localhost:8080")
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaMultimodalEmbeddingBrigeTower health check failed.")
|
||||
|
||||
async def invoke(self, input: MultimodalDoc) -> EmbedMultimodalDoc:
|
||||
"""Invokes the embedding service to generate embeddings for the provided input.
|
||||
|
||||
@@ -8,7 +8,7 @@ from typing import List, Union
|
||||
import requests
|
||||
from huggingface_hub import AsyncInferenceClient
|
||||
|
||||
from comps import CustomLogger, OpeaComponent, ServiceType
|
||||
from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, ServiceType
|
||||
from comps.cores.mega.utils import get_access_token
|
||||
from comps.cores.proto.api_protocol import EmbeddingRequest, EmbeddingResponse
|
||||
|
||||
@@ -19,6 +19,7 @@ CLIENTID = os.getenv("CLIENTID")
|
||||
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_TEI_EMBEDDING")
|
||||
class OpeaTEIEmbedding(OpeaComponent):
|
||||
"""A specialized embedding component derived from OpeaComponent for TEI embedding services.
|
||||
|
||||
@@ -32,6 +33,10 @@ class OpeaTEIEmbedding(OpeaComponent):
|
||||
self.base_url = os.getenv("TEI_EMBEDDING_ENDPOINT", "http://localhost:8080")
|
||||
self.client = self._initialize_client()
|
||||
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaTEIEmbedding health check failed.")
|
||||
|
||||
def _initialize_client(self) -> AsyncInferenceClient:
|
||||
"""Initializes the AsyncInferenceClient."""
|
||||
access_token = (
|
||||
|
||||
@@ -6,13 +6,14 @@ import os
|
||||
|
||||
from predictionguard import PredictionGuard
|
||||
|
||||
from comps import CustomLogger, OpeaComponent, ServiceType
|
||||
from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, ServiceType
|
||||
from comps.cores.proto.api_protocol import EmbeddingRequest, EmbeddingResponse, EmbeddingResponseData
|
||||
|
||||
logger = CustomLogger("predictionguard_embedding")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_PREDICTIONGUARD_EMBEDDING")
|
||||
class PredictionguardEmbedding(OpeaComponent):
|
||||
"""A specialized embedding component derived from OpeaComponent for interacting with Prediction Guard services.
|
||||
|
||||
@@ -30,6 +31,9 @@ class PredictionguardEmbedding(OpeaComponent):
|
||||
else:
|
||||
logger.info("No PredictionGuard API KEY provided, client not instantiated")
|
||||
self.model_name = os.getenv("PG_EMBEDDING_MODEL_NAME", "bridgetower-large-itm-mlm-itc")
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("PredictionguardEmbedding health check failed.")
|
||||
|
||||
def check_health(self) -> bool:
|
||||
"""Checks the health of the Prediction Guard embedding service.
|
||||
|
||||
@@ -9,7 +9,7 @@ from integrations.predictionguard_embedding import PredictionguardEmbedding
|
||||
|
||||
from comps import (
|
||||
CustomLogger,
|
||||
OpeaComponentController,
|
||||
OpeaComponentLoader,
|
||||
ServiceType,
|
||||
opea_microservices,
|
||||
register_microservice,
|
||||
@@ -21,29 +21,12 @@ from comps.cores.proto.api_protocol import EmbeddingRequest, EmbeddingResponse
|
||||
logger = CustomLogger("opea_embedding_microservice")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
# Initialize OpeaComponentController
|
||||
controller = OpeaComponentController()
|
||||
|
||||
# Register components
|
||||
try:
|
||||
# Instantiate Embedding components and register it to controller
|
||||
if os.getenv("TEI_EMBEDDING_ENDPOINT"):
|
||||
opea_tei_embedding = OpeaTEIEmbedding(
|
||||
name="OpeaTEIEmbedding",
|
||||
description="OPEA TEI Embedding Service",
|
||||
)
|
||||
controller.register(opea_tei_embedding)
|
||||
if os.getenv("PREDICTIONGUARD_API_KEY"):
|
||||
predictionguard_embedding = PredictionguardEmbedding(
|
||||
name="PredictionGuardEmbedding",
|
||||
description="Prediction Guard Embedding Service",
|
||||
)
|
||||
controller.register(predictionguard_embedding)
|
||||
|
||||
# Discover and activate a healthy component
|
||||
controller.discover_and_activate()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize components: {e}")
|
||||
embedding_component_name = os.getenv("EMBEDDING_COMPONENT_NAME", "OPEA_TEI_EMBEDDING")
|
||||
# Initialize OpeaComponentLoader
|
||||
loader = OpeaComponentLoader(
|
||||
embedding_component_name,
|
||||
description=f"OPEA Embedding Component: {embedding_component_name}",
|
||||
)
|
||||
|
||||
|
||||
@register_microservice(
|
||||
@@ -62,8 +45,8 @@ async def embedding(input: EmbeddingRequest) -> EmbeddingResponse:
|
||||
logger.info(f"Input received: {input}")
|
||||
|
||||
try:
|
||||
# Use the controller to invoke the active component
|
||||
embedding_response = await controller.invoke(input)
|
||||
# Use the loader to invoke the component
|
||||
embedding_response = await loader.invoke(input)
|
||||
|
||||
# Log the result if logging is enabled
|
||||
if logflag:
|
||||
|
||||
@@ -10,7 +10,7 @@ from comps import (
|
||||
CustomLogger,
|
||||
EmbedMultimodalDoc,
|
||||
MultimodalDoc,
|
||||
OpeaComponentController,
|
||||
OpeaComponentLoader,
|
||||
ServiceType,
|
||||
opea_microservices,
|
||||
register_microservice,
|
||||
@@ -21,23 +21,12 @@ from comps import (
|
||||
logger = CustomLogger("opea_multimodal_embedding_microservice")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
# Initialize OpeaComponentController
|
||||
controller = OpeaComponentController()
|
||||
|
||||
# Register components
|
||||
try:
|
||||
# Instantiate Embedding components and register it to controller
|
||||
if os.getenv("MMEI_EMBEDDING_ENDPOINT"):
|
||||
opea_mm_embedding_bt = OpeaMultimodalEmbeddingBrigeTower(
|
||||
name="OpeaMultimodalEmbeddingBrigeTower",
|
||||
description="OPEA Multimodal Embedding Service using BridgeTower",
|
||||
)
|
||||
controller.register(opea_mm_embedding_bt)
|
||||
|
||||
# Discover and activate a healthy component
|
||||
controller.discover_and_activate()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize components: {e}")
|
||||
embedding_component_name = os.getenv("EMBEDDING_COMPONENT_NAME", "OPEA_MULTIMODAL_EMBEDDING_BRIDGETOWER")
|
||||
# Initialize OpeaComponentLoader
|
||||
loader = OpeaComponentLoader(
|
||||
embedding_component_name,
|
||||
description=f"OPEA Embedding Component: {embedding_component_name}",
|
||||
)
|
||||
|
||||
port = int(os.getenv("MM_EMBEDDING_PORT_MICROSERVICE", 6000))
|
||||
|
||||
@@ -60,8 +49,8 @@ async def embedding(input: MultimodalDoc) -> EmbedMultimodalDoc:
|
||||
logger.info(f"Input received: {input}")
|
||||
|
||||
try:
|
||||
# Use the controller to invoke the active component
|
||||
embedding_response = await controller.invoke(input)
|
||||
# Use the loader to invoke the component
|
||||
embedding_response = await loader.invoke(input)
|
||||
|
||||
# Log the result if logging is enabled
|
||||
if logflag:
|
||||
|
||||
@@ -4,7 +4,7 @@ import base64
|
||||
import os
|
||||
import threading
|
||||
|
||||
from comps import CustomLogger, OpeaComponent, SDImg2ImgInputs, ServiceType
|
||||
from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, SDImg2ImgInputs, ServiceType
|
||||
|
||||
logger = CustomLogger("opea_imagetoimage")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
@@ -65,6 +65,7 @@ def initialize(
|
||||
initialized = True
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_IMAGE2IMAGE")
|
||||
class OpeaImageToImage(OpeaComponent):
|
||||
"""A specialized ImageToImage component derived from OpeaComponent for Stable Diffusion model .
|
||||
|
||||
@@ -94,6 +95,9 @@ class OpeaImageToImage(OpeaComponent):
|
||||
)
|
||||
self.pipe = pipe
|
||||
self.seed = seed
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaImageToImage health check failed.")
|
||||
|
||||
def invoke(self, input: SDImg2ImgInputs):
|
||||
"""Invokes the ImageToImage service to generate Images for the provided input.
|
||||
|
||||
@@ -8,7 +8,7 @@ import time
|
||||
|
||||
from comps import (
|
||||
CustomLogger,
|
||||
OpeaComponentController,
|
||||
OpeaComponentLoader,
|
||||
SDImg2ImgInputs,
|
||||
SDOutputs,
|
||||
ServiceType,
|
||||
@@ -23,15 +23,7 @@ args = None
|
||||
|
||||
logger = CustomLogger("image2image")
|
||||
|
||||
|
||||
# Initialize OpeaComponentController
|
||||
controller = OpeaComponentController()
|
||||
|
||||
# Register components
|
||||
# try:
|
||||
|
||||
# except Exception as e:
|
||||
# logger.error(f"Failed to initialize components: {e}")
|
||||
component_loader = None
|
||||
|
||||
|
||||
@register_microservice(
|
||||
@@ -46,7 +38,7 @@ controller = OpeaComponentController()
|
||||
@register_statistics(names=["opea_service@image2image"])
|
||||
def image2image(input: SDImg2ImgInputs):
|
||||
start = time.time()
|
||||
results = controller.invoke(input)
|
||||
results = component_loader.invoke(input)
|
||||
statistics_dict["opea_service@image2image"].append_latency(time.time() - start, None)
|
||||
return SDOutputs(images=results)
|
||||
|
||||
@@ -61,21 +53,18 @@ if __name__ == "__main__":
|
||||
parser.add_argument("--bf16", action="store_true")
|
||||
|
||||
args = parser.parse_args()
|
||||
# Instantiate Animation component and register it to controller
|
||||
opea_imagetoimage = OpeaImageToImage(
|
||||
name="OpeaImageToImage",
|
||||
description="OPEA Image To Image Service",
|
||||
seed=args.seed,
|
||||
model_name_or_path=args.model_name_or_path,
|
||||
device=args.device,
|
||||
token=args.token,
|
||||
bf16=args.bf16,
|
||||
use_hpu_graphs=args.use_hpu_graphs,
|
||||
)
|
||||
image2image_component_name = os.getenv("IMAGE2IMAGE_COMPONENT_NAME", "OPEA_IMAGE2IMAGE")
|
||||
# Register components
|
||||
try:
|
||||
# Initialize OpeaComponentLoader
|
||||
component_loader = OpeaComponentLoader(
|
||||
image2image_component_name,
|
||||
description=f"OPEA IMAGE2IMAGE Component: {image2image_component_name}",
|
||||
config=args.__dict__,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize components: {e}")
|
||||
exit(1)
|
||||
|
||||
controller.register(opea_imagetoimage)
|
||||
|
||||
# Discover and activate a healthy component
|
||||
controller.discover_and_activate()
|
||||
logger.info("Image2image server started.")
|
||||
opea_microservices["opea_service@image2image"].start()
|
||||
|
||||
@@ -7,11 +7,12 @@ import torch
|
||||
from diffusers import StableVideoDiffusionPipeline
|
||||
from diffusers.utils import export_to_video, load_image
|
||||
|
||||
from comps import CustomLogger, ImagesPath, OpeaComponent, ServiceType, VideoPath
|
||||
from comps import CustomLogger, ImagesPath, OpeaComponent, OpeaComponentRegistry, ServiceType, VideoPath
|
||||
|
||||
logger = CustomLogger("opea")
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_IMAGE2VIDEO")
|
||||
class OpeaImage2video(OpeaComponent):
|
||||
"""A specialized image2video component derived from OpeaComponent for image2video services."""
|
||||
|
||||
@@ -42,6 +43,9 @@ class OpeaImage2video(OpeaComponent):
|
||||
else:
|
||||
raise NotImplementedError(f"Only support cpu and hpu device now, device {self.device} not supported.")
|
||||
logger.info("Stable Video Diffusion model initialized.")
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaImage2video health check failed.")
|
||||
|
||||
async def invoke(self, input: ImagesPath) -> VideoPath:
|
||||
"""Invokes the image2video service to generate video(s) for the provided input.
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
|
||||
from integrations.opea import OpeaImage2video
|
||||
@@ -9,7 +10,7 @@ from integrations.opea import OpeaImage2video
|
||||
from comps import (
|
||||
CustomLogger,
|
||||
ImagesPath,
|
||||
OpeaComponentController,
|
||||
OpeaComponentLoader,
|
||||
ServiceType,
|
||||
VideoPath,
|
||||
opea_microservices,
|
||||
@@ -20,8 +21,7 @@ from comps import (
|
||||
|
||||
logger = CustomLogger("opea_image2video_microservice")
|
||||
|
||||
# Initialize OpeaComponentController
|
||||
controller = OpeaComponentController()
|
||||
component_loader = None
|
||||
|
||||
|
||||
@register_microservice(
|
||||
@@ -37,8 +37,8 @@ controller = OpeaComponentController()
|
||||
async def image2video(input: ImagesPath):
|
||||
start = time.time()
|
||||
try:
|
||||
# Use the controller to invoke the active component
|
||||
results = await controller.invoke(input)
|
||||
# Use the loader to invoke the component
|
||||
results = await component_loader.invoke(input)
|
||||
statistics_dict["opea_service@image2video"].append_latency(time.time() - start, None)
|
||||
return results
|
||||
except Exception as e:
|
||||
@@ -56,21 +56,18 @@ if __name__ == "__main__":
|
||||
parser.add_argument("--seed", type=int, default=42)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
image2video_component_name = os.getenv("IMAGE2VIDEO_COMPONENT_NAME", "OPEA_IMAGE2VIDEO")
|
||||
# Register components
|
||||
try:
|
||||
# Instantiate Image2video components
|
||||
opea_image2video = OpeaImage2video(
|
||||
name="OpeaImage2video", description="OPEA Image2video Service", config=args.__dict__
|
||||
# Initialize OpeaComponentLoader
|
||||
component_loader = OpeaComponentLoader(
|
||||
image2video_component_name,
|
||||
description=f"OPEA IMAGE2VIDEO Component: {image2video_component_name}",
|
||||
config=args.__dict__,
|
||||
)
|
||||
|
||||
# Register components with the controller
|
||||
controller.register(opea_image2video)
|
||||
|
||||
# Discover and activate a healthy component
|
||||
controller.discover_and_activate()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize components: {e}")
|
||||
exit(1)
|
||||
|
||||
logger.info("Image2video server started.")
|
||||
opea_microservices["opea_service@image2video"].start()
|
||||
|
||||
@@ -41,6 +41,7 @@ services:
|
||||
LLM_ENDPOINT: ${LLM_ENDPOINT}
|
||||
HF_TOKEN: ${HF_TOKEN}
|
||||
LLM_MODEL_ID: ${LLM_MODEL_ID}
|
||||
LLM_COMPONENT_NAME: "OPEA_LLM"
|
||||
restart: unless-stopped
|
||||
|
||||
networks:
|
||||
|
||||
@@ -39,6 +39,7 @@ services:
|
||||
vLLM_ENDPOINT: ${vLLM_ENDPOINT}
|
||||
HF_TOKEN: ${HF_TOKEN}
|
||||
LLM_MODEL: ${LLM_MODEL}
|
||||
LLM_COMPONENT_NAME: "OPEA_LLM"
|
||||
restart: unless-stopped
|
||||
|
||||
networks:
|
||||
|
||||
@@ -9,7 +9,7 @@ from fastapi.responses import StreamingResponse
|
||||
from langchain_core.prompts import PromptTemplate
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
from comps import CustomLogger, LLMParamsDoc, OpeaComponent, SearchedDoc, ServiceType
|
||||
from comps import CustomLogger, LLMParamsDoc, OpeaComponent, OpeaComponentRegistry, SearchedDoc, ServiceType
|
||||
from comps.cores.mega.utils import ConfigError, get_access_token, load_model_configs
|
||||
from comps.cores.proto.api_protocol import ChatCompletionRequest
|
||||
|
||||
@@ -47,6 +47,7 @@ def get_llm_endpoint():
|
||||
raise ConfigError(f"Input model {MODEL_NAME} not present in model_configs")
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_LLM")
|
||||
class OPEALLM(OpeaComponent):
|
||||
"""A specialized OPEA LLM component derived from OpeaComponent for interacting with TGI/vLLM services based on OpenAI API.
|
||||
|
||||
@@ -57,6 +58,9 @@ class OPEALLM(OpeaComponent):
|
||||
def __init__(self, name: str, description: str, config: dict = None):
|
||||
super().__init__(name, ServiceType.LLM.name.lower(), description, config)
|
||||
self.client = self._initialize_client()
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OPEALLM health check failed.")
|
||||
|
||||
def _initialize_client(self) -> AsyncOpenAI:
|
||||
"""Initializes the AsyncOpenAI."""
|
||||
|
||||
@@ -10,7 +10,7 @@ from integrations.opea import OPEALLM
|
||||
from comps import (
|
||||
CustomLogger,
|
||||
LLMParamsDoc,
|
||||
OpeaComponentController,
|
||||
OpeaComponentLoader,
|
||||
SearchedDoc,
|
||||
ServiceType,
|
||||
opea_microservices,
|
||||
@@ -23,23 +23,10 @@ from comps.cores.proto.api_protocol import ChatCompletionRequest
|
||||
logger = CustomLogger("llm")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
# Initialize OpeaComponentController
|
||||
controller = OpeaComponentController()
|
||||
|
||||
# Register components
|
||||
try:
|
||||
opea_llm = OPEALLM(
|
||||
name="OPEALLM",
|
||||
description="OPEA LLM Service, compatible with OpenAI API",
|
||||
)
|
||||
|
||||
# Register components with the controller
|
||||
controller.register(opea_llm)
|
||||
|
||||
# Discover and activate a healthy component
|
||||
controller.discover_and_activate()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize components: {e}")
|
||||
llm_component_name = os.getenv("LLM_COMPONENT_NAME", "OPEA_LLM")
|
||||
# Initialize OpeaComponentLoader
|
||||
loader = OpeaComponentLoader(llm_component_name, description=f"OPEA LLM Component: {llm_component_name}")
|
||||
|
||||
|
||||
@register_microservice(
|
||||
@@ -58,8 +45,8 @@ async def llm_generate(input: Union[LLMParamsDoc, ChatCompletionRequest, Searche
|
||||
logger.info(input)
|
||||
|
||||
try:
|
||||
# Use the controller to invoke the active component
|
||||
response = await controller.invoke(input)
|
||||
# Use the loader to invoke the component
|
||||
response = await loader.invoke(input)
|
||||
# Record statistics
|
||||
statistics_dict["opea_service@llm"].append_latency(time.time() - start, None)
|
||||
return response
|
||||
|
||||
@@ -8,7 +8,7 @@ from typing import Union
|
||||
import requests
|
||||
from huggingface_hub import AsyncInferenceClient
|
||||
|
||||
from comps import CustomLogger, LLMParamsDoc, SearchedDoc, ServiceType
|
||||
from comps import CustomLogger, LLMParamsDoc, OpeaComponentRegistry, SearchedDoc, ServiceType
|
||||
from comps.cores.common.component import OpeaComponent
|
||||
from comps.cores.mega.utils import get_access_token
|
||||
from comps.cores.proto.api_protocol import (
|
||||
@@ -27,6 +27,7 @@ CLIENTID = os.getenv("CLIENTID")
|
||||
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_RERANK_TEI")
|
||||
class OPEATEIReranking(OpeaComponent):
|
||||
"""A specialized reranking component derived from OpeaComponent for TEI reranking services.
|
||||
|
||||
@@ -38,6 +39,9 @@ class OPEATEIReranking(OpeaComponent):
|
||||
super().__init__(name, ServiceType.RERANK.name.lower(), description, config)
|
||||
self.base_url = os.getenv("TEI_RERANKING_ENDPOINT", "http://localhost:8808")
|
||||
self.client = self._initialize_client()
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OPEATEIReranking health check failed.")
|
||||
|
||||
def _initialize_client(self) -> AsyncInferenceClient:
|
||||
"""Initializes the AsyncInferenceClient."""
|
||||
|
||||
@@ -9,7 +9,7 @@ from integrations.opea_tei import OPEATEIReranking
|
||||
|
||||
from comps import (
|
||||
CustomLogger,
|
||||
OpeaComponentController,
|
||||
OpeaComponentLoader,
|
||||
ServiceType,
|
||||
opea_microservices,
|
||||
register_microservice,
|
||||
@@ -21,24 +21,10 @@ from comps.cores.proto.docarray import LLMParamsDoc, LVMVideoDoc, RerankedDoc, S
|
||||
|
||||
logger = CustomLogger("opea_reranking_microservice")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
rerank_type = os.getenv("RERANK_TYPE", False)
|
||||
controller = OpeaComponentController()
|
||||
|
||||
# Register components
|
||||
try:
|
||||
# Instantiate reranking components
|
||||
if rerank_type == "tei":
|
||||
opea_tei_reranking = OPEATEIReranking(
|
||||
name="OPEATEIReranking",
|
||||
description="OPEA TEI Reranking Service",
|
||||
)
|
||||
# Register components with the controller
|
||||
controller.register(opea_tei_reranking)
|
||||
|
||||
# Discover and activate a healthy component
|
||||
controller.discover_and_activate()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize components: {e}")
|
||||
rerank_component_name = os.getenv("RERANK_COMPONENT_NAME", "OPEA_RERANK_TEI")
|
||||
# Initialize OpeaComponentLoader
|
||||
loader = OpeaComponentLoader(rerank_component_name, description=f"OPEA RERANK Component: {rerank_component_name}")
|
||||
|
||||
|
||||
@register_microservice(
|
||||
@@ -59,8 +45,8 @@ async def reranking(
|
||||
logger.info(f"Input received: {input}")
|
||||
|
||||
try:
|
||||
# Use the controller to invoke the active component
|
||||
reranking_response = await controller.invoke(input)
|
||||
# Use the loader to invoke the component
|
||||
reranking_response = await loader.invoke(input)
|
||||
|
||||
# Log the result if logging is enabled
|
||||
if logflag:
|
||||
|
||||
@@ -8,7 +8,7 @@ from typing import List, Optional
|
||||
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings, OpenAIEmbeddings
|
||||
from langchain_milvus.vectorstores import Milvus
|
||||
|
||||
from comps import CustomLogger, EmbedDoc, OpeaComponent, SearchedDoc, ServiceType, TextDoc
|
||||
from comps import CustomLogger, EmbedDoc, OpeaComponent, OpeaComponentRegistry, SearchedDoc, ServiceType
|
||||
|
||||
from .config import COLLECTION_NAME, INDEX_PARAMS, LOCAL_EMBEDDING_MODEL, MILVUS_URI, TEI_EMBEDDING_ENDPOINT
|
||||
|
||||
@@ -16,6 +16,7 @@ logger = CustomLogger("milvus_retrievers")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_RETRIEVER_MILVUS")
|
||||
class OpeaMilvusRetriever(OpeaComponent):
|
||||
"""A specialized retriever component derived from OpeaComponent for milvus retriever services.
|
||||
|
||||
@@ -28,6 +29,9 @@ class OpeaMilvusRetriever(OpeaComponent):
|
||||
|
||||
self.embedder = self._initialize_embedder()
|
||||
self.client = self._initialize_client()
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaMilvusRetriever health check failed.")
|
||||
|
||||
def _initialize_embedder(self):
|
||||
if TEI_EMBEDDING_ENDPOINT:
|
||||
|
||||
@@ -7,7 +7,15 @@ from typing import Union
|
||||
|
||||
from langchain_community.vectorstores import Redis
|
||||
|
||||
from comps import CustomLogger, EmbedDoc, EmbedMultimodalDoc, OpeaComponent, SearchedDoc, ServiceType
|
||||
from comps import (
|
||||
CustomLogger,
|
||||
EmbedDoc,
|
||||
EmbedMultimodalDoc,
|
||||
OpeaComponent,
|
||||
OpeaComponentRegistry,
|
||||
SearchedDoc,
|
||||
ServiceType,
|
||||
)
|
||||
from comps.cores.proto.api_protocol import ChatCompletionRequest, EmbeddingResponse, RetrievalRequest, RetrievalResponse
|
||||
|
||||
from .config import BRIDGE_TOWER_EMBEDDING, EMBED_MODEL, INDEX_NAME, REDIS_URL, TEI_EMBEDDING_ENDPOINT
|
||||
@@ -16,6 +24,7 @@ logger = CustomLogger("redis_retrievers")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_RETRIEVER_REDIS")
|
||||
class OpeaRedisRetriever(OpeaComponent):
|
||||
"""A specialized retriever component derived from OpeaComponent for redis retriever services.
|
||||
|
||||
@@ -43,6 +52,9 @@ class OpeaRedisRetriever(OpeaComponent):
|
||||
|
||||
self.embeddings = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)
|
||||
self.client = self._initialize_client()
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaRedisRetriever health check failed.")
|
||||
|
||||
def _initialize_client(self) -> Redis:
|
||||
"""Initializes the redis client."""
|
||||
|
||||
@@ -13,7 +13,7 @@ from comps import (
|
||||
CustomLogger,
|
||||
EmbedDoc,
|
||||
EmbedMultimodalDoc,
|
||||
OpeaComponentController,
|
||||
OpeaComponentLoader,
|
||||
SearchedDoc,
|
||||
SearchedMultimodalDoc,
|
||||
ServiceType,
|
||||
@@ -32,31 +32,13 @@ from comps.cores.proto.api_protocol import (
|
||||
|
||||
logger = CustomLogger("opea_retrievers_microservice")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
retriever_type = os.getenv("RETRIEVER_TYPE", False)
|
||||
# Initialize Controller
|
||||
controller = OpeaComponentController()
|
||||
|
||||
|
||||
# Register components
|
||||
try:
|
||||
# Instantiate Retrievers components and register it to controller
|
||||
if retriever_type == "redis":
|
||||
redis_retriever = OpeaRedisRetriever(
|
||||
name="OpeaRedisRetriever",
|
||||
description="OPEA Redis Retriever Service",
|
||||
)
|
||||
controller.register(redis_retriever)
|
||||
elif retriever_type == "milvus":
|
||||
milvus_retriever = OpeaMilvusRetriever(
|
||||
name="OpeaMilvusRetriever",
|
||||
description="OPEA Milvus Retriever Service",
|
||||
)
|
||||
controller.register(milvus_retriever)
|
||||
|
||||
# Discover and activate a healthy component
|
||||
controller.discover_and_activate()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize components: {e}")
|
||||
retriever_component_name = os.getenv("RETRIEVER_COMPONENT_NAME", "OPEA_RETRIEVER_REDIS")
|
||||
# Initialize OpeaComponentLoader
|
||||
loader = OpeaComponentLoader(
|
||||
retriever_component_name,
|
||||
description=f"OPEA RETRIEVER Component: {retriever_component_name}",
|
||||
)
|
||||
|
||||
|
||||
@register_microservice(
|
||||
@@ -76,8 +58,8 @@ async def ingest_files(
|
||||
logger.info(f"[ retrieval ] input:{input}")
|
||||
|
||||
try:
|
||||
# Use the controller to invoke the active component
|
||||
response = await controller.invoke(input)
|
||||
# Use the loader to invoke the component
|
||||
response = await loader.invoke(input)
|
||||
|
||||
# return different response format
|
||||
retrieved_docs = []
|
||||
|
||||
@@ -13,7 +13,7 @@ from pydantic import BaseModel, Field
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from comps import CustomLogger, OpeaComponent, ServiceType
|
||||
from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, ServiceType
|
||||
from comps.text2sql.src.integrations.sql_agent import CustomSQLDatabaseToolkit, custom_create_sql_agent
|
||||
|
||||
logger = CustomLogger("comps-text2sql")
|
||||
@@ -69,6 +69,7 @@ class Input(BaseModel):
|
||||
conn_str: Optional[PostgresConnection] = None
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_TEXT2SQL")
|
||||
class OpeaText2SQL(OpeaComponent):
|
||||
"""A specialized text to sql component derived from OpeaComponent for interacting with TGI services and Database.
|
||||
|
||||
@@ -78,6 +79,9 @@ class OpeaText2SQL(OpeaComponent):
|
||||
|
||||
def __init__(self, name: str, description: str, config: dict = None):
|
||||
super().__init__(name, ServiceType.TEXT2SQL.name.lower(), description, config)
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaText2SQL health check failed.")
|
||||
|
||||
async def check_health(self) -> bool:
|
||||
"""Checks the health of the TGI service.
|
||||
|
||||
@@ -7,7 +7,7 @@ import sys
|
||||
|
||||
from fastapi.exceptions import HTTPException
|
||||
|
||||
from comps import CustomLogger, OpeaComponentController, opea_microservices, register_microservice
|
||||
from comps import CustomLogger, OpeaComponentLoader, opea_microservices, register_microservice
|
||||
from comps.text2sql.src.integrations.opea import Input, OpeaText2SQL
|
||||
|
||||
cur_path = pathlib.Path(__file__).parent.resolve()
|
||||
@@ -17,23 +17,12 @@ sys.path.append(comps_path)
|
||||
logger = CustomLogger("text2sql")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
try:
|
||||
# Initialize OpeaComponentController
|
||||
controller = OpeaComponentController()
|
||||
|
||||
# Register components
|
||||
text2sql_agent = OpeaText2SQL(
|
||||
name="Text2SQL",
|
||||
description="Text2SQL Service",
|
||||
)
|
||||
|
||||
# Register components with the controller
|
||||
controller.register(text2sql_agent)
|
||||
|
||||
# Discover and activate a healthy component
|
||||
controller.discover_and_activate()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize components: {e}")
|
||||
text2sql_component_name = os.getenv("TEXT2SQL_COMPONENT_NAME", "OPEA_TEXT2SQL")
|
||||
# Initialize OpeaComponentLoader
|
||||
loader = OpeaComponentLoader(
|
||||
text2sql_component_name,
|
||||
description=f"OPEA RERANK Component: {text2sql_component_name}",
|
||||
)
|
||||
|
||||
|
||||
@register_microservice(
|
||||
@@ -55,7 +44,7 @@ async def execute_agent(input: Input):
|
||||
dict: A dictionary with a 'result' key containing the output of the executed SQL query.
|
||||
"""
|
||||
if input.conn_str.test_connection():
|
||||
response = await controller.invoke(input)
|
||||
response = await loader.invoke(input)
|
||||
# response = "a"
|
||||
return {"result": response}
|
||||
else:
|
||||
|
||||
@@ -1,19 +1,21 @@
|
||||
# Copyright (C) 2024 Intel Corporation
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
|
||||
import requests
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from comps import CustomLogger, OpeaComponent, ServiceType
|
||||
from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, ServiceType
|
||||
from comps.cores.proto.api_protocol import AudioSpeechRequest
|
||||
|
||||
logger = CustomLogger("opea_gptsovits")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_GPTSOVITS_TTS")
|
||||
class OpeaGptsovitsTts(OpeaComponent):
|
||||
"""A specialized TTS (Text To Speech) component derived from OpeaComponent for GPTSoVITS TTS services.
|
||||
|
||||
@@ -24,6 +26,9 @@ class OpeaGptsovitsTts(OpeaComponent):
|
||||
def __init__(self, name: str, description: str, config: dict = None):
|
||||
super().__init__(name, ServiceType.TTS.name.lower(), description, config)
|
||||
self.base_url = os.getenv("TTS_ENDPOINT", "http://localhost:9880")
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaGptsovitsTts health check failed.")
|
||||
|
||||
async def invoke(
|
||||
self,
|
||||
@@ -34,7 +39,11 @@ class OpeaGptsovitsTts(OpeaComponent):
|
||||
# make sure you change the refer_wav_path locally
|
||||
request.voice = None
|
||||
|
||||
response = requests.post(f"{self.base_url}/v1/audio/speech", data=request.json())
|
||||
response = await asyncio.to_thread(
|
||||
requests.post,
|
||||
f"{self.base_url}/v1/audio/speech",
|
||||
json=request.dict(),
|
||||
)
|
||||
return response
|
||||
|
||||
def check_health(self) -> bool:
|
||||
|
||||
@@ -1,19 +1,21 @@
|
||||
# Copyright (C) 2024 Intel Corporation
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
|
||||
import requests
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from comps import CustomLogger, OpeaComponent, ServiceType
|
||||
from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, ServiceType
|
||||
from comps.cores.proto.api_protocol import AudioSpeechRequest
|
||||
|
||||
logger = CustomLogger("opea_speecht5")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
|
||||
@OpeaComponentRegistry.register("OPEA_SPEECHT5_TTS")
|
||||
class OpeaSpeecht5Tts(OpeaComponent):
|
||||
"""A specialized TTS (Text To Speech) component derived from OpeaComponent for SpeechT5 TTS services.
|
||||
|
||||
@@ -24,8 +26,11 @@ class OpeaSpeecht5Tts(OpeaComponent):
|
||||
def __init__(self, name: str, description: str, config: dict = None):
|
||||
super().__init__(name, ServiceType.TTS.name.lower(), description, config)
|
||||
self.base_url = os.getenv("TTS_ENDPOINT", "http://localhost:7055")
|
||||
health_status = self.check_health()
|
||||
if not health_status:
|
||||
logger.error("OpeaSpeecht5Tts health check failed.")
|
||||
|
||||
def invoke(
|
||||
async def invoke(
|
||||
self,
|
||||
request: AudioSpeechRequest,
|
||||
) -> requests.models.Response:
|
||||
@@ -36,7 +41,11 @@ class OpeaSpeecht5Tts(OpeaComponent):
|
||||
if request.voice not in ["default", "male"] or request.speed != 1.0:
|
||||
logger.warning("Currently parameter 'speed' can only be 1.0 and 'voice' can only be default or male!")
|
||||
|
||||
response = requests.post(f"{self.base_url}/v1/audio/speech", data=request.json())
|
||||
response = await asyncio.to_thread(
|
||||
requests.post,
|
||||
f"{self.base_url}/v1/audio/speech",
|
||||
json=request.dict(),
|
||||
)
|
||||
return response
|
||||
|
||||
def check_health(self) -> bool:
|
||||
|
||||
@@ -10,7 +10,7 @@ from integrations.opea_speecht5 import OpeaSpeecht5Tts
|
||||
|
||||
from comps import (
|
||||
CustomLogger,
|
||||
OpeaComponentController,
|
||||
OpeaComponentLoader,
|
||||
ServiceType,
|
||||
opea_microservices,
|
||||
register_microservice,
|
||||
@@ -22,30 +22,9 @@ from comps.cores.proto.api_protocol import AudioSpeechRequest
|
||||
logger = CustomLogger("opea_tts_microservice")
|
||||
logflag = os.getenv("LOGFLAG", False)
|
||||
|
||||
# Initialize OpeaComponentController
|
||||
controller = OpeaComponentController()
|
||||
|
||||
# Register components
|
||||
try:
|
||||
# Instantiate TTS components
|
||||
opea_speecht5 = OpeaSpeecht5Tts(
|
||||
name="OpeaSpeecht5Tts",
|
||||
description="OPEA SpeechT5 TTS Service",
|
||||
)
|
||||
|
||||
opea_gptsovits = OpeaGptsovitsTts(
|
||||
name="OpeaGptsovitsTts",
|
||||
description="OPEA GPTSoVITS TTS Service",
|
||||
)
|
||||
|
||||
# Register components with the controller
|
||||
controller.register(opea_speecht5)
|
||||
controller.register(opea_gptsovits)
|
||||
|
||||
# Discover and activate a healthy component
|
||||
controller.discover_and_activate()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize components: {e}")
|
||||
tts_component_name = os.getenv("TTS_COMPONENT_NAME", "OPEA_SPEECHT5_TTS")
|
||||
# Initialize OpeaComponentLoader
|
||||
loader = OpeaComponentLoader(tts_component_name, description=f"OPEA TTS Component: {tts_component_name}")
|
||||
|
||||
|
||||
async def stream_forwarder(response):
|
||||
@@ -71,8 +50,8 @@ async def text_to_speech(request: AudioSpeechRequest) -> StreamingResponse:
|
||||
logger.info(f"Input received: {request}")
|
||||
|
||||
try:
|
||||
# Use the controller to invoke the active component
|
||||
tts_response = controller.invoke(request)
|
||||
# Use the loader to invoke the component
|
||||
tts_response = await loader.invoke(request)
|
||||
if logflag:
|
||||
logger.info(tts_response)
|
||||
statistics_dict["opea_service@tts"].append_latency(time.time() - start, None)
|
||||
|
||||
Reference in New Issue
Block a user