Jae-Won Chung
New leaderboard prototype
b10121d
raw
history blame
13 kB
from __future__ import annotations
import argparse
import asyncio
import requests
import json
import random
import time
from typing import AsyncGenerator, Literal
from dataclasses import asdict, dataclass, field
import pynvml
import aiohttp
import numpy as np
from tqdm.asyncio import tqdm
from evalplus.data import get_human_eval_plus, get_mbpp_plus, write_jsonl
from zeus.monitor import ZeusMonitor
DEFAULT_TIMEOUT = aiohttp.ClientTimeout(total=3 * 3600)
STOP_SEQUENCES = ["\nclass", "\ndef", "\n#", "\n@", "\nprint", "\nif", "\n```"]
@dataclass
class Results:
model: str
backend: str
gpu_model: str
num_gpus: int
max_num_seqs: int
power_limit: int
request_rate: float
num_requests: int
num_failures: int = 0
total_runtime: float = 0.0
requests_per_second: float = 0.0
total_prompt_tokens: int = 0
total_completion_tokens: int = 0
latency_per_request: float = 0.0
latency_per_output_token: float = 0.0
server_side_total_energy: float = 0.0
server_side_energy_per_request: float = 0.0
server_side_energy_per_output_token: float = 0.0
server_side_average_power: float = 0.0
client_side_total_energy: float = 0.0
client_side_energy_per_request: float = 0.0
client_side_energy_per_output_token: float = 0.0
client_side_average_power: float = 0.0
results: list[Result] = field(default_factory=list)
@dataclass
class ResultIntermediate:
task_id: str = ""
success: bool = True
latency: float = 0.0
prompt: str = ""
response_bytes: list[bytes] = field(default_factory=list)
@dataclass
class Result:
task_id: str = ""
success: bool = True
latency: float = 0.0
prompt: str = ""
response: str = ""
num_prompt_tokens: int = 0
num_completion_tokens: int = 0
energy: float = 0.0
def strip_stop_sequence(text: str, stop_sequences: list[str]) -> str:
for stop in stop_sequences:
if text.endswith(stop):
return text[:-len(stop)]
return text
def load_evalplus(dataset: Literal["humaneval", "mbpp"], data_dup_factor: int) -> list[tuple[str, str]]:
"""Load the evalplus dataset.
Tuple is (task_id, prompt).
"""
if dataset == "humaneval":
gen_fn = get_human_eval_plus
elif dataset == "mbpp":
gen_fn = get_mbpp_plus
else:
raise ValueError(f"Unknown EvalPlus dataset: {dataset}")
return [(task_id, problem["prompt"].strip()) for task_id, problem in gen_fn().items()] * data_dup_factor
async def get_request(
input_requests: list[tuple[str, str]],
result_intermediates: list[ResultIntermediate],
request_rate: float,
) -> AsyncGenerator[tuple[ResultIntermediate, str], None]:
if request_rate == float("inf"):
# If the request rate is infinity, then we don't need to wait.
for ri, (task_id, prompt) in zip(result_intermediates, input_requests, strict=True):
ri.task_id = task_id
yield (ri, prompt)
return
for ri, (task_id, prompt) in zip(result_intermediates, input_requests, strict=True):
ri.task_id = task_id
yield (ri, prompt)
# Sample the request interval from the exponential distribution.
interval = np.random.exponential(1.0 / request_rate)
# The next request will be sent after the interval.
await asyncio.sleep(interval)
async def send_request(
result_intermediate: ResultIntermediate,
backend: str,
model: str,
api_url: str,
prompt: str,
) -> None:
headers = {"Content-Type": "application/json"}
# We do greedy decoding following https://evalplus.github.io/leaderboard.html
pload = {
"model": model,
"prompt": prompt,
"max_tokens": 512,
"temperature": 0.0,
"stop": STOP_SEQUENCES,
}
async with aiohttp.ClientSession(timeout=DEFAULT_TIMEOUT) as session:
request_start_time = time.perf_counter()
async with session.post(api_url, headers=headers, json=pload) as response:
# Request failed
if response.status >= 300:
print(f"Request failed: {await response.text()}")
result_intermediate.prompt = prompt
result_intermediate.success = False
return
chunks = []
async for chunk, _ in response.content.iter_chunks():
chunks.append(chunk)
request_end_time = time.perf_counter()
result_intermediate.latency = request_end_time - request_start_time
result_intermediate.prompt = prompt
result_intermediate.response_bytes = chunks
async def benchmark(
results: Results,
backend: str,
model: str,
api_url: str,
input_requests: list[tuple[str, str]],
request_rate: float,
) -> None:
tasks: list[asyncio.Task] = []
result_intermediates = [ResultIntermediate() for _ in input_requests]
pbar = tqdm(total=len(input_requests))
async for ri, prompt in get_request(input_requests, result_intermediates, request_rate):
pbar.update(1)
task = asyncio.create_task(
# Ensures results has same ordering as the input dataset
send_request(ri, backend, model, api_url, prompt)
)
tasks.append(task)
await asyncio.gather(*tasks)
for result, intermediate in zip(results.results, result_intermediates, strict=True):
result.task_id = intermediate.task_id
result.success = intermediate.success
result.latency = intermediate.latency
result.prompt = intermediate.prompt
if result.success:
output = json.loads(b"".join(intermediate.response_bytes).decode("utf-8"))
result.response = strip_stop_sequence(output["choices"][0]["text"], STOP_SEQUENCES)
result.num_prompt_tokens = output["usage"]["prompt_tokens"]
result.num_completion_tokens = output["usage"]["completion_tokens"]
result.energy = output["usage"]["energy"]
def run_benchmark(
args: argparse.Namespace,
api_url: str,
input_requests: list[tuple[str, str]],
results_filename: str,
evalplus_filename: str,
):
zeus_monitor = ZeusMonitor()
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
gpu_model = pynvml.nvmlDeviceGetName(handle)
pynvml.nvmlShutdown()
results = Results(
model=args.model,
backend=args.backend,
gpu_model=gpu_model,
num_gpus=len(zeus_monitor.gpu_indices),
max_num_seqs=args.max_num_seqs,
power_limit=args.power_limit,
request_rate=args.request_rate,
num_requests=len(input_requests),
results=[Result() for _ in input_requests],
)
zeus_monitor.begin_window(results_filename, sync_execution=False)
asyncio.run(benchmark(results, args.backend, args.model, api_url, input_requests, args.request_rate))
measurements = zeus_monitor.end_window(results_filename, sync_execution=False)
client_side_total_energy = measurements.total_energy
# Store aggregated results
total_prompt_tokens = 0
total_completion_tokens = 0
total_latency = 0
total_latency_per_output_token = 0
server_side_total_energy = 0
for result in results.results:
if not result.success:
results.num_failures += 1
continue
total_prompt_tokens += result.num_prompt_tokens
total_completion_tokens += result.num_completion_tokens
total_latency += result.latency
total_latency_per_output_token += result.latency / result.num_completion_tokens
server_side_total_energy += result.energy
num_results = len(results.results) - results.num_failures
if num_results == 0:
raise RuntimeError("All requests failed!")
results.total_runtime = measurements.time
results.requests_per_second = num_results / results.total_runtime
results.total_prompt_tokens = total_prompt_tokens
results.total_completion_tokens = total_completion_tokens
results.latency_per_request = total_latency / num_results
results.latency_per_output_token = total_latency_per_output_token / num_results
results.server_side_total_energy = server_side_total_energy
results.server_side_energy_per_request = results.server_side_total_energy / num_results
results.server_side_energy_per_output_token = results.server_side_total_energy / results.total_completion_tokens
results.server_side_average_power = server_side_total_energy / results.total_runtime
results.client_side_total_energy = client_side_total_energy
results.client_side_energy_per_request = client_side_total_energy / num_results
results.client_side_energy_per_output_token = client_side_total_energy / results.total_completion_tokens
results.client_side_average_power = client_side_total_energy / results.total_runtime
with open(results_filename, "w") as f:
f.write(json.dumps(asdict(results), indent=2))
print("Benchmark results written to", results_filename)
evalplus_results = [dict(task_id=result.task_id, completion=result.response) for result in results.results]
write_jsonl(evalplus_filename, evalplus_results)
print("Benchmark results:")
print(f"Model: {results.model}")
print(f"Backend: {results.backend}")
print(f"Request rate: {results.request_rate} requests/s")
print()
print(f"Total benchmark runtime: {results.total_runtime:.2f} s")
print(f"Requests per second: {results.requests_per_second:.2f} requests/s")
print(f"Average latency per request: {results.latency_per_request:.2f} s")
print(f"Average latency per output token: {results.latency_per_output_token:.2f} s")
print(f"(Client-side) Total energy: {results.client_side_total_energy:.2f} J")
print(f"(Client-side) Energy per request: {results.client_side_energy_per_request:.2f} J")
print(f"(Client-side) Energy per token: {results.client_side_energy_per_output_token:.2f} J")
print(f"(Client-side) Average power: {results.client_side_average_power:.2f} W")
print(f"(Server-side) Total energy: {results.server_side_total_energy:.2f} J")
print(f"(Server-side) Energy per request: {results.server_side_energy_per_request:.2f} J")
print(f"(Server-side) Energy per token: {results.server_side_energy_per_output_token:.2f} J")
print(f"(Server-side) Average power: {results.server_side_average_power:.2f} W")
def wait_server_ready(list_models_url: str) -> None:
while True:
try:
response = requests.get(list_models_url)
response.raise_for_status()
break
except requests.exceptions.RequestException:
print("Waiting for the server to be ready...")
time.sleep(1)
def main(args: argparse.Namespace):
if args.backend not in ["tgi", "vllm"]:
raise ValueError(f"Unknown backend: {args.backend}")
arg_out_filename = f"{args.benchmark_name}+args.json"
with open(arg_out_filename, "w") as f:
f.write(json.dumps(vars(args), indent=2))
print(args)
print("Benchmark args written to", arg_out_filename)
random.seed(args.seed)
np.random.seed(args.seed)
api_url = f"http://localhost:{args.port}/v1/completions"
input_requests = load_evalplus(args.dataset, args.data_dup_factor)
wait_server_ready(f"http://localhost:{args.port}/health")
run_benchmark(
args,
api_url,
input_requests,
f"{args.benchmark_name}+results.json",
f"{args.benchmark_name}+results+evalplus.jsonl",
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--backend", required=True, choices=["vllm", "tgi"], help="Server to benchmark.")
parser.add_argument("--port", type=int, required=True, help="Port of the server to benchmark.")
parser.add_argument("--model", required=True, help="Model to benchmark, e.g., codellama/CodeLlama-7b-hf.")
parser.add_argument("--dataset", required=True, choices=["humaneval", "mbpp"], help="EvalPlus dataset to use.")
parser.add_argument(
"--request-rate",
type=float,
required=True,
help="Poisson process rate for request arrival times. If this is inf, all requests are sent at time 0.",
)
parser.add_argument(
"--benchmark-name",
required=True,
help="Name of the benchmark. Result files will be written to paths derived from this.",
)
parser.add_argument("--seed", type=int, default=0)
parser.add_argument("--power-limit", type=int, required=True, help="Not used but passed in in order to save to results file.")
parser.add_argument("--max-num-seqs", type=int, required=True, help="Not used but passed in in order to save to results file.")
parser.add_argument("--data-dup-factor", type=int, default=1, help="How many times to repeat the ShareGPT dataset to generate more requests.")
args = parser.parse_args()
main(args)