Spaces:
Runtime error
Runtime error
import os | |
import sys | |
import fire | |
import gradio as gr | |
import torch | |
import transformers | |
from peft import PeftModel | |
from transformers import GenerationConfig, LlamaForCausalLM, LlamaTokenizer, AutoModel, AutoTokenizer, AutoModelForCausalLM,AutoConfig | |
from utils.callbacks import Iteratorize, Stream | |
from utils.prompter import Prompter | |
# if torch.cuda.is_available(): | |
# device = "cuda" | |
# else: | |
# device = "cpu" | |
# try: | |
# if torch.backends.mps.is_available(): | |
# device = "mps" | |
# except: | |
# pass | |
device = "xpu" | |
def main( | |
load_8bit: bool = False, | |
base_model: str = "ziqingyang/chinese-alpaca-2-7b", | |
lora_weights: str = "entity303/lawgpt-lora-7b-v2", | |
prompt_template: str = "", # The prompt template to use, will default to alpaca. | |
server_name: str = "0.0.0.0", # Allows to listen on all interfaces by providing '0. | |
share_gradio: bool = False, | |
): | |
base_model = base_model or os.environ.get("BASE_MODEL", "") | |
assert ( | |
base_model | |
), "Please specify a --base_model, e.g. --base_model='huggyllama/llama-7b'" | |
prompter = Prompter(prompt_template) | |
tokenizer = LlamaTokenizer.from_pretrained(base_model) | |
prompter = Prompter(prompt_template) | |
tokenizer = LlamaTokenizer.from_pretrained(base_model) | |
config = AutoConfig.from_pretrained(base_model) | |
model = AutoModelForCausalLM.from_pretrained( | |
base_model, | |
config=config, | |
load_in_8bit=load_8bit, | |
torch_dtype=torch.float16, | |
device_map="auto", | |
) | |
try: | |
print(f"Using lora {lora_weights}") | |
model = PeftModel.from_pretrained( | |
model, | |
lora_weights, | |
torch_dtype=torch.float16, | |
) | |
except: | |
print("*"*50, "\n Attention! No Lora Weights \n", "*"*50) | |
# unwind broken decapoda-research config | |
model.config.pad_token_id = tokenizer.pad_token_id = 0 # unk | |
model.config.bos_token_id = 1 | |
model.config.eos_token_id = 2 | |
# if not load_8bit: | |
# model.half() # seems to fix bugs for some users. | |
model.eval() | |
if torch.__version__ >= "2" and sys.platform != "win32": | |
model = torch.compile(model) | |
def evaluate( | |
instruction, | |
# input=None, | |
temperature=0.1, | |
top_p=0.75, | |
top_k=40, | |
num_beams=4, | |
max_new_tokens=128, | |
stream_output=False, | |
**kwargs, | |
): | |
input=None | |
prompt = prompter.generate_prompt(instruction, input) | |
inputs = tokenizer(prompt, return_tensors="pt") | |
input_ids = inputs["input_ids"].to(device) | |
generation_config = GenerationConfig( | |
temperature=temperature, | |
top_p=top_p, | |
top_k=top_k, | |
num_beams=num_beams, | |
**kwargs, | |
) | |
generate_params = { | |
"input_ids": input_ids, | |
"generation_config": generation_config, | |
"return_dict_in_generate": True, | |
"output_scores": True, | |
"max_new_tokens": max_new_tokens, | |
} | |
if stream_output: | |
# Stream the reply 1 token at a time. | |
# This is based on the trick of using 'stopping_criteria' to create an iterator, | |
# from https://github.com/oobabooga/text-generation-webui/blob/ad37f396fc8bcbab90e11ecf17c56c97bfbd4a9c/modules/text_generation.py#L216-L243. | |
def generate_with_callback(callback=None, **kwargs): | |
kwargs.setdefault( | |
"stopping_criteria", transformers.StoppingCriteriaList() | |
) | |
kwargs["stopping_criteria"].append( | |
Stream(callback_func=callback) | |
) | |
with torch.no_grad(): | |
model.generate(**kwargs) | |
def generate_with_streaming(**kwargs): | |
return Iteratorize( | |
generate_with_callback, kwargs, callback=None | |
) | |
with generate_with_streaming(**generate_params) as generator: | |
for output in generator: | |
# new_tokens = len(output) - len(input_ids[0]) | |
decoded_output = tokenizer.decode(output) | |
if output[-1] in [tokenizer.eos_token_id]: | |
break | |
yield prompter.get_response(decoded_output) | |
print(decoded_output) | |
return # early return for stream_output | |
# Without streaming | |
with torch.no_grad(): | |
generation_output = model.generate( | |
input_ids=input_ids, | |
generation_config=generation_config, | |
return_dict_in_generate=True, | |
output_scores=True, | |
max_new_tokens=max_new_tokens, | |
) | |
s = generation_output.sequences[0] | |
output = tokenizer.decode(s) | |
print(output) | |
yield prompter.get_response(output) | |
gr.Interface( | |
fn=evaluate, | |
inputs=[ | |
gr.components.Textbox( | |
lines=2, | |
label="Instruction", | |
placeholder="ๆญคๅค่พๅ ฅๆณๅพ็ธๅ ณ้ฎ้ข", | |
), | |
# gr.components.Textbox(lines=2, label="Input", placeholder="none"), | |
gr.components.Slider( | |
minimum=0, maximum=1, value=0.1, label="Temperature" | |
), | |
gr.components.Slider( | |
minimum=0, maximum=1, value=0.75, label="Top p" | |
), | |
gr.components.Slider( | |
minimum=0, maximum=100, step=1, value=40, label="Top k" | |
), | |
gr.components.Slider( | |
minimum=1, maximum=4, step=1, value=1, label="Beams" | |
), | |
gr.components.Slider( | |
minimum=1, maximum=2000, step=1, value=256, label="Max tokens" | |
), | |
gr.components.Checkbox(label="Stream output", value=True), | |
], | |
outputs=[ | |
gr.inputs.Textbox( | |
lines=8, | |
label="Output", | |
) | |
], | |
title="๐ฆ๐ฒ LaWGPT", | |
description="", | |
).queue().launch(server_name="0.0.0.0", share=share_gradio) | |
if __name__ == "__main__": | |
fire.Fire(main) | |