diff --git a/ChatQnA/chatqna.yaml b/ChatQnA/chatqna.yaml index 3020d44cd..187f38c83 100644 --- a/ChatQnA/chatqna.yaml +++ b/ChatQnA/chatqna.yaml @@ -87,4 +87,4 @@ benchmark: rerank: top_n: 2 llm: - max_token_size: 128 # specify the output token size \ No newline at end of file + max_token_size: 128 # specify the output token size diff --git a/deploy_and_benchmark.py b/deploy_and_benchmark.py index d626d6e2e..c987d0950 100644 --- a/deploy_and_benchmark.py +++ b/deploy_and_benchmark.py @@ -1,24 +1,24 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 - -import os -import re -import sys -import yaml + +import argparse import copy import json -import time +import os +import re import shutil -import argparse import subprocess +import sys +import time from datetime import datetime -eval_path = '/home/sdp/letong/GenAIEval/' +import yaml + +eval_path = "/home/sdp/letong/GenAIEval/" sys.path.append(eval_path) from evals.benchmark.stresscli.commands.load_test import locust_runtests from kubernetes import client, config - ############################################ # load yaml # ############################################ @@ -45,7 +45,7 @@ def construct_benchmark_config(content): """Extract relevant data from the YAML based on the specified test cases.""" # Extract test suite configuration test_suite_config = content.get("benchmark", {}) - + return { # no examples param "example_name": test_suite_config.get("example_name", "chatqna"), @@ -64,7 +64,7 @@ def construct_benchmark_config(content): # new params "dataprep_chunk_size": test_suite_config.get("data_prep", {}).get("chunk_size", [1024]), "dataprep_chunk_overlap": test_suite_config.get("data_prep", {}).get("chunk_overlap", [1000]), - "retriever_algo": test_suite_config.get("retriever", {}).get("algo", 'IVF'), + "retriever_algo": test_suite_config.get("retriever", {}).get("algo", "IVF"), "retriever_fetch_k": test_suite_config.get("retriever", {}).get("fetch_k", 2), "rerank_top_n": test_suite_config.get("teirerank", {}).get("top_n", 2), "llm_max_token_size": test_suite_config.get("llm", {}).get("max_token_size", 1024), @@ -73,6 +73,7 @@ def construct_benchmark_config(content): def construct_deploy_config(deploy_config, target_node, max_batch_size=None): """Construct a new deploy config based on the target node number and optional max_batch_size. + Args: deploy_config: Original deploy config dictionary target_node: Target node number to match in the node array @@ -121,7 +122,7 @@ def construct_deploy_config(deploy_config, target_node, max_batch_size=None): ############################################ def _create_yaml_content(service, base_url, bench_target, test_phase, num_queries, test_params): """Create content for the run.yaml file.""" - + # If a load shape includes the parameter concurrent_level, # the parameter will be passed to Locust to launch fixed # number of simulated users. @@ -130,7 +131,7 @@ def _create_yaml_content(service, base_url, bench_target, test_phase, num_querie concurrency = max(1, num_queries // test_params["concurrent_level"]) else: concurrency = test_params["concurrent_level"] - + yaml_content = { "profile": { "storage": {"hostpath": test_params["test_output_dir"]}, @@ -155,7 +156,7 @@ def _create_yaml_content(service, base_url, bench_target, test_phase, num_querie "runs": [{"name": test_phase, "users": concurrency, "max-request": num_queries}], } } - + # For the following scenarios, test will stop after the specified run-time # 1) run_time is not specified in benchmark.yaml # 2) Not a warm-up run @@ -163,15 +164,13 @@ def _create_yaml_content(service, base_url, bench_target, test_phase, num_querie # however the default is 48 hours. if test_params["run_time"] is not None and test_phase != "warmup": yaml_content["profile"]["global-settings"]["run-time"] = test_params["run_time"] - + return yaml_content - - -def _create_stresscli_yaml( - example, case_type, case_params, test_params, test_phase, num_queries, base_url, ts -) -> str: + + +def _create_stresscli_yaml(example, case_type, case_params, test_params, test_phase, num_queries, base_url, ts) -> str: """Create a stresscli configuration file and persist it on disk. - + Parameters ---------- example : str @@ -190,7 +189,7 @@ def _create_stresscli_yaml( The parameters of the test ts : str Timestamp - + Returns ------- run_yaml_path : str @@ -201,15 +200,15 @@ def _create_stresscli_yaml( if "pub_med" in dataset: bench_target = "chatqna_qlist_pubmed" max_lines = dataset.split("pub_med")[-1] - os.environ['DATASET'] = f"pubmed_{max_lines}.txt" - os.environ['MAX_LINES'] = max_lines + os.environ["DATASET"] = f"pubmed_{max_lines}.txt" + os.environ["MAX_LINES"] = max_lines print("================ max_lines ==================") print(max_lines) print("-----------------------------------------------") # Generate the content of stresscli configuration file stresscli_yaml = _create_yaml_content(case_params, base_url, bench_target, test_phase, num_queries, test_params) - + # Dump the stresscli configuration file service_name = case_params.get("service_name") run_yaml_path = os.path.join( @@ -217,16 +216,16 @@ def _create_stresscli_yaml( ) with open(run_yaml_path, "w") as yaml_file: yaml.dump(stresscli_yaml, yaml_file) - + return run_yaml_path - - + + def create_benchmark_yaml(example, deployment_type, service_type, service, base_url, test_suite_config, index): """Create and save the run.yaml file for the service being tested.""" os.makedirs(test_suite_config["test_output_dir"], exist_ok=True) run_yaml_paths = [] - + # Add YAML configuration of stresscli for warm-ups warm_ups = test_suite_config["warm_ups"] if warm_ups is not None and warm_ups > 0: @@ -241,9 +240,7 @@ def create_benchmark_yaml(example, deployment_type, service_type, service, base_ if user_queries_lst is None or len(user_queries_lst) == 0: # Test stop is controlled by run time run_yaml_paths.append( - _create_stresscli_yaml( - example, service_type, service, test_suite_config, "benchmark", -1, base_url, index - ) + _create_stresscli_yaml(example, service_type, service, test_suite_config, "benchmark", -1, base_url, index) ) else: # Test stop is controlled by request count @@ -253,7 +250,7 @@ def create_benchmark_yaml(example, deployment_type, service_type, service, base_ example, service_type, service, test_suite_config, "benchmark", user_queries, base_url, index ) ) - + return run_yaml_paths @@ -263,23 +260,23 @@ def create_benchmark_yaml(example, deployment_type, service_type, service, base_ def _get_cluster_ip(service_name, namespace="default"): # Load the Kubernetes configuration config.load_kube_config() # or use config.load_incluster_config() if running inside a Kubernetes pod - + # Create an API client for the core API (which handles services) v1 = client.CoreV1Api() - + try: # Get the service object service = v1.read_namespaced_service(name=service_name, namespace=namespace) - + # Extract the Cluster IP cluster_ip = service.spec.cluster_ip - + # Extract the port number (assuming the first port, modify if necessary) if service.spec.ports: port_number = service.spec.ports[0].port # Get the first port number else: port_number = None - + return cluster_ip, port_number except client.exceptions.ApiException as e: print(f"Error fetching service: {e}") @@ -288,13 +285,13 @@ def _get_cluster_ip(service_name, namespace="default"): def _get_service_ip(service_name, deployment_type="k8s", service_ip=None, service_port=None, namespace="default"): """Get the service IP and port based on the deployment type. - + Args: service_name (str): The name of the service. deployment_type (str): The type of deployment ("k8s" or "docker"). service_ip (str): The IP address of the service (required for Docker deployment). service_port (int): The port of the service (required for Docker deployment). - + Returns: (str, int): The service IP and port. """ @@ -311,19 +308,19 @@ def _get_service_ip(service_name, deployment_type="k8s", service_ip=None, servic port = service_port else: raise ValueError("Unsupported deployment type. Use 'k8s' or 'docker'.") - + return svc_ip, port def _run_service_test(example, service_type, service, test_suite_config): print(f"[OPEA BENCHMARK] 🚀 Example: [ {example} ] Service: [ {service.get('service_name')} ], Running test...") - + # Get the service name service_name = service.get("service_name") - + # Get the deployment type from the test suite configuration deployment_type = test_suite_config.get("deployment_type", "k8s") - + # Get the service IP and port based on deployment type svc_ip, port = _get_service_ip( service_name, @@ -338,15 +335,15 @@ def _run_service_test(example, service_type, service, test_suite_config): print(svc_ip) print(port) print("-----------------------------------------------") - + base_url = f"http://{svc_ip}:{port}" endpoint = service_endpoints[example] url = f"{base_url}{endpoint}" print(f"[OPEA BENCHMARK] 🚀 Running test for {service_name} at {url}") - + # Generate a unique index based on the current time timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - + # Create the run.yaml for the service run_yaml_paths = create_benchmark_yaml( example, deployment_type, service_type, service, base_url, test_suite_config, timestamp @@ -361,30 +358,33 @@ def _run_service_test(example, service_type, service, test_suite_config): for index, run_yaml_path in enumerate(run_yaml_paths, start=1): print(f"[OPEA BENCHMARK] 🚀 The {index} time test is running, run yaml: {run_yaml_path}...") output_folders.append(locust_runtests(None, run_yaml_path)) - + print(f"[OPEA BENCHMARK] 🚀 Test completed for {service_name} at {url}") - + return output_folders def run_benchmark(benchmark_config, report=False): # Extract data parsed_data = construct_benchmark_config(benchmark_config) - os.environ['MAX_TOKENS'] = str(parsed_data['llm_max_token_size']) + os.environ["MAX_TOKENS"] = str(parsed_data["llm_max_token_size"]) print("================ parsed data ==================") print(parsed_data) print("-----------------------------------------------") test_suite_config = { - "user_queries": [1,2,4], # num of user queries set to 1 by default - "random_prompt": False, # whether to use random prompt, set to False by default - "run_time": "60m", # The max total run time for the test suite, set to 60m by default - "collect_service_metric": False, # whether to collect service metrics, set to False by default - "llm_model": "Qwen/Qwen2.5-Coder-7B-Instruct", # The LLM model used for the test - "deployment_type": "k8s", # Default is "k8s", can also be "docker" - "service_ip": None, # Leave as None for k8s, specify for Docker - "service_port": None, # Leave as None for k8s, specify for Docker - "test_output_dir": "/home/sdp/letong/GenAIExamples/benchmark_output", # The directory to store the test output - "load_shape": {"name":"constant", "params":{"constant":{"concurrent_level": 4},"poisson":{"arrival_rate":1.0}}}, + "user_queries": [1, 2, 4], # num of user queries set to 1 by default + "random_prompt": False, # whether to use random prompt, set to False by default + "run_time": "60m", # The max total run time for the test suite, set to 60m by default + "collect_service_metric": False, # whether to collect service metrics, set to False by default + "llm_model": "Qwen/Qwen2.5-Coder-7B-Instruct", # The LLM model used for the test + "deployment_type": "k8s", # Default is "k8s", can also be "docker" + "service_ip": None, # Leave as None for k8s, specify for Docker + "service_port": None, # Leave as None for k8s, specify for Docker + "test_output_dir": "/home/sdp/letong/GenAIExamples/benchmark_output", # The directory to store the test output + "load_shape": { + "name": "constant", + "params": {"constant": {"concurrent_level": 4}, "poisson": {"arrival_rate": 1.0}}, + }, "concurrent_level": 4, "arrival_rate": 1.0, "query_timeout": 120, @@ -392,13 +392,13 @@ def run_benchmark(benchmark_config, report=False): "seed": None, "namespace": "default", "dataset": parsed_data["dataset"], - "data_ratio": parsed_data["data_ratio"] + "data_ratio": parsed_data["data_ratio"], } print("============= test_suite_config ===============") print(test_suite_config) print("-----------------------------------------------") - service_type="e2e" + service_type = "e2e" dataset = None query_data = None case_data = { @@ -411,12 +411,12 @@ def run_benchmark(benchmark_config, report=False): "llm-dependency-svc", "llm-svc", "retriever-svc", - "vector-db" + "vector-db", ], - "dataset": dataset, # Activate if random_prompt=true: leave blank = default dataset(WebQuestions) or sharegpt + "dataset": dataset, # Activate if random_prompt=true: leave blank = default dataset(WebQuestions) or sharegpt "prompts": query_data, "max_output": 128, # max number of output tokens - "k": 1 # number of retrieved documents + "k": 1, # number of retrieved documents } print("================= case_data ===================") print(case_data) @@ -433,7 +433,7 @@ def run_benchmark(benchmark_config, report=False): results = get_report_results(folder) all_results[folder] = results print(f"results = {results}\n") - + return all_results @@ -532,6 +532,7 @@ def deploy_delete_labels(label, node_names=None): def _read_deploy_config(config_path): """Read and parse the deploy config file. + Args: config_path: Path to the deploy config file Returns: @@ -784,12 +785,23 @@ def _generate_helm_values(example_type, deploy_config, chart_dir, action_type, n def _update_service(release_name, chart_name, namespace, hw_values_file, deploy_values_file, update_values_file): - """Update the deployment using helm upgrade with new values. - """ + """Update the deployment using helm upgrade with new values.""" # Construct helm upgrade command - command = ["helm", "upgrade", release_name, chart_name, "--namespace", namespace, - "-f", hw_values_file, "-f", deploy_values_file, "-f", update_values_file] + command = [ + "helm", + "upgrade", + release_name, + chart_name, + "--namespace", + namespace, + "-f", + hw_values_file, + "-f", + deploy_values_file, + "-f", + update_values_file, + ] # Execute helm upgrade print(f"Running command: {' '.join(command)}") _run_kubectl_command(command) @@ -798,6 +810,7 @@ def _update_service(release_name, chart_name, namespace, hw_values_file, deploy_ def _start_service(release_name, chart_name, namespace, hw_values_file, deploy_values_file): """Deploy a Helm release with a specified name and chart. + Parameters: - release_name: The name of the Helm release. - chart_name: The Helm chart name or path. @@ -856,7 +869,7 @@ def deploy_start_services(config_file, chart_name, namespace, label, chart_dir, else: print(f"Failed to generate values.yaml: {result['message']}") return False - + print("start to read the generated values file") # Read back the generated YAML file for verification with open(values_file_path, "r") as file: @@ -864,16 +877,14 @@ def deploy_start_services(config_file, chart_name, namespace, label, chart_dir, print(file.read()) breakpoint() - + # Handle service update if specified if update_service: if not user_values: print("user_values is required for update reference") try: - _update_service( - chart_name, chart_name, namespace, hw_values_file, user_values, values_file_path - ) + _update_service(chart_name, chart_name, namespace, hw_values_file, user_values, values_file_path) return except Exception as e: print(f"Failed to update deployment: {str(e)}") @@ -881,9 +892,7 @@ def deploy_start_services(config_file, chart_name, namespace, label, chart_dir, # Deploy service if not update else: try: - _start_service( - chart_name, chart_name, namespace, hw_values_file, values_file_path - ) + _start_service(chart_name, chart_name, namespace, hw_values_file, values_file_path) return except Exception as e: print(f"Failed to start deployment: {str(e)}") @@ -894,6 +903,7 @@ def deploy_start_services(config_file, chart_name, namespace, label, chart_dir, def deploy_check_readiness(chart_name, namespace, timeout=300, interval=5, logfile="deployment.log"): """Wait until all pods in the deployment are running and ready. + Args: namespace: The Kubernetes namespace timeout: The maximum time to wait in seconds (default 120 seconds) @@ -906,14 +916,16 @@ def deploy_check_readiness(chart_name, namespace, timeout=300, interval=5, logfi # Get the list of deployments in the namespace cmd = ["kubectl", "-n", namespace, "get", "deployments", "-o", "jsonpath='{.items[*].metadata.name}'"] deployments = subprocess.check_output(cmd, text=True).strip("'\n").split() - + with open(logfile, "a") as log: log.write(f"Found deployments: {', '.join(deployments)}\n") # Loop through each deployment to check its readiness for deployment in deployments: - if not ("-" in deployment and chart_name in deployment and all(x not in deployment for x in ["ui", "nginx"])): + if not ( + "-" in deployment and chart_name in deployment and all(x not in deployment for x in ["ui", "nginx"]) + ): continue instance, app = deployment.split("-", 1) @@ -926,19 +938,30 @@ def deploy_check_readiness(chart_name, namespace, timeout=300, interval=5, logfi timer = 0 while True: cmd = [ - "kubectl", "-n", namespace, "get", "pods", - "-l", f"app.kubernetes.io/instance={instance}", - "-l", f"app.kubernetes.io/name={app}", + "kubectl", + "-n", + namespace, + "get", + "pods", + "-l", + f"app.kubernetes.io/instance={instance}", + "-l", + f"app.kubernetes.io/name={app}", "--field-selector=status.phase=Running", - "-o", "json" + "-o", + "json", ] pods = json.loads(subprocess.check_output(cmd, text=True)) - ready_pods = sum(all(c.get("ready") for c in p.get("status", {}).get("containerStatuses", [])) for p in pods["items"]) + ready_pods = sum( + all(c.get("ready") for c in p.get("status", {}).get("containerStatuses", [])) for p in pods["items"] + ) terminating_pods = sum(1 for p in pods["items"] if p.get("metadata", {}).get("deletionTimestamp")) with open(logfile, "a") as log: - log.write(f"Ready pods: {ready_pods}, Desired replicas: {desired_replicas}, Terminating pods: {terminating_pods}\n") + log.write( + f"Ready pods: {ready_pods}, Desired replicas: {desired_replicas}, Terminating pods: {terminating_pods}\n" + ) if ready_pods == desired_replicas and terminating_pods == 0: with open(logfile, "a") as log: @@ -983,7 +1006,9 @@ def deploy_stop_services(chart_name, namespace): print(f"Error occurred while uninstalling Helm release or deleting namespace: {e}") -def deploy_and_benchmark(max_batch_sizes, deploy_config, node, chart_name, namespace, chart_dir, benchmark_config, label): +def deploy_and_benchmark( + max_batch_sizes, deploy_config, node, chart_name, namespace, chart_dir, benchmark_config, label +): values_file_path = None for i, max_batch_size in enumerate(max_batch_sizes): print(f"[OPEA DEPLOYMENT] 🚀 Processing max_batch_size: {max_batch_size}") @@ -999,7 +1024,14 @@ def deploy_and_benchmark(max_batch_sizes, deploy_config, node, chart_name, names if i == 0: # First iteration: full deployment - res = deploy_start_services(config_file=temp_config_file, chart_name=chart_name, namespace=namespace, label=label, chart_dir=chart_dir, user_values=values_file_path) + res = deploy_start_services( + config_file=temp_config_file, + chart_name=chart_name, + namespace=namespace, + label=label, + chart_dir=chart_dir, + user_values=values_file_path, + ) if res["status"] == "success": values_file_path = res["filepath"] print(f"Captured values_file_path: {values_file_path}") @@ -1008,11 +1040,16 @@ def deploy_and_benchmark(max_batch_sizes, deploy_config, node, chart_name, names else: # Subsequent iterations: update services with config change - res = deploy_start_services(config_file=temp_config_file, chart_name=chart_name, namespace=namespace, chart_dir=chart_dir, user_values=values_file_path, update_service=True) + res = deploy_start_services( + config_file=temp_config_file, + chart_name=chart_name, + namespace=namespace, + chart_dir=chart_dir, + user_values=values_file_path, + update_service=True, + ) if res.returncode != 0: - print( - f"Update failed for {node} nodes configuration with max_batch_size {max_batch_size}" - ) + print(f"Update failed for {node} nodes configuration with max_batch_size {max_batch_size}") break # Skip remaining max_batch_sizes for this node # Wait for deployment to be ready @@ -1056,7 +1093,7 @@ def main(yaml_file="./ChatQnA/chatqna.yaml", target_node=None): nodes_to_process = [target_node] if target_node is not None else nodes node_names = deploy_config.get("node_name", []) namespace = deploy_config.get("namespace", "default") - label = "example="+chart_name + label = "example=" + chart_name # Pull the Helm chart chart_pull_url = f"oci://ghcr.io/opea-project/charts/{chart_name}" @@ -1064,7 +1101,7 @@ def main(yaml_file="./ChatQnA/chatqna.yaml", target_node=None): chart_dir = deploy_pull_helm_chart(chart_pull_url, version, chart_name) if not chart_dir: return - + # Run deploy and benchmark in for-loop with different nodes for node in nodes_to_process: print(f"[OPEA DEPLOYMENT] 🚀 Processing configuration for {node} nodes...") @@ -1087,7 +1124,9 @@ def main(yaml_file="./ChatQnA/chatqna.yaml", target_node=None): max_batch_sizes = [max_batch_sizes] breakpoint() - deploy_and_benchmark(max_batch_sizes, deploy_config, node, chart_name, namespace, chart_dir, benchmark_config, label) + deploy_and_benchmark( + max_batch_sizes, deploy_config, node, chart_name, namespace, chart_dir, benchmark_config, label + ) finally: # Uninstall the deployment @@ -1108,14 +1147,17 @@ def main(yaml_file="./ChatQnA/chatqna.yaml", target_node=None): shutil.rmtree(chart_dir) print("Temporary directory removed successfully.") - + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Deploy and benchmark with specific node configuration.") - parser.add_argument("yaml_file", nargs="?", type=str, default="./ChatQnA/chatqna.yaml", - help="Path to the YAML configuration file. Defaults to './ChatQnA/chatqna.yaml'") - parser.add_argument("--target-node", type=int, default=None, - help="Optional: Target number of nodes to deploy.") + parser.add_argument( + "yaml_file", + nargs="?", + type=str, + default="./ChatQnA/chatqna.yaml", + help="Path to the YAML configuration file. Defaults to './ChatQnA/chatqna.yaml'", + ) + parser.add_argument("--target-node", type=int, default=None, help="Optional: Target number of nodes to deploy.") args = parser.parse_args() main(yaml_file=args.yaml_file, target_node=args.target_node) - \ No newline at end of file