Fix benchmark scripts (#1517)

- Align benchmark default config:  
1. Update default helm charts version. 
2. Add `# mandatory` comment. 
3. Update default model ID for LLM. 
- Fix deploy issue:  
1. Support different `replicaCount` for w/ w/o rerank test. 
2. Add `max_num_seqs` for vllm. 
3. Add resource setting for tune mode. 

- Fix Benchmark issue: 
1. Update `user_queries` and `concurrency` setting. 
2. Remove invalid parameters. 
3. Fix `dataset` and `prompt` setting. And dataset ingest into db. 
5. Fix the benchmark hang issue with large user queries. Update `"processes": 16` will fix this issue. 
6. Update the eval_path setting logical. 
- Optimize benchmark readme. 
- Optimize the log path to make the logs more readable. 

Signed-off-by: chensuyue <suyue.chen@intel.com>
Signed-off-by: Cathy Zhang <cathy.zhang@intel.com>
Signed-off-by: letonghan <letong.han@intel.com>
This commit is contained in:
chen, suyue
2025-02-28 10:30:54 +08:00
committed by GitHub
parent 78f8ae524d
commit 3d8009aa91
6 changed files with 641 additions and 207 deletions

View File

@@ -2,9 +2,9 @@
# SPDX-License-Identifier: Apache-2.0
import os
import sys
from datetime import datetime
import requests
import yaml
from evals.benchmark.stresscli.commands.load_test import locust_runtests
from kubernetes import client, config
@@ -25,17 +25,15 @@ def construct_benchmark_config(test_suite_config):
"""Extract relevant data from the YAML based on the specified test cases."""
return {
"concurrency": test_suite_config.get("concurrency", []),
"totoal_query_num": test_suite_config.get("user_queries", []),
"duration:": test_suite_config.get("duration:", []),
"query_num_per_concurrency": test_suite_config.get("query_num_per_concurrency", []),
"possion": test_suite_config.get("possion", False),
"possion_arrival_rate": test_suite_config.get("possion_arrival_rate", 1.0),
"user_queries": test_suite_config.get("user_queries", [1]),
"concurrency": test_suite_config.get("concurrency", [1]),
"load_shape_type": test_suite_config.get("load_shape_type", "constant"),
"poisson_arrival_rate": test_suite_config.get("poisson_arrival_rate", 1.0),
"warmup_iterations": test_suite_config.get("warmup_iterations", 10),
"seed": test_suite_config.get("seed", None),
"test_cases": test_suite_config.get("test_cases", ["chatqnafixed"]),
"user_queries": test_suite_config.get("user_queries", [1]),
"query_token_size": test_suite_config.get("query_token_size", 128),
"bench_target": test_suite_config.get("bench_target", ["chatqnafixed"]),
"dataset": test_suite_config.get("dataset", ""),
"prompt": test_suite_config.get("prompt", [10]),
"llm_max_token_size": test_suite_config.get("llm", {}).get("max_token_size", [128]),
}
@@ -97,17 +95,11 @@ def _get_service_ip(service_name, deployment_type="k8s", service_ip=None, servic
return svc_ip, port
def _create_yaml_content(service, base_url, bench_target, test_phase, num_queries, test_params):
def _create_yaml_content(service, base_url, bench_target, test_phase, num_queries, test_params, concurrency=1):
"""Create content for the run.yaml file."""
# If a load shape includes the parameter concurrent_level,
# the parameter will be passed to Locust to launch fixed
# number of simulated users.
concurrency = 1
if num_queries >= 0:
concurrency = max(1, num_queries // test_params["concurrent_level"])
else:
concurrency = test_params["concurrent_level"]
# calculate the number of concurrent users
concurrent_level = int(num_queries // concurrency)
import importlib.util
@@ -116,16 +108,21 @@ def _create_yaml_content(service, base_url, bench_target, test_phase, num_querie
print(spec)
# get folder path of opea-eval
eval_path = None
import pkg_resources
for dist in pkg_resources.working_set:
if "opea-eval" in dist.project_name:
eval_path = dist.location
eval_path = os.getenv("EVAL_PATH", "")
if not eval_path:
print("Fail to load opea-eval package. Please install it first.")
import pkg_resources
for dist in pkg_resources.working_set:
if "opea-eval" in dist.project_name:
eval_path = dist.location
break
if not eval_path:
print("Fail to find the opea-eval package. Please set/install it first.")
exit(1)
load_shape = test_params["load_shape"]
load_shape["params"]["constant"] = {"concurrent_level": concurrent_level}
yaml_content = {
"profile": {
"storage": {"hostpath": test_params["test_output_dir"]},
@@ -133,8 +130,9 @@ def _create_yaml_content(service, base_url, bench_target, test_phase, num_querie
"tool": "locust",
"locustfile": os.path.join(eval_path, "evals/benchmark/stresscli/locust/aistress.py"),
"host": base_url,
"run-time": test_params["run_time"],
"stop-timeout": test_params["query_timeout"],
"processes": 2,
"processes": 16, # set to 2 by default
"namespace": test_params["namespace"],
"bench-target": bench_target,
"service-metric-collect": test_params["collect_service_metric"],
@@ -145,42 +143,38 @@ def _create_yaml_content(service, base_url, bench_target, test_phase, num_querie
"seed": test_params.get("seed", None),
"llm-model": test_params["llm_model"],
"deployment-type": test_params["deployment_type"],
"load-shape": test_params["load_shape"],
"load-shape": load_shape,
},
"runs": [{"name": test_phase, "users": concurrency, "max-request": num_queries}],
}
}
# For the following scenarios, test will stop after the specified run-time
if test_params["run_time"] is not None and test_phase != "warmup":
yaml_content["profile"]["global-settings"]["run-time"] = test_params["run_time"]
return yaml_content
def _create_stresscli_confs(case_params, test_params, test_phase, num_queries, base_url, ts) -> str:
def _create_stresscli_confs(case_params, test_params, test_phase, num_queries, base_url, ts, concurrency=1) -> str:
"""Create a stresscli configuration file and persist it on disk."""
stresscli_confs = []
# Get the workload
test_cases = test_params["test_cases"]
for test_case in test_cases:
bench_target = test_params["bench_target"]
for i, b_target in enumerate(bench_target):
stresscli_conf = {}
print(test_case)
if isinstance(test_case, str):
bench_target = test_case
elif isinstance(test_case, dict):
bench_target = list(test_case.keys())[0]
dataset_conf = test_case[bench_target]
if bench_target == "chatqna_qlist_pubmed":
max_lines = dataset_conf["dataset"].split("pub_med")[-1]
stresscli_conf["envs"] = {"DATASET": f"pubmed_{max_lines}.txt", "MAX_LINES": max_lines}
print(f"[OPEA BENCHMARK] 🚀 Running test for {b_target} in phase {test_phase} for {num_queries} queries")
if len(test_params["dataset"]) > i:
stresscli_conf["envs"] = {"DATASET": test_params["dataset"][i], "MAX_LINES": str(test_params["prompt"][i])}
else:
stresscli_conf["envs"] = {"MAX_LINES": str(test_params["prompt"][i])}
# Generate the content of stresscli configuration file
stresscli_yaml = _create_yaml_content(case_params, base_url, bench_target, test_phase, num_queries, test_params)
stresscli_yaml = _create_yaml_content(
case_params, base_url, b_target, test_phase, num_queries, test_params, concurrency
)
# Dump the stresscli configuration file
service_name = case_params.get("service_name")
max_output = case_params.get("max_output")
run_yaml_path = os.path.join(
test_params["test_output_dir"], f"run_{service_name}_{ts}_{test_phase}_{num_queries}_{bench_target}.yaml"
test_params["test_output_dir"],
f"run_{test_phase}_{service_name}_{num_queries}_{b_target}_{max_output}_{ts}.yaml",
)
with open(run_yaml_path, "w") as yaml_file:
yaml.dump(stresscli_yaml, yaml_file)
@@ -207,15 +201,79 @@ def create_stresscli_confs(service, base_url, test_suite_config, index):
stresscli_confs.extend(_create_stresscli_confs(service, test_suite_config, "benchmark", -1, base_url, index))
else:
# Test stop is controlled by request count
for user_queries in user_queries_lst:
for i, user_query in enumerate(user_queries_lst):
concurrency_list = test_suite_config["concurrency"]
user_query *= test_suite_config["node_num"]
stresscli_confs.extend(
_create_stresscli_confs(service, test_suite_config, "benchmark", user_queries, base_url, index)
_create_stresscli_confs(
service,
test_suite_config,
"benchmark",
user_query,
base_url,
index,
concurrency=concurrency_list[i],
)
)
return stresscli_confs
def _run_service_test(example, service, test_suite_config):
def ingest_data_to_db(service, dataset, namespace):
"""Ingest data into the database."""
for service_name in service.get("service_list"):
if "data" in service_name:
# Ingest data into the database
print(f"[OPEA BENCHMARK] 🚀 Ingesting data into the database for {service_name}...")
try:
svc_ip, port = _get_service_ip(service_name, "k8s", None, None, namespace)
url = f"http://{svc_ip}:{port}/v1/dataprep/ingest"
files = {"files": open(dataset, "rb")}
response = requests.post(url, files=files)
if response.status_code != 200:
print(f"Error ingesting data: {response.text}. Status code: {response.status_code}")
return False
if "Data preparation succeeded" not in response.text:
print(f"Error ingesting data: {response.text}. Response: {response}")
return False
except Exception as e:
print(f"Error ingesting data: {e}")
return False
print(f"[OPEA BENCHMARK] 🚀 Data ingestion completed for {service_name}.")
break
return True
def clear_db(service, namespace):
"""Delete all files from the database."""
for service_name in service.get("service_list"):
if "data" in service_name:
# Delete data from the database
try:
svc_ip, port = _get_service_ip(service_name, "k8s", None, None, namespace)
url = f"http://{svc_ip}:{port}/v1/dataprep/delete"
data = {"file_path": "all"}
print(f"[OPEA BENCHMARK] 🚀 Deleting data from the database for {service_name} with {url}")
response = requests.post(url, json=data, headers={"Content-Type": "application/json"})
if response.status_code != 200:
print(f"Error deleting data: {response.text}. Status code: {response.status_code}")
return False
if "true" not in response.text:
print(f"Error deleting data: {response.text}. Response: {response}")
return False
except Exception as e:
print(f"Error deleting data: {e}")
return False
print(f"[OPEA BENCHMARK] 🚀 Data deletion completed for {service_name}.")
break
return True
def _run_service_test(example, service, test_suite_config, namespace):
"""Run the test for a specific service and example."""
print(f"[OPEA BENCHMARK] 🚀 Example: [ {example} ] Service: [ {service.get('service_name')} ], Running test...")
@@ -251,44 +309,94 @@ def _run_service_test(example, service, test_suite_config):
run_yaml_path = stresscli_conf["run_yaml_path"]
print(f"[OPEA BENCHMARK] 🚀 The {index} time test is running, run yaml: {run_yaml_path}...")
os.environ["MAX_TOKENS"] = str(service.get("max_output"))
dataset = None
if stresscli_conf.get("envs") is not None:
for key, value in stresscli_conf.get("envs").items():
os.environ[key] = value
if key == "DATASET":
dataset = value
output_folders.append(locust_runtests(None, run_yaml_path))
if dataset:
# Ingest data into the database for single run of benchmark
result = ingest_data_to_db(service, dataset, namespace)
if not result:
print(f"[OPEA BENCHMARK] 🚀 Data ingestion failed for {service_name}.")
exit(1)
else:
print(f"[OPEA BENCHMARK] 🚀 Dataset is not specified for {service_name}. Check the benchmark.yaml again.")
# Run the benchmark test and append the output folder to the list
print("[OPEA BENCHMARK] 🚀 Start locust_runtests at", datetime.now().strftime("%Y%m%d_%H%M%S"))
locust_output = locust_runtests(None, run_yaml_path)
print(f"[OPEA BENCHMARK] 🚀 locust_output origin name is {locust_output}")
# Rename the output folder to include the index
new_output_path = os.path.join(
os.path.dirname(run_yaml_path), f"{os.path.splitext(os.path.basename(run_yaml_path))[0]}_output"
)
os.rename(locust_output, new_output_path)
print(f"[OPEA BENCHMARK] 🚀 locust new_output_path is {new_output_path}")
output_folders.append(new_output_path)
print("[OPEA BENCHMARK] 🚀 End locust_runtests at", datetime.now().strftime("%Y%m%d_%H%M%S"))
# Delete all files from the database after the test
result = clear_db(service, namespace)
print("[OPEA BENCHMARK] 🚀 End of clean up db", datetime.now().strftime("%Y%m%d_%H%M%S"))
if not result:
print(f"[OPEA BENCHMARK] 🚀 Data deletion failed for {service_name}.")
exit(1)
print(f"[OPEA BENCHMARK] 🚀 Test completed for {service_name} at {url}")
return output_folders
def run_benchmark(benchmark_config, chart_name, namespace, llm_model=None, report=False):
def run_benchmark(benchmark_config, chart_name, namespace, node_num=1, llm_model=None, report=False, output_dir=None):
"""Run the benchmark test for the specified helm chart and configuration.
Args:
benchmark_config (dict): The benchmark configuration.
chart_name (str): The name of the helm chart.
namespace (str): The namespace to deploy the chart.
node_num (int): The number of nodes of current deployment.
llm_model (str): The LLM model to use for the test.
report (bool): Whether to generate a report after the test.
output_dir (str): Directory to store the test output. If None, uses default directory.
"""
# If llm_model is None or an empty string, set to default value
if not llm_model:
llm_model = "Qwen/Qwen2.5-Coder-7B-Instruct"
llm_model = "meta-llama/Meta-Llama-3-8B-Instruct"
# Extract data
parsed_data = construct_benchmark_config(benchmark_config)
test_suite_config = {
"user_queries": parsed_data["user_queries"], # num of user queries
"random_prompt": False, # whether to use random prompt, set to False by default
"run_time": "60m", # The max total run time for the test suite, set to 60m by default
"run_time": "30m", # The max total run time for the test suite, set to 60m by default
"collect_service_metric": False, # whether to collect service metrics, set to False by default
"llm_model": llm_model, # The LLM model used for the test
"deployment_type": "k8s", # Default is "k8s", can also be "docker"
"service_ip": None, # Leave as None for k8s, specify for Docker
"service_port": None, # Leave as None for k8s, specify for Docker
"test_output_dir": os.getcwd() + "/benchmark_output", # The directory to store the test output
"test_output_dir": (
output_dir if output_dir else os.getcwd() + "/benchmark_output"
), # Use output_dir if provided
"node_num": node_num,
"load_shape": {
"name": "constant",
"params": {"constant": {"concurrent_level": 4}, "poisson": {"arrival_rate": 1.0}},
"name": parsed_data["load_shape_type"],
"params": {
"poisson": {"arrival_rate": parsed_data["poisson_arrival_rate"]},
},
},
"concurrent_level": 4,
"arrival_rate": 1.0,
"concurrency": parsed_data["concurrency"],
"arrival_rate": parsed_data["poisson_arrival_rate"],
"query_timeout": 120,
"warm_ups": parsed_data["warmup_iterations"],
"seed": parsed_data["seed"],
"namespace": namespace,
"test_cases": parsed_data["test_cases"],
"bench_target": parsed_data["bench_target"],
"dataset": parsed_data["dataset"],
"prompt": parsed_data["prompt"],
"llm_max_token_size": parsed_data["llm_max_token_size"],
}
@@ -313,15 +421,14 @@ def run_benchmark(benchmark_config, chart_name, namespace, llm_model=None, repor
"chatqna-retriever-usvc",
"chatqna-tei",
"chatqna-teirerank",
"chatqna-tgi",
"chatqna-vllm",
],
"test_cases": parsed_data["test_cases"],
# Activate if random_prompt=true: leave blank = default dataset(WebQuestions) or sharegpt
"prompts": query_data,
"max_output": llm_max_token, # max number of output tokens
"k": 1, # number of retrieved documents
}
output_folder = _run_service_test(chart_name, case_data, test_suite_config)
output_folder = _run_service_test(chart_name, case_data, test_suite_config, namespace)
print(f"[OPEA BENCHMARK] 🚀 Test Finished. Output saved in {output_folder}.")
@@ -339,5 +446,5 @@ def run_benchmark(benchmark_config, chart_name, namespace, llm_model=None, repor
if __name__ == "__main__":
benchmark_config = load_yaml("./benchmark.yaml")
run_benchmark(benchmark_config=benchmark_config, chart_name="chatqna", namespace="deploy-benchmark")
benchmark_config = load_yaml("./ChatQnA/benchmark_chatqna.yaml")
run_benchmark(benchmark_config=benchmark_config, chart_name="chatqna", namespace="benchmark")