Spaces:
Runtime error
Runtime error
import json | |
import os | |
import shutil | |
import requests | |
import threading | |
import time | |
import random | |
import inspect | |
import logging | |
import warnings | |
from requests import HTTPError | |
from dataclasses import asdict | |
from pathlib import Path | |
from typing import Callable, Literal, AsyncGenerator | |
from gradio.utils import SyncToAsyncIterator, async_iteration | |
from gradio.components import ( | |
Button, | |
Chatbot, | |
# IOComponent, | |
Markdown, | |
State, | |
Textbox, | |
get_component_instance, | |
) | |
from huggingface_hub.utils import ( | |
BadRequestError, | |
build_hf_headers, | |
get_session, | |
hf_raise_for_status, | |
) | |
from huggingface_hub.inference._common import ( | |
TASKS_EXPECTING_IMAGES, | |
ContentT, | |
# InferenceTimeoutError, | |
ModelStatus, | |
_b64_encode, | |
_b64_to_image, | |
_bytes_to_dict, | |
_bytes_to_image, | |
_bytes_to_list, | |
_import_numpy, | |
_is_tgi_server, | |
_open_as_binary, | |
_set_as_non_tgi, | |
_stream_text_generation_response, | |
) | |
from huggingface_hub.inference._text_generation import ( | |
TextGenerationParameters, | |
TextGenerationRequest, | |
TextGenerationResponse, | |
TextGenerationStreamResponse, | |
raise_text_generation_error, | |
) | |
from huggingface_hub.inference._types import ( | |
ClassificationOutput, | |
ConversationalOutput, | |
FillMaskOutput, | |
ImageSegmentationOutput, | |
ObjectDetectionOutput, | |
QuestionAnsweringOutput, | |
TableQuestionAnsweringOutput, | |
TokenClassificationOutput, | |
) | |
from gradio.themes import ThemeClass as Theme | |
import gradio as gr | |
from gradio.helpers import special_args | |
from gradio.routes import Request | |
import anyio | |
from huggingface_hub import HfApi, Repository, InferenceClient | |
from utils import force_git_push | |
from typing import ( | |
TYPE_CHECKING, | |
Any, | |
Dict, | |
Iterable, | |
List, | |
Literal, | |
Optional, | |
Union, | |
overload, | |
) | |
logger = logging.getLogger(__name__) | |
HF_TOKEN = os.environ.get("HF_TOKEN", None) | |
DATASET_REPO_URL = os.getenv("DATASET_REPO_URL") | |
MODEL_NAME = os.getenv("MODEL_NAME") | |
ENDPOINT = os.getenv("ENDPOINT") | |
FORCE_PUSH = os.getenv("FORCE_PUSH") | |
REVISION = os.getenv("REVISION") | |
BOT_NAME = "PersianGPT-FT" | |
PUSH_FREQUENCY = 60 # every minute | |
HISTORY = "" | |
PROMPT = "" | |
NAME = "" | |
SYSTEM_PROMPT = "" | |
TEMPERATURE = "" | |
MAX_NEW_TOKENS = "" | |
TOP_P = "" | |
TOP_K = "" | |
REPETITION_PENALTY = "" | |
MODEL_REPO_URL = f"https://huggingface.co/{MODEL_NAME}" | |
if len(ENDPOINT)>0: | |
API_URL = f"{ENDPOINT}" #/revision/{REVISION}" if len(REVISION)>0 else f"{ENDPOINT}" | |
MODEL_VERSION = f"{ENDPOINT}" if len(REVISION)==0 else REVISION | |
print(f'ENDPOINT: {ENDPOINT}') | |
else: | |
API_URL = f"/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2F%3Cspan class="hljs-subst">{MODEL_NAME}/revision/{REVISION}" if len(REVISION)>0 else f"/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2F%3Cspan class="hljs-subst">{MODEL_NAME}" | |
model_repo = Repository(local_dir="model", | |
clone_from=MODEL_REPO_URL, | |
use_auth_token=HF_TOKEN, | |
skip_lfs_files=True, | |
git_user="vpcom", | |
revision=REVISION) | |
MODEL_VERSION = model_repo.git_head_commit_url() if len(REVISION)==0 else REVISION | |
print(f'API URL: {API_URL}') | |
#print(f'Model Version: {MODEL_VERSION}') | |
DATA_FILENAME = "data.jsonl" | |
DATA_FILE = os.path.join("data", DATA_FILENAME) | |
data_repo = Repository(local_dir="data", | |
clone_from=DATASET_REPO_URL, | |
use_auth_token=HF_TOKEN) | |
stop_sequences = ["<|endoftext|>"] | |
class InferenceClientUS(InferenceClient): | |
def __init__( | |
self, | |
model: Optional[str] = None, | |
token: Union[str, bool, None] = None, | |
timeout: Optional[float] = None, | |
headers: Optional[Dict[str, str]] = None, | |
cookies: Optional[Dict[str, str]] = None, | |
) -> None: | |
super().__init__( | |
model=model, | |
token=token, | |
timeout=timeout, | |
headers=headers, | |
cookies=cookies, | |
) | |
def post( | |
self, | |
*, | |
json: Optional[Union[str, Dict, List]] = None, | |
data: Optional[ContentT] = None, | |
model: Optional[str] = None, | |
task: Optional[str] = None, | |
stream: bool = False, | |
) -> Union[bytes, Iterable[bytes]]: | |
""" | |
Make a POST request to the inference server. | |
Args: | |
json (`Union[str, Dict, List]`, *optional*): | |
The JSON data to send in the request body. Defaults to None. | |
data (`Union[str, Path, bytes, BinaryIO]`, *optional*): | |
The content to send in the request body. It can be raw bytes, a pointer to an opened file, a local file | |
path, or a URL to an online resource (image, audio file,...). If both `json` and `data` are passed, | |
`data` will take precedence. At least `json` or `data` must be provided. Defaults to None. | |
model (`str`, *optional*): | |
The model to use for inference. Can be a model ID hosted on the Hugging Face Hub or a URL to a deployed | |
Inference Endpoint. Will override the model defined at the instance level. Defaults to None. | |
task (`str`, *optional*): | |
The task to perform on the inference. Used only to default to a recommended model if `model` is not | |
provided. At least `model` or `task` must be provided. Defaults to None. | |
stream (`bool`, *optional*): | |
Whether to iterate over streaming APIs. | |
Returns: | |
bytes: The raw bytes returned by the server. | |
Raises: | |
[`InferenceTimeoutError`]: | |
If the model is unavailable or the request times out. | |
`HTTPError`: | |
If the request fails with an HTTP error status code other than HTTP 503. | |
""" | |
url = self._resolve_url(model, task) | |
if data is not None and json is not None: | |
warnings.warn("Ignoring `json` as `data` is passed as binary.") | |
# Set Accept header if relevant | |
headers = self.headers.copy() | |
if task in TASKS_EXPECTING_IMAGES and "Accept" not in headers: | |
headers["Accept"] = "image/png" | |
t0 = time.time() | |
timeout = self.timeout | |
while True: | |
with _open_as_binary(data) as data_as_binary: | |
try: | |
response = get_session().post( | |
url, | |
json=json, | |
data=data_as_binary, | |
headers=headers, | |
cookies=self.cookies, | |
timeout=self.timeout, | |
stream=stream, | |
) | |
except TimeoutError as error: | |
# Convert any `TimeoutError` to a `InferenceTimeoutError` | |
raise ValueError(f"Inference call timed out: {url}, {error}") # type: ignore | |
try: | |
hf_raise_for_status(response) | |
return response.iter_lines() if stream else response.content | |
except HTTPError as error: | |
if error.response.status_code == 503: | |
# If Model is unavailable, either raise a TimeoutError... | |
if timeout is not None and time.time() - t0 > timeout: | |
# raise InferenceTimeoutError( | |
# f"Model not loaded on the server: {url}. Please retry with a higher timeout (current:" | |
# f" {self.timeout}).", | |
# request=error.request, | |
# response=error.response, | |
# ) from error | |
raise ValueError( | |
f"Model not loaded on the server: {url}. Please retry with a higher timeout (current:" | |
f" {self.timeout}), Err:{error}" | |
# request=error.request, | |
# response=error.response, | |
) | |
# ...or wait 1s and retry | |
logger.info(f"Waiting for model to be loaded on the server: {error}") | |
time.sleep(1) | |
if timeout is not None: | |
timeout = max(self.timeout - (time.time() - t0), 1) # type: ignore | |
continue | |
raise | |
def text_generation( | |
self, | |
prompt: str, | |
*, | |
details: bool = False, | |
stream: bool = False, | |
model: Optional[str] = None, | |
do_sample: bool = False, | |
max_new_tokens: int = 20, | |
best_of: Optional[int] = None, | |
repetition_penalty: Optional[float] = None, | |
return_full_text: bool = False, | |
seed: Optional[int] = None, | |
stop_sequences: Optional[List[str]] = None, | |
temperature: Optional[float] = None, | |
top_k: Optional[int] = None, | |
top_p: Optional[float] = None, | |
truncate: Optional[int] = None, | |
typical_p: Optional[float] = None, | |
watermark: bool = False, | |
decoder_input_details: bool = False, | |
) -> Union[str, TextGenerationResponse, Iterable[str], Iterable[TextGenerationStreamResponse]]: | |
""" | |
Given a prompt, generate the following text. | |
It is recommended to have Pydantic installed in order to get inputs validated. This is preferable as it allow | |
early failures. | |
API endpoint is supposed to run with the `text-generation-inference` backend (TGI). This backend is the | |
go-to solution to run large language models at scale. However, for some smaller models (e.g. "gpt2") the | |
default `transformers` + `api-inference` solution is still in use. Both approaches have very similar APIs, but | |
not exactly the same. This method is compatible with both approaches but some parameters are only available for | |
`text-generation-inference`. If some parameters are ignored, a warning message is triggered but the process | |
continues correctly. | |
To learn more about the TGI project, please refer to https://github.com/huggingface/text-generation-inference. | |
Args: | |
prompt (`str`): | |
Input text. | |
details (`bool`, *optional*): | |
By default, text_generation returns a string. Pass `details=True` if you want a detailed output (tokens, | |
probabilities, seed, finish reason, etc.). Only available for models running on with the | |
`text-generation-inference` backend. | |
stream (`bool`, *optional*): | |
By default, text_generation returns the full generated text. Pass `stream=True` if you want a stream of | |
tokens to be returned. Only available for models running on with the `text-generation-inference` | |
backend. | |
model (`str`, *optional*): | |
The model to use for inference. Can be a model ID hosted on the Hugging Face Hub or a URL to a deployed | |
Inference Endpoint. This parameter overrides the model defined at the instance level. Defaults to None. | |
do_sample (`bool`): | |
Activate logits sampling | |
max_new_tokens (`int`): | |
Maximum number of generated tokens | |
best_of (`int`): | |
Generate best_of sequences and return the one if the highest token logprobs | |
repetition_penalty (`float`): | |
The parameter for repetition penalty. 1.0 means no penalty. See [this | |
paper](https://arxiv.org/pdf/1909.05858.pdf) for more details. | |
return_full_text (`bool`): | |
Whether to prepend the prompt to the generated text | |
seed (`int`): | |
Random sampling seed | |
stop_sequences (`List[str]`): | |
Stop generating tokens if a member of `stop_sequences` is generated | |
temperature (`float`): | |
The value used to module the logits distribution. | |
top_k (`int`): | |
The number of highest probability vocabulary tokens to keep for top-k-filtering. | |
top_p (`float`): | |
If set to < 1, only the smallest set of most probable tokens with probabilities that add up to `top_p` or | |
higher are kept for generation. | |
truncate (`int`): | |
Truncate inputs tokens to the given size | |
typical_p (`float`): | |
Typical Decoding mass | |
See [Typical Decoding for Natural Language Generation](https://arxiv.org/abs/2202.00666) for more information | |
watermark (`bool`): | |
Watermarking with [A Watermark for Large Language Models](https://arxiv.org/abs/2301.10226) | |
decoder_input_details (`bool`): | |
Return the decoder input token logprobs and ids. You must set `details=True` as well for it to be taken | |
into account. Defaults to `False`. | |
Returns: | |
`Union[str, TextGenerationResponse, Iterable[str], Iterable[TextGenerationStreamResponse]]`: | |
Generated text returned from the server: | |
- if `stream=False` and `details=False`, the generated text is returned as a `str` (default) | |
- if `stream=True` and `details=False`, the generated text is returned token by token as a `Iterable[str]` | |
- if `stream=False` and `details=True`, the generated text is returned with more details as a [`~huggingface_hub.inference._text_generation.TextGenerationResponse`] | |
- if `details=True` and `stream=True`, the generated text is returned token by token as a iterable of [`~huggingface_hub.inference._text_generation.TextGenerationStreamResponse`] | |
Raises: | |
`ValidationError`: | |
If input values are not valid. No HTTP call is made to the server. | |
[`InferenceTimeoutError`]: | |
If the model is unavailable or the request times out. | |
`HTTPError`: | |
If the request fails with an HTTP error status code other than HTTP 503. | |
Example: | |
```py | |
>>> from huggingface_hub import InferenceClient | |
>>> client = InferenceClient() | |
# Case 1: generate text | |
>>> client.text_generation("The huggingface_hub library is ", max_new_tokens=12) | |
'100% open source and built to be easy to use.' | |
# Case 2: iterate over the generated tokens. Useful for large generation. | |
>>> for token in client.text_generation("The huggingface_hub library is ", max_new_tokens=12, stream=True): | |
... print(token) | |
100 | |
% | |
open | |
source | |
and | |
built | |
to | |
be | |
easy | |
to | |
use | |
. | |
# Case 3: get more details about the generation process. | |
>>> client.text_generation("The huggingface_hub library is ", max_new_tokens=12, details=True) | |
TextGenerationResponse( | |
generated_text='100% open source and built to be easy to use.', | |
details=Details( | |
finish_reason=<FinishReason.Length: 'length'>, | |
generated_tokens=12, | |
seed=None, | |
prefill=[ | |
InputToken(id=487, text='The', logprob=None), | |
InputToken(id=53789, text=' hugging', logprob=-13.171875), | |
(...) | |
InputToken(id=204, text=' ', logprob=-7.0390625) | |
], | |
tokens=[ | |
Token(id=1425, text='100', logprob=-1.0175781, special=False), | |
Token(id=16, text='%', logprob=-0.0463562, special=False), | |
(...) | |
Token(id=25, text='.', logprob=-0.5703125, special=False) | |
], | |
best_of_sequences=None | |
) | |
) | |
# Case 4: iterate over the generated tokens with more details. | |
# Last object is more complete, containing the full generated text and the finish reason. | |
>>> for details in client.text_generation("The huggingface_hub library is ", max_new_tokens=12, details=True, stream=True): | |
... print(details) | |
... | |
TextGenerationStreamResponse(token=Token(id=1425, text='100', logprob=-1.0175781, special=False), generated_text=None, details=None) | |
TextGenerationStreamResponse(token=Token(id=16, text='%', logprob=-0.0463562, special=False), generated_text=None, details=None) | |
TextGenerationStreamResponse(token=Token(id=1314, text=' open', logprob=-1.3359375, special=False), generated_text=None, details=None) | |
TextGenerationStreamResponse(token=Token(id=3178, text=' source', logprob=-0.28100586, special=False), generated_text=None, details=None) | |
TextGenerationStreamResponse(token=Token(id=273, text=' and', logprob=-0.5961914, special=False), generated_text=None, details=None) | |
TextGenerationStreamResponse(token=Token(id=3426, text=' built', logprob=-1.9423828, special=False), generated_text=None, details=None) | |
TextGenerationStreamResponse(token=Token(id=271, text=' to', logprob=-1.4121094, special=False), generated_text=None, details=None) | |
TextGenerationStreamResponse(token=Token(id=314, text=' be', logprob=-1.5224609, special=False), generated_text=None, details=None) | |
TextGenerationStreamResponse(token=Token(id=1833, text=' easy', logprob=-2.1132812, special=False), generated_text=None, details=None) | |
TextGenerationStreamResponse(token=Token(id=271, text=' to', logprob=-0.08520508, special=False), generated_text=None, details=None) | |
TextGenerationStreamResponse(token=Token(id=745, text=' use', logprob=-0.39453125, special=False), generated_text=None, details=None) | |
TextGenerationStreamResponse(token=Token( | |
id=25, | |
text='.', | |
logprob=-0.5703125, | |
special=False), | |
generated_text='100% open source and built to be easy to use.', | |
details=StreamDetails(finish_reason=<FinishReason.Length: 'length'>, generated_tokens=12, seed=None) | |
) | |
``` | |
""" | |
# NOTE: Text-generation integration is taken from the text-generation-inference project. It has more features | |
# like input/output validation (if Pydantic is installed). See `_text_generation.py` header for more details. | |
if decoder_input_details and not details: | |
warnings.warn( | |
"`decoder_input_details=True` has been passed to the server but `details=False` is set meaning that" | |
" the output from the server will be truncated." | |
) | |
decoder_input_details = False | |
# Validate parameters | |
parameters = TextGenerationParameters( | |
best_of=best_of, | |
details=details, | |
do_sample=do_sample, | |
max_new_tokens=max_new_tokens, | |
repetition_penalty=repetition_penalty, | |
return_full_text=return_full_text, | |
seed=seed, | |
stop=stop_sequences if stop_sequences is not None else [], | |
temperature=temperature, | |
top_k=top_k, | |
top_p=top_p, | |
truncate=truncate, | |
typical_p=typical_p, | |
watermark=watermark, | |
decoder_input_details=decoder_input_details, | |
) | |
request = TextGenerationRequest(inputs=prompt, stream=stream, parameters=parameters) | |
payload = asdict(request) | |
# add the use_cache option | |
print(f"payload:{payload}") | |
payload["options"] = {} | |
payload["options"]['use_cache'] = False | |
# Remove some parameters if not a TGI server | |
if not _is_tgi_server(model): | |
ignored_parameters = [] | |
for key in "watermark", "stop", "details", "decoder_input_details": | |
if payload["parameters"][key] is not None: | |
ignored_parameters.append(key) | |
del payload["parameters"][key] | |
if len(ignored_parameters) > 0: | |
warnings.warn( | |
"API endpoint/model for text-generation is not served via TGI. Ignoring parameters" | |
f" {ignored_parameters}.", | |
UserWarning, | |
) | |
if details: | |
warnings.warn( | |
"API endpoint/model for text-generation is not served via TGI. Parameter `details=True` will" | |
" be ignored meaning only the generated text will be returned.", | |
UserWarning, | |
) | |
details = False | |
if stream: | |
raise ValueError( | |
"API endpoint/model for text-generation is not served via TGI. Cannot return output as a stream." | |
" Please pass `stream=False` as input." | |
) | |
# Handle errors separately for more precise error messages | |
try: | |
bytes_output = self.post(json=payload, model=model, task="text-generation", stream=stream) # type: ignore | |
except HTTPError as e: | |
if isinstance(e, BadRequestError) and "The following `model_kwargs` are not used by the model" in str(e): | |
_set_as_non_tgi(model) | |
return self.text_generation( # type: ignore | |
prompt=prompt, | |
details=details, | |
stream=stream, | |
model=model, | |
do_sample=do_sample, | |
max_new_tokens=max_new_tokens, | |
best_of=best_of, | |
repetition_penalty=repetition_penalty, | |
return_full_text=return_full_text, | |
seed=seed, | |
stop_sequences=stop_sequences, | |
temperature=temperature, | |
top_k=top_k, | |
top_p=top_p, | |
truncate=truncate, | |
typical_p=typical_p, | |
watermark=watermark, | |
decoder_input_details=decoder_input_details, | |
) | |
raise_text_generation_error(e) | |
# Parse output | |
if stream: | |
return _stream_text_generation_response(bytes_output, details) # type: ignore | |
data = _bytes_to_dict(bytes_output)[0] | |
return TextGenerationResponse(**data) if details else data["generated_text"] | |
client = InferenceClientUS( | |
API_URL, | |
headers={"Authorization": f"Bearer {HF_TOKEN}"}, | |
) | |
def asynchronous_push(f_stop): | |
if not data_repo.is_repo_clean(): | |
# print("Repo currently clean. Ignoring push_to_hub") | |
# print(data_repo.huggingface_token) | |
# else: | |
data_repo.git_add(auto_lfs_track=True) | |
data_repo.git_commit("Auto commit by space") | |
if FORCE_PUSH == "yes": | |
force_git_push(data_repo) | |
else: | |
data_repo.git_push() | |
if not f_stop.is_set(): | |
# call again in 60 seconds | |
threading.Timer(PUSH_FREQUENCY, asynchronous_push, [f_stop]).start() | |
f_stop = threading.Event() | |
asynchronous_push(f_stop) | |
def format_prompt(message, history, system_prompt): | |
prompt = "" # ؛ | |
if system_prompt: | |
prompt += f"{system_prompt}" | |
for user_prompt, bot_response in history: | |
prompt += f"{user_prompt}" | |
prompt += f"{bot_response}" | |
prompt += f"""{message}""" | |
return prompt.replace('\n','؛').replace('\t','/').replace(' * ','/').replace('\u200c',' ').strip() | |
def generate( | |
prompt, history, name, system_prompt, | |
temperature=0.9, max_new_tokens=100, top_p=0.95, top_k=100, | |
repetition_penalty=1.0, seed=42, | |
): | |
global PROMPT | |
PROMPT = prompt | |
global HISTORY | |
HISTORY = history | |
global NAME | |
NAME = name | |
global SYSTEM_PROMPT | |
SYSTEM_PROMPT = system_prompt | |
global TEMPERATURE | |
TEMPERATURE = temperature | |
global MAX_NEW_TOKENS | |
MAX_NEW_TOKENS = max_new_tokens | |
global TOP_P | |
TOP_P = top_p | |
global TOP_K | |
TOP_K = top_k | |
global REPETITION_PENALTY | |
REPETITION_PENALTY = repetition_penalty | |
temperature = float(temperature) | |
if temperature < 1e-2: | |
temperature = 1e-2 | |
top_p = float(top_p) | |
generate_kwargs = dict( | |
max_new_tokens=max_new_tokens, | |
temperature=None if temperature==1 else temperature, | |
top_p=None if top_p==0 else top_p, | |
repetition_penalty=None if repetition_penalty==1 else repetition_penalty, | |
top_k=None if top_k==0 else top_k, | |
stop_sequences=stop_sequences, | |
do_sample=True, | |
#best_of=2, | |
#typical_p=0.9, | |
#seed=seed, | |
) | |
#seed = seed + 1 | |
history = [] # explicitly set the history as none so no history is important anymore | |
formatted_prompt = format_prompt(prompt, history, system_prompt) | |
print(f"Formatted Prompt: {formatted_prompt}") | |
if len(ENDPOINT)>0: | |
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=False, details=True, return_full_text=False) | |
stream = [stream] # if stream false | |
else: | |
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=False, details=True, return_full_text=False) | |
output = "" #f"{prompt}" | |
for response in stream: | |
#print('stream',response) | |
if len(ENDPOINT)>0: | |
if isinstance(response, str): | |
output += response if response else "" | |
else: | |
output += response.generated_text if response.generated_text else "" | |
else: | |
output += response | |
for stop_str in stop_sequences: | |
if output.endswith(stop_str): | |
output = output[:-len(stop_str)] | |
output = output.rstrip() | |
yield output | |
yield output | |
return output | |
additional_inputs=[ | |
gr.Textbox( | |
"", | |
label="Who are you?", | |
placeholder="...Please write your name here", | |
), | |
gr.Textbox("<|endoftext|>", label="Optional system prompt"), #<|endoftext|> | |
gr.Slider( | |
label="Temperature", | |
value=1.0, | |
minimum=0.0, | |
maximum=1.0, | |
step=0.01, | |
interactive=True, | |
info="Higher values produce more diverse outputs", | |
), | |
gr.Slider( | |
label="Max new tokens", | |
value=64, | |
minimum=0, | |
maximum=250, | |
step=64, | |
interactive=True, | |
info="The maximum numbers of new tokens", | |
), | |
gr.Slider( | |
label="Top-p (nucleus sampling)", | |
value=0, | |
minimum=0, | |
maximum=1, | |
step=0.05, | |
interactive=True, | |
info="Higher values sample more low-probability tokens", | |
), | |
gr.Slider( | |
label="Top-k", | |
value=0, | |
minimum=0, | |
maximum=1000, | |
step=10, | |
interactive=True, | |
info="Higher values sample more low-probability tokens", | |
), | |
gr.Slider( | |
label="Repetition penalty", | |
value=1.0, | |
minimum=1.0001, | |
maximum=2.0, | |
step=0.05, | |
interactive=True, | |
info="Penalize repeated tokens", | |
), | |
gr.Markdown( | |
""" | |
Model: Sorvãd (سرواد) | |
سرواد. [ س رْ ] (اِ) کلام منظوم و شعر. (برهان ) (غیاث ). شعر پارسی . (تفلیسی ) : | |
دگر نخواهم گفتن همی ثنا وغزل | |
که رفت یکرهه بازار و قیمت سرواد. | |
لبیبی (از لغت فرس ص 108). | |
زهی به عدل تو مرهون عمارت دنیا | |
خهی به مدح تومشحون رسایل و سرواد. | |
شمس فخری . | |
- Mojtaba Valipour: Model Design and Pretraining, Data Collection | |
- Ali Ghodsi: Advising | |
- Amir Mohammad Marshal Pirgheybi: Data Processing | |
""" | |
) | |
] | |
CSS = """ | |
.gradio-container textarea {direction: rtl; white-space: pre-line;} | |
#component-11 #component-12 {direction: rtl; white-space: pre-line;} | |
p {direction: rtl; white-space: pre-line;} | |
""" | |
class Chatbot(gr.Chatbot): | |
def __init__( | |
self, | |
value: list[list[str | tuple[str] | tuple[str | Path, str] | None]] | |
| Callable | |
| None = None, | |
color_map: dict[str, str] | None = None, | |
*, | |
label: str | None = None, | |
every: float | None = None, | |
show_label: bool | None = None, | |
container: bool = True, | |
scale: int | None = None, | |
min_width: int = 160, | |
visible: bool = True, | |
elem_id: str | None = None, | |
elem_classes: list[str] | str | None = None, | |
height: int | None = None, | |
latex_delimiters: list[dict[str, str | bool]] | None = None, | |
rtl: bool = False, | |
show_share_button: bool | None = None, | |
show_copy_button: bool = False, | |
avatar_images: tuple[str | Path | None, str | Path | None] | None = None, | |
sanitize_html: bool = True, | |
render_markdown: bool = True, | |
bubble_full_width: bool = True, | |
line_breaks: bool = True, | |
layout: Literal["panel", "bubble"] | None = None, | |
**kwargs, | |
): | |
""" | |
Parameters: | |
value: Default value to show in chatbot. If callable, the function will be called whenever the app loads to set the initial value of the component. | |
color_map: This parameter is deprecated. | |
label: component name in interface. | |
every: If `value` is a callable, run the function 'every' number of seconds while the client connection is open. Has no effect otherwise. Queue must be enabled. The event can be accessed (e.g. to cancel it) via this component's .load_event attribute. | |
show_label: if True, will display label. | |
container: If True, will place the component in a container - providing some extra padding around the border. | |
scale: relative width compared to adjacent Components in a Row. For example, if Component A has scale=2, and Component B has scale=1, A will be twice as wide as B. Should be an integer. | |
min_width: minimum pixel width, will wrap if not sufficient screen space to satisfy this value. If a certain scale value results in this Component being narrower than min_width, the min_width parameter will be respected first. | |
visible: If False, component will be hidden. | |
elem_id: An optional string that is assigned as the id of this component in the HTML DOM. Can be used for targeting CSS styles. | |
elem_classes: An optional list of strings that are assigned as the classes of this component in the HTML DOM. Can be used for targeting CSS styles. | |
height: height of the component in pixels. | |
latex_delimiters: A list of dicts of the form {"left": open delimiter (str), "right": close delimiter (str), "display": whether to display in newline (bool)} that will be used to render LaTeX expressions. If not provided, `latex_delimiters` is set to `[{ "left": "$$", "right": "$$", "display": True }]`, so only expressions enclosed in $$ delimiters will be rendered as LaTeX, and in a new line. Pass in an empty list to disable LaTeX rendering. For more information, see the [KaTeX documentation](https://katex.org/docs/autorender.html). | |
rtl: If True, sets the direction of the rendered text to right-to-left. Default is False, which renders text left-to-right. | |
show_share_button: If True, will show a share icon in the corner of the component that allows user to share outputs to Hugging Face Spaces Discussions. If False, icon does not appear. If set to None (default behavior), then the icon appears if this Gradio app is launched on Spaces, but not otherwise. | |
show_copy_button: If True, will show a copy button for each chatbot message. | |
avatar_images: Tuple of two avatar image paths or URLs for user and bot (in that order). Pass None for either the user or bot image to skip. Must be within the working directory of the Gradio app or an external URL. | |
sanitize_html: If False, will disable HTML sanitization for chatbot messages. This is not recommended, as it can lead to security vulnerabilities. | |
render_markdown: If False, will disable Markdown rendering for chatbot messages. | |
bubble_full_width: If False, the chat bubble will fit to the content of the message. If True (default), the chat bubble will be the full width of the component. | |
line_breaks: If True (default), will enable Github-flavored Markdown line breaks in chatbot messages. If False, single new lines will be ignored. Only applies if `render_markdown` is True. | |
layout: If "panel", will display the chatbot in a llm style layout. If "bubble", will display the chatbot with message bubbles, with the user and bot messages on alterating sides. Will default to "bubble". | |
""" | |
super().__init__( | |
value = value, | |
color_map = color_map, | |
label = label, | |
every = every, | |
show_label = show_label, | |
container = container, | |
scale = scale, | |
min_width = min_width, | |
visible = visible, | |
elem_id = elem_id, | |
elem_classes = elem_classes, | |
height = height, | |
latex_delimiters = latex_delimiters, | |
rtl = rtl, | |
show_share_button = show_share_button, | |
show_copy_button = show_copy_button, | |
avatar_images = avatar_images, | |
sanitize_html = sanitize_html, | |
render_markdown = render_markdown, | |
bubble_full_width = bubble_full_width, | |
line_breaks = line_breaks, | |
layout = layout, | |
kwargs=kwargs, | |
) | |
def _preprocess_chat_messages( | |
self, chat_message: str | dict | None | |
) -> str | tuple[str] | tuple[str, str] | None: | |
if chat_message is None: | |
return None | |
elif isinstance(chat_message, dict): | |
if chat_message["alt_text"] is not None: | |
return (chat_message["name"], chat_message["alt_text"]) | |
else: | |
return (chat_message["name"],) | |
else: # string | |
return chat_message | |
def preprocess( | |
self, | |
y: list[list[str | dict | None] | tuple[str | dict | None, str | dict | None]], | |
) -> list[list[str | tuple[str] | tuple[str, str] | None]]: | |
if y is None: | |
return y | |
processed_messages = [] | |
for message_pair in y: | |
if not isinstance(message_pair, (tuple, list)): | |
raise TypeError( | |
f"Expected a list of lists or list of tuples. Received: {message_pair}" | |
) | |
if len(message_pair) != 2: | |
raise TypeError( | |
f"Expected a list of lists of length 2 or list of tuples of length 2. Received: {message_pair}" | |
) | |
processed_messages.append( | |
[ | |
self._preprocess_chat_messages(message_pair[0]), | |
self._preprocess_chat_messages(message_pair[1]), | |
] | |
) | |
return processed_messages | |
def _postprocess_chat_messages( | |
self, chat_message: str | tuple | list | None | |
) -> str | dict | None: | |
if chat_message is None: | |
return None | |
elif isinstance(chat_message, (tuple, list)): | |
file_uri = str(chat_message[0]) | |
if utils.validate_url(file_uri): | |
filepath = file_uri | |
else: | |
filepath = self.make_temp_copy_if_needed(file_uri) | |
mime_type = client_utils.get_mimetype(filepath) | |
return { | |
"name": filepath, | |
"mime_type": mime_type, | |
"alt_text": chat_message[1] if len(chat_message) > 1 else None, | |
"data": None, # These last two fields are filled in by the frontend | |
"is_file": True, | |
} | |
elif isinstance(chat_message, str): | |
chat_message = inspect.cleandoc(chat_message) | |
return chat_message | |
else: | |
raise ValueError(f"Invalid message for Chatbot component: {chat_message}") | |
def postprocess( | |
self, | |
y: list[list[str | tuple[str] | tuple[str, str] | None] | tuple], | |
) -> list[list[str | dict | None]]: | |
""" | |
Parameters: | |
y: List of lists representing the message and response pairs. Each message and response should be a string, which may be in Markdown format. It can also be a tuple whose first element is a string or pathlib.Path filepath or URL to an image/video/audio, and second (optional) element is the alt text, in which case the media file is displayed. It can also be None, in which case that message is not displayed. | |
Returns: | |
List of lists representing the message and response. Each message and response will be a string of HTML, or a dictionary with media information. Or None if the message is not to be displayed. | |
""" | |
if y is None: | |
return [] | |
processed_messages = [] | |
for message_pair in y: | |
result = "" | |
if message_pair[0] is not None: | |
result += message_pair[0] | |
if message_pair[1] is not None: | |
result += message_pair[1] | |
if not isinstance(message_pair, (tuple, list)): | |
raise TypeError( | |
f"Expected a list of lists or list of tuples. Received: {message_pair}" | |
) | |
if len(message_pair) != 2: | |
raise TypeError( | |
f"Expected a list of lists of length 2 or list of tuples of length 2. Received: {message_pair}" | |
) | |
result = result.replace('؛','\n').replace('/',' * ').replace('\u200c',' ').strip() | |
result = result[:result.rfind('\n')] # filter till the end verse, do not publish not complete verses | |
#print('Message Pairs: ',message_pair[0],message_pair[1], result) | |
processed_messages.append( | |
[ | |
None,self._postprocess_chat_messages(result) | |
#self._postprocess_chat_messages(message_pair[1])), | |
] | |
) | |
return processed_messages | |
chatbot = Chatbot(label="PersianGPT", | |
rtl=True, | |
show_share_button=True, | |
show_copy_button=True, | |
#layout="panel", | |
bubble_full_width = False) | |
textbox = gr.Textbox( | |
label="textbox", | |
container=False, | |
show_label=False, | |
lines=3, | |
scale=7, | |
placeholder="...Type something here", | |
rtl=True, | |
) | |
class ChatInterface(gr.ChatInterface): | |
def __init__( | |
self, | |
fn: Callable, | |
*, | |
chatbot: Chatbot | None = None, | |
textbox: Textbox | None = None, | |
additional_inputs = None, | |
additional_inputs_accordion_name: str = "Additional Inputs", | |
examples: list[str] | None = None, | |
cache_examples: bool | None = None, | |
title: str | None = None, | |
description: str | None = None, | |
theme: Theme | str | None = None, | |
css: str | None = None, | |
analytics_enabled: bool | None = None, | |
submit_btn: str | None | Button = "Submit", | |
stop_btn: str | None | Button = "Stop", | |
retry_btn: str | None | Button = "🔄 Retry", | |
undo_btn: str | None | Button = "↩️ Undo", | |
clear_btn: str | None | Button = "🗑️ Clear", | |
autofocus: bool = True, | |
): | |
""" | |
Parameters: | |
fn: the function to wrap the chat interface around. Should accept two parameters: a string input message and list of two-element lists of the form [[user_message, bot_message], ...] representing the chat history, and return a string response. See the Chatbot documentation for more information on the chat history format. | |
chatbot: an instance of the gr.Chatbot component to use for the chat interface, if you would like to customize the chatbot properties. If not provided, a default gr.Chatbot component will be created. | |
textbox: an instance of the gr.Textbox component to use for the chat interface, if you would like to customize the textbox properties. If not provided, a default gr.Textbox component will be created. | |
additional_inputs: an instance or list of instances of gradio components (or their string shortcuts) to use as additional inputs to the chatbot. If components are not already rendered in a surrounding Blocks, then the components will be displayed under the chatbot, in an accordion. | |
additional_inputs_accordion_name: the label of the accordion to use for additional inputs, only used if additional_inputs is provided. | |
examples: sample inputs for the function; if provided, appear below the chatbot and can be clicked to populate the chatbot input. | |
cache_examples: If True, caches examples in the server for fast runtime in examples. The default option in HuggingFace Spaces is True. The default option elsewhere is False. | |
title: a title for the interface; if provided, appears above chatbot in large font. Also used as the tab title when opened in a browser window. | |
description: a description for the interface; if provided, appears above the chatbot and beneath the title in regular font. Accepts Markdown and HTML content. | |
theme: Theme to use, loaded from gradio.themes. | |
css: custom css or path to custom css file to use with interface. | |
analytics_enabled: Whether to allow basic telemetry. If None, will use GRADIO_ANALYTICS_ENABLED environment variable if defined, or default to True. | |
submit_btn: Text to display on the submit button. If None, no button will be displayed. If a Button object, that button will be used. | |
stop_btn: Text to display on the stop button, which replaces the submit_btn when the submit_btn or retry_btn is clicked and response is streaming. Clicking on the stop_btn will halt the chatbot response. If set to None, stop button functionality does not appear in the chatbot. If a Button object, that button will be used as the stop button. | |
retry_btn: Text to display on the retry button. If None, no button will be displayed. If a Button object, that button will be used. | |
undo_btn: Text to display on the delete last button. If None, no button will be displayed. If a Button object, that button will be used. | |
clear_btn: Text to display on the clear button. If None, no button will be displayed. If a Button object, that button will be used. | |
autofocus: If True, autofocuses to the textbox when the page loads. | |
""" | |
super().__init__( | |
fn = fn, | |
chatbot = chatbot, | |
textbox= textbox, | |
additional_inputs = additional_inputs, | |
additional_inputs_accordion_name = additional_inputs_accordion_name, | |
examples = examples, | |
cache_examples = cache_examples, | |
title = title, | |
description = description, | |
theme = theme, | |
css = css, | |
analytics_enabled = analytics_enabled, | |
submit_btn = submit_btn, | |
stop_btn = stop_btn, | |
retry_btn = retry_btn, | |
undo_btn = undo_btn, | |
clear_btn = clear_btn, | |
autofocus = autofocus, | |
) | |
async def _stream_fn( | |
self, | |
message: str, | |
history_with_input: list[list[str | None]], | |
request: Request, | |
*args, | |
) -> AsyncGenerator: | |
history = history_with_input[:-1] | |
print(f'Message is {message}') | |
if len(message)==0: | |
message = random.choice(["ا","ب","پ","ت","ث","ج","چ","ح","خ","ل","م","ن","و", | |
"د","ذ","ر","ز","ژ","س","ش","ص","ض","ط","ظ","ع","غ", | |
"ف","ق","ه","ی", | |
]) | |
inputs, _, _ = special_args( | |
self.fn, inputs=[message, history, *args], request=request | |
) | |
if self.is_async: | |
generator = self.fn(*inputs) | |
else: | |
generator = await anyio.to_thread.run_sync( | |
self.fn, *inputs, limiter=self.limiter | |
) | |
generator = SyncToAsyncIterator(generator, self.limiter) | |
try: | |
first_response = await async_iteration(generator) | |
update = history + [[message, first_response]] | |
yield update, update | |
except StopIteration: | |
update = history + [[message, None]] | |
yield update, update | |
async for response in generator: | |
update = history + [[message, response]] | |
yield update, update | |
async def _submit_fn( | |
self, | |
message: str, | |
history_with_input: list[list[str | None]], | |
request: Request, | |
*args, | |
) -> tuple[list[list[str | None]], list[list[str | None]]]: | |
history = history_with_input[:-1] | |
inputs, _, _ = special_args( | |
self.fn, inputs=[message, history, *args], request=request | |
) | |
if self.is_async: | |
response = await self.fn(*inputs) | |
else: | |
response = await anyio.to_thread.run_sync( | |
self.fn, *inputs, limiter=self.limiter | |
) | |
history.append([message,response]) | |
return history, history | |
chat_interface = ChatInterface( | |
generate, | |
chatbot=chatbot, | |
textbox=textbox, | |
#examples=examples, | |
additional_inputs=additional_inputs, | |
submit_btn = "Generate", | |
stop_btn = None, | |
retry_btn = None, | |
undo_btn = None, | |
clear_btn = None, | |
cache_examples=False, | |
) | |
def vote(data: gr.LikeData): | |
if data.liked: | |
print("You upvoted this response: " + data.value) | |
else: | |
print("You downvoted this response: " + data.value) | |
with open(DATA_FILE, "a") as jsonlfile: | |
json_data = [ | |
json.dumps( | |
{ | |
"time_stamp": time.time(), | |
"model_version":MODEL_VERSION, | |
"name":NAME, | |
"prompt": PROMPT.replace('\n','؛').replace('\t','/').replace(' * ','/').replace('\u200c',' ').strip(), | |
"system prompt": SYSTEM_PROMPT, | |
"temperature": TEMPERATURE, | |
"max_new_tokens": MAX_NEW_TOKENS, | |
"top_p": TOP_P, | |
"top_k": TOP_K, | |
"repetition_penalty": REPETITION_PENALTY, | |
"response": data.value.replace('\n','؛').replace('\t','/').replace(' * ','/').replace('\u200c',' ').strip(), | |
"label": data.liked, | |
}, ensure_ascii=False | |
) | |
] | |
print(f'we are writing this: {json_data}') | |
jsonlfile.write("\n".join(json_data) + "\n") | |
with gr.Blocks(css=CSS) as demo: | |
chatbot.like(vote, None, None) | |
chat_interface.render() | |
demo.queue(concurrency_count=100, api_open=False).launch(show_api=False) #, share=True) |