import requests from huggingface_hub import InferenceClient, login from transformers import AutoTokenizer from langchain.chat_models import ChatOpenAI import os, sys, json import gradio as gr from langchain.evaluation import load_evaluator from pprint import pprint as print import time from utils import * from beschreibungen import * #from langchain.chains import LLMChain, RetrievalQA #from langchain.retrievers.self_query.base import SelfQueryRetriever from langchain.document_loaders import PyPDFLoader, WebBaseLoader, UnstructuredWordDocumentLoader, DirectoryLoader from langchain.document_loaders.blob_loaders.youtube_audio import YoutubeAudioLoader from langchain.document_loaders.generic import GenericLoader from langchain.document_loaders.parsers import OpenAIWhisperParser from langchain.schema import AIMessage, HumanMessage from langchain.embeddings import HuggingFaceInstructEmbeddings, HuggingFaceEmbeddings, HuggingFaceBgeEmbeddings, HuggingFaceInferenceAPIEmbeddings from langchain.prompts import PromptTemplate from langchain.embeddings.openai import OpenAIEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter #from langchain.retrievers.self_query.base import SelfQueryRetriever from langchain.vectorstores import Chroma from chromadb.errors import InvalidDimensionException from dotenv import load_dotenv, find_dotenv _ = load_dotenv(find_dotenv()) # access token with permission to access the model and PRO subscription HUGGINGFACEHUB_API_TOKEN = os.getenv("HF_ACCESS_READ") os.environ["HUGGINGFACEHUB_API_TOKEN"] = HUGGINGFACEHUB_API_TOKEN #login(token=os.environ["HF_ACCESS_READ"]) OAI_API_KEY=os.getenv("OPENAI_API_KEY") ################################################# #Prompt Zusätze ################################################# template = """Antworte in deutsch, wenn es nicht explizit anders gefordert wird. Wenn du die Antwort nicht kennst, antworte einfach, dass du es nicht weißt. Versuche nicht, die Antwort zu erfinden oder aufzumocken. Halte die Antwort kurz aber exakt. """ llm_template = "Beantworte die Frage am Ende. " + template + "Frage: {question} Hilfreiche Antwort: " rag_template = "Nutze die folgenden Kontext Teile, um die Frage zu beantworten am Ende. Findest du die Antwort in den Kontext Teilen nicht, versuche die Antwort selbst zu finden. Mache das aber in deiner Antwort deutlich." + template + "{context} Frage: {question} Hilfreiche Antwort: " ################################################# # Konstanten #RAG: Pfad, wo Docs/Bilder/Filme abgelegt werden können - lokal, also hier im HF Space (sonst auf eigenem Rechner) ################################################# PATH_WORK = "." CHROMA_DIR = "/chroma" YOUTUBE_DIR = "/youtube" ############################################### #URLs zu Dokumenten oder andere Inhalte, die einbezogen werden sollen PDF_URL = "https://arxiv.org/pdf/2303.08774.pdf" WEB_URL = "https://openai.com/research/gpt-4" YOUTUBE_URL_1 = "https://www.youtube.com/watch?v=--khbXchTeE" YOUTUBE_URL_2 = "https://www.youtube.com/watch?v=hdhZwyf24mE" #YOUTUBE_URL_3 = "https://www.youtube.com/watch?v=vw-KWfKwvTQ" ############################################### #globale Variablen ############################################## #nur bei ersten Anfrage splitten der Dokumente - um die Vektordatenbank entsprechend zu füllen splittet = False ############################################## # inference client ############################################## print ("Inf.Client") #client = InferenceClient("/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2Fmeta-llama%2FLlama-2-70b-chat-hf") #client = InferenceClient("https://ybdhvwle4ksrawzo.eu-west-1.aws.endpoints.huggingface.cloud") #Inference mit Authorisation: #API_URL = "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2FHuggingFaceH4%2Fzephyr-7b-beta" #braucht großen Space mit mind. 93GB #API_URL = "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2Fargilla%2Fnotux-8x7b-v1" HEADERS = {"Authorization": f"Bearer {HUGGINGFACEHUB_API_TOKEN}"} ############################################## # tokenizer for generating prompt ############################################## print ("Tokenizer") #tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-70b-chat-hf") #tokenizer = AutoTokenizer.from_pretrained("HuggingFaceH4/zephyr-7b-beta") #tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.1") #tokenizer = AutoTokenizer.from_pretrained("mistralai/Mixtral-8x7B-Instruct-v0.1") ############################################## # Zum Testen: #list of models available #client = InferenceClient() #print("List of models ......................:") #print(client.list_deployed_models("text-generation-inference")) #angezeigt am 17.12.2023: #{'text-generation': ['bigcode/starcoder','bigscience/bloom','codellama/CodeLlama-13b-hf','codellama/CodeLlama-34b-Instruct-hf','HuggingFaceH4/zephyr-7b-beta','HuggingFaceM4/idefics-80b-instruct', 'meta-llama/Llama-2-70b-chat-hf', #'mistralai/Mistral-7B-Instruct-v0.1','mistralai/Mistral-7B-v0.1', 'OpenAssistant/oasst-sft-4-pythia-12b-epoch-3.5','openchat/openchat_3.5','TheBloke/vicuna-7B-v1.5-GPTQ','tiiuae/falcon-180B-chat','tiiuae/falcon-7b', #'tiiuae/falcon-7b-instruct'],'text2text-generation': ['google/flan-t5-xxl']} ################################################# ################################################# ################################################# #Funktionen zur Verarbeitung ################################################ def add_text(history, text): history = history + [(text, None)] return history #, gr.Textbox(value="", interactive=False) def add_file(history, file): history = history + [((file.name,), None)] return history ################################################ ################################################ # Für den Vektorstore... # Funktion, um für einen best. File-typ ein directory-loader zu definieren def create_directory_loader(file_type, directory_path): #verschiedene Dokument loaders: loaders = { '.pdf': PyPDFLoader, '.word': UnstructuredWordDocumentLoader, } return DirectoryLoader( path=directory_path, glob=f"**/*{file_type}", loader_cls=loaders[file_type], ) #die Inhalte splitten, um in Vektordatenbank entsprechend zu laden als Splits def document_loading_splitting(): global splittet ############################## # Document loading docs = [] # kreiere einen DirectoryLoader für jeden file type pdf_loader = create_directory_loader('.pdf', './chroma/pdf') word_loader = create_directory_loader('.word', './chroma/word') # Laden der files pdf_documents = pdf_loader.load() word_documents = word_loader.load() #alle zusammen in docs (s.o.)... docs.extend(pdf_documents) docs.extend(word_documents) #andere loader - für URLs zu Web, Video, PDF im Web... # Load PDF loader = PyPDFLoader(PDF_URL) docs.extend(loader.load()) # Load Web loader = WebBaseLoader(WEB_URL) docs.extend(loader.load()) # Load YouTube loader = GenericLoader(YoutubeAudioLoader([YOUTUBE_URL_1,YOUTUBE_URL_2], PATH_WORK + YOUTUBE_DIR), OpenAIWhisperParser()) docs.extend(loader.load()) ################################ # Vektorstore Vorbereitung: Document splitting text_splitter = RecursiveCharacterTextSplitter(chunk_overlap = 150, chunk_size = 1500) splits = text_splitter.split_documents(docs) #nur bei erster Anfrage mit "choma" wird gesplittet... splittet = True return splits #Vektorstore anlegen... #Chroma DB die splits ablegen - vektorisiert... def document_storage_chroma(splits): #OpenAi embeddings---------------------------------- Chroma.from_documents(documents = splits, embedding = OpenAIEmbeddings(disallowed_special = ()), persist_directory = PATH_WORK + CHROMA_DIR) #HF embeddings-------------------------------------- #Chroma.from_documents(documents = splits, embedding = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2", model_kwargs={"device": "cpu"}, encode_kwargs={'normalize_embeddings': False}), persist_directory = PATH_WORK + CHROMA_DIR) #Vektorstore vorbereiten... #dokumente in chroma db vektorisiert ablegen können - die Db vorbereiten daüfur def document_retrieval_chroma(): #OpenAI embeddings ------------------------------- embeddings = OpenAIEmbeddings() #HF embeddings ----------------------------------- #Alternative Embedding - für Vektorstore, um Ähnlichkeitsvektoren zu erzeugen - die ...InstructEmbedding ist sehr rechenaufwendig #embeddings = HuggingFaceInstructEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={"device": "cpu"}) #etwas weniger rechenaufwendig: #embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2", model_kwargs={"device": "cpu"}, encode_kwargs={'normalize_embeddings': False}) #oder einfach ohne Langchain: #embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") #ChromaDb um die embedings zu speichern db = Chroma(embedding_function = embeddings, persist_directory = PATH_WORK + CHROMA_DIR) print ("Chroma DB bereit ...................") return db ############################################### #Langchain anlegen #langchain nutzen, um prompt an LLM zu leiten - llm und prompt sind austauschbar #prompt ohne RAG!!! def llm_chain(prompt): llm_template = "Beantworte die Frage am Ende. " + template + "Frage: " + prompt return llm_template #langchain nutzen, um prompt an llm zu leiten, aber vorher in der VektorDB suchen, um passende splits zum Prompt hinzuzufügen #prompt mit RAG!!! def rag_chain(prompt, db, k=3): rag_template = "Nutze die folgenden Kontext Teile am Ende, um die Frage zu beantworten . " + template + "Frage: " + prompt + "Kontext Teile: " retrieved_chunks = db.similarity_search(prompt, k) neu_prompt = rag_template for i, chunk in enumerate(retrieved_chunks): neu_prompt += f"{i+1}. {chunk}\n" return neu_prompt ######################################## # Bot- test gegen schädliche Bots die die Anwendung testen... # Funktion zur Überprüfung der Benutzereingabe # Funktion zur Überprüfung der Eingabe und Aktivierung der Hauptanwendung def validate_input(user_input_validate, validate=False): user_input_hashed = hash_input(user_input_validate) if user_input_hashed == hash_input(ANTI_BOT_PW): return "Richtig! Weiter gehts... ", True, gr.Textbox(visible=False), gr.Button(visible=False) else: return "Falsche Antwort!!!!!!!!!", False, gr.Textbox(label = "", placeholder="Bitte tippen Sie das oben im Moodle Kurs angegebene Wort ein, um zu beweisen, dass Sie kein Bot sind.", visible=True, scale= 5), gr.Button("Validieren", visible = True) ################################################### #Prompts mit History erzeugen für verschiednee Modelle ################################################### #Funktion, die einen Prompt mit der history zusammen erzeugt - allgemein def generate_prompt_with_history(text, history, max_length=4048): prompt="" history = ["\n{}\n{}".format(x[0],x[1]) for x in history] history.append("\n{}\n".format(text)) history_text = "" flag = False for x in history[::-1]: history_text = x + history_text flag = True print ("Prompt: ..........................") print(prompt+history_text) if flag: return prompt+history_text else: return None #Prompt und History für OPenAi Schnittstelle def generate_prompt_with_history_openai(prompt, history): history_openai_format = [] for human, assistant in history: history_openai_format.append({"role": "user", "content": human }) history_openai_format.append({"role": "assistant", "content":assistant}) history_openai_format.append({"role": "user", "content": prompt}) return history_openai_format ############################################## #History - die Frage oder das File eintragen... ############################################## def add_text(history, prompt): history = history + [(prompt, None)] return history, prompt, "" #gr.Textbox(value="", interactive=False) def add_file(history, file, prompt): if (prompt == ""): history = history + [((file.name,), None)] else: history = history + [((file.name,), None), (prompt, None)] return history, prompt, "" def transfer_input(inputs): textbox = reset_textbox() return ( inputs, gr.update(value=""), gr.Button.update(visible=True), ) ############################################## ############################################## ############################################## # generate function ############################################## def generate(text, history, rag_option, model_option, k=3, top_p=0.6, temperature=0.5, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3,): #mit RAG if (rag_option is None): raise gr.Error("Retrieval Augmented Generation ist erforderlich.") if (text == ""): raise gr.Error("Prompt ist erforderlich.") try: if (model_option == "HF1"): #Anfrage an InferenceEndpoint1 ---------------------------- API_URL = "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2FHuggingFaceH4%2Fzephyr-7b-beta" print("HF1") else: API_URL = "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2Ftiiuae%2Ffalcon-180B-chat" print("HF2") if (rag_option == "An"): #muss nur einmal ausgeführt werden... if not splittet: splits = document_loading_splitting() document_storage_chroma(splits) db = document_retrieval_chroma() #mit RAG: neu_text_mit_chunks = rag_chain(text, db, k) #für Chat LLM: #prompt = generate_prompt_with_history_openai(neu_text_mit_chunks, history) #als reiner prompt: prompt = generate_prompt_with_history(neu_text_mit_chunks, history) else: #für Chat LLM: #prompt = generate_prompt_with_history_openai(text, history) #als reiner prompt: prompt = generate_prompt_with_history(text, history) print("prompt:....................................") print (prompt) #Anfrage an Modell (mit RAG: mit chunks aus Vektorstore, ohne: nur promt und history) #payload = tokenizer.apply_chat_template([{"role":"user","content":prompt}],tokenize=False) #Für LLAMA: #payload = tokenizer.apply_chat_template(prompt,tokenize=False) #result = client.text_generation(payload, do_sample=True,return_full_text=False, max_new_tokens=2048,top_p=0.9,temperature=0.6,) #inference allg: data = { "inputs": prompt, "options": {"max_new_tokens": max_new_tokens}, } response= requests.post(API_URL, headers=HEADERS, json=data) result = response.json() print("result:------------------") chatbot_response = result[0]['generated_text'] print("anzahl tokens gesamt antwort:------------------") print (len(chatbot_response.split())) except Exception as e: raise gr.Error(e) chatbot_message = chatbot_response[len(prompt):].strip() print("history/chatbot_rsponse:--------------------------------") print(history) print(chatbot_message) """ #Antwort als Stream ausgeben... for i in range(len(chatbot_message)): time.sleep(0.03) yield chatbot_message[: i+1], "Generating" if shared_state.interrupted: shared_state.recover() try: yield chatbot_message[: i+1], "Stop: Success" return except: pass """ #Antwort als Stream ausgeben... history[-1][1] = "" for character in chatbot_message: history[-1][1] += character time.sleep(0.03) yield history, "Generating" if shared_state.interrupted: shared_state.recover() try: yield history, "Stop: Success" return except: pass #zum Evaluieren: # custom eli5 criteria #custom_criterion = {"eli5": "Is the output explained in a way that a 5 yeard old would unterstand it?"} #eval_result = evaluator.evaluate_strings(prediction=res.strip(), input=text, criteria=custom_criterion, requires_reference=True) #print ("eval_result:............ ") #print(eval_result) #return res.strip() ######################################## #Evaluation ######################################## evaluation_llm = ChatOpenAI(model="Inference Endpoint") # create evaluator evaluator = load_evaluator("criteria", criteria="conciseness", llm=evaluation_llm) ################################################ #GUI ############################################### #Beschreibung oben in GUI ################################################ print ("Start GUI") with open("custom.css", "r", encoding="utf-8") as f: customCSS = f.read() with gr.Blocks(css=customCSS, theme=themeAlex) as demo: history = gr.State([]) user_question = gr.State("") #validiert speichern validate = gr.State(False) with gr.Row(): user_input_validate =gr.Textbox(label= "Bitte das oben im Moodle Kurs angegebene Wort eingeben, um die Anwendung zu starten", visible=True, interactive=True, scale= 7) validate_btn = gr.Button("Validieren", visible = True) #validation_result = gr.Text(label="Validierungsergebnis") with gr.Row(): gr.HTML("LI Chatot") status_display = gr.Markdown("Success", elem_id="status_display") gr.Markdown(description_top) with gr.Row(): with gr.Column(scale=5): with gr.Row(): chatbot = gr.Chatbot(elem_id="chuanhu_chatbot") with gr.Row(): with gr.Column(scale=12): user_input = gr.Textbox( show_label=False, placeholder="Gib hier deinen Prompt ein...", container=False ) with gr.Column(min_width=70, scale=1): submitBtn = gr.Button("Senden") with gr.Column(min_width=70, scale=1): cancelBtn = gr.Button("Stop") with gr.Row(): emptyBtn = gr.ClearButton( [user_input, chatbot], value="🧹 Neue Session") btn = gr.UploadButton("📁", file_types=["image", "video", "audio"]) with gr.Column(): with gr.Column(min_width=50, scale=1): with gr.Tab(label="Parameter Einstellung"): gr.Markdown("# Parameters") rag_option = gr.Radio(["Aus", "An"], label="RAG - LI Erweiterungen", value = "Aus") model_option = gr.Radio(["HF1", "HF2"], label="Modellauswahl", value = "HF1") top_p = gr.Slider( minimum=-0, maximum=1.0, value=0.95, step=0.05, interactive=True, label="Top-p", ) temperature = gr.Slider( minimum=0.1, maximum=2.0, value=1, step=0.1, interactive=True, label="Temperature", ) max_length_tokens = gr.Slider( minimum=0, maximum=512, value=512, step=8, interactive=True, label="Max Generation Tokens", ) max_context_length_tokens = gr.Slider( minimum=0, maximum=4096, value=2048, step=128, interactive=True, label="Max History Tokens", ) repetition_penalty=gr.Slider(label="Repetition penalty", value=1.2, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Strafe für wiederholte Tokens", visible=True) anzahl_docs = gr.Slider(label="Anzahl Dokumente", value=3, minimum=1, maximum=10, step=1, interactive=True, info="wie viele Dokumententeile aus dem Vektorstore an den prompt gehängt werden", visible=True) gr.Markdown(description) #Argumente für generate Funktion als Input predict_args = dict( fn=generate, inputs=[ user_question, chatbot, #history, rag_option, model_option, anzahl_docs, top_p, temperature, max_length_tokens, max_context_length_tokens, repetition_penalty, validate ], outputs=[ chatbot, status_display], #[ chatbot, history, status_display], show_progress=True, ) reset_args = dict( fn=reset_textbox, inputs=[], outputs=[user_input, status_display] ) # Chatbot transfer_input_args_text = dict( fn=add_text, inputs=[chatbot, user_input], outputs=[chatbot, user_question, user_input], show_progress=True ) transfer_input_args_file = dict( fn=add_file, inputs=[chatbot, btn, user_input], outputs=[chatbot, user_question, user_input], show_progress=True ) predict_event1 = user_input.submit(**transfer_input_args_text, queue=False,).then(**predict_args) predict_event3 = btn.upload(**transfer_input_args_file,queue=False,).then(**predict_args) predict_event2 = submitBtn.click(**transfer_input_args_text, queue=False,).then(**predict_args) #Validation Button # Event-Handler für die Validierung validate_btn.click(validate_input, inputs=[user_input_validate, validate], outputs=[status_display, validate, user_input_validate, validate_btn]) user_input_validate.submit(validate_input, inputs=[user_input_validate, validate], outputs=[status_display, validate, user_input_validate, validate_btn]) cancelBtn.click( cancel_outputing, [], [status_display], #cancels=[predict_event1,predict_event2, predict_event3 ] ) demo.title = "HH-ChatBot" demo.queue().launch(debug=True)