Spaces:
Running
Running
from __future__ import annotations | |
import argparse | |
import asyncio | |
import requests | |
import json | |
import random | |
import time | |
from typing import AsyncGenerator, Literal | |
from dataclasses import asdict, dataclass, field | |
import pynvml | |
import aiohttp | |
import numpy as np | |
from tqdm.asyncio import tqdm | |
from evalplus.data import get_human_eval_plus, get_mbpp_plus, write_jsonl | |
from zeus.monitor import ZeusMonitor | |
DEFAULT_TIMEOUT = aiohttp.ClientTimeout(total=3 * 3600) | |
STOP_SEQUENCES = ["\nclass", "\ndef", "\n#", "\n@", "\nprint", "\nif", "\n```"] | |
class Results: | |
model: str | |
backend: str | |
gpu_model: str | |
num_gpus: int | |
max_num_seqs: int | |
power_limit: int | |
request_rate: float | |
num_requests: int | |
num_failures: int = 0 | |
total_runtime: float = 0.0 | |
requests_per_second: float = 0.0 | |
total_prompt_tokens: int = 0 | |
total_completion_tokens: int = 0 | |
latency_per_request: float = 0.0 | |
latency_per_output_token: float = 0.0 | |
server_side_total_energy: float = 0.0 | |
server_side_energy_per_request: float = 0.0 | |
server_side_energy_per_output_token: float = 0.0 | |
server_side_average_power: float = 0.0 | |
client_side_total_energy: float = 0.0 | |
client_side_energy_per_request: float = 0.0 | |
client_side_energy_per_output_token: float = 0.0 | |
client_side_average_power: float = 0.0 | |
results: list[Result] = field(default_factory=list) | |
class ResultIntermediate: | |
task_id: str = "" | |
success: bool = True | |
latency: float = 0.0 | |
prompt: str = "" | |
response_bytes: list[bytes] = field(default_factory=list) | |
class Result: | |
task_id: str = "" | |
success: bool = True | |
latency: float = 0.0 | |
prompt: str = "" | |
response: str = "" | |
num_prompt_tokens: int = 0 | |
num_completion_tokens: int = 0 | |
energy: float = 0.0 | |
def strip_stop_sequence(text: str, stop_sequences: list[str]) -> str: | |
for stop in stop_sequences: | |
if text.endswith(stop): | |
return text[:-len(stop)] | |
return text | |
def load_evalplus(dataset: Literal["humaneval", "mbpp"], data_dup_factor: int) -> list[tuple[str, str]]: | |
"""Load the evalplus dataset. | |
Tuple is (task_id, prompt). | |
""" | |
if dataset == "humaneval": | |
gen_fn = get_human_eval_plus | |
elif dataset == "mbpp": | |
gen_fn = get_mbpp_plus | |
else: | |
raise ValueError(f"Unknown EvalPlus dataset: {dataset}") | |
return [(task_id, problem["prompt"].strip()) for task_id, problem in gen_fn().items()] * data_dup_factor | |
async def get_request( | |
input_requests: list[tuple[str, str]], | |
result_intermediates: list[ResultIntermediate], | |
request_rate: float, | |
) -> AsyncGenerator[tuple[ResultIntermediate, str], None]: | |
if request_rate == float("inf"): | |
# If the request rate is infinity, then we don't need to wait. | |
for ri, (task_id, prompt) in zip(result_intermediates, input_requests, strict=True): | |
ri.task_id = task_id | |
yield (ri, prompt) | |
return | |
for ri, (task_id, prompt) in zip(result_intermediates, input_requests, strict=True): | |
ri.task_id = task_id | |
yield (ri, prompt) | |
# Sample the request interval from the exponential distribution. | |
interval = np.random.exponential(1.0 / request_rate) | |
# The next request will be sent after the interval. | |
await asyncio.sleep(interval) | |
async def send_request( | |
result_intermediate: ResultIntermediate, | |
backend: str, | |
model: str, | |
api_url: str, | |
prompt: str, | |
) -> None: | |
headers = {"Content-Type": "application/json"} | |
# We do greedy decoding following https://evalplus.github.io/leaderboard.html | |
pload = { | |
"model": model, | |
"prompt": prompt, | |
"max_tokens": 512, | |
"temperature": 0.0, | |
"stop": STOP_SEQUENCES, | |
} | |
async with aiohttp.ClientSession(timeout=DEFAULT_TIMEOUT) as session: | |
request_start_time = time.perf_counter() | |
async with session.post(api_url, headers=headers, json=pload) as response: | |
# Request failed | |
if response.status >= 300: | |
print(f"Request failed: {await response.text()}") | |
result_intermediate.prompt = prompt | |
result_intermediate.success = False | |
return | |
chunks = [] | |
async for chunk, _ in response.content.iter_chunks(): | |
chunks.append(chunk) | |
request_end_time = time.perf_counter() | |
result_intermediate.latency = request_end_time - request_start_time | |
result_intermediate.prompt = prompt | |
result_intermediate.response_bytes = chunks | |
async def benchmark( | |
results: Results, | |
backend: str, | |
model: str, | |
api_url: str, | |
input_requests: list[tuple[str, str]], | |
request_rate: float, | |
) -> None: | |
tasks: list[asyncio.Task] = [] | |
result_intermediates = [ResultIntermediate() for _ in input_requests] | |
pbar = tqdm(total=len(input_requests)) | |
async for ri, prompt in get_request(input_requests, result_intermediates, request_rate): | |
pbar.update(1) | |
task = asyncio.create_task( | |
# Ensures results has same ordering as the input dataset | |
send_request(ri, backend, model, api_url, prompt) | |
) | |
tasks.append(task) | |
await asyncio.gather(*tasks) | |
for result, intermediate in zip(results.results, result_intermediates, strict=True): | |
result.task_id = intermediate.task_id | |
result.success = intermediate.success | |
result.latency = intermediate.latency | |
result.prompt = intermediate.prompt | |
if result.success: | |
output = json.loads(b"".join(intermediate.response_bytes).decode("utf-8")) | |
result.response = strip_stop_sequence(output["choices"][0]["text"], STOP_SEQUENCES) | |
result.num_prompt_tokens = output["usage"]["prompt_tokens"] | |
result.num_completion_tokens = output["usage"]["completion_tokens"] | |
result.energy = output["usage"]["energy"] | |
def run_benchmark( | |
args: argparse.Namespace, | |
api_url: str, | |
input_requests: list[tuple[str, str]], | |
results_filename: str, | |
evalplus_filename: str, | |
): | |
zeus_monitor = ZeusMonitor() | |
pynvml.nvmlInit() | |
handle = pynvml.nvmlDeviceGetHandleByIndex(0) | |
gpu_model = pynvml.nvmlDeviceGetName(handle) | |
pynvml.nvmlShutdown() | |
results = Results( | |
model=args.model, | |
backend=args.backend, | |
gpu_model=gpu_model, | |
num_gpus=len(zeus_monitor.gpu_indices), | |
max_num_seqs=args.max_num_seqs, | |
power_limit=args.power_limit, | |
request_rate=args.request_rate, | |
num_requests=len(input_requests), | |
results=[Result() for _ in input_requests], | |
) | |
zeus_monitor.begin_window(results_filename, sync_execution=False) | |
asyncio.run(benchmark(results, args.backend, args.model, api_url, input_requests, args.request_rate)) | |
measurements = zeus_monitor.end_window(results_filename, sync_execution=False) | |
client_side_total_energy = measurements.total_energy | |
# Store aggregated results | |
total_prompt_tokens = 0 | |
total_completion_tokens = 0 | |
total_latency = 0 | |
total_latency_per_output_token = 0 | |
server_side_total_energy = 0 | |
for result in results.results: | |
if not result.success: | |
results.num_failures += 1 | |
continue | |
total_prompt_tokens += result.num_prompt_tokens | |
total_completion_tokens += result.num_completion_tokens | |
total_latency += result.latency | |
total_latency_per_output_token += result.latency / result.num_completion_tokens | |
server_side_total_energy += result.energy | |
num_results = len(results.results) - results.num_failures | |
if num_results == 0: | |
raise RuntimeError("All requests failed!") | |
results.total_runtime = measurements.time | |
results.requests_per_second = num_results / results.total_runtime | |
results.total_prompt_tokens = total_prompt_tokens | |
results.total_completion_tokens = total_completion_tokens | |
results.latency_per_request = total_latency / num_results | |
results.latency_per_output_token = total_latency_per_output_token / num_results | |
results.server_side_total_energy = server_side_total_energy | |
results.server_side_energy_per_request = results.server_side_total_energy / num_results | |
results.server_side_energy_per_output_token = results.server_side_total_energy / results.total_completion_tokens | |
results.server_side_average_power = server_side_total_energy / results.total_runtime | |
results.client_side_total_energy = client_side_total_energy | |
results.client_side_energy_per_request = client_side_total_energy / num_results | |
results.client_side_energy_per_output_token = client_side_total_energy / results.total_completion_tokens | |
results.client_side_average_power = client_side_total_energy / results.total_runtime | |
with open(results_filename, "w") as f: | |
f.write(json.dumps(asdict(results), indent=2)) | |
print("Benchmark results written to", results_filename) | |
evalplus_results = [dict(task_id=result.task_id, completion=result.response) for result in results.results] | |
write_jsonl(evalplus_filename, evalplus_results) | |
print("Benchmark results:") | |
print(f"Model: {results.model}") | |
print(f"Backend: {results.backend}") | |
print(f"Request rate: {results.request_rate} requests/s") | |
print() | |
print(f"Total benchmark runtime: {results.total_runtime:.2f} s") | |
print(f"Requests per second: {results.requests_per_second:.2f} requests/s") | |
print(f"Average latency per request: {results.latency_per_request:.2f} s") | |
print(f"Average latency per output token: {results.latency_per_output_token:.2f} s") | |
print(f"(Client-side) Total energy: {results.client_side_total_energy:.2f} J") | |
print(f"(Client-side) Energy per request: {results.client_side_energy_per_request:.2f} J") | |
print(f"(Client-side) Energy per token: {results.client_side_energy_per_output_token:.2f} J") | |
print(f"(Client-side) Average power: {results.client_side_average_power:.2f} W") | |
print(f"(Server-side) Total energy: {results.server_side_total_energy:.2f} J") | |
print(f"(Server-side) Energy per request: {results.server_side_energy_per_request:.2f} J") | |
print(f"(Server-side) Energy per token: {results.server_side_energy_per_output_token:.2f} J") | |
print(f"(Server-side) Average power: {results.server_side_average_power:.2f} W") | |
def wait_server_ready(list_models_url: str) -> None: | |
while True: | |
try: | |
response = requests.get(list_models_url) | |
response.raise_for_status() | |
break | |
except requests.exceptions.RequestException: | |
print("Waiting for the server to be ready...") | |
time.sleep(1) | |
def main(args: argparse.Namespace): | |
if args.backend not in ["tgi", "vllm"]: | |
raise ValueError(f"Unknown backend: {args.backend}") | |
arg_out_filename = f"{args.benchmark_name}+args.json" | |
with open(arg_out_filename, "w") as f: | |
f.write(json.dumps(vars(args), indent=2)) | |
print(args) | |
print("Benchmark args written to", arg_out_filename) | |
random.seed(args.seed) | |
np.random.seed(args.seed) | |
api_url = f"http://localhost:{args.port}/v1/completions" | |
input_requests = load_evalplus(args.dataset, args.data_dup_factor) | |
wait_server_ready(f"http://localhost:{args.port}/health") | |
run_benchmark( | |
args, | |
api_url, | |
input_requests, | |
f"{args.benchmark_name}+results.json", | |
f"{args.benchmark_name}+results+evalplus.jsonl", | |
) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--backend", required=True, choices=["vllm", "tgi"], help="Server to benchmark.") | |
parser.add_argument("--port", type=int, required=True, help="Port of the server to benchmark.") | |
parser.add_argument("--model", required=True, help="Model to benchmark, e.g., codellama/CodeLlama-7b-hf.") | |
parser.add_argument("--dataset", required=True, choices=["humaneval", "mbpp"], help="EvalPlus dataset to use.") | |
parser.add_argument( | |
"--request-rate", | |
type=float, | |
required=True, | |
help="Poisson process rate for request arrival times. If this is inf, all requests are sent at time 0.", | |
) | |
parser.add_argument( | |
"--benchmark-name", | |
required=True, | |
help="Name of the benchmark. Result files will be written to paths derived from this.", | |
) | |
parser.add_argument("--seed", type=int, default=0) | |
parser.add_argument("--power-limit", type=int, required=True, help="Not used but passed in in order to save to results file.") | |
parser.add_argument("--max-num-seqs", type=int, required=True, help="Not used but passed in in order to save to results file.") | |
parser.add_argument("--data-dup-factor", type=int, default=1, help="How many times to repeat the ShareGPT dataset to generate more requests.") | |
args = parser.parse_args() | |
main(args) | |