Alpaca / app.py
hieult44's picture
Add application file
113ebc1
import streamlit as st
import openai
from streamlit_chat import message as st_message
from transformers import BlenderbotTokenizer
from transformers import BlenderbotForConditionalGeneration
from io import StringIO
from io import BytesIO
import requests
import torch
import PyPDF2
from transformers import GenerationConfig, LlamaTokenizer, LlamaForCausalLM
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import CharacterTextSplitter
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
import os
os.environ['OPENAI_API_KEY']="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"
import warnings
warnings.filterwarnings("ignore")
st.markdown(
"""
<style>
[data-testid="stSidebar"][aria-expanded="true"] > div:first-child {
width: 325px;
}
[data-testid="stSidebar"][aria-expanded="false"] > div:first-child {
width: 325px;
margin-left: -350px;
}
</style>
""",
unsafe_allow_html=True,
)
st.sidebar.title('ChatFAQ')
st.sidebar.subheader('Parameters')
@st.cache_resource
def get_models():
# it may be necessary for other frameworks to cache the model
# seems pytorch keeps an internal state of the conversation
model_name = "facebook/blenderbot-400M-distill"
tokenizer = BlenderbotTokenizer.from_pretrained(model_name)
model = BlenderbotForConditionalGeneration.from_pretrained(model_name)
return tokenizer, model
st.title("ChatFAQ")
app_mode = st.sidebar.selectbox('Choose the App mode',
['Blenderbot_1B', 'Blenderbot-400M-distill', 'ChatGPT-3.5', 'Fine-tune Alpaca 7B', 'Customized Alpaca 7B', 'Alpaca-LORA']
)
# app_mode = st.sidebar.selectbox('Choose the domain',
# ['Law','Economic','Technology']
# )
uploaded_file = st.sidebar.file_uploader("Choose a file")
if uploaded_file is not None:
string_data = ""
file_type = uploaded_file.type
if file_type == "application/pdf":
bytes_data = uploaded_file.getvalue()
# Create a BytesIO object from the bytes data
bytes_io = BytesIO(bytes_data)
# Create a PDF reader object
pdf_reader = PyPDF2.PdfReader(bytes_io)
# Get the number of pages in the PDF file
num_pages = len(pdf_reader.pages)
# Loop through each page and extract the text
for i in range(num_pages):
page = pdf_reader.pages[i]
text = page.extract_text()
string_data = string_data + text
elif file_type == "text/plain":
with st.spinner('Loading the document...'):
# To convert to a string based IO:
stringio = StringIO(uploaded_file.getvalue().decode("utf-8"))
# To read file as string:
string_data = stringio.read()
st.success('Loading successfully!')
if app_mode =='Blenderbot_1B':
st.markdown('In this application, **Blenderbot_1B API** is used and **StreamLit** is to create the Web Graphical User Interface (GUI).')
st.markdown(
"""
<style>
[data-testid="stSidebar"][aria-expanded="true"] > div:first-child {
width: 300px;
}
[data-testid="stSidebar"][aria-expanded="false"] > div:first-child {
width: 300px;
margin-left: -400px;
}
</style>
""",
unsafe_allow_html=True,
)
if 'history1' not in st.session_state:
st.session_state['history1'] = []
API_TOKEN = "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"
API_URL = "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2Ffacebook%2Fblenderbot-1B-distill%26quot%3B%3C%2Fspan%3E%3C!-- HTML_TAG_END -->
headers = {"Authorization": f"Bearer {API_TOKEN}"}
def query(payload):
response = requests.post(API_URL, headers=headers, json=payload)
return response.json()
def generate_answer():
historyInputs = {"past_user_inputs": [],
"generated_responses": []}
for element in st.session_state["history1"]:
if element["is_user"] == True:
historyInputs["past_user_inputs"].append(element["message"])
else:
historyInputs["generated_responses"].append(element["message"])
user_message = st.session_state.input_text
historyInputs["text"] = user_message if user_message != "" else " "
print(historyInputs)
output = query({
"inputs": historyInputs,
})
print(output)
print(output["generated_text"])
st.session_state['history1'].append({"message": user_message, "is_user": True})
st.session_state['history1'].append({"message": output["generated_text"], "is_user": False})
print(st.session_state['history1'])
for chat in st.session_state['history1']:
st_message(**chat) # unpacking
st.text_input("Talk to the bot", key="input_text", on_change=generate_answer)
if st.button("Clear"):
st.session_state["history1"] = []
for chat in st.session_state['history1']:
st_message(**chat) # unpacking
if app_mode =='Blenderbot-400M-distill':
st.markdown('In this application, **Blenderbot-400M-distill API** is used and **StreamLit** is to create the Web Graphical User Interface (GUI).')
st.markdown(
"""
<style>
[data-testid="stSidebar"][aria-expanded="true"] > div:first-child {
width: 300px;
}
[data-testid="stSidebar"][aria-expanded="false"] > div:first-child {
width: 300px;
margin-left: -400px;
}
</style>
""",
unsafe_allow_html=True,
)
if 'history2' not in st.session_state:
st.session_state['history2'] = []
def generate_answer():
tokenizer, model = get_models()
user_message = st.session_state.input_text
print(type(user_message), user_message)
History_inputs = []
for element in st.session_state["history2"]:
if element["is_user"] == True:
History_inputs.append(element["message"])
historyInputs = ". ".join(History_inputs)
print(historyInputs + " " + st.session_state.input_text)
inputs = tokenizer(historyInputs + " . " + st.session_state.input_text, return_tensors="pt")
result = model.generate(**inputs)
message_bot = tokenizer.decode(
result[0], skip_special_tokens=True
) # .replace("<s>", "").replace("</s>", "")
st.session_state['history2'].append({"message": user_message, "is_user": True})
st.session_state['history2'].append({"message": message_bot, "is_user": False})
for chat in st.session_state['history2']:
st_message(**chat) # unpacking
st.text_input("Talk to the bot", key="input_text", on_change=generate_answer)
if st.button("Clear"):
st.session_state["history2"] = []
for chat in st.session_state['history2']:
st_message(**chat) # unpacking
if app_mode =='ChatGPT-3.5':
counter = 0
def get_unique_key():
global counter
counter += 1
return f"chat{counter}"
OPENAI_KEY="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"
openai.api_key = OPENAI_KEY
openai_engine = openai.ChatCompletion()
if 'history3' not in st.session_state:
st.session_state['history3'] = []
if "messages" not in st.session_state:
st.session_state["messages"] = []
if "messagesDocument" not in st.session_state:
st.session_state["messagesDocument"] = []
def generate_answer():
st.session_state["messages"] += [{"role": "user", "content": st.session_state.input_text}]
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo", messages=st.session_state["messages"]
)
message_response = response["choices"][0]["message"]["content"]
st.session_state["messages"] += [
{"role": "system", "content": message_response}
]
st.session_state['history3'].append({"message": st.session_state.input_text, "is_user": True})
st.session_state['history3'].append({"message": message_response, "is_user": False})
print(st.session_state['history3'])
print(st.session_state["messages"])
if st.button("Retrieve the document's content"):
if uploaded_file is None:
st.error("Please input the document!", icon="🚨")
else:
with st.spinner('Wait for processing the document...'):
with open("my_text.txt", "w", encoding='utf-8') as f:
f.write(string_data)
loader = TextLoader("my_text.txt", encoding='utf-8')
documents = loader.load()
print(type(documents), documents)
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
docsearch = Chroma.from_documents(texts, embeddings)
qa = RetrievalQA.from_chain_type(llm=OpenAI(), chain_type="stuff", retriever=docsearch.as_retriever(search_kwargs={"k": 1}))
st.success('Successful!')
def generate_answer():
query = st.session_state.input_text
docs = qa.run(query)
system_prompt_first = """
You are a helpful assisant that help user with answering questions over a content that was pulled from a database
---CONTENT START---\n
"""
system_prompt_second = """
\n---CONTENT END---
Based on information pulled from the database, answer the question below from the user. If the content pulled from the database is not related to the question, say "I do not have enough information for this question"
Question:
"""
system_prompt_ans = "\nAnswer:"
prompt = system_prompt_first + docs + system_prompt_second + query + system_prompt_ans
print(prompt)
st.session_state["messagesDocument"] += [{"role": "user", "content": prompt}]
message_response = openai_engine.create(model='gpt-3.5-turbo',messages=st.session_state["messagesDocument"])
st.session_state['history3'].append({"message": st.session_state.input_text, "is_user": True})
st.session_state['history3'].append({"message": message_response.choices[0].message.content, "is_user": False})
st.session_state["messagesDocument"] += [
{"role": "system", "content": message_response.choices[0].message.content}
]
print(st.session_state["messagesDocument"])
st.markdown("""
<style>
.chatbox {
max-height: 300px;
overflow-y: auto;
}
</style>
""", unsafe_allow_html=True)
for chat in st.session_state['history3']:
st_message(**chat, key=get_unique_key()) # unpacking
st.text_input("Talk to the bot: ",placeholder = "Ask me anything ...", key="input_text", on_change=generate_answer)
if st.button("Clear"):
st.session_state["history3"] = []
st.session_state["messages"] = []
for chat in st.session_state['history3']:
st_message(**chat, key=get_unique_key()) # unpacking
if app_mode =='Fine-tune Alpaca 7B':
st.markdown('In this application, we are using **Fine-tune Alpaca 7B API** and **StreamLit** is to create the Web Graphical User Interface (GUI). ')
st.markdown(
"""
<style>
[data-testid="stSidebar"][aria-expanded="true"] > div:first-child {
width: 300px;
}
[data-testid="stSidebar"][aria-expanded="false"] > div:first-child {
width: 300px;
margin-left: -400px;
}
</style>
""",
unsafe_allow_html=True,
)
if app_mode =='Customized Alpaca 7B':
st.markdown('In this application, we are using **PART - Part Attention Regressor for 3D Human Body Estimation [ICCV 2021]** for creating Body Mesh and **Dynamic Time Warping** for comparing poses. **StreamLit** is to create the Web Graphical User Interface (GUI). ')
st.markdown(
"""
<style>
[data-testid="stSidebar"][aria-expanded="true"] > div:first-child {
width: 300px;
}
[data-testid="stSidebar"][aria-expanded="false"] > div:first-child {
width: 300px;
margin-left: -400px;
}
</style>
""",
unsafe_allow_html=True,
)
if app_mode =='Alpaca-LORA':
st.markdown('In this application, we are using **PART - Part Attention Regressor for 3D Human Body Estimation [ICCV 2021]** for creating Body Mesh and **Dynamic Time Warping** for comparing poses. **StreamLit** is to create the Web Graphical User Interface (GUI). ')
st.markdown(
"""
<style>
[data-testid="stSidebar"][aria-expanded="true"] > div:first-child {
width: 300px;
}
[data-testid="stSidebar"][aria-expanded="false"] > div:first-child {
width: 300px;
margin-left: -400px;
}
</style>
""",
unsafe_allow_html=True,
)
def generate_prompt(instruction: str, input_ctxt: str = None) -> str:
if input_ctxt:
return f"""Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.
### Instruction:
{instruction}
### Input:
{input_ctxt}
### Response:"""
else:
return f"""Below is an instruction that describes a task. Write a response that appropriately completes the request.
### Instruction:
{instruction}
### Response:"""
tokenizer = LlamaTokenizer.from_pretrained("chainyo/alpaca-lora-7b")
model = LlamaForCausalLM.from_pretrained(
"chainyo/alpaca-lora-7b",
load_in_8bit=True,
torch_dtype=torch.float16,
device_map="auto",
)
generation_config = GenerationConfig(
temperature=0.2,
top_p=0.75,
top_k=40,
num_beams=4,
max_new_tokens=128,
)
model.eval()
if torch.__version__ >= "2":
model = torch.compile(model)
instruction = "What is the meaning of life?"
input_ctxt = None # For some tasks, you can provide an input context to help the model generate a better response.
prompt = generate_prompt(instruction, input_ctxt)
input_ids = tokenizer(prompt, return_tensors="pt").input_ids
input_ids = input_ids.to(model.device)
with torch.no_grad():
outputs = model.generate(
input_ids=input_ids,
generation_config=generation_config,
return_dict_in_generate=True,
output_scores=True,
)
response = tokenizer.decode(outputs.sequences[0], skip_special_tokens=True)
print(response)
# def generate_answer():
# tokenizer, model = get_models()
# user_message = st.session_state.input_text
# print(type(user_message), user_message)
# History_inputs = []
# for element in st.session_state["history"]:
# if element["is_user"] == True:
# History_inputs.append(element["message"])
# historyInputs = ". ".join(History_inputs)
# print(historyInputs + " " + st.session_state.input_text)
# inputs = tokenizer(historyInputs + " . " + st.session_state.input_text, return_tensors="pt")
# result = model.generate(**inputs)
# message_bot = tokenizer.decode(
# result[0], skip_special_tokens=True
# ) # .replace("<s>", "").replace("</s>", "")
# st.session_state['history'].append({"message": user_message, "is_user": True})
# st.session_state['history'].append({"message": message_bot, "is_user": False})
# for chat in st.session_state['history']:
# st_message(**chat) # unpacking
# st.text_input("Talk to the bot", key="input_text", on_change=generate_answer)