# test time it takes to make 100 concurrent embedding requests to OpenaI | |
import sys, os | |
import traceback | |
from dotenv import load_dotenv | |
load_dotenv() | |
import os, io | |
sys.path.insert( | |
0, os.path.abspath("../..") | |
) # Adds the parent directory to the system path | |
import pytest | |
import litellm | |
litellm.set_verbose=False | |
question = "embed this very long text" * 100 | |
# make X concurrent calls to litellm.completion(model=gpt-35-turbo, messages=[]), pick a random question in questions array. | |
# Allow me to tune X concurrent calls.. Log question, output/exception, response time somewhere | |
# show me a summary of requests made, success full calls, failed calls. For failed calls show me the exceptions | |
import concurrent.futures | |
import random | |
import time | |
# Function to make concurrent calls to OpenAI API | |
def make_openai_completion(question): | |
try: | |
start_time = time.time() | |
import openai | |
client = openai.OpenAI(api_key=os.environ['OPENAI_API_KEY'], base_url="http://0.0.0.0:8000") #base_url="http://0.0.0.0:8000", | |
response = client.embeddings.create( | |
model="text-embedding-ada-002", | |
input=[question], | |
) | |
print(response) | |
end_time = time.time() | |
# Log the request details | |
# with open("request_log.txt", "a") as log_file: | |
# log_file.write( | |
# f"Question: {question[:100]}\nResponse ID:{response.id} Content:{response.choices[0].message.content[:10]}\nTime: {end_time - start_time:.2f} seconds\n\n" | |
# ) | |
return response | |
except Exception as e: | |
# Log exceptions for failed calls | |
# with open("error_log.txt", "a") as error_log_file: | |
# error_log_file.write( | |
# f"\nException: {str(e)}\n\n" | |
# ) | |
return None | |
start_time = time.time() | |
# Number of concurrent calls (you can adjust this) | |
concurrent_calls = 500 | |
# List to store the futures of concurrent calls | |
futures = [] | |
# Make concurrent calls | |
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_calls) as executor: | |
for _ in range(concurrent_calls): | |
futures.append(executor.submit(make_openai_completion, question)) | |
# Wait for all futures to complete | |
concurrent.futures.wait(futures) | |
# Summarize the results | |
successful_calls = 0 | |
failed_calls = 0 | |
for future in futures: | |
if future.result() is not None: | |
successful_calls += 1 | |
else: | |
failed_calls += 1 | |
end_time = time.time() | |
# Calculate the duration | |
duration = end_time - start_time | |
print(f"Load test Summary:") | |
print(f"Total Requests: {concurrent_calls}") | |
print(f"Successful Calls: {successful_calls}") | |
print(f"Failed Calls: {failed_calls}") | |
print(f"Total Time: {duration:.2f} seconds") | |
# # Display content of the logs | |
# with open("request_log.txt", "r") as log_file: | |
# print("\nRequest Log:\n", log_file.read()) | |
# with open("error_log.txt", "r") as error_log_file: | |
# print("\nError Log:\n", error_log_file.read()) | |