Jae-Won Chung
New leaderboard prototype
b10121d
raw
history blame
12.9 kB
from __future__ import annotations
import argparse
import asyncio
import requests
import json
import random
import time
from typing import AsyncGenerator
from dataclasses import asdict, dataclass, field
import pynvml
import aiohttp
import numpy as np
from tqdm.asyncio import tqdm
from zeus.monitor import ZeusMonitor
SYSTEM_PROMPT = "You are an artificial intelligence assistant that gives helpful answers to the user's questions or instructions."
DEFAULT_TIMEOUT = aiohttp.ClientTimeout(total=3 * 3600)
@dataclass
class Results:
model: str
backend: str
gpu_model: str
num_gpus: int
num_nodes: int
max_num_seqs: int
power_limit: int
request_rate: float
num_requests: int
num_failures: int = 0
system_prompt: str = SYSTEM_PROMPT
total_runtime: float = 0.0
requests_per_second: float = 0.0
total_prompt_tokens: int = 0
total_completion_tokens: int = 0
latency_per_request: float = 0.0
latency_per_output_token: float = 0.0
server_side_total_energy: float = 0.0
server_side_energy_per_request: float = 0.0
server_side_energy_per_output_token: float = 0.0
server_side_average_power: float = 0.0
client_side_total_energy: float = 0.0
client_side_energy_per_request: float = 0.0
client_side_energy_per_output_token: float = 0.0
client_side_average_power: float = 0.0
results: list[Result] = field(default_factory=list)
@dataclass
class ResultIntermediate:
success: bool = True
latency: float = 0.0
prompt: str = ""
response_bytes: list[bytes] = field(default_factory=list)
@dataclass
class Result:
success: bool = True
latency: float = 0.0
prompt: str = ""
response: str = ""
num_prompt_tokens: int = 0
num_completion_tokens: int = 0
energy: float = 0.0
def load_sharegpt(path: str, data_dup_factor: int) -> list[str]:
# Load the dataset.
with open(path) as f:
dataset = json.load(f)
# Only keep the first turn of each conversation.
return [data["conversations"][0]["value"] for data in dataset] * data_dup_factor
async def get_request(
input_requests: list[str],
result_intermediates: list[ResultIntermediate],
request_rate: float,
) -> AsyncGenerator[tuple[ResultIntermediate, str], None]:
if request_rate == float("inf"):
# If the request rate is infinity, then we don't need to wait.
for item in zip(result_intermediates, input_requests, strict=True):
yield item
return
for item in zip(result_intermediates, input_requests, strict=True):
yield item
# Sample the request interval from the exponential distribution.
interval = np.random.exponential(1.0 / request_rate)
# The next request will be sent after the interval.
await asyncio.sleep(interval)
async def send_request(
result_intermediate: ResultIntermediate,
backend: str,
model: str,
api_url: str,
prompt: str,
) -> None:
headers = {"Content-Type": "application/json"}
# OpenAI Chat Completions API request format
# Assuming `add_generation_prompt` is either not needed or set to true
pload = {
"model": model,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt.strip()},
],
"stream": False,
"max_tokens": 1024,
"temperature": 0.8,
"top_p": 0.95,
"stop": ["\nUser:", "<|endoftext|>", "</s>"],
}
async with aiohttp.ClientSession(timeout=DEFAULT_TIMEOUT) as session:
request_start_time = time.perf_counter()
async with session.post(api_url, headers=headers, json=pload) as response:
# Request failed
if response.status >= 300:
print(f"Request failed: {await response.text()}")
result_intermediate.prompt = prompt
result_intermediate.success = False
return
chunks = []
async for chunk, _ in response.content.iter_chunks():
chunks.append(chunk)
request_end_time = time.perf_counter()
result_intermediate.latency = request_end_time - request_start_time
result_intermediate.prompt = prompt
result_intermediate.response_bytes = chunks
async def benchmark(
results: Results,
backend: str,
model: str,
api_url: str,
input_requests: list[str],
request_rate: float,
) -> None:
tasks: list[asyncio.Task] = []
result_intermediates = [ResultIntermediate() for _ in input_requests]
pbar = tqdm(total=len(input_requests))
async for ri, prompt in get_request(input_requests, result_intermediates, request_rate):
pbar.update(1)
task = asyncio.create_task(
# Ensures results has same ordering as the input dataset
send_request(ri, backend, model, api_url, prompt)
)
tasks.append(task)
await asyncio.gather(*tasks)
for result, intermediate in zip(results.results, result_intermediates, strict=True):
result.success = intermediate.success
result.latency = intermediate.latency
result.prompt = intermediate.prompt
if result.success:
output = json.loads(b"".join(intermediate.response_bytes).decode("utf-8"))
result.response = output["choices"][0]["message"]["content"]
result.num_prompt_tokens = output["usage"]["prompt_tokens"]
result.num_completion_tokens = output["usage"]["completion_tokens"]
result.energy = output["usage"]["energy"]
def run_benchmark(
args: argparse.Namespace,
api_url: str,
input_requests: list[str],
out_filename: str,
):
zeus_monitor = ZeusMonitor()
args.num_gpus = len(zeus_monitor.gpu_indices)
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
gpu_model = pynvml.nvmlDeviceGetName(handle)
pynvml.nvmlShutdown()
results = Results(
model=args.model,
backend=args.backend,
gpu_model=gpu_model,
num_gpus=args.num_gpus,
num_nodes=args.nnodes,
max_num_seqs=args.max_num_seqs,
power_limit=args.power_limit,
request_rate=args.request_rate,
num_requests=len(input_requests),
results=[Result() for _ in input_requests],
)
zeus_monitor.begin_window(out_filename, sync_execution=False)
asyncio.run(benchmark(results, args.backend, args.model, api_url, input_requests, args.request_rate))
measurements = zeus_monitor.end_window(out_filename, sync_execution=False)
client_side_total_energy = measurements.total_energy
# Store aggregated results
total_prompt_tokens = 0
total_completion_tokens = 0
total_latency = 0
total_latency_per_output_token = 0
server_side_total_energy = 0
for result in results.results:
if not result.success:
results.num_failures += 1
continue
total_prompt_tokens += result.num_prompt_tokens
total_completion_tokens += result.num_completion_tokens
total_latency += result.latency
total_latency_per_output_token += result.latency / result.num_completion_tokens
server_side_total_energy += result.energy
num_results = len(results.results) - results.num_failures
if num_results == 0:
raise RuntimeError("All requests failed!")
results.total_runtime = measurements.time
results.requests_per_second = num_results / results.total_runtime
results.total_prompt_tokens = total_prompt_tokens
results.total_completion_tokens = total_completion_tokens
results.latency_per_request = total_latency / num_results
results.latency_per_output_token = total_latency_per_output_token / num_results
results.server_side_total_energy = server_side_total_energy
results.server_side_energy_per_request = results.server_side_total_energy / num_results
results.server_side_energy_per_output_token = results.server_side_total_energy / results.total_completion_tokens
results.server_side_average_power = server_side_total_energy / results.total_runtime
results.client_side_total_energy = client_side_total_energy
results.client_side_energy_per_request = client_side_total_energy / num_results
results.client_side_energy_per_output_token = client_side_total_energy / results.total_completion_tokens
results.client_side_average_power = client_side_total_energy / results.total_runtime
with open(out_filename, "w") as f:
f.write(json.dumps(asdict(results), indent=2))
print("Benchmark results written to", out_filename)
print("Benchmark results:")
print(f"Model: {results.model}")
print(f"Backend: {results.backend}")
print(f"Request rate: {results.request_rate} requests/s")
print()
print(f"Total benchmark runtime: {results.total_runtime:.2f} s")
print(f"Total number of requests: {results.num_requests}")
print(f"Number of failed requests: {results.num_failures}")
print(f"Requests per second: {results.requests_per_second:.2f} requests/s")
print(f"Average latency per request: {results.latency_per_request:.2f} s")
print(f"Average latency per output token: {results.latency_per_output_token:.2f} s")
print(f"(Client-side) Total energy: {results.client_side_total_energy:.2f} J")
print(f"(Client-side) Energy per request: {results.client_side_energy_per_request:.2f} J")
print(f"(Client-side) Energy per token: {results.client_side_energy_per_output_token:.2f} J")
print(f"(Client-side) Average power: {results.client_side_average_power:.2f} W")
print(f"(Server-side) Total energy: {results.server_side_total_energy:.2f} J")
print(f"(Server-side) Energy per request: {results.server_side_energy_per_request:.2f} J")
print(f"(Server-side) Energy per token: {results.server_side_energy_per_output_token:.2f} J")
print(f"(Server-side) Average power: {results.server_side_average_power:.2f} W")
print()
print("Note that individual request measurements would not be correct for multi-node benchmarks.")
def wait_server_ready(list_models_url: str) -> None:
while True:
try:
response = requests.get(list_models_url)
response.raise_for_status()
break
except requests.exceptions.RequestException:
print("Waiting for the server to be ready...")
time.sleep(1)
def main(args: argparse.Namespace):
if args.backend not in ["tgi", "vllm"]:
raise ValueError(f"Unknown backend: {args.backend}")
arg_out_filename = f"{args.benchmark_name}+args.json"
with open(arg_out_filename, "w") as f:
f.write(json.dumps(vars(args), indent=2))
print(args)
print("Benchmark args written to", arg_out_filename)
random.seed(args.seed)
np.random.seed(args.seed)
api_url = f"http://localhost:{args.port}/v1/chat/completions"
input_requests = load_sharegpt(args.sharegpt_path, args.data_dup_factor)
if args.backend == "vllm":
wait_server_ready(f"http://localhost:{args.port}/v1/models")
elif args.backend == "tgi":
wait_server_ready(f"http://localhost:{args.port}/health")
else:
raise ValueError(f"Unknown backend: {args.backend}")
run_benchmark(args, api_url, input_requests, f"{args.benchmark_name}+results.json")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--backend", required=True, choices=["vllm", "tgi"], help="Server to benchmark.")
parser.add_argument("--port", type=int, required=True, help="Port of the server to benchmark.")
parser.add_argument("--model", required=True, help="Model to benchmark, e.g., meta-llama/Llama-2-7b-chat-hf.")
parser.add_argument("--sharegpt-path", help="Path to the ShareGPT dataset to feed to the server.")
parser.add_argument(
"--request-rate",
type=float,
required=True,
help="Poisson process rate for request arrival times. If this is inf, all requests are sent at time 0.",
)
parser.add_argument(
"--benchmark-name",
required=True,
help="Name of the benchmark. Result files will be written to paths derived from this.",
)
parser.add_argument("--seed", type=int, default=0)
parser.add_argument("--power-limit", type=int, required=True, help="Not used but passed in in order to save to results file.")
parser.add_argument("--nnodes", type=int, required=True, help="Not used but passed in in order to save to results file.")
parser.add_argument("--max-num-seqs", type=int, required=True, help="Not used but passed in in order to save to results file.")
parser.add_argument("--data-dup-factor", type=int, default=1, help="How many times to repeat the ShareGPT dataset to generate more requests.")
args = parser.parse_args()
main(args)