NXTtokenViz / app3.py
billusanda007's picture
Rename app.py to app3.py
70b9637 verified
import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import gradio as gr
import pandas as pd
from collections import Counter, defaultdict
import os
from huggingface_hub import login
import requests
from bs4 import BeautifulSoup
import numpy as np
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import googlesearch
import time
import nltk
nltk.download('punkt')
from sentence_transformers import SentenceTransformer, util
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
def fetch_article_text_sequential(url):
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
exclude=["Thank you for your patience","Subscribe","subscribe","trouble retrieving the article content","browser settings",
"Thank you for your patience while we verify access. If you are in Reader mode please exit and log into your Times account, or subscribe for all of The Times.",
"Thank you for your patience while we verify access.",
"Already a subscriber? Log in.",
"Want all of The Times? Subscribe.",
"Advertisement",
"Site Index",
"Thank you for your patience while we verify access. If you are in Reader mode please exit andlog intoyour Times account, orsubscribefor all of The Times.",
"Already a subscriber?Log in.",
"Want all of The Times?Subscribe.",
"Site Information Navigation"
]
try:
# Send a request to the webpage with the specified headers
response = requests.get(url, headers=headers)
response.raise_for_status() # Check that the request was successful
# Parse the webpage content
soup = BeautifulSoup(response.text, 'html.parser')
# Initialize an empty list to store the text sequentially
article_content = []
# Define the tags we are interested in (headlines and paragraphs)
tags_of_interest = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p']
# Find all tags of interest in the order they appear in the document
for tag in soup.find_all(tags_of_interest):
if not any(excluded_phrase in tag.get_text() for excluded_phrase in exclude):
text = tag.get_text(strip=True)
article_content.append(text)
return '\n'.join(article_content)
except:
return None
def get_google_search_results(query, start=0):
search_url = "https://www.google.com/search"
params = {"q": query, "start": start}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
response = requests.get(search_url, params=params, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")
search_results = []
for g in soup.find_all(class_="g"):
title = g.find("h3").text if g.find("h3") else "No title"
link = g.find("a")["href"] if g.find("a") else "No link"
if not link.lower().endswith(('.pdf', '.PDF')):
search_results.append({"title": title, "link": link})
return search_results
def fetch_sentences_from_html(html):
try:
# Parse the string with BeautifulSoup
if html == None:
return []
soup = BeautifulSoup(html, 'html.parser')
paragraphs = soup.find_all("p")
text = " ".join(p.get_text() for p in paragraphs)
sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
##print(sentences)
return sentences
except Exception as e:
##print(f"Failed to fetch {html}: {str(e)}")
return []
# Function to rank sentences using cosine similarity
def rank_sentences(sentences):
if not sentences:
return [] # Return an empty list if no sentences are found
embeddings = model.encode(sentences, convert_to_tensor=True)
# Compute pairwise cosine similarity between sentences
similarities = util.pytorch_cos_sim(embeddings, embeddings).cpu().numpy()
# Calculate the average similarity for each sentence
avg_similarities = np.mean(similarities, axis=1)
# Rank sentences based on their average similarity
ranked_sentences = sorted(zip(sentences, avg_similarities), key=lambda x: x[1], reverse=True)
ranked_sentences = [sentence for sentence, _ in ranked_sentences]
return ranked_sentences
def rank_sentences_new(sentences, query, top_n=20):
if sentences == None:
return []
sentences = re.split("\n", sentences.strip())
# Remove any empty strings from the list
[sentence.strip() for sentence in sentences if sentence.strip()]
vectorizer = TfidfVectorizer().fit_transform([query] + sentences)
vectors = vectorizer.toarray()
query_vector = vectors[0]
sentences_vectors = vectors[1:]
cosine_similarities = cosine_similarity([query_vector], sentences_vectors).flatten()
ranked_indices = cosine_similarities.argsort()[-top_n:][::-1]
return [sentences[idx] for idx in ranked_indices]
domains = [
"wikipedia.org", "nytimes.com", "cnn.com", "bbc.com", "theguardian.com",
"forbes.com", "reuters.com", "cnbc.com", "bloomberg.com", "foxnews.com",
"npr.org", "washingtonpost.com", "wsj.com", "aljazeera.com", "ft.com",
"huffpost.com", "nationalgeographic.com", "scientificamerican.com",
"nature.com", "time.com", "usatoday.com", "apnews.com", "abcnews.go.com",
"cbsnews.com", "nbcnews.com", "news.yahoo.com", "theatlantic.com",
"vox.com", "politico.com", "economist.com"
]
exclude=["Thank you for your patience","Subscribe","subscribe","trouble retrieving the article content","browser settings",
"Thank you for your patience while we verify access. If you are in Reader mode please exit and log into your Times account, or subscribe for all of The Times.",
"Thank you for your patience while we verify access.",
"Already a subscriber? Log in.",
"Want all of The Times? Subscribe.",
"Advertisement",
"Site Index",
"Thank you for your patience while we verify access. If you are in Reader mode please exit andlog intoyour Times account, orsubscribefor all of The Times.",
"Already a subscriber?Log in.",
"Want all of The Times?Subscribe.",
"Site Information Navigation",
"Please enable JS and disable any ad blocker"
]
# Define number of results we want to retrieve
num_results_needed = 10
all_results = []
start = 0
# Ask the user for a search query
# user_query = input("Enter a search query: ")
def get_web_content(user_query,num_results_needed):
all_results = []
start = 0
t1=time.time()
while len(all_results) < num_results_needed:
results = get_google_search_results(user_query, start=start)
all_results.extend(results)
all_results = all_results[:num_results_needed] # Ensure no more than needed results
start += 10
all_sentences_2 = []
# #print the search results and top sentences from each URL
delimiter='\n'
ans = []
for result in all_results:
#print(f"Title: {result['title']}")
#print(f"Link: {result['link']}")
# sentences = get_top_sentences(result['link'])
text = fetch_article_text_sequential(result['link'])
top_sentences = rank_sentences_new(text, user_query)
ans=[]
for sentence in top_sentences:
if not any(excluded_phrase in sentence for excluded_phrase in exclude):
#print(sentence)
ans.append(sentence)
if(len(ans))==15:
break
all_sentences_2.extend(ans)
#print()
t2=time.time()
minutes, seconds = divmod(t2-t1, 60)
#print(f"{minutes} minutes and {seconds} seconds")
ans = "\n".join(sentence.strip() for sentence in all_sentences_2 if sentence.strip())
return ans , all_sentences_2
def get_web_content_new(user_query,num_results_needed):
all_results = []
start = 0
t1=time.time()
while len(all_results) < num_results_needed:
results = get_google_search_results(user_query, start=start)
all_results.extend(results)
all_results = all_results[:num_results_needed] # Ensure no more than needed results
start += 10
all_sentences = []
# #print the search results and top sentences from each URL
all_sentences_2 = []
delimiter='\n'
for result in all_results:
##print(f"Title: {result['title']}")
##print(f"Link: {result['link']}")
text = fetch_article_text_sequential(result['link'])
######
##print(text)
sentences = nltk.sent_tokenize(text)
sentences=sentences[:min(150,len(sentences))]
all_sentences.extend(sentences)
ranked_sentences = rank_sentences(all_sentences)
#print("Ranked Sentences")
#print("ranked_sentences",ranked_sentences,"\n\n")
ans2=[]
for sentence in ranked_sentences:
if not any(excluded_phrase in sentence for excluded_phrase in exclude):
##print(sentence)
ans2.append(sentence)
if(len(ans2))==15:
break
all_sentences_2.extend(ans2)
##print()
t2=time.time()
minutes, seconds = divmod(t2-t1, 60)
##print(f"{minutes} minutes and {seconds} seconds")
#return "\n".join(sentence.strip() for sentence in all_sentences_2 if sentence.strip())
#return text
return ranked_sentences
#sentences, sent = get_web_content("Who has been awarded the Nobel Prize in Physics in 2023",2)
#res = get_web_content(Question[0],10)
#Context = res[0]
# Get the token from the environment variable
api_token = os.getenv('HF_TOKEN')
# Load pre-trained model and tokenizer
model_name = "gpt2-large"
model = GPT2LMHeadModel.from_pretrained(model_name)
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
#device = torch.device("mps")
#model.to(device)
model.eval()
top_p = 0.9
threshold = 0.6
max_length = 100
#context_tokens = tokenizer.tokenize(Context)
def create_ngrams(tokens, n): return [tuple(tokens[i:i+n]) for i in range(len(tokens)-n+1)]
###Smoothing___
def kneser_ney_smoothing(ngram_counts, lower_order_counts, discount=0.75):
"""
Apply Kneser-Ney smoothing to n-gram counts.
Args:
ngram_counts (Counter): Counts of n-grams (e.g., 4-grams or 3-grams).
lower_order_counts (Counter): Counts of (n-1)-grams (e.g., 3-grams or 2-grams).
discount (float): Discounting parameter.
Returns:
defaultdict: Smoothed probabilities.
"""
continuation_counts = Counter()
lower_counts = Counter()
for ngram in ngram_counts:
lower_ngram = ngram[1:]
continuation_counts[lower_ngram] += 1
lower_counts[lower_ngram] += 1
def continuation_probability(word):
return continuation_counts[word] / sum(continuation_counts.values())
probabilities = defaultdict(lambda: defaultdict(float))
for ngram, count in ngram_counts.items():
lower_ngram = ngram[:-1]
lower_count = lower_order_counts[lower_ngram]
discounted_count = max(count - discount, 0)
lambda_factor = (discount / lower_count) * len(continuation_counts)
probabilities[lower_ngram][ngram[-1]] = (discounted_count / lower_count) + lambda_factor * continuation_probability(ngram[-1])
return probabilities
def get_probability_from_context(Context):
context_tokens = tokenizer.tokenize(Context)
four_grams = create_ngrams(context_tokens, 4)
three_grams = create_ngrams(context_tokens, 3)
four_gram_counts = Counter(four_grams)
three_gram_counts = Counter(three_grams)
probabilities = kneser_ney_smoothing(four_gram_counts, three_gram_counts)
return probabilities, four_gram_counts, three_gram_counts
#_probabilities__, four_gram_counts, three_gram_counts = get_probability_from_context(Context)
#input_tokens = tokenizer.tokenize(initial_text)
#input_3_gram = tuple(input_tokens[-3:])
def predict_next_token(probabilities, three_gram): return probabilities.get(three_gram, {})
#next_token_probs = predict_next_token(_probabilities__, input_3_gram)
#top_k = 4
#top_k_tokens = sorted(next_token_probs.items(), key=lambda x: x[1], reverse=True)[:top_k]
#probs = (kneser_ney_smoothing(four_gram_counts, three_gram_counts))
#next_token_probs = predict_next_token(probs, input_3_gram)
def generate_text_with_probs(initial_context, context_text , top_p, max_length, top_k, threshold=0.6):
Tokens = {}
#input_ids = tokenizer.encode(initial_context, return_tensors="pt").to(device='mps')
input_ids = tokenizer.encode(initial_context, return_tensors="pt").to(device='cpu')
generated_text = initial_context
token_tables = []
token_no = 1
context_tokens = tokenizer.tokenize(context_text)
four_grams = create_ngrams(context_tokens, 4)
three_grams = create_ngrams(context_tokens, 3)
two_grams = create_ngrams(context_tokens, 2)
one_grams = create_ngrams(context_tokens, 1)
four_gram_counts = Counter(four_grams)
three_gram_counts = Counter(three_grams)
two_grams_counts = Counter(two_grams)
one_grams_counts = Counter(one_grams)
prob_list = ["four_gram", "three_gram", "two_gram", "one_gram"] # Define prob_list here
prob = [four_gram_counts ,three_gram_counts ,two_grams_counts ,one_grams_counts]
probs = kneser_ney_smoothing(four_gram_counts, three_gram_counts)
use_llm = 0
use_llm_back_up = 0
use_ngram = 0
flag = False
count = 0
Token_index = 0
colored_text = initial_context
with torch.no_grad():
#while len(generated_text.split()) < max_length:
for _ in range(max_length):
outputs = model(input_ids=input_ids)
next_token_logits = outputs.logits[:, -1, :]
sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True)
cumulative_probs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1)
sorted_indices_to_remove = cumulative_probs > top_p
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
sorted_indices_to_remove[..., 0] = 0
indices_to_remove = sorted_indices[sorted_indices_to_remove]
next_token_logits[:, indices_to_remove] = -float('Inf')
probabilities = torch.softmax(next_token_logits, dim=-1)
top_tokens = sorted_indices[0, :top_k]
top_probs = probabilities[0, top_tokens]
top_token_probs = [(tokenizer.decode([token.item()]), prob.item()) for token, prob in zip(top_tokens, top_probs)]
df = pd.DataFrame(top_token_probs, columns=["Token", "Probability"])
df.index = df.index + 1
token_tables.append((f"{token_no}>> Next token options from LLM", df))
##print("Next token options from LLM")
##print(df)
cumulative_prob = cumulative_probs[0, top_k - 1].item()
##print(f"cumulative_prob from LLM: {cumulative_prob}")
entropy = (-1)*np.sum(np.array(df['Probability'])*np.log(df['Probability']))
##print("LLM Entropy:",(-1)*np.sum(np.array(df['Probability'])*np.log(df['Probability'])))
##print("\n")
input_text = tokenizer.decode(input_ids[0], skip_special_tokens=True)
input_tokens = tokenizer.tokenize(input_text)
use_llm += 1
__token_pob__ = {}
num = 0
num_ = 4
while __token_pob__ == {} and num < 3:
probs = kneser_ney_smoothing(prob[num],prob[num+1])
__inputs__ = tuple(input_tokens[-(3-num):])
__token_pob__ = probs.get(__inputs__, {})
##print(num,"\n",num_)
num += 1
num_ -= 1
##print(f"Next word probs N_GRAM:{__token_pob__},\n input_{num_}_gram: {__inputs__},\n using {prob_list[num]}_counter and {prob_list[num-1]}_counter; probability exist: {__token_pob__ != {}}")
df = pd.DataFrame(list(__token_pob__.items()), columns=['Token', 'Probability'])
df.index = df.index + 1
token_tables.append((f"{token_no}>> Next token options from N_gram", df))
token_no +=1
##print(f"Next token options from N_GRAM:")
##print(df)
##print("Cumulative Probability of N_gram:",np.sum(df['Probability']))
#print("\n")
if cumulative_prob < threshold and __token_pob__ != {} and flag == True and count >= 4 or np.sum(df['Probability']) > cumulative_prob:
Token_index+=1
#if cumulative_prob < threshold and __token_pob__ != {} and flag == True and count >= 4 or entropy >= 0.6:
##print("Using n-gram model")
next_token = max(__token_pob__, key=__token_pob__.get)
if next_token == 'Ċ':
sorted_tokens = sorted(__token_pob__.items(), key=lambda x: x[1], reverse=True)
if len(sorted_tokens) > 1:
next_token = sorted_tokens[1][0]
##print("Second max token : ", next_token)
Tokens[Token_index] = [next_token,"ngram",__token_pob__[next_token]]
#######
color_code = "#78bfd3" # Light blue for n-gram
colored_text += f"<span style='color: {color_code}'>{tokenizer.convert_tokens_to_string(next_token)}</span>"
else:
Tokens[Token_index] = [next_token,"ngram",__token_pob__[next_token]]
######
color_code = "#78bfd3" # Light blue for n-gram
colored_text += f"<span style='color: {color_code}'>{tokenizer.convert_tokens_to_string(next_token)}</span>"
##print("n-gram token : ",next_token)
input_tokens.append(next_token)
generated_text = tokenizer.convert_tokens_to_string(input_tokens)
##print(generated_text)
initial_context = generated_text
#input_ids = tokenizer.encode(generated_text, return_tensors="pt").to(device='mps')
input_ids = tokenizer.encode(generated_text, return_tensors="pt").to(device='cpu')
use_ngram += 1
else:
##print("Using LLM")
Token_index+=1
next_token = torch.multinomial(probabilities, num_samples=1)
next_token_prob = probabilities[0, next_token].item()
next_token_text = tokenizer.decode(next_token.item())
##print("LLM token : ",next_token_text)
Tokens[Token_index] = [next_token_text,"llm",next_token_prob]
color_code = "#c99a6e"
colored_text += f"<span style='color: {color_code}'>{next_token_text}</span>"
count += 1
if count >= 4:
flag = True
#token_no += 1
input_ids = torch.cat([input_ids, next_token], dim=-1)
if next_token.item() == tokenizer.eos_token_id:
break
generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True)
##print(generated_text)
initial_context = generated_text
use_llm_back_up += 1
##print(initial_context)
##print('-------------------------------------------------------------------------------------------------------------------------------------------------------------\n\n')
##print("\n\n")
generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True)
#total = use_llm + use_llm_back_up + use_ngram
##print(f"total: {use_llm} ({(use_llm / total) * 100:.2f}%)")
##print(f"use_llms: {use_llm_back_up} ({(use_llm_back_up / total) * 100:.2f}%)")
##print(f"use_ngram: {use_ngram} ({(use_ngram / total) * 100:.2f}%)")
##print('-------------------------------------------------------------------------------------------------------------------------------------------------------------\n\n')
return generated_text, Tokens, token_tables,colored_text
def combined_model_predictions(query, initial_context, top_p, max_length, top_k, threshold, docs):
Question = [query]
context_text = get_web_content(Question[0], docs)[0]
print('Content Fetched')
generated_text, tokens, token_tables, colored_html = generate_text_with_probs(initial_context, context_text, top_p, max_length, top_k, threshold)
data_list = [(token_index, tupes[0], tupes[1], tupes[2]) for token_index, tupes in tokens.items()]
df = pd.DataFrame(data_list, columns=['Token_pos', 'Token', 'Source Model', "Probability"])
return colored_html, df, token_tables
iface = gr.Interface(
fn=combined_model_predictions,
inputs=[
gr.Textbox(lines=2,placeholder="Enter query here..."),
gr.Textbox(lines=2,placeholder="Enter initial context here..."),
gr.Slider(0, 1, step=0.01, value=0.9, label="Top-p (nucleus) sampling"),
gr.Slider(1, 100, value= 4, step=1, label="Max Length"),
gr.Slider(1, 50, value= 5, step=1, label="Top-k"),
gr.Slider(0, 1, step=0.01, value=0.9, label="LLM cumulative Threshold"),
gr.Slider(1, 50, step=1, value=10, label="Web_retrieved Docs to fetch")
],
outputs=[
gr.HTML(label="Generated Text"),
gr.Dataframe(label="Tokens"),
gr.Dataframe(label="Token tables"),
],
title="Next Token Visualizer (GPT-2-large - 812M param.)"
)
iface.launch()