|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os, openai, sys, json, inspect, uuid, datetime, threading |
|
from typing import Any |
|
from functools import partial |
|
import dotenv, traceback, random, asyncio, time, contextvars |
|
from copy import deepcopy |
|
import httpx |
|
import litellm |
|
from litellm import ( |
|
client, |
|
exception_type, |
|
get_optional_params, |
|
get_litellm_params, |
|
Logging, |
|
) |
|
from litellm.utils import ( |
|
get_secret, |
|
CustomStreamWrapper, |
|
read_config_args, |
|
completion_with_fallbacks, |
|
get_llm_provider, |
|
get_api_key, |
|
mock_completion_streaming_obj, |
|
convert_to_model_response_object, |
|
token_counter, |
|
Usage |
|
) |
|
from .llms import ( |
|
anthropic, |
|
together_ai, |
|
ai21, |
|
sagemaker, |
|
bedrock, |
|
huggingface_restapi, |
|
replicate, |
|
aleph_alpha, |
|
nlp_cloud, |
|
baseten, |
|
vllm, |
|
ollama, |
|
cohere, |
|
petals, |
|
oobabooga, |
|
palm, |
|
vertex_ai, |
|
maritalk) |
|
from .llms.openai import OpenAIChatCompletion, OpenAITextCompletion |
|
from .llms.azure import AzureChatCompletion |
|
from .llms.huggingface_restapi import Huggingface |
|
from .llms.prompt_templates.factory import prompt_factory, custom_prompt, function_call_prompt |
|
import tiktoken |
|
from concurrent.futures import ThreadPoolExecutor |
|
from typing import Callable, List, Optional, Dict, Union, Mapping |
|
|
|
encoding = tiktoken.get_encoding("cl100k_base") |
|
from litellm.utils import ( |
|
get_secret, |
|
CustomStreamWrapper, |
|
TextCompletionStreamWrapper, |
|
ModelResponse, |
|
TextCompletionResponse, |
|
TextChoices, |
|
EmbeddingResponse, |
|
read_config_args, |
|
Choices, |
|
Message |
|
) |
|
|
|
|
|
dotenv.load_dotenv() |
|
openai_chat_completions = OpenAIChatCompletion() |
|
openai_text_completions = OpenAITextCompletion() |
|
azure_chat_completions = AzureChatCompletion() |
|
huggingface = Huggingface() |
|
|
|
|
|
class LiteLLM: |
|
|
|
def __init__(self, *, |
|
api_key=None, |
|
organization: Optional[str] = None, |
|
base_url: Optional[str]= None, |
|
timeout: Optional[float] = 600, |
|
max_retries: Optional[int] = litellm.num_retries, |
|
default_headers: Optional[Mapping[str, str]] = None,): |
|
self.params = locals() |
|
self.chat = Chat(self.params) |
|
|
|
class Chat(): |
|
|
|
def __init__(self, params): |
|
self.params = params |
|
self.completions = Completions(self.params) |
|
|
|
class Completions(): |
|
|
|
def __init__(self, params): |
|
self.params = params |
|
|
|
def create(self, model, messages, **kwargs): |
|
for k, v in kwargs.items(): |
|
self.params[k] = v |
|
response = completion(model=model, messages=messages, **self.params) |
|
return response |
|
|
|
@client |
|
async def acompletion(*args, **kwargs): |
|
""" |
|
Asynchronously executes a litellm.completion() call for any of litellm supported llms (example gpt-4, gpt-3.5-turbo, claude-2, command-nightly) |
|
|
|
Parameters: |
|
model (str): The name of the language model to use for text completion. see all supported LLMs: https://docs.litellm.ai/docs/providers/ |
|
messages (List): A list of message objects representing the conversation context (default is an empty list). |
|
|
|
OPTIONAL PARAMS |
|
functions (List, optional): A list of functions to apply to the conversation messages (default is an empty list). |
|
function_call (str, optional): The name of the function to call within the conversation (default is an empty string). |
|
temperature (float, optional): The temperature parameter for controlling the randomness of the output (default is 1.0). |
|
top_p (float, optional): The top-p parameter for nucleus sampling (default is 1.0). |
|
n (int, optional): The number of completions to generate (default is 1). |
|
stream (bool, optional): If True, return a streaming response (default is False). |
|
stop(string/list, optional): - Up to 4 sequences where the LLM API will stop generating further tokens. |
|
max_tokens (integer, optional): The maximum number of tokens in the generated completion (default is infinity). |
|
presence_penalty (float, optional): It is used to penalize new tokens based on their existence in the text so far. |
|
frequency_penalty: It is used to penalize new tokens based on their frequency in the text so far. |
|
logit_bias (dict, optional): Used to modify the probability of specific tokens appearing in the completion. |
|
user (str, optional): A unique identifier representing your end-user. This can help the LLM provider to monitor and detect abuse. |
|
metadata (dict, optional): Pass in additional metadata to tag your completion calls - eg. prompt version, details, etc. |
|
api_base (str, optional): Base URL for the API (default is None). |
|
api_version (str, optional): API version (default is None). |
|
api_key (str, optional): API key (default is None). |
|
model_list (list, optional): List of api base, version, keys |
|
|
|
LITELLM Specific Params |
|
mock_response (str, optional): If provided, return a mock completion response for testing or debugging purposes (default is None). |
|
force_timeout (int, optional): The maximum execution time in seconds for the completion request (default is 600). |
|
custom_llm_provider (str, optional): Used for Non-OpenAI LLMs, Example usage for bedrock, set model="amazon.titan-tg1-large" and custom_llm_provider="bedrock" |
|
Returns: |
|
ModelResponse: A response object containing the generated completion and associated metadata. |
|
|
|
Notes: |
|
- This function is an asynchronous version of the `completion` function. |
|
- The `completion` function is called using `run_in_executor` to execute synchronously in the event loop. |
|
- If `stream` is True, the function returns an async generator that yields completion lines. |
|
""" |
|
loop = asyncio.get_event_loop() |
|
model = args[0] if len(args) > 0 else kwargs["model"] |
|
|
|
kwargs["acompletion"] = True |
|
custom_llm_provider = None |
|
try: |
|
|
|
func = partial(completion, *args, **kwargs) |
|
|
|
|
|
ctx = contextvars.copy_context() |
|
func_with_context = partial(ctx.run, func) |
|
|
|
_, custom_llm_provider, _, _ = get_llm_provider(model=model, api_base=kwargs.get("api_base", None)) |
|
|
|
if (custom_llm_provider == "openai" |
|
or custom_llm_provider == "azure" |
|
or custom_llm_provider == "custom_openai" |
|
or custom_llm_provider == "anyscale" |
|
or custom_llm_provider == "openrouter" |
|
or custom_llm_provider == "deepinfra" |
|
or custom_llm_provider == "perplexity" |
|
or custom_llm_provider == "text-completion-openai" |
|
or custom_llm_provider == "huggingface"): |
|
if kwargs.get("stream", False): |
|
response = completion(*args, **kwargs) |
|
else: |
|
|
|
init_response = await loop.run_in_executor(None, func_with_context) |
|
if isinstance(init_response, dict) or isinstance(init_response, ModelResponse): |
|
response = init_response |
|
elif asyncio.iscoroutine(init_response): |
|
response = await init_response |
|
else: |
|
|
|
response = await loop.run_in_executor(None, func_with_context) |
|
if kwargs.get("stream", False): |
|
return _async_streaming(response=response, model=model, custom_llm_provider=custom_llm_provider, args=args) |
|
else: |
|
return response |
|
except Exception as e: |
|
custom_llm_provider = custom_llm_provider or "openai" |
|
raise exception_type( |
|
model=model, custom_llm_provider=custom_llm_provider, original_exception=e, completion_kwargs=args, |
|
) |
|
|
|
async def _async_streaming(response, model, custom_llm_provider, args): |
|
try: |
|
async for line in response: |
|
yield line |
|
except Exception as e: |
|
raise exception_type( |
|
model=model, custom_llm_provider=custom_llm_provider, original_exception=e, completion_kwargs=args, |
|
) |
|
|
|
def mock_completion(model: str, messages: List, stream: Optional[bool] = False, mock_response: str = "This is a mock request", **kwargs): |
|
""" |
|
Generate a mock completion response for testing or debugging purposes. |
|
|
|
This is a helper function that simulates the response structure of the OpenAI completion API. |
|
|
|
Parameters: |
|
model (str): The name of the language model for which the mock response is generated. |
|
messages (List): A list of message objects representing the conversation context. |
|
stream (bool, optional): If True, returns a mock streaming response (default is False). |
|
mock_response (str, optional): The content of the mock response (default is "This is a mock request"). |
|
**kwargs: Additional keyword arguments that can be used but are not required. |
|
|
|
Returns: |
|
litellm.ModelResponse: A ModelResponse simulating a completion response with the specified model, messages, and mock response. |
|
|
|
Raises: |
|
Exception: If an error occurs during the generation of the mock completion response. |
|
|
|
Note: |
|
- This function is intended for testing or debugging purposes to generate mock completion responses. |
|
- If 'stream' is True, it returns a response that mimics the behavior of a streaming completion. |
|
""" |
|
try: |
|
model_response = ModelResponse(stream=stream) |
|
if stream is True: |
|
|
|
response = mock_completion_streaming_obj(model_response, mock_response=mock_response, model=model) |
|
return response |
|
|
|
model_response["choices"][0]["message"]["content"] = mock_response |
|
model_response["created"] = int(time.time()) |
|
model_response["model"] = model |
|
return model_response |
|
|
|
except: |
|
traceback.print_exc() |
|
raise Exception("Mock completion response failed") |
|
|
|
@client |
|
def completion( |
|
model: str, |
|
|
|
messages: List = [], |
|
functions: List = [], |
|
function_call: str = "", |
|
timeout: Optional[Union[float, int]] = None, |
|
temperature: Optional[float] = None, |
|
top_p: Optional[float] = None, |
|
n: Optional[int] = None, |
|
stream: Optional[bool] = None, |
|
stop=None, |
|
max_tokens: Optional[float] = None, |
|
presence_penalty: Optional[float] = None, |
|
frequency_penalty: Optional[float]=None, |
|
logit_bias: Optional[dict] = None, |
|
user: Optional[str] = None, |
|
|
|
response_format: Optional[dict] = None, |
|
seed: Optional[int] = None, |
|
tools: Optional[List] = None, |
|
tool_choice: Optional[str] = None, |
|
deployment_id = None, |
|
|
|
base_url: Optional[str] = None, |
|
api_version: Optional[str] = None, |
|
api_key: Optional[str] = None, |
|
model_list: Optional[list] = None, |
|
|
|
|
|
**kwargs, |
|
) -> ModelResponse: |
|
""" |
|
Perform a completion() using any of litellm supported llms (example gpt-4, gpt-3.5-turbo, claude-2, command-nightly) |
|
Parameters: |
|
model (str): The name of the language model to use for text completion. see all supported LLMs: https://docs.litellm.ai/docs/providers/ |
|
messages (List): A list of message objects representing the conversation context (default is an empty list). |
|
|
|
OPTIONAL PARAMS |
|
functions (List, optional): A list of functions to apply to the conversation messages (default is an empty list). |
|
function_call (str, optional): The name of the function to call within the conversation (default is an empty string). |
|
temperature (float, optional): The temperature parameter for controlling the randomness of the output (default is 1.0). |
|
top_p (float, optional): The top-p parameter for nucleus sampling (default is 1.0). |
|
n (int, optional): The number of completions to generate (default is 1). |
|
stream (bool, optional): If True, return a streaming response (default is False). |
|
stop(string/list, optional): - Up to 4 sequences where the LLM API will stop generating further tokens. |
|
max_tokens (integer, optional): The maximum number of tokens in the generated completion (default is infinity). |
|
presence_penalty (float, optional): It is used to penalize new tokens based on their existence in the text so far. |
|
frequency_penalty: It is used to penalize new tokens based on their frequency in the text so far. |
|
logit_bias (dict, optional): Used to modify the probability of specific tokens appearing in the completion. |
|
user (str, optional): A unique identifier representing your end-user. This can help the LLM provider to monitor and detect abuse. |
|
metadata (dict, optional): Pass in additional metadata to tag your completion calls - eg. prompt version, details, etc. |
|
api_base (str, optional): Base URL for the API (default is None). |
|
api_version (str, optional): API version (default is None). |
|
api_key (str, optional): API key (default is None). |
|
model_list (list, optional): List of api base, version, keys |
|
|
|
LITELLM Specific Params |
|
mock_response (str, optional): If provided, return a mock completion response for testing or debugging purposes (default is None). |
|
custom_llm_provider (str, optional): Used for Non-OpenAI LLMs, Example usage for bedrock, set model="amazon.titan-tg1-large" and custom_llm_provider="bedrock" |
|
max_retries (int, optional): The number of retries to attempt (default is 0). |
|
Returns: |
|
ModelResponse: A response object containing the generated completion and associated metadata. |
|
|
|
Note: |
|
- This function is used to perform completions() using the specified language model. |
|
- It supports various optional parameters for customizing the completion behavior. |
|
- If 'mock_response' is provided, a mock completion response is returned for testing or debugging. |
|
""" |
|
|
|
args = locals() |
|
api_base = kwargs.get('api_base', None) |
|
return_async = kwargs.get('return_async', False) |
|
mock_response = kwargs.get('mock_response', None) |
|
force_timeout= kwargs.get('force_timeout', 600) |
|
logger_fn = kwargs.get('logger_fn', None) |
|
verbose = kwargs.get('verbose', False) |
|
custom_llm_provider = kwargs.get('custom_llm_provider', None) |
|
litellm_logging_obj = kwargs.get('litellm_logging_obj', None) |
|
id = kwargs.get('id', None) |
|
metadata = kwargs.get('metadata', None) |
|
fallbacks = kwargs.get('fallbacks', None) |
|
headers = kwargs.get("headers", None) |
|
num_retries = kwargs.get("num_retries", None) |
|
max_retries = kwargs.get("max_retries", None) |
|
context_window_fallback_dict = kwargs.get("context_window_fallback_dict", None) |
|
|
|
input_cost_per_token = kwargs.get("input_cost_per_token", None) |
|
output_cost_per_token = kwargs.get("output_cost_per_token", None) |
|
|
|
initial_prompt_value = kwargs.get("initial_prompt_value", None) |
|
roles = kwargs.get("roles", None) |
|
final_prompt_value = kwargs.get("final_prompt_value", None) |
|
bos_token = kwargs.get("bos_token", None) |
|
eos_token = kwargs.get("eos_token", None) |
|
acompletion = kwargs.get("acompletion", False) |
|
client = kwargs.get("client", None) |
|
|
|
openai_params = ["functions", "function_call", "temperature", "temperature", "top_p", "n", "stream", "stop", "max_tokens", "presence_penalty", "frequency_penalty", "logit_bias", "user", "request_timeout", "api_base", "api_version", "api_key", "deployment_id", "organization", "base_url", "default_headers", "timeout", "response_format", "seed", "tools", "tool_choice", "max_retries"] |
|
litellm_params = ["metadata", "acompletion", "caching", "return_async", "mock_response", "api_key", "api_version", "api_base", "force_timeout", "logger_fn", "verbose", "custom_llm_provider", "litellm_logging_obj", "litellm_call_id", "use_client", "id", "fallbacks", "azure", "headers", "model_list", "num_retries", "context_window_fallback_dict", "roles", "final_prompt_value", "bos_token", "eos_token", "request_timeout", "complete_response", "self", "client", "rpm", "tpm", "input_cost_per_token", "output_cost_per_token"] |
|
default_params = openai_params + litellm_params |
|
non_default_params = {k: v for k,v in kwargs.items() if k not in default_params} |
|
if mock_response: |
|
return mock_completion(model, messages, stream=stream, mock_response=mock_response) |
|
if timeout is None: |
|
timeout = kwargs.get("request_timeout", None) or 600 |
|
timeout = float(timeout) |
|
try: |
|
if base_url is not None: |
|
api_base = base_url |
|
if max_retries is not None: |
|
num_retries = max_retries |
|
logging = litellm_logging_obj |
|
fallbacks = ( |
|
fallbacks |
|
or litellm.model_fallbacks |
|
) |
|
if fallbacks is not None: |
|
return completion_with_fallbacks(**args) |
|
if model_list is not None: |
|
deployments = [m["litellm_params"] for m in model_list if m["model_name"] == model] |
|
return batch_completion_models(deployments=deployments, **args) |
|
if litellm.model_alias_map and model in litellm.model_alias_map: |
|
model = litellm.model_alias_map[ |
|
model |
|
] |
|
model_response = ModelResponse() |
|
if kwargs.get('azure', False) == True: |
|
custom_llm_provider="azure" |
|
if deployment_id != None: |
|
model=deployment_id |
|
custom_llm_provider="azure" |
|
model, custom_llm_provider, dynamic_api_key, api_base = get_llm_provider(model=model, custom_llm_provider=custom_llm_provider, api_base=api_base, api_key=api_key) |
|
|
|
|
|
if input_cost_per_token is not None and output_cost_per_token is not None: |
|
litellm.register_model({ |
|
model: { |
|
"input_cost_per_token": input_cost_per_token, |
|
"output_cost_per_token": output_cost_per_token, |
|
"litellm_provider": custom_llm_provider |
|
} |
|
}) |
|
|
|
custom_prompt_dict = {} |
|
if initial_prompt_value or roles or final_prompt_value or bos_token or eos_token: |
|
custom_prompt_dict = {model: {}} |
|
if initial_prompt_value: |
|
custom_prompt_dict[model]["initial_prompt_value"] = initial_prompt_value |
|
if roles: |
|
custom_prompt_dict[model]["roles"] = roles |
|
if final_prompt_value: |
|
custom_prompt_dict[model]["final_prompt_value"] = final_prompt_value |
|
if bos_token: |
|
custom_prompt_dict[model]["bos_token"] = bos_token |
|
if eos_token: |
|
custom_prompt_dict[model]["eos_token"] = eos_token |
|
model_api_key = get_api_key(llm_provider=custom_llm_provider, dynamic_api_key=api_key) |
|
if model_api_key and "sk-litellm" in model_api_key: |
|
api_base = "https://proxy.litellm.ai" |
|
custom_llm_provider = "openai" |
|
api_key = model_api_key |
|
|
|
if dynamic_api_key is not None: |
|
api_key = dynamic_api_key |
|
|
|
optional_params = get_optional_params( |
|
functions=functions, |
|
function_call=function_call, |
|
temperature=temperature, |
|
top_p=top_p, |
|
n=n, |
|
stream=stream, |
|
stop=stop, |
|
max_tokens=max_tokens, |
|
presence_penalty=presence_penalty, |
|
frequency_penalty=frequency_penalty, |
|
logit_bias=logit_bias, |
|
user=user, |
|
|
|
model=model, |
|
custom_llm_provider=custom_llm_provider, |
|
response_format=response_format, |
|
seed=seed, |
|
tools=tools, |
|
tool_choice=tool_choice, |
|
max_retries=max_retries, |
|
**non_default_params |
|
) |
|
|
|
if litellm.add_function_to_prompt and optional_params.get("functions_unsupported_model", None): |
|
functions_unsupported_model = optional_params.pop("functions_unsupported_model") |
|
messages = function_call_prompt(messages=messages, functions=functions_unsupported_model) |
|
|
|
|
|
litellm_params = get_litellm_params( |
|
return_async=return_async, |
|
api_key=api_key, |
|
force_timeout=force_timeout, |
|
logger_fn=logger_fn, |
|
verbose=verbose, |
|
custom_llm_provider=custom_llm_provider, |
|
api_base=api_base, |
|
litellm_call_id=kwargs.get('litellm_call_id', None), |
|
model_alias_map=litellm.model_alias_map, |
|
completion_call_id=id, |
|
metadata=metadata |
|
) |
|
logging.update_environment_variables(model=model, user=user, optional_params=optional_params, litellm_params=litellm_params) |
|
if custom_llm_provider == "azure": |
|
|
|
api_type = get_secret("AZURE_API_TYPE") or "azure" |
|
|
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("AZURE_API_BASE") |
|
) |
|
|
|
api_version = ( |
|
api_version or |
|
litellm.api_version or |
|
get_secret("AZURE_API_VERSION") |
|
) |
|
|
|
api_key = ( |
|
api_key or |
|
litellm.api_key or |
|
litellm.azure_key or |
|
get_secret("AZURE_OPENAI_API_KEY") or |
|
get_secret("AZURE_API_KEY") |
|
) |
|
|
|
azure_ad_token = ( |
|
optional_params.pop("azure_ad_token", None) or |
|
get_secret("AZURE_AD_TOKEN") |
|
) |
|
|
|
headers = ( |
|
headers or |
|
litellm.headers |
|
) |
|
|
|
|
|
config=litellm.AzureOpenAIConfig.get_config() |
|
for k, v in config.items(): |
|
if k not in optional_params: |
|
optional_params[k] = v |
|
|
|
|
|
response = azure_chat_completions.completion( |
|
model=model, |
|
messages=messages, |
|
headers=headers, |
|
api_key=api_key, |
|
api_base=api_base, |
|
api_version=api_version, |
|
api_type=api_type, |
|
azure_ad_token=azure_ad_token, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
logging_obj=logging, |
|
acompletion=acompletion, |
|
timeout=timeout, |
|
client=client |
|
) |
|
|
|
|
|
logging.post_call( |
|
input=messages, |
|
api_key=api_key, |
|
original_response=response, |
|
additional_args={ |
|
"headers": headers, |
|
"api_version": api_version, |
|
"api_base": api_base, |
|
}, |
|
) |
|
elif ( |
|
model in litellm.open_ai_chat_completion_models |
|
or custom_llm_provider == "custom_openai" |
|
or custom_llm_provider == "deepinfra" |
|
or custom_llm_provider == "perplexity" |
|
or custom_llm_provider == "anyscale" |
|
or custom_llm_provider == "openai" |
|
or "ft:gpt-3.5-turbo" in model |
|
): |
|
|
|
|
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("OPENAI_API_BASE") |
|
or "https://api.openai.com/v1" |
|
) |
|
openai.organization = ( |
|
litellm.organization |
|
or get_secret("OPENAI_ORGANIZATION") |
|
or None |
|
) |
|
|
|
api_key = ( |
|
api_key or |
|
litellm.api_key or |
|
litellm.openai_key or |
|
get_secret("OPENAI_API_KEY") |
|
) |
|
|
|
headers = ( |
|
headers or |
|
litellm.headers |
|
) |
|
|
|
|
|
config=litellm.OpenAIConfig.get_config() |
|
for k, v in config.items(): |
|
if k not in optional_params: |
|
optional_params[k] = v |
|
|
|
|
|
try: |
|
response = openai_chat_completions.completion( |
|
model=model, |
|
messages=messages, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
api_key=api_key, |
|
api_base=api_base, |
|
acompletion=acompletion, |
|
logging_obj=logging, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
timeout=timeout, |
|
custom_prompt_dict=custom_prompt_dict, |
|
client=client |
|
) |
|
except Exception as e: |
|
|
|
logging.post_call( |
|
input=messages, |
|
api_key=api_key, |
|
original_response=str(e), |
|
additional_args={"headers": headers}, |
|
) |
|
raise e |
|
|
|
|
|
logging.post_call( |
|
input=messages, |
|
api_key=api_key, |
|
original_response=response, |
|
additional_args={"headers": headers}, |
|
) |
|
elif ( |
|
custom_llm_provider == "text-completion-openai" |
|
or "ft:babbage-002" in model |
|
or "ft:davinci-002" in model |
|
): |
|
|
|
openai.api_type = "openai" |
|
|
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("OPENAI_API_BASE") |
|
or "https://api.openai.com/v1" |
|
) |
|
|
|
openai.api_version = None |
|
|
|
|
|
api_key = ( |
|
api_key or |
|
litellm.api_key or |
|
litellm.openai_key or |
|
get_secret("OPENAI_API_KEY") |
|
) |
|
|
|
headers = ( |
|
headers or |
|
litellm.headers |
|
) |
|
|
|
|
|
config=litellm.OpenAITextCompletionConfig.get_config() |
|
for k, v in config.items(): |
|
if k not in optional_params: |
|
optional_params[k] = v |
|
if litellm.organization: |
|
openai.organization = litellm.organization |
|
|
|
if len(messages)>0 and "content" in messages[0] and type(messages[0]["content"]) == list: |
|
|
|
|
|
prompt = messages[0]["content"] |
|
else: |
|
prompt = " ".join([message["content"] for message in messages]) |
|
|
|
logging.pre_call( |
|
input=prompt, |
|
api_key=api_key, |
|
additional_args={ |
|
"openai_organization": litellm.organization, |
|
"headers": headers, |
|
"api_base": api_base, |
|
"api_type": openai.api_type, |
|
}, |
|
) |
|
|
|
model_response = openai_text_completions.completion( |
|
model=model, |
|
messages=messages, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
api_key=api_key, |
|
api_base=api_base, |
|
acompletion=acompletion, |
|
logging_obj=logging, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn |
|
) |
|
|
|
|
|
|
|
|
|
response = model_response |
|
elif ( |
|
"replicate" in model or |
|
custom_llm_provider == "replicate" or |
|
model in litellm.replicate_models |
|
): |
|
|
|
replicate_key = None |
|
replicate_key = ( |
|
api_key |
|
or litellm.replicate_key |
|
or litellm.api_key |
|
or get_secret("REPLICATE_API_KEY") |
|
or get_secret("REPLICATE_API_TOKEN") |
|
) |
|
|
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("REPLICATE_API_BASE") |
|
or "https://api.replicate.com/v1" |
|
) |
|
|
|
custom_prompt_dict = ( |
|
custom_prompt_dict |
|
or litellm.custom_prompt_dict |
|
) |
|
|
|
model_response = replicate.completion( |
|
model=model, |
|
messages=messages, |
|
api_base=api_base, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
api_key=replicate_key, |
|
logging_obj=logging, |
|
custom_prompt_dict=custom_prompt_dict |
|
) |
|
if "stream" in optional_params and optional_params["stream"] == True: |
|
|
|
response = CustomStreamWrapper(model_response, model, logging_obj=logging, custom_llm_provider="replicate") |
|
return response |
|
response = model_response |
|
|
|
elif custom_llm_provider=="anthropic": |
|
api_key = ( |
|
api_key |
|
or litellm.anthropic_key |
|
or litellm.api_key |
|
or os.environ.get("ANTHROPIC_API_KEY") |
|
) |
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("ANTHROPIC_API_BASE") |
|
or "https://api.anthropic.com/v1/complete" |
|
) |
|
custom_prompt_dict = ( |
|
custom_prompt_dict |
|
or litellm.custom_prompt_dict |
|
) |
|
model_response = anthropic.completion( |
|
model=model, |
|
messages=messages, |
|
api_base=api_base, |
|
custom_prompt_dict=litellm.custom_prompt_dict, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
api_key=api_key, |
|
logging_obj=logging, |
|
) |
|
if "stream" in optional_params and optional_params["stream"] == True: |
|
|
|
response = CustomStreamWrapper(model_response, model, custom_llm_provider="anthropic", logging_obj=logging) |
|
return response |
|
response = model_response |
|
elif custom_llm_provider == "nlp_cloud": |
|
nlp_cloud_key = ( |
|
api_key or litellm.nlp_cloud_key or get_secret("NLP_CLOUD_API_KEY") or litellm.api_key |
|
) |
|
|
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("NLP_CLOUD_API_BASE") |
|
or "https://api.nlpcloud.io/v1/gpu/" |
|
) |
|
|
|
model_response = nlp_cloud.completion( |
|
model=model, |
|
messages=messages, |
|
api_base=api_base, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
api_key=nlp_cloud_key, |
|
logging_obj=logging |
|
) |
|
|
|
if "stream" in optional_params and optional_params["stream"] == True: |
|
|
|
response = CustomStreamWrapper(model_response, model, custom_llm_provider="nlp_cloud", logging_obj=logging) |
|
return response |
|
response = model_response |
|
elif custom_llm_provider == "aleph_alpha": |
|
aleph_alpha_key = ( |
|
api_key or litellm.aleph_alpha_key or get_secret("ALEPH_ALPHA_API_KEY") or get_secret("ALEPHALPHA_API_KEY") or litellm.api_key |
|
) |
|
|
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("ALEPH_ALPHA_API_BASE") |
|
or "https://api.aleph-alpha.com/complete" |
|
) |
|
|
|
model_response = aleph_alpha.completion( |
|
model=model, |
|
messages=messages, |
|
api_base=api_base, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
default_max_tokens_to_sample=litellm.max_tokens, |
|
api_key=aleph_alpha_key, |
|
logging_obj=logging |
|
) |
|
|
|
if "stream" in optional_params and optional_params["stream"] == True: |
|
|
|
response = CustomStreamWrapper(model_response, model, custom_llm_provider="aleph_alpha", logging_obj=logging) |
|
return response |
|
response = model_response |
|
elif custom_llm_provider == "cohere": |
|
cohere_key = ( |
|
api_key |
|
or litellm.cohere_key |
|
or get_secret("COHERE_API_KEY") |
|
or get_secret("CO_API_KEY") |
|
or litellm.api_key |
|
) |
|
|
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("COHERE_API_BASE") |
|
or "https://api.cohere.ai/v1/generate" |
|
) |
|
|
|
model_response = cohere.completion( |
|
model=model, |
|
messages=messages, |
|
api_base=api_base, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
api_key=cohere_key, |
|
logging_obj=logging |
|
) |
|
|
|
if "stream" in optional_params and optional_params["stream"] == True: |
|
|
|
response = CustomStreamWrapper(model_response, model, custom_llm_provider="cohere", logging_obj=logging) |
|
return response |
|
response = model_response |
|
elif custom_llm_provider == "maritalk": |
|
maritalk_key = ( |
|
api_key |
|
or litellm.maritalk_key |
|
or get_secret("MARITALK_API_KEY") |
|
or litellm.api_key |
|
) |
|
|
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("MARITALK_API_BASE") |
|
or "https://chat.maritaca.ai/api/chat/inference" |
|
) |
|
|
|
model_response = maritalk.completion( |
|
model=model, |
|
messages=messages, |
|
api_base=api_base, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
api_key=maritalk_key, |
|
logging_obj=logging |
|
) |
|
|
|
if "stream" in optional_params and optional_params["stream"] == True: |
|
|
|
response = CustomStreamWrapper(model_response, model, custom_llm_provider="maritalk", logging_obj=logging) |
|
return response |
|
response = model_response |
|
elif ( |
|
custom_llm_provider == "huggingface" |
|
): |
|
custom_llm_provider = "huggingface" |
|
huggingface_key = ( |
|
api_key |
|
or litellm.huggingface_key |
|
or os.environ.get("HF_TOKEN") |
|
or os.environ.get("HUGGINGFACE_API_KEY") |
|
or litellm.api_key |
|
) |
|
hf_headers = ( |
|
headers |
|
or litellm.headers |
|
) |
|
|
|
custom_prompt_dict = ( |
|
custom_prompt_dict |
|
or litellm.custom_prompt_dict |
|
) |
|
model_response = huggingface.completion( |
|
model=model, |
|
messages=messages, |
|
api_base=api_base, |
|
headers=hf_headers, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
api_key=huggingface_key, |
|
acompletion=acompletion, |
|
logging_obj=logging, |
|
custom_prompt_dict=custom_prompt_dict |
|
) |
|
if "stream" in optional_params and optional_params["stream"] == True and acompletion is False: |
|
|
|
response = CustomStreamWrapper( |
|
model_response, model, custom_llm_provider="huggingface", logging_obj=logging |
|
) |
|
return response |
|
response = model_response |
|
elif custom_llm_provider == "oobabooga": |
|
custom_llm_provider = "oobabooga" |
|
model_response = oobabooga.completion( |
|
model=model, |
|
messages=messages, |
|
model_response=model_response, |
|
api_base=api_base, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
api_key=None, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
logging_obj=logging |
|
) |
|
if "stream" in optional_params and optional_params["stream"] == True: |
|
|
|
response = CustomStreamWrapper( |
|
model_response, model, custom_llm_provider="oobabooga", logging_obj=logging |
|
) |
|
return response |
|
response = model_response |
|
elif custom_llm_provider == "openrouter": |
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or "https://openrouter.ai/api/v1" |
|
) |
|
|
|
api_key = ( |
|
api_key or |
|
litellm.api_key or |
|
litellm.openrouter_key or |
|
get_secret("OPENROUTER_API_KEY") or |
|
get_secret("OR_API_KEY") |
|
) |
|
|
|
openrouter_site_url = ( |
|
get_secret("OR_SITE_URL") |
|
or "https://litellm.ai" |
|
) |
|
|
|
openrouter_app_name = ( |
|
get_secret("OR_APP_NAME") |
|
or "liteLLM" |
|
) |
|
|
|
headers = ( |
|
headers or |
|
litellm.headers or |
|
{ |
|
"HTTP-Referer": openrouter_site_url, |
|
"X-Title": openrouter_app_name, |
|
} |
|
) |
|
|
|
data = { |
|
"model": model, |
|
"messages": messages, |
|
**optional_params |
|
} |
|
|
|
logging.pre_call(input=messages, api_key=openai.api_key, additional_args={"complete_input_dict": data, "headers": headers}) |
|
|
|
|
|
|
|
response = openai_chat_completions.completion( |
|
model=model, |
|
messages=messages, |
|
headers=headers, |
|
api_key=api_key, |
|
api_base=api_base, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
logging_obj=logging, |
|
acompletion=acompletion, |
|
timeout=timeout |
|
) |
|
|
|
logging.post_call( |
|
input=messages, api_key=openai.api_key, original_response=response |
|
) |
|
elif custom_llm_provider == "together_ai" or ("togethercomputer" in model) or (model in litellm.together_ai_models): |
|
custom_llm_provider = "together_ai" |
|
together_ai_key = ( |
|
api_key |
|
or litellm.togetherai_api_key |
|
or get_secret("TOGETHER_AI_TOKEN") |
|
or get_secret("TOGETHERAI_API_KEY") |
|
or litellm.api_key |
|
) |
|
|
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("TOGETHERAI_API_BASE") |
|
or "https://api.together.xyz/inference" |
|
) |
|
|
|
custom_prompt_dict = ( |
|
custom_prompt_dict |
|
or litellm.custom_prompt_dict |
|
) |
|
|
|
model_response = together_ai.completion( |
|
model=model, |
|
messages=messages, |
|
api_base=api_base, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
api_key=together_ai_key, |
|
logging_obj=logging, |
|
custom_prompt_dict=custom_prompt_dict |
|
) |
|
if "stream_tokens" in optional_params and optional_params["stream_tokens"] == True: |
|
|
|
response = CustomStreamWrapper( |
|
model_response, model, custom_llm_provider="together_ai", logging_obj=logging |
|
) |
|
return response |
|
response = model_response |
|
elif custom_llm_provider == "palm": |
|
palm_api_key = ( |
|
api_key |
|
or get_secret("PALM_API_KEY") |
|
or litellm.api_key |
|
) |
|
|
|
|
|
model_response = palm.completion( |
|
model=model, |
|
messages=messages, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
api_key=palm_api_key, |
|
logging_obj=logging |
|
) |
|
|
|
if "stream" in optional_params and optional_params["stream"] == True: |
|
|
|
resp_string = model_response["choices"][0]["message"]["content"] |
|
response = CustomStreamWrapper( |
|
resp_string, model, custom_llm_provider="palm", logging_obj=logging |
|
) |
|
return response |
|
response = model_response |
|
elif model in litellm.vertex_chat_models or model in litellm.vertex_code_chat_models or model in litellm.vertex_text_models or model in litellm.vertex_code_text_models: |
|
vertex_ai_project = (litellm.vertex_project |
|
or get_secret("VERTEXAI_PROJECT")) |
|
vertex_ai_location = (litellm.vertex_location |
|
or get_secret("VERTEXAI_LOCATION")) |
|
|
|
model_response = vertex_ai.completion( |
|
model=model, |
|
messages=messages, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
vertex_location=vertex_ai_location, |
|
vertex_project=vertex_ai_project, |
|
logging_obj=logging |
|
) |
|
|
|
if "stream" in optional_params and optional_params["stream"] == True: |
|
response = CustomStreamWrapper( |
|
model_response, model, custom_llm_provider="vertex_ai", logging_obj=logging |
|
) |
|
return response |
|
response = model_response |
|
elif custom_llm_provider == "ai21": |
|
custom_llm_provider = "ai21" |
|
ai21_key = ( |
|
api_key |
|
or litellm.ai21_key |
|
or os.environ.get("AI21_API_KEY") |
|
or litellm.api_key |
|
) |
|
|
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("AI21_API_BASE") |
|
or "https://api.ai21.com/studio/v1/" |
|
) |
|
|
|
model_response = ai21.completion( |
|
model=model, |
|
messages=messages, |
|
api_base=api_base, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
api_key=ai21_key, |
|
logging_obj=logging |
|
) |
|
|
|
if "stream" in optional_params and optional_params["stream"] == True: |
|
|
|
response = CustomStreamWrapper( |
|
model_response, model, custom_llm_provider="ai21", logging_obj=logging |
|
) |
|
return response |
|
|
|
|
|
response = model_response |
|
elif custom_llm_provider == "sagemaker": |
|
|
|
model_response = sagemaker.completion( |
|
model=model, |
|
messages=messages, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
logging_obj=logging |
|
) |
|
if "stream" in optional_params and optional_params["stream"]==True: |
|
|
|
|
|
|
|
|
|
|
|
resp_string = model_response["choices"][0]["message"]["content"] |
|
response = CustomStreamWrapper( |
|
resp_string, model, custom_llm_provider="sagemaker", logging_obj=logging |
|
) |
|
return response |
|
|
|
|
|
response = model_response |
|
elif custom_llm_provider == "bedrock": |
|
|
|
custom_prompt_dict = ( |
|
custom_prompt_dict |
|
or litellm.custom_prompt_dict |
|
) |
|
model_response = bedrock.completion( |
|
model=model, |
|
messages=messages, |
|
custom_prompt_dict=litellm.custom_prompt_dict, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
logging_obj=logging, |
|
) |
|
|
|
|
|
if "stream" in optional_params and optional_params["stream"] == True: |
|
|
|
if "ai21" in model: |
|
response = CustomStreamWrapper( |
|
model_response, model, custom_llm_provider="bedrock", logging_obj=logging |
|
) |
|
else: |
|
response = CustomStreamWrapper( |
|
iter(model_response), model, custom_llm_provider="bedrock", logging_obj=logging |
|
) |
|
return response |
|
|
|
|
|
response = model_response |
|
elif custom_llm_provider == "vllm": |
|
model_response = vllm.completion( |
|
model=model, |
|
messages=messages, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
logging_obj=logging |
|
) |
|
|
|
if "stream" in optional_params and optional_params["stream"] == True: |
|
|
|
response = CustomStreamWrapper( |
|
model_response, model, custom_llm_provider="vllm", logging_obj=logging |
|
) |
|
return response |
|
|
|
|
|
response = model_response |
|
elif custom_llm_provider == "ollama": |
|
api_base = ( |
|
litellm.api_base or |
|
api_base or |
|
get_secret("OLLAMA_API_BASE") or |
|
"http://localhost:11434" |
|
|
|
) |
|
custom_prompt_dict = ( |
|
custom_prompt_dict |
|
or litellm.custom_prompt_dict |
|
) |
|
if model in custom_prompt_dict: |
|
|
|
model_prompt_details = custom_prompt_dict[model] |
|
prompt = custom_prompt( |
|
role_dict=model_prompt_details["roles"], |
|
initial_prompt_value=model_prompt_details["initial_prompt_value"], |
|
final_prompt_value=model_prompt_details["final_prompt_value"], |
|
messages=messages |
|
) |
|
else: |
|
prompt = prompt_factory(model=model, messages=messages, custom_llm_provider=custom_llm_provider) |
|
|
|
if kwargs.get('acompletion', False) == True: |
|
if optional_params.get("stream", False) == True: |
|
|
|
async_generator = ollama.async_get_ollama_response_stream(api_base, model, prompt, optional_params, logging_obj=logging) |
|
return async_generator |
|
|
|
generator = ollama.get_ollama_response_stream(api_base, model, prompt, optional_params, logging_obj=logging) |
|
if optional_params.get("stream", False) == True: |
|
|
|
response = CustomStreamWrapper( |
|
generator, model, custom_llm_provider="ollama", logging_obj=logging |
|
) |
|
return response |
|
else: |
|
response_string = "" |
|
for chunk in generator: |
|
response_string+=chunk['content'] |
|
|
|
|
|
model_response["choices"][0]["finish_reason"] = "stop" |
|
model_response["choices"][0]["message"]["content"] = response_string |
|
model_response["created"] = int(time.time()) |
|
model_response["model"] = "ollama/" + model |
|
prompt_tokens = len(encoding.encode(prompt)) |
|
completion_tokens = len(encoding.encode(response_string)) |
|
model_response["usage"] = Usage(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=prompt_tokens + completion_tokens) |
|
response = model_response |
|
elif ( |
|
custom_llm_provider == "baseten" |
|
or litellm.api_base == "https://app.baseten.co" |
|
): |
|
custom_llm_provider = "baseten" |
|
baseten_key = ( |
|
api_key or litellm.baseten_key or os.environ.get("BASETEN_API_KEY") or litellm.api_key |
|
) |
|
|
|
model_response = baseten.completion( |
|
model=model, |
|
messages=messages, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
api_key=baseten_key, |
|
logging_obj=logging |
|
) |
|
if inspect.isgenerator(model_response) or ("stream" in optional_params and optional_params["stream"] == True): |
|
|
|
response = CustomStreamWrapper( |
|
model_response, model, custom_llm_provider="baseten", logging_obj=logging |
|
) |
|
return response |
|
response = model_response |
|
elif ( |
|
custom_llm_provider == "petals" |
|
or model in litellm.petals_models |
|
): |
|
api_base = ( |
|
api_base or |
|
litellm.api_base |
|
) |
|
|
|
custom_llm_provider = "petals" |
|
stream = optional_params.pop("stream", False) |
|
model_response = petals.completion( |
|
model=model, |
|
messages=messages, |
|
api_base=api_base, |
|
model_response=model_response, |
|
print_verbose=print_verbose, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
logger_fn=logger_fn, |
|
encoding=encoding, |
|
logging_obj=logging |
|
) |
|
if stream==True: |
|
|
|
resp_string = model_response["choices"][0]["message"]["content"] |
|
response = CustomStreamWrapper( |
|
resp_string, model, custom_llm_provider="petals", logging_obj=logging |
|
) |
|
return response |
|
response = model_response |
|
elif ( |
|
custom_llm_provider == "custom" |
|
): |
|
import requests |
|
|
|
url = ( |
|
litellm.api_base or |
|
api_base or |
|
"" |
|
) |
|
if url == None or url == "": |
|
raise ValueError("api_base not set. Set api_base or litellm.api_base for custom endpoints") |
|
|
|
""" |
|
assume input to custom LLM api bases follow this format: |
|
resp = requests.post( |
|
api_base, |
|
json={ |
|
'model': 'meta-llama/Llama-2-13b-hf', # model name |
|
'params': { |
|
'prompt': ["The capital of France is P"], |
|
'max_tokens': 32, |
|
'temperature': 0.7, |
|
'top_p': 1.0, |
|
'top_k': 40, |
|
} |
|
} |
|
) |
|
|
|
""" |
|
prompt = " ".join([message["content"] for message in messages]) |
|
resp = requests.post(url, json={ |
|
'model': model, |
|
'params': { |
|
'prompt': [prompt], |
|
'max_tokens': max_tokens, |
|
'temperature': temperature, |
|
'top_p': top_p, |
|
'top_k': kwargs.get('top_k', 40), |
|
} |
|
}) |
|
response_json = resp.json() |
|
""" |
|
assume all responses from custom api_bases of this format: |
|
{ |
|
'data': [ |
|
{ |
|
'prompt': 'The capital of France is P', |
|
'output': ['The capital of France is PARIS.\nThe capital of France is PARIS.\nThe capital of France is PARIS.\nThe capital of France is PARIS.\nThe capital of France is PARIS.\nThe capital of France is PARIS.\nThe capital of France is PARIS.\nThe capital of France is PARIS.\nThe capital of France is PARIS.\nThe capital of France is PARIS.\nThe capital of France is PARIS.\nThe capital of France is PARIS.\nThe capital of France is PARIS.\nThe capital of France'], |
|
'params': {'temperature': 0.7, 'top_k': 40, 'top_p': 1}}], |
|
'message': 'ok' |
|
} |
|
] |
|
} |
|
""" |
|
string_response = response_json['data'][0]['output'][0] |
|
|
|
model_response["choices"][0]["message"]["content"] = string_response |
|
model_response["created"] = int(time.time()) |
|
model_response["model"] = model |
|
response = model_response |
|
else: |
|
raise ValueError( |
|
f"Unable to map your input to a model. Check your input - {args}" |
|
) |
|
return response |
|
except Exception as e: |
|
|
|
raise exception_type( |
|
model=model, custom_llm_provider=custom_llm_provider, original_exception=e, completion_kwargs=args, |
|
) |
|
|
|
|
|
def completion_with_retries(*args, **kwargs): |
|
""" |
|
Executes a litellm.completion() with 3 retries |
|
""" |
|
try: |
|
import tenacity |
|
except Exception as e: |
|
raise Exception(f"tenacity import failed please run `pip install tenacity`. Error{e}") |
|
|
|
num_retries = kwargs.pop("num_retries", 3) |
|
retry_strategy = kwargs.pop("retry_strategy", "constant_retry") |
|
original_function = kwargs.pop("original_function", completion) |
|
if retry_strategy == "constant_retry": |
|
retryer = tenacity.Retrying(stop=tenacity.stop_after_attempt(num_retries), reraise=True) |
|
elif retry_strategy == "exponential_backoff_retry": |
|
retryer = tenacity.Retrying(wait=tenacity.wait_exponential(multiplier=1, max=10), stop=tenacity.stop_after_attempt(num_retries), reraise=True) |
|
return retryer(original_function, *args, **kwargs) |
|
|
|
async def acompletion_with_retries(*args, **kwargs): |
|
""" |
|
Executes a litellm.completion() with 3 retries |
|
""" |
|
try: |
|
import tenacity |
|
except Exception as e: |
|
raise Exception(f"tenacity import failed please run `pip install tenacity`. Error{e}") |
|
|
|
num_retries = kwargs.pop("num_retries", 3) |
|
retry_strategy = kwargs.pop("retry_strategy", "constant_retry") |
|
original_function = kwargs.pop("original_function", completion) |
|
if retry_strategy == "constant_retry": |
|
retryer = tenacity.Retrying(stop=tenacity.stop_after_attempt(num_retries), reraise=True) |
|
elif retry_strategy == "exponential_backoff_retry": |
|
retryer = tenacity.Retrying(wait=tenacity.wait_exponential(multiplier=1, max=10), stop=tenacity.stop_after_attempt(num_retries), reraise=True) |
|
return await retryer(original_function, *args, **kwargs) |
|
|
|
|
|
|
|
def batch_completion( |
|
model: str, |
|
|
|
messages: List = [], |
|
functions: List = [], |
|
function_call: str = "", |
|
temperature: Optional[float] = None, |
|
top_p: Optional[float] = None, |
|
n: Optional[int] = None, |
|
stream: Optional[bool] = None, |
|
stop=None, |
|
max_tokens: Optional[float] = None, |
|
presence_penalty: Optional[float] = None, |
|
frequency_penalty: Optional[float]=None, |
|
logit_bias: Optional[dict] = None, |
|
user: Optional[str] = None, |
|
deployment_id = None, |
|
request_timeout: Optional[int] = None, |
|
|
|
**kwargs): |
|
""" |
|
Batch litellm.completion function for a given model. |
|
|
|
Args: |
|
model (str): The model to use for generating completions. |
|
messages (List, optional): List of messages to use as input for generating completions. Defaults to []. |
|
functions (List, optional): List of functions to use as input for generating completions. Defaults to []. |
|
function_call (str, optional): The function call to use as input for generating completions. Defaults to "". |
|
temperature (float, optional): The temperature parameter for generating completions. Defaults to None. |
|
top_p (float, optional): The top-p parameter for generating completions. Defaults to None. |
|
n (int, optional): The number of completions to generate. Defaults to None. |
|
stream (bool, optional): Whether to stream completions or not. Defaults to None. |
|
stop (optional): The stop parameter for generating completions. Defaults to None. |
|
max_tokens (float, optional): The maximum number of tokens to generate. Defaults to None. |
|
presence_penalty (float, optional): The presence penalty for generating completions. Defaults to None. |
|
frequency_penalty (float, optional): The frequency penalty for generating completions. Defaults to None. |
|
logit_bias (dict, optional): The logit bias for generating completions. Defaults to {}. |
|
user (str, optional): The user string for generating completions. Defaults to "". |
|
deployment_id (optional): The deployment ID for generating completions. Defaults to None. |
|
request_timeout (int, optional): The request timeout for generating completions. Defaults to None. |
|
|
|
Returns: |
|
list: A list of completion results. |
|
""" |
|
args = locals() |
|
batch_messages = messages |
|
completions = [] |
|
model = model |
|
custom_llm_provider = None |
|
if model.split("/", 1)[0] in litellm.provider_list: |
|
custom_llm_provider = model.split("/", 1)[0] |
|
model = model.split("/", 1)[1] |
|
if custom_llm_provider == "vllm": |
|
optional_params = get_optional_params( |
|
functions=functions, |
|
function_call=function_call, |
|
temperature=temperature, |
|
top_p=top_p, |
|
n=n, |
|
stream=stream, |
|
stop=stop, |
|
max_tokens=max_tokens, |
|
presence_penalty=presence_penalty, |
|
frequency_penalty=frequency_penalty, |
|
logit_bias=logit_bias, |
|
user=user, |
|
|
|
model=model, |
|
custom_llm_provider=custom_llm_provider |
|
) |
|
results = vllm.batch_completions(model=model, messages=batch_messages, custom_prompt_dict=litellm.custom_prompt_dict, optional_params=optional_params) |
|
|
|
else: |
|
def chunks(lst, n): |
|
"""Yield successive n-sized chunks from lst.""" |
|
for i in range(0, len(lst), n): |
|
yield lst[i:i + n] |
|
with ThreadPoolExecutor(max_workers=100) as executor: |
|
for sub_batch in chunks(batch_messages, 100): |
|
for message_list in sub_batch: |
|
kwargs_modified = args.copy() |
|
kwargs_modified["messages"] = message_list |
|
original_kwargs = {} |
|
if "kwargs" in kwargs_modified: |
|
original_kwargs = kwargs_modified.pop("kwargs") |
|
future = executor.submit(completion, **kwargs_modified, **original_kwargs) |
|
completions.append(future) |
|
|
|
|
|
results = [future.result() for future in completions] |
|
return results |
|
|
|
|
|
|
|
def batch_completion_models(*args, **kwargs): |
|
""" |
|
Send a request to multiple language models concurrently and return the response |
|
as soon as one of the models responds. |
|
|
|
Args: |
|
*args: Variable-length positional arguments passed to the completion function. |
|
**kwargs: Additional keyword arguments: |
|
- models (str or list of str): The language models to send requests to. |
|
- Other keyword arguments to be passed to the completion function. |
|
|
|
Returns: |
|
str or None: The response from one of the language models, or None if no response is received. |
|
|
|
Note: |
|
This function utilizes a ThreadPoolExecutor to parallelize requests to multiple models. |
|
It sends requests concurrently and returns the response from the first model that responds. |
|
""" |
|
import concurrent |
|
if "model" in kwargs: |
|
kwargs.pop("model") |
|
if "models" in kwargs: |
|
models = kwargs["models"] |
|
kwargs.pop("models") |
|
futures = {} |
|
with concurrent.futures.ThreadPoolExecutor(max_workers=len(models)) as executor: |
|
for model in models: |
|
futures[model] = executor.submit(completion, *args, model=model, **kwargs) |
|
|
|
for model, future in sorted(futures.items(), key=lambda x: models.index(x[0])): |
|
if future.result() is not None: |
|
return future.result() |
|
elif "deployments" in kwargs: |
|
deployments = kwargs["deployments"] |
|
kwargs.pop("deployments") |
|
kwargs.pop("model_list") |
|
nested_kwargs = kwargs.pop("kwargs", {}) |
|
futures = {} |
|
with concurrent.futures.ThreadPoolExecutor(max_workers=len(deployments)) as executor: |
|
for deployment in deployments: |
|
for key in kwargs.keys(): |
|
if key not in deployment: |
|
deployment[key] = kwargs[key] |
|
kwargs = {**deployment, **nested_kwargs} |
|
futures[deployment["model"]] = executor.submit(completion, **kwargs) |
|
|
|
while futures: |
|
|
|
print_verbose("\n\n waiting for next result\n\n") |
|
done, _ = concurrent.futures.wait(futures.values(), return_when=concurrent.futures.FIRST_COMPLETED) |
|
print_verbose(f"done list\n{done}") |
|
for future in done: |
|
try: |
|
result = future.result() |
|
return result |
|
except Exception as e: |
|
|
|
print_verbose(f"\n\ngot an exception, ignoring, removing from futures") |
|
print_verbose(futures) |
|
new_futures = {} |
|
for key, value in futures.items(): |
|
if future == value: |
|
print_verbose(f"removing key{key}") |
|
continue |
|
else: |
|
new_futures[key] = value |
|
futures = new_futures |
|
print_verbose(f"new futures{futures}") |
|
continue |
|
|
|
|
|
print_verbose("\n\ndone looping through futures\n\n") |
|
print_verbose(futures) |
|
|
|
return None |
|
|
|
def batch_completion_models_all_responses(*args, **kwargs): |
|
""" |
|
Send a request to multiple language models concurrently and return a list of responses |
|
from all models that respond. |
|
|
|
Args: |
|
*args: Variable-length positional arguments passed to the completion function. |
|
**kwargs: Additional keyword arguments: |
|
- models (str or list of str): The language models to send requests to. |
|
- Other keyword arguments to be passed to the completion function. |
|
|
|
Returns: |
|
list: A list of responses from the language models that responded. |
|
|
|
Note: |
|
This function utilizes a ThreadPoolExecutor to parallelize requests to multiple models. |
|
It sends requests concurrently and collects responses from all models that respond. |
|
""" |
|
import concurrent.futures |
|
|
|
|
|
GREEN = "\033[92m" |
|
RED = "\033[91m" |
|
RESET = "\033[0m" |
|
|
|
if "model" in kwargs: |
|
kwargs.pop("model") |
|
if "models" in kwargs: |
|
models = kwargs["models"] |
|
kwargs.pop("models") |
|
|
|
responses = [] |
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=len(models)) as executor: |
|
for idx, model in enumerate(models): |
|
future = executor.submit(completion, *args, model=model, **kwargs) |
|
if future.result() is not None: |
|
responses.append(future.result()) |
|
|
|
return responses |
|
|
|
|
|
|
|
async def aembedding(*args, **kwargs): |
|
""" |
|
Asynchronously calls the `embedding` function with the given arguments and keyword arguments. |
|
|
|
Parameters: |
|
- `args` (tuple): Positional arguments to be passed to the `embedding` function. |
|
- `kwargs` (dict): Keyword arguments to be passed to the `embedding` function. |
|
|
|
Returns: |
|
- `response` (Any): The response returned by the `embedding` function. |
|
""" |
|
loop = asyncio.get_event_loop() |
|
model = args[0] if len(args) > 0 else kwargs["model"] |
|
|
|
kwargs["aembedding"] = True |
|
custom_llm_provider = None |
|
try: |
|
|
|
func = partial(embedding, *args, **kwargs) |
|
|
|
|
|
ctx = contextvars.copy_context() |
|
func_with_context = partial(ctx.run, func) |
|
|
|
_, custom_llm_provider, _, _ = get_llm_provider(model=model, api_base=kwargs.get("api_base", None)) |
|
|
|
if (custom_llm_provider == "openai" |
|
or custom_llm_provider == "azure" |
|
or custom_llm_provider == "custom_openai" |
|
or custom_llm_provider == "anyscale" |
|
or custom_llm_provider == "openrouter" |
|
or custom_llm_provider == "deepinfra" |
|
or custom_llm_provider == "perplexity" |
|
or custom_llm_provider == "huggingface"): |
|
|
|
init_response = await loop.run_in_executor(None, func_with_context) |
|
if isinstance(init_response, dict) or isinstance(init_response, ModelResponse): |
|
response = init_response |
|
elif asyncio.iscoroutine(init_response): |
|
response = await init_response |
|
else: |
|
|
|
response = await loop.run_in_executor(None, func_with_context) |
|
return response |
|
except Exception as e: |
|
custom_llm_provider = custom_llm_provider or "openai" |
|
raise exception_type( |
|
model=model, custom_llm_provider=custom_llm_provider, original_exception=e, completion_kwargs=args, |
|
) |
|
|
|
@client |
|
def embedding( |
|
model, |
|
input=[], |
|
|
|
timeout=600, |
|
|
|
api_base: Optional[str] = None, |
|
api_version: Optional[str] = None, |
|
api_key: Optional[str] = None, |
|
api_type: Optional[str] = None, |
|
caching: bool=False, |
|
user: Optional[str]=None, |
|
custom_llm_provider=None, |
|
litellm_call_id=None, |
|
litellm_logging_obj=None, |
|
logger_fn=None, |
|
**kwargs |
|
): |
|
""" |
|
Embedding function that calls an API to generate embeddings for the given input. |
|
|
|
Parameters: |
|
- model: The embedding model to use. |
|
- input: The input for which embeddings are to be generated. |
|
- timeout: The timeout value for the API call, default 10 mins |
|
- litellm_call_id: The call ID for litellm logging. |
|
- litellm_logging_obj: The litellm logging object. |
|
- logger_fn: The logger function. |
|
- api_base: Optional. The base URL for the API. |
|
- api_version: Optional. The version of the API. |
|
- api_key: Optional. The API key to use. |
|
- api_type: Optional. The type of the API. |
|
- caching: A boolean indicating whether to enable caching. |
|
- custom_llm_provider: The custom llm provider. |
|
|
|
Returns: |
|
- response: The response received from the API call. |
|
|
|
Raises: |
|
- exception_type: If an exception occurs during the API call. |
|
""" |
|
azure = kwargs.get("azure", None) |
|
client = kwargs.pop("client", None) |
|
rpm = kwargs.pop("rpm", None) |
|
tpm = kwargs.pop("tpm", None) |
|
aembedding = kwargs.pop("aembedding", None) |
|
|
|
optional_params = {} |
|
for param in kwargs: |
|
if param != "metadata": |
|
optional_params[param] = kwargs[param] |
|
model, custom_llm_provider, dynamic_api_key, api_base = get_llm_provider(model=model, custom_llm_provider=custom_llm_provider, api_base=api_base, api_key=api_key) |
|
try: |
|
response = None |
|
logging = litellm_logging_obj |
|
logging.update_environment_variables(model=model, user="", optional_params={}, litellm_params={"timeout": timeout, "azure": azure, "litellm_call_id": litellm_call_id, "logger_fn": logger_fn}) |
|
if azure == True or custom_llm_provider == "azure": |
|
|
|
api_type = get_secret("AZURE_API_TYPE") or "azure" |
|
|
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("AZURE_API_BASE") |
|
) |
|
|
|
api_version = ( |
|
api_version or |
|
litellm.api_version or |
|
get_secret("AZURE_API_VERSION") |
|
) |
|
|
|
azure_ad_token = ( |
|
kwargs.pop("azure_ad_token", None) or |
|
get_secret("AZURE_AD_TOKEN") |
|
) |
|
|
|
api_key = ( |
|
api_key or |
|
litellm.api_key or |
|
litellm.azure_key or |
|
get_secret("AZURE_API_KEY") |
|
) |
|
|
|
response = azure_chat_completions.embedding( |
|
model=model, |
|
input=input, |
|
api_base=api_base, |
|
api_key=api_key, |
|
api_version=api_version, |
|
azure_ad_token=azure_ad_token, |
|
logging_obj=logging, |
|
timeout=timeout, |
|
model_response=EmbeddingResponse(), |
|
optional_params=optional_params, |
|
client=client, |
|
aembedding=aembedding |
|
) |
|
elif model in litellm.open_ai_embedding_models or custom_llm_provider == "openai": |
|
api_base = ( |
|
api_base |
|
or litellm.api_base |
|
or get_secret("OPENAI_API_BASE") |
|
or "https://api.openai.com/v1" |
|
) |
|
openai.organization = ( |
|
litellm.organization |
|
or get_secret("OPENAI_ORGANIZATION") |
|
or None |
|
) |
|
|
|
api_key = ( |
|
api_key or |
|
litellm.api_key or |
|
litellm.openai_key or |
|
get_secret("OPENAI_API_KEY") |
|
) |
|
api_type = "openai" |
|
api_version = None |
|
|
|
|
|
|
|
response = openai_chat_completions.embedding( |
|
model=model, |
|
input=input, |
|
api_base=api_base, |
|
api_key=api_key, |
|
logging_obj=logging, |
|
timeout=timeout, |
|
model_response=EmbeddingResponse(), |
|
optional_params=optional_params, |
|
client=client, |
|
aembedding=aembedding, |
|
) |
|
elif model in litellm.cohere_embedding_models: |
|
cohere_key = ( |
|
api_key |
|
or litellm.cohere_key |
|
or get_secret("COHERE_API_KEY") |
|
or get_secret("CO_API_KEY") |
|
or litellm.api_key |
|
) |
|
response = cohere.embedding( |
|
model=model, |
|
input=input, |
|
optional_params=optional_params, |
|
encoding=encoding, |
|
api_key=cohere_key, |
|
logging_obj=logging, |
|
model_response= EmbeddingResponse() |
|
|
|
) |
|
elif custom_llm_provider == "huggingface": |
|
api_key = ( |
|
api_key |
|
or litellm.huggingface_key |
|
or get_secret("HUGGINGFACE_API_KEY") |
|
or litellm.api_key |
|
) |
|
response = huggingface.embedding( |
|
model=model, |
|
input=input, |
|
encoding=encoding, |
|
api_key=api_key, |
|
api_base=api_base, |
|
logging_obj=logging, |
|
model_response= EmbeddingResponse() |
|
) |
|
elif custom_llm_provider == "bedrock": |
|
response = bedrock.embedding( |
|
model=model, |
|
input=input, |
|
encoding=encoding, |
|
logging_obj=logging, |
|
optional_params=kwargs, |
|
model_response= EmbeddingResponse() |
|
) |
|
else: |
|
args = locals() |
|
raise ValueError(f"No valid embedding model args passed in - {args}") |
|
return response |
|
except Exception as e: |
|
|
|
logging.post_call( |
|
input=input, |
|
api_key=openai.api_key, |
|
original_response=str(e), |
|
) |
|
|
|
raise exception_type( |
|
model=model, |
|
original_exception=e, |
|
custom_llm_provider="azure" if azure == True else None, |
|
) |
|
|
|
|
|
|
|
def text_completion( |
|
prompt: Union[str, List[Union[str, List[Union[str, List[int]]]]]], |
|
model: Optional[str]=None, |
|
best_of: Optional[int] = None, |
|
echo: Optional[bool] = None, |
|
frequency_penalty: Optional[float] = None, |
|
logit_bias: Optional[Dict[int, int]] = None, |
|
logprobs: Optional[int] = None, |
|
max_tokens: Optional[int] = None, |
|
n: Optional[int] = None, |
|
presence_penalty: Optional[float] = None, |
|
stop: Optional[Union[str, List[str]]] = None, |
|
stream: Optional[bool] = None, |
|
suffix: Optional[str] = None, |
|
temperature: Optional[float] = None, |
|
top_p: Optional[float] = None, |
|
user: Optional[str] = None, |
|
|
|
|
|
api_base: Optional[str] = None, |
|
api_version: Optional[str] = None, |
|
api_key: Optional[str] = None, |
|
model_list: Optional[list] = None, |
|
|
|
|
|
custom_llm_provider: Optional[str] = None, |
|
*args, |
|
**kwargs |
|
): |
|
global print_verbose |
|
import copy |
|
""" |
|
Generate text completions using the OpenAI API. |
|
|
|
Args: |
|
model (str): ID of the model to use. |
|
prompt (Union[str, List[Union[str, List[Union[str, List[int]]]]]): The prompt(s) to generate completions for. |
|
best_of (Optional[int], optional): Generates best_of completions server-side. Defaults to 1. |
|
echo (Optional[bool], optional): Echo back the prompt in addition to the completion. Defaults to False. |
|
frequency_penalty (Optional[float], optional): Penalize new tokens based on their existing frequency. Defaults to 0. |
|
logit_bias (Optional[Dict[int, int]], optional): Modify the likelihood of specified tokens. Defaults to None. |
|
logprobs (Optional[int], optional): Include the log probabilities on the most likely tokens. Defaults to None. |
|
max_tokens (Optional[int], optional): The maximum number of tokens to generate in the completion. Defaults to 16. |
|
n (Optional[int], optional): How many completions to generate for each prompt. Defaults to 1. |
|
presence_penalty (Optional[float], optional): Penalize new tokens based on whether they appear in the text so far. Defaults to 0. |
|
stop (Optional[Union[str, List[str]]], optional): Sequences where the API will stop generating further tokens. Defaults to None. |
|
stream (Optional[bool], optional): Whether to stream back partial progress. Defaults to False. |
|
suffix (Optional[str], optional): The suffix that comes after a completion of inserted text. Defaults to None. |
|
temperature (Optional[float], optional): Sampling temperature to use. Defaults to 1. |
|
top_p (Optional[float], optional): Nucleus sampling parameter. Defaults to 1. |
|
user (Optional[str], optional): A unique identifier representing your end-user. |
|
Returns: |
|
TextCompletionResponse: A response object containing the generated completion and associated metadata. |
|
|
|
Example: |
|
Your example of how to use this function goes here. |
|
""" |
|
if "engine" in kwargs: |
|
if model==None: |
|
|
|
model = kwargs["engine"] |
|
kwargs.pop("engine") |
|
|
|
text_completion_response = TextCompletionResponse() |
|
|
|
optional_params: Dict[str, Any] = {} |
|
|
|
if best_of is not None: |
|
optional_params["best_of"] = best_of |
|
if echo is not None: |
|
optional_params["echo"] = echo |
|
if frequency_penalty is not None: |
|
optional_params["frequency_penalty"] = frequency_penalty |
|
if logit_bias is not None: |
|
optional_params["logit_bias"] = logit_bias |
|
if logprobs is not None: |
|
optional_params["logprobs"] = logprobs |
|
if max_tokens is not None: |
|
optional_params["max_tokens"] = max_tokens |
|
if n is not None: |
|
optional_params["n"] = n |
|
if presence_penalty is not None: |
|
optional_params["presence_penalty"] = presence_penalty |
|
if stop is not None: |
|
optional_params["stop"] = stop |
|
if stream is not None: |
|
optional_params["stream"] = stream |
|
if suffix is not None: |
|
optional_params["suffix"] = suffix |
|
if temperature is not None: |
|
optional_params["temperature"] = temperature |
|
if top_p is not None: |
|
optional_params["top_p"] = top_p |
|
if user is not None: |
|
optional_params["user"] = user |
|
if api_base is not None: |
|
optional_params["api_base"] = api_base |
|
if api_version is not None: |
|
optional_params["api_version"] = api_version |
|
if api_key is not None: |
|
optional_params["api_key"] = api_key |
|
if custom_llm_provider is not None: |
|
optional_params["custom_llm_provider"] = custom_llm_provider |
|
|
|
|
|
_, custom_llm_provider, dynamic_api_key, api_base = get_llm_provider(model=model, custom_llm_provider=custom_llm_provider, api_base=api_base) |
|
|
|
if custom_llm_provider == "huggingface": |
|
|
|
if echo == True: |
|
|
|
if "top_n_tokens" not in kwargs: |
|
kwargs["top_n_tokens"] = 3 |
|
|
|
|
|
if type(prompt) == list: |
|
import concurrent.futures |
|
tokenizer = tiktoken.encoding_for_model("text-davinci-003") |
|
|
|
if len(prompt) > 0 and type(prompt[0]) == list: |
|
responses = [None for x in prompt] |
|
def process_prompt(i, individual_prompt): |
|
decoded_prompt = tokenizer.decode(individual_prompt) |
|
all_params = {**kwargs, **optional_params} |
|
response = text_completion( |
|
model=model, |
|
prompt=decoded_prompt, |
|
num_retries=3, |
|
*args, |
|
**all_params, |
|
) |
|
|
|
text_completion_response["id"] = response.get("id", None) |
|
text_completion_response["object"] = "text_completion" |
|
text_completion_response["created"] = response.get("created", None) |
|
text_completion_response["model"] = response.get("model", None) |
|
return response["choices"][0] |
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
futures = [executor.submit(process_prompt, i, individual_prompt) for i, individual_prompt in enumerate(prompt)] |
|
for i, future in enumerate(concurrent.futures.as_completed(futures)): |
|
responses[i] = future.result() |
|
text_completion_response.choices = responses |
|
|
|
return text_completion_response |
|
|
|
|
|
|
|
|
|
|
|
messages = [{"role": "system", "content": prompt}] |
|
kwargs.pop("prompt", None) |
|
response = completion( |
|
model = model, |
|
messages=messages, |
|
*args, |
|
**kwargs, |
|
**optional_params, |
|
) |
|
if stream == True or kwargs.get("stream", False) == True: |
|
response = TextCompletionStreamWrapper(completion_stream=response, model=model) |
|
return response |
|
|
|
transformed_logprobs = None |
|
|
|
try: |
|
raw_response = response._hidden_params.get("original_response", None) |
|
transformed_logprobs = litellm.utils.transform_logprobs(raw_response) |
|
except Exception as e: |
|
print_verbose(f"LiteLLM non blocking exception: {e}") |
|
text_completion_response["id"] = response.get("id", None) |
|
text_completion_response["object"] = "text_completion" |
|
text_completion_response["created"] = response.get("created", None) |
|
text_completion_response["model"] = response.get("model", None) |
|
text_choices = TextChoices() |
|
text_choices["text"] = response["choices"][0]["message"]["content"] |
|
text_choices["index"] = response["choices"][0]["index"] |
|
text_choices["logprobs"] = transformed_logprobs |
|
text_choices["finish_reason"] = response["choices"][0]["finish_reason"] |
|
text_completion_response["choices"] = [text_choices] |
|
text_completion_response["usage"] = response.get("usage", None) |
|
return text_completion_response |
|
|
|
|
|
def moderation(input: str, api_key: Optional[str]=None): |
|
|
|
api_key = ( |
|
api_key or |
|
litellm.api_key or |
|
litellm.openai_key or |
|
get_secret("OPENAI_API_KEY") |
|
) |
|
openai.api_key = api_key |
|
openai.api_type = "open_ai" |
|
openai.api_version = None |
|
openai.base_url = "https://api.openai.com/v1/" |
|
response = openai.moderations.create(input=input) |
|
return response |
|
|
|
|
|
|
|
def print_verbose(print_statement): |
|
if litellm.set_verbose: |
|
print(print_statement) |
|
|
|
def config_completion(**kwargs): |
|
if litellm.config_path != None: |
|
config_args = read_config_args(litellm.config_path) |
|
|
|
return completion(**kwargs, **config_args) |
|
else: |
|
raise ValueError( |
|
"No config path set, please set a config path using `litellm.config_path = 'path/to/config.json'`" |
|
) |
|
|
|
def stream_chunk_builder(chunks: list, messages: Optional[list]=None): |
|
id = chunks[0]["id"] |
|
object = chunks[0]["object"] |
|
created = chunks[0]["created"] |
|
model = chunks[0]["model"] |
|
system_fingerprint = chunks[0].get("system_fingerprint", None) |
|
role = chunks[0]["choices"][0]["delta"]["role"] |
|
finish_reason = chunks[-1]["choices"][0]["finish_reason"] |
|
|
|
|
|
response = { |
|
"id": id, |
|
"object": object, |
|
"created": created, |
|
"model": model, |
|
"system_fingerprint": system_fingerprint, |
|
"choices": [ |
|
{ |
|
"index": 0, |
|
"message": { |
|
"role": role, |
|
"content": "" |
|
}, |
|
"finish_reason": finish_reason, |
|
} |
|
], |
|
"usage": { |
|
"prompt_tokens": 0, |
|
"completion_tokens": 0, |
|
"total_tokens": 0 |
|
} |
|
} |
|
|
|
|
|
content_list = [] |
|
combined_content = "" |
|
combined_arguments = "" |
|
|
|
if "tool_calls" in chunks[0]["choices"][0]["delta"] and chunks[0]["choices"][0]["delta"]["tool_calls"] is not None: |
|
argument_list = [] |
|
delta = chunks[0]["choices"][0]["delta"] |
|
message = response["choices"][0]["message"] |
|
message["tool_calls"] = [] |
|
id = None |
|
name = None |
|
type = None |
|
tool_calls_list = [] |
|
prev_index = 0 |
|
prev_id = None |
|
curr_id = None |
|
curr_index = 0 |
|
for chunk in chunks: |
|
choices = chunk["choices"] |
|
for choice in choices: |
|
delta = choice.get("delta", {}) |
|
tool_calls = delta.get("tool_calls", "") |
|
|
|
if tool_calls and tool_calls[0].function is not None: |
|
if tool_calls[0].id: |
|
id = tool_calls[0].id |
|
curr_id = id |
|
if prev_id is None: |
|
prev_id = curr_id |
|
if tool_calls[0].index: |
|
curr_index = tool_calls[0].index |
|
if tool_calls[0].function.arguments: |
|
|
|
arguments = tool_calls[0].function.arguments |
|
argument_list.append(arguments) |
|
if tool_calls[0].function.name: |
|
name = tool_calls[0].function.name |
|
if tool_calls[0].type: |
|
type = tool_calls[0].type |
|
if curr_index != prev_index: |
|
combined_arguments = "".join(argument_list) |
|
tool_calls_list.append({"id": prev_id, "index": prev_index, "function": {"arguments": combined_arguments, "name": name}, "type": type}) |
|
argument_list = [] |
|
prev_index = curr_index |
|
prev_id = curr_id |
|
|
|
combined_arguments = "".join(argument_list) |
|
tool_calls_list.append({"id": id, "function": {"arguments": combined_arguments, "name": name}, "type": type}) |
|
response["choices"][0]["message"]["content"] = None |
|
response["choices"][0]["message"]["tool_calls"] = tool_calls_list |
|
elif "function_call" in chunks[0]["choices"][0]["delta"] and chunks[0]["choices"][0]["delta"]["function_call"] is not None: |
|
argument_list = [] |
|
delta = chunks[0]["choices"][0]["delta"] |
|
function_call = delta.get("function_call", "") |
|
function_call_name = function_call.name |
|
|
|
message = response["choices"][0]["message"] |
|
message["function_call"] = {} |
|
message["function_call"]["name"] = function_call_name |
|
|
|
for chunk in chunks: |
|
choices = chunk["choices"] |
|
for choice in choices: |
|
delta = choice.get("delta", {}) |
|
function_call = delta.get("function_call", "") |
|
|
|
|
|
if function_call: |
|
|
|
arguments = function_call.arguments |
|
argument_list.append(arguments) |
|
|
|
combined_arguments = "".join(argument_list) |
|
response["choices"][0]["message"]["content"] = None |
|
response["choices"][0]["message"]["function_call"]["arguments"] = combined_arguments |
|
else: |
|
for chunk in chunks: |
|
choices = chunk["choices"] |
|
for choice in choices: |
|
delta = choice.get("delta", {}) |
|
content = delta.get("content", "") |
|
if content == None: |
|
continue |
|
content_list.append(content) |
|
|
|
|
|
combined_content = "".join(content_list) |
|
|
|
|
|
response["choices"][0]["message"]["content"] = combined_content |
|
|
|
if len(combined_content) > 0: |
|
completion_output = combined_content |
|
elif len(combined_arguments) > 0: |
|
completion_output = combined_arguments |
|
|
|
try: |
|
response["usage"]["prompt_tokens"] = token_counter(model=model, messages=messages) |
|
except: |
|
print_verbose(f"token_counter failed, assuming prompt tokens is 0") |
|
response["usage"]["prompt_tokens"] = 0 |
|
response["usage"]["completion_tokens"] = token_counter(model=model, text=completion_output) |
|
response["usage"]["total_tokens"] = response["usage"]["prompt_tokens"] + response["usage"]["completion_tokens"] |
|
return convert_to_model_response_object(response_object=response, model_response_object=litellm.ModelResponse()) |
|
|