Provide unified scalable deployment and benchmarking support for exam… (#1315)
Signed-off-by: Cathy Zhang <cathy.zhang@intel.com> Signed-off-by: letonghan <letong.han@intel.com> Co-authored-by: letonghan <letong.han@intel.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
292
deploy_and_benchmark.py
Normal file
292
deploy_and_benchmark.py
Normal file
@@ -0,0 +1,292 @@
|
||||
# Copyright (C) 2025 Intel Corporation
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import argparse
|
||||
import copy
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
import yaml
|
||||
|
||||
from benchmark import run_benchmark
|
||||
|
||||
|
||||
def read_yaml(file_path):
|
||||
try:
|
||||
with open(file_path, "r") as file:
|
||||
return yaml.safe_load(file)
|
||||
except Exception as e:
|
||||
print(f"Error reading YAML file: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def construct_deploy_config(deploy_config, target_node, max_batch_size=None):
|
||||
"""Construct a new deploy config based on the target node number and optional max_batch_size.
|
||||
|
||||
Args:
|
||||
deploy_config: Original deploy config dictionary
|
||||
target_node: Target node number to match in the node array
|
||||
max_batch_size: Optional specific max_batch_size value to use
|
||||
|
||||
Returns:
|
||||
A new deploy config with single values for node and instance_num
|
||||
"""
|
||||
# Deep copy the original config to avoid modifying it
|
||||
new_config = copy.deepcopy(deploy_config)
|
||||
|
||||
# Get the node array and validate
|
||||
nodes = deploy_config.get("node")
|
||||
if not isinstance(nodes, list):
|
||||
raise ValueError("deploy_config['node'] must be an array")
|
||||
|
||||
# Find the index of the target node
|
||||
try:
|
||||
node_index = nodes.index(target_node)
|
||||
except ValueError:
|
||||
raise ValueError(f"Target node {target_node} not found in node array {nodes}")
|
||||
|
||||
# Set the single node value
|
||||
new_config["node"] = target_node
|
||||
|
||||
# Update instance_num for each service based on the same index
|
||||
for service_name, service_config in new_config.get("services", {}).items():
|
||||
if "replicaCount" in service_config:
|
||||
instance_nums = service_config["replicaCount"]
|
||||
if isinstance(instance_nums, list):
|
||||
if len(instance_nums) != len(nodes):
|
||||
raise ValueError(
|
||||
f"instance_num array length ({len(instance_nums)}) for service {service_name} "
|
||||
f"doesn't match node array length ({len(nodes)})"
|
||||
)
|
||||
service_config["replicaCount"] = instance_nums[node_index]
|
||||
|
||||
# Update max_batch_size if specified
|
||||
if max_batch_size is not None and "llm" in new_config["services"]:
|
||||
new_config["services"]["llm"]["max_batch_size"] = max_batch_size
|
||||
|
||||
return new_config
|
||||
|
||||
|
||||
def pull_helm_chart(chart_pull_url, version, chart_name):
|
||||
# Pull and untar the chart
|
||||
subprocess.run(["helm", "pull", chart_pull_url, "--version", version, "--untar"], check=True)
|
||||
|
||||
current_dir = os.getcwd()
|
||||
untar_dir = os.path.join(current_dir, chart_name)
|
||||
|
||||
if not os.path.isdir(untar_dir):
|
||||
print(f"Error: Could not find untarred directory for {chart_name}")
|
||||
return None
|
||||
|
||||
return untar_dir
|
||||
|
||||
|
||||
def main(yaml_file, target_node=None):
|
||||
"""Main function to process deployment configuration.
|
||||
|
||||
Args:
|
||||
yaml_file: Path to the YAML configuration file
|
||||
target_node: Optional target number of nodes to deploy. If not specified, will process all nodes.
|
||||
"""
|
||||
config = read_yaml(yaml_file)
|
||||
if config is None:
|
||||
print("Failed to read YAML file.")
|
||||
return None
|
||||
|
||||
deploy_config = config["deploy"]
|
||||
benchmark_config = config["benchmark"]
|
||||
|
||||
# Extract chart name from the YAML file name
|
||||
chart_name = os.path.splitext(os.path.basename(yaml_file))[0].split("_")[-1]
|
||||
print(f"chart_name: {chart_name}")
|
||||
python_cmd = sys.executable
|
||||
|
||||
# Process nodes
|
||||
nodes = deploy_config.get("node", [])
|
||||
if not isinstance(nodes, list):
|
||||
print("Error: deploy_config['node'] must be an array")
|
||||
return None
|
||||
|
||||
nodes_to_process = [target_node] if target_node is not None else nodes
|
||||
node_names = deploy_config.get("node_name", [])
|
||||
namespace = deploy_config.get("namespace", "default")
|
||||
|
||||
# Pull the Helm chart
|
||||
chart_pull_url = f"oci://ghcr.io/opea-project/charts/{chart_name}"
|
||||
version = deploy_config.get("version", "1.1.0")
|
||||
chart_dir = pull_helm_chart(chart_pull_url, version, chart_name)
|
||||
if not chart_dir:
|
||||
return
|
||||
|
||||
for node in nodes_to_process:
|
||||
try:
|
||||
print(f"\nProcessing configuration for {node} nodes...")
|
||||
|
||||
# Get corresponding node names for this node count
|
||||
current_node_names = node_names[:node] if node_names else []
|
||||
|
||||
# Add labels for current node configuration
|
||||
print(f"Adding labels for {node} nodes...")
|
||||
cmd = [python_cmd, "deploy.py", "--chart-name", chart_name, "--num-nodes", str(node), "--add-label"]
|
||||
if current_node_names:
|
||||
cmd.extend(["--node-names"] + current_node_names)
|
||||
|
||||
result = subprocess.run(cmd, check=True)
|
||||
if result.returncode != 0:
|
||||
print(f"Failed to add labels for {node} nodes")
|
||||
continue
|
||||
|
||||
try:
|
||||
# Process max_batch_sizes
|
||||
max_batch_sizes = deploy_config.get("services", {}).get("llm", {}).get("max_batch_size", [])
|
||||
if not isinstance(max_batch_sizes, list):
|
||||
max_batch_sizes = [max_batch_sizes]
|
||||
|
||||
values_file_path = None
|
||||
for i, max_batch_size in enumerate(max_batch_sizes):
|
||||
print(f"\nProcessing max_batch_size: {max_batch_size}")
|
||||
|
||||
# Construct new deploy config
|
||||
new_deploy_config = construct_deploy_config(deploy_config, node, max_batch_size)
|
||||
|
||||
# Write the new deploy config to a temporary file
|
||||
temp_config_file = f"temp_deploy_config_{node}_{max_batch_size}.yaml"
|
||||
try:
|
||||
with open(temp_config_file, "w") as f:
|
||||
yaml.dump(new_deploy_config, f)
|
||||
|
||||
if i == 0:
|
||||
# First iteration: full deployment
|
||||
cmd = [
|
||||
python_cmd,
|
||||
"deploy.py",
|
||||
"--deploy-config",
|
||||
temp_config_file,
|
||||
"--chart-name",
|
||||
chart_name,
|
||||
"--namespace",
|
||||
namespace,
|
||||
"--chart-dir",
|
||||
chart_dir,
|
||||
]
|
||||
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
|
||||
|
||||
match = re.search(r"values_file_path: (\S+)", result.stdout)
|
||||
if match:
|
||||
values_file_path = match.group(1)
|
||||
print(f"Captured values_file_path: {values_file_path}")
|
||||
else:
|
||||
print("values_file_path not found in the output")
|
||||
|
||||
else:
|
||||
# Subsequent iterations: update services with config change
|
||||
cmd = [
|
||||
python_cmd,
|
||||
"deploy.py",
|
||||
"--deploy-config",
|
||||
temp_config_file,
|
||||
"--chart-name",
|
||||
chart_name,
|
||||
"--namespace",
|
||||
namespace,
|
||||
"--chart-dir",
|
||||
chart_dir,
|
||||
"--user-values",
|
||||
values_file_path,
|
||||
"--update-service",
|
||||
]
|
||||
result = subprocess.run(cmd, check=True)
|
||||
if result.returncode != 0:
|
||||
print(
|
||||
f"Update failed for {node} nodes configuration with max_batch_size {max_batch_size}"
|
||||
)
|
||||
break # Skip remaining max_batch_sizes for this node
|
||||
|
||||
# Wait for deployment to be ready
|
||||
print("\nWaiting for deployment to be ready...")
|
||||
cmd = [
|
||||
python_cmd,
|
||||
"deploy.py",
|
||||
"--chart-name",
|
||||
chart_name,
|
||||
"--namespace",
|
||||
namespace,
|
||||
"--check-ready",
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(cmd, check=True)
|
||||
print("Deployments are ready!")
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Deployments status failed with returncode: {e.returncode}")
|
||||
|
||||
# Run benchmark
|
||||
run_benchmark(
|
||||
benchmark_config=benchmark_config,
|
||||
chart_name=chart_name,
|
||||
namespace=namespace,
|
||||
llm_model=deploy_config.get("services", {}).get("llm", {}).get("model_id", ""),
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print(
|
||||
f"Error during {'deployment' if i == 0 else 'update'} for {node} nodes with max_batch_size {max_batch_size}: {str(e)}"
|
||||
)
|
||||
break # Skip remaining max_batch_sizes for this node
|
||||
finally:
|
||||
# Clean up the temporary file
|
||||
if os.path.exists(temp_config_file):
|
||||
os.remove(temp_config_file)
|
||||
|
||||
finally:
|
||||
# Uninstall the deployment
|
||||
print(f"\nUninstalling deployment for {node} nodes...")
|
||||
cmd = [
|
||||
python_cmd,
|
||||
"deploy.py",
|
||||
"--chart-name",
|
||||
chart_name,
|
||||
"--namespace",
|
||||
namespace,
|
||||
"--uninstall",
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(cmd, check=True)
|
||||
if result.returncode != 0:
|
||||
print(f"Failed to uninstall deployment for {node} nodes")
|
||||
except Exception as e:
|
||||
print(f"Error while uninstalling deployment for {node} nodes: {str(e)}")
|
||||
|
||||
# Delete labels for current node configuration
|
||||
print(f"Deleting labels for {node} nodes...")
|
||||
cmd = [python_cmd, "deploy.py", "--chart-name", chart_name, "--num-nodes", str(node), "--delete-label"]
|
||||
if current_node_names:
|
||||
cmd.extend(["--node-names"] + current_node_names)
|
||||
|
||||
try:
|
||||
result = subprocess.run(cmd, check=True)
|
||||
if result.returncode != 0:
|
||||
print(f"Failed to delete labels for {node} nodes")
|
||||
except Exception as e:
|
||||
print(f"Error while deleting labels for {node} nodes: {str(e)}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing configuration for {node} nodes: {str(e)}")
|
||||
continue
|
||||
|
||||
# Cleanup: Remove the untarred directory
|
||||
if chart_dir and os.path.isdir(chart_dir):
|
||||
print(f"Removing temporary directory: {chart_dir}")
|
||||
shutil.rmtree(chart_dir)
|
||||
print("Temporary directory removed successfully.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Deploy and benchmark with specific node configuration.")
|
||||
parser.add_argument("yaml_file", help="Path to the YAML configuration file")
|
||||
parser.add_argument("--target-node", type=int, help="Optional: Target number of nodes to deploy.", default=None)
|
||||
|
||||
args = parser.parse_args()
|
||||
main(args.yaml_file, args.target_node)
|
||||
Reference in New Issue
Block a user