Spaces:
Sleeping
Sleeping
import torch | |
from transformers import GPT2LMHeadModel, GPT2Tokenizer | |
import gradio as gr | |
import pandas as pd | |
from collections import Counter, defaultdict | |
import os | |
from huggingface_hub import login | |
import requests | |
from bs4 import BeautifulSoup | |
import numpy as np | |
import re | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import googlesearch | |
import time | |
import nltk | |
nltk.download('punkt') | |
from sentence_transformers import SentenceTransformer, util | |
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
def fetch_article_text_sequential(url): | |
headers = { | |
"Content-Type": "application/json", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
} | |
exclude=["Thank you for your patience","Subscribe","subscribe","trouble retrieving the article content","browser settings", | |
"Thank you for your patience while we verify access. If you are in Reader mode please exit and log into your Times account, or subscribe for all of The Times.", | |
"Thank you for your patience while we verify access.", | |
"Already a subscriber? Log in.", | |
"Want all of The Times? Subscribe.", | |
"Advertisement", | |
"Site Index", | |
"Thank you for your patience while we verify access. If you are in Reader mode please exit andlog intoyour Times account, orsubscribefor all of The Times.", | |
"Already a subscriber?Log in.", | |
"Want all of The Times?Subscribe.", | |
"Site Information Navigation" | |
] | |
try: | |
# Send a request to the webpage with the specified headers | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() # Check that the request was successful | |
# Parse the webpage content | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# Initialize an empty list to store the text sequentially | |
article_content = [] | |
# Define the tags we are interested in (headlines and paragraphs) | |
tags_of_interest = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p'] | |
# Find all tags of interest in the order they appear in the document | |
for tag in soup.find_all(tags_of_interest): | |
if not any(excluded_phrase in tag.get_text() for excluded_phrase in exclude): | |
text = tag.get_text(strip=True) | |
article_content.append(text) | |
return '\n'.join(article_content) | |
except: | |
return None | |
def get_google_search_results(query, start=0): | |
search_url = "https://www.google.com/search" | |
params = {"q": query, "start": start} | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
} | |
response = requests.get(search_url, params=params, headers=headers) | |
soup = BeautifulSoup(response.text, "html.parser") | |
search_results = [] | |
for g in soup.find_all(class_="g"): | |
title = g.find("h3").text if g.find("h3") else "No title" | |
link = g.find("a")["href"] if g.find("a") else "No link" | |
if not link.lower().endswith(('.pdf', '.PDF')): | |
search_results.append({"title": title, "link": link}) | |
return search_results | |
def fetch_sentences_from_html(html): | |
try: | |
# Parse the string with BeautifulSoup | |
if html == None: | |
return [] | |
soup = BeautifulSoup(html, 'html.parser') | |
paragraphs = soup.find_all("p") | |
text = " ".join(p.get_text() for p in paragraphs) | |
sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text) | |
##print(sentences) | |
return sentences | |
except Exception as e: | |
##print(f"Failed to fetch {html}: {str(e)}") | |
return [] | |
# Function to rank sentences using cosine similarity | |
def rank_sentences(sentences): | |
if not sentences: | |
return [] # Return an empty list if no sentences are found | |
embeddings = model.encode(sentences, convert_to_tensor=True) | |
# Compute pairwise cosine similarity between sentences | |
similarities = util.pytorch_cos_sim(embeddings, embeddings).cpu().numpy() | |
# Calculate the average similarity for each sentence | |
avg_similarities = np.mean(similarities, axis=1) | |
# Rank sentences based on their average similarity | |
ranked_sentences = sorted(zip(sentences, avg_similarities), key=lambda x: x[1], reverse=True) | |
ranked_sentences = [sentence for sentence, _ in ranked_sentences] | |
return ranked_sentences | |
def rank_sentences_new(sentences, query, top_n=20): | |
if sentences == None: | |
return [] | |
sentences = re.split("\n", sentences.strip()) | |
# Remove any empty strings from the list | |
[sentence.strip() for sentence in sentences if sentence.strip()] | |
vectorizer = TfidfVectorizer().fit_transform([query] + sentences) | |
vectors = vectorizer.toarray() | |
query_vector = vectors[0] | |
sentences_vectors = vectors[1:] | |
cosine_similarities = cosine_similarity([query_vector], sentences_vectors).flatten() | |
ranked_indices = cosine_similarities.argsort()[-top_n:][::-1] | |
return [sentences[idx] for idx in ranked_indices] | |
domains = [ | |
"wikipedia.org", "nytimes.com", "cnn.com", "bbc.com", "theguardian.com", | |
"forbes.com", "reuters.com", "cnbc.com", "bloomberg.com", "foxnews.com", | |
"npr.org", "washingtonpost.com", "wsj.com", "aljazeera.com", "ft.com", | |
"huffpost.com", "nationalgeographic.com", "scientificamerican.com", | |
"nature.com", "time.com", "usatoday.com", "apnews.com", "abcnews.go.com", | |
"cbsnews.com", "nbcnews.com", "news.yahoo.com", "theatlantic.com", | |
"vox.com", "politico.com", "economist.com" | |
] | |
exclude=["Thank you for your patience","Subscribe","subscribe","trouble retrieving the article content","browser settings", | |
"Thank you for your patience while we verify access. If you are in Reader mode please exit and log into your Times account, or subscribe for all of The Times.", | |
"Thank you for your patience while we verify access.", | |
"Already a subscriber? Log in.", | |
"Want all of The Times? Subscribe.", | |
"Advertisement", | |
"Site Index", | |
"Thank you for your patience while we verify access. If you are in Reader mode please exit andlog intoyour Times account, orsubscribefor all of The Times.", | |
"Already a subscriber?Log in.", | |
"Want all of The Times?Subscribe.", | |
"Site Information Navigation", | |
"Please enable JS and disable any ad blocker" | |
] | |
# Define number of results we want to retrieve | |
num_results_needed = 10 | |
all_results = [] | |
start = 0 | |
# Ask the user for a search query | |
# user_query = input("Enter a search query: ") | |
def get_web_content(user_query,num_results_needed): | |
all_results = [] | |
start = 0 | |
t1=time.time() | |
while len(all_results) < num_results_needed: | |
results = get_google_search_results(user_query, start=start) | |
all_results.extend(results) | |
all_results = all_results[:num_results_needed] # Ensure no more than needed results | |
start += 10 | |
all_sentences_2 = [] | |
# #print the search results and top sentences from each URL | |
delimiter='\n' | |
ans = [] | |
for result in all_results: | |
#print(f"Title: {result['title']}") | |
#print(f"Link: {result['link']}") | |
# sentences = get_top_sentences(result['link']) | |
text = fetch_article_text_sequential(result['link']) | |
top_sentences = rank_sentences_new(text, user_query) | |
ans=[] | |
for sentence in top_sentences: | |
if not any(excluded_phrase in sentence for excluded_phrase in exclude): | |
#print(sentence) | |
ans.append(sentence) | |
if(len(ans))==15: | |
break | |
all_sentences_2.extend(ans) | |
#print() | |
t2=time.time() | |
minutes, seconds = divmod(t2-t1, 60) | |
#print(f"{minutes} minutes and {seconds} seconds") | |
ans = "\n".join(sentence.strip() for sentence in all_sentences_2 if sentence.strip()) | |
return ans , all_sentences_2 | |
def get_web_content_new(user_query,num_results_needed): | |
all_results = [] | |
start = 0 | |
t1=time.time() | |
while len(all_results) < num_results_needed: | |
results = get_google_search_results(user_query, start=start) | |
all_results.extend(results) | |
all_results = all_results[:num_results_needed] # Ensure no more than needed results | |
start += 10 | |
all_sentences = [] | |
# #print the search results and top sentences from each URL | |
all_sentences_2 = [] | |
delimiter='\n' | |
for result in all_results: | |
##print(f"Title: {result['title']}") | |
##print(f"Link: {result['link']}") | |
text = fetch_article_text_sequential(result['link']) | |
###### | |
##print(text) | |
sentences = nltk.sent_tokenize(text) | |
sentences=sentences[:min(150,len(sentences))] | |
all_sentences.extend(sentences) | |
ranked_sentences = rank_sentences(all_sentences) | |
#print("Ranked Sentences") | |
#print("ranked_sentences",ranked_sentences,"\n\n") | |
ans2=[] | |
for sentence in ranked_sentences: | |
if not any(excluded_phrase in sentence for excluded_phrase in exclude): | |
##print(sentence) | |
ans2.append(sentence) | |
if(len(ans2))==15: | |
break | |
all_sentences_2.extend(ans2) | |
##print() | |
t2=time.time() | |
minutes, seconds = divmod(t2-t1, 60) | |
##print(f"{minutes} minutes and {seconds} seconds") | |
#return "\n".join(sentence.strip() for sentence in all_sentences_2 if sentence.strip()) | |
#return text | |
return ranked_sentences | |
#sentences, sent = get_web_content("Who has been awarded the Nobel Prize in Physics in 2023",2) | |
#res = get_web_content(Question[0],10) | |
#Context = res[0] | |
# Get the token from the environment variable | |
api_token = os.getenv('HF_TOKEN') | |
# Load pre-trained model and tokenizer | |
model_name = "gpt2-large" | |
model = GPT2LMHeadModel.from_pretrained(model_name) | |
tokenizer = GPT2Tokenizer.from_pretrained(model_name) | |
#device = torch.device("mps") | |
#model.to(device) | |
model.eval() | |
top_p = 0.9 | |
threshold = 0.6 | |
max_length = 100 | |
#context_tokens = tokenizer.tokenize(Context) | |
def create_ngrams(tokens, n): return [tuple(tokens[i:i+n]) for i in range(len(tokens)-n+1)] | |
###Smoothing___ | |
def kneser_ney_smoothing(ngram_counts, lower_order_counts, discount=0.75): | |
""" | |
Apply Kneser-Ney smoothing to n-gram counts. | |
Args: | |
ngram_counts (Counter): Counts of n-grams (e.g., 4-grams or 3-grams). | |
lower_order_counts (Counter): Counts of (n-1)-grams (e.g., 3-grams or 2-grams). | |
discount (float): Discounting parameter. | |
Returns: | |
defaultdict: Smoothed probabilities. | |
""" | |
continuation_counts = Counter() | |
lower_counts = Counter() | |
for ngram in ngram_counts: | |
lower_ngram = ngram[1:] | |
continuation_counts[lower_ngram] += 1 | |
lower_counts[lower_ngram] += 1 | |
def continuation_probability(word): | |
return continuation_counts[word] / sum(continuation_counts.values()) | |
probabilities = defaultdict(lambda: defaultdict(float)) | |
for ngram, count in ngram_counts.items(): | |
lower_ngram = ngram[:-1] | |
lower_count = lower_order_counts[lower_ngram] | |
discounted_count = max(count - discount, 0) | |
lambda_factor = (discount / lower_count) * len(continuation_counts) | |
probabilities[lower_ngram][ngram[-1]] = (discounted_count / lower_count) + lambda_factor * continuation_probability(ngram[-1]) | |
return probabilities | |
def get_probability_from_context(Context): | |
context_tokens = tokenizer.tokenize(Context) | |
four_grams = create_ngrams(context_tokens, 4) | |
three_grams = create_ngrams(context_tokens, 3) | |
four_gram_counts = Counter(four_grams) | |
three_gram_counts = Counter(three_grams) | |
probabilities = kneser_ney_smoothing(four_gram_counts, three_gram_counts) | |
return probabilities, four_gram_counts, three_gram_counts | |
#_probabilities__, four_gram_counts, three_gram_counts = get_probability_from_context(Context) | |
#input_tokens = tokenizer.tokenize(initial_text) | |
#input_3_gram = tuple(input_tokens[-3:]) | |
def predict_next_token(probabilities, three_gram): return probabilities.get(three_gram, {}) | |
#next_token_probs = predict_next_token(_probabilities__, input_3_gram) | |
#top_k = 4 | |
#top_k_tokens = sorted(next_token_probs.items(), key=lambda x: x[1], reverse=True)[:top_k] | |
#probs = (kneser_ney_smoothing(four_gram_counts, three_gram_counts)) | |
#next_token_probs = predict_next_token(probs, input_3_gram) | |
def generate_text_with_probs(initial_context, context_text , top_p, max_length, top_k, threshold=0.6): | |
Tokens = {} | |
#input_ids = tokenizer.encode(initial_context, return_tensors="pt").to(device='mps') | |
input_ids = tokenizer.encode(initial_context, return_tensors="pt").to(device='cpu') | |
generated_text = initial_context | |
token_tables = [] | |
token_no = 1 | |
context_tokens = tokenizer.tokenize(context_text) | |
four_grams = create_ngrams(context_tokens, 4) | |
three_grams = create_ngrams(context_tokens, 3) | |
two_grams = create_ngrams(context_tokens, 2) | |
one_grams = create_ngrams(context_tokens, 1) | |
four_gram_counts = Counter(four_grams) | |
three_gram_counts = Counter(three_grams) | |
two_grams_counts = Counter(two_grams) | |
one_grams_counts = Counter(one_grams) | |
prob_list = ["four_gram", "three_gram", "two_gram", "one_gram"] # Define prob_list here | |
prob = [four_gram_counts ,three_gram_counts ,two_grams_counts ,one_grams_counts] | |
probs = kneser_ney_smoothing(four_gram_counts, three_gram_counts) | |
use_llm = 0 | |
use_llm_back_up = 0 | |
use_ngram = 0 | |
flag = False | |
count = 0 | |
Token_index = 0 | |
colored_text = initial_context | |
with torch.no_grad(): | |
#while len(generated_text.split()) < max_length: | |
for _ in range(max_length): | |
outputs = model(input_ids=input_ids) | |
next_token_logits = outputs.logits[:, -1, :] | |
sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True) | |
cumulative_probs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1) | |
sorted_indices_to_remove = cumulative_probs > top_p | |
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() | |
sorted_indices_to_remove[..., 0] = 0 | |
indices_to_remove = sorted_indices[sorted_indices_to_remove] | |
next_token_logits[:, indices_to_remove] = -float('Inf') | |
probabilities = torch.softmax(next_token_logits, dim=-1) | |
top_tokens = sorted_indices[0, :top_k] | |
top_probs = probabilities[0, top_tokens] | |
top_token_probs = [(tokenizer.decode([token.item()]), prob.item()) for token, prob in zip(top_tokens, top_probs)] | |
df = pd.DataFrame(top_token_probs, columns=["Token", "Probability"]) | |
df.index = df.index + 1 | |
token_tables.append((f"{token_no}>> Next token options from LLM", df)) | |
##print("Next token options from LLM") | |
##print(df) | |
cumulative_prob = cumulative_probs[0, top_k - 1].item() | |
##print(f"cumulative_prob from LLM: {cumulative_prob}") | |
entropy = (-1)*np.sum(np.array(df['Probability'])*np.log(df['Probability'])) | |
##print("LLM Entropy:",(-1)*np.sum(np.array(df['Probability'])*np.log(df['Probability']))) | |
##print("\n") | |
input_text = tokenizer.decode(input_ids[0], skip_special_tokens=True) | |
input_tokens = tokenizer.tokenize(input_text) | |
use_llm += 1 | |
__token_pob__ = {} | |
num = 0 | |
num_ = 4 | |
while __token_pob__ == {} and num < 3: | |
probs = kneser_ney_smoothing(prob[num],prob[num+1]) | |
__inputs__ = tuple(input_tokens[-(3-num):]) | |
__token_pob__ = probs.get(__inputs__, {}) | |
##print(num,"\n",num_) | |
num += 1 | |
num_ -= 1 | |
##print(f"Next word probs N_GRAM:{__token_pob__},\n input_{num_}_gram: {__inputs__},\n using {prob_list[num]}_counter and {prob_list[num-1]}_counter; probability exist: {__token_pob__ != {}}") | |
df = pd.DataFrame(list(__token_pob__.items()), columns=['Token', 'Probability']) | |
df.index = df.index + 1 | |
token_tables.append((f"{token_no}>> Next token options from N_gram", df)) | |
token_no +=1 | |
##print(f"Next token options from N_GRAM:") | |
##print(df) | |
##print("Cumulative Probability of N_gram:",np.sum(df['Probability'])) | |
#print("\n") | |
if cumulative_prob < threshold and __token_pob__ != {} and flag == True and count >= 4 or np.sum(df['Probability']) > cumulative_prob: | |
Token_index+=1 | |
#if cumulative_prob < threshold and __token_pob__ != {} and flag == True and count >= 4 or entropy >= 0.6: | |
##print("Using n-gram model") | |
next_token = max(__token_pob__, key=__token_pob__.get) | |
if next_token == 'Ċ': | |
sorted_tokens = sorted(__token_pob__.items(), key=lambda x: x[1], reverse=True) | |
if len(sorted_tokens) > 1: | |
next_token = sorted_tokens[1][0] | |
##print("Second max token : ", next_token) | |
Tokens[Token_index] = [next_token,"ngram",__token_pob__[next_token]] | |
####### | |
color_code = "#78bfd3" # Light blue for n-gram | |
colored_text += f"<span style='color: {color_code}'>{tokenizer.convert_tokens_to_string(next_token)}</span>" | |
else: | |
Tokens[Token_index] = [next_token,"ngram",__token_pob__[next_token]] | |
###### | |
color_code = "#78bfd3" # Light blue for n-gram | |
colored_text += f"<span style='color: {color_code}'>{tokenizer.convert_tokens_to_string(next_token)}</span>" | |
##print("n-gram token : ",next_token) | |
input_tokens.append(next_token) | |
generated_text = tokenizer.convert_tokens_to_string(input_tokens) | |
##print(generated_text) | |
initial_context = generated_text | |
#input_ids = tokenizer.encode(generated_text, return_tensors="pt").to(device='mps') | |
input_ids = tokenizer.encode(generated_text, return_tensors="pt").to(device='cpu') | |
use_ngram += 1 | |
else: | |
##print("Using LLM") | |
Token_index+=1 | |
next_token = torch.multinomial(probabilities, num_samples=1) | |
next_token_prob = probabilities[0, next_token].item() | |
next_token_text = tokenizer.decode(next_token.item()) | |
##print("LLM token : ",next_token_text) | |
Tokens[Token_index] = [next_token_text,"llm",next_token_prob] | |
color_code = "#c99a6e" | |
colored_text += f"<span style='color: {color_code}'>{next_token_text}</span>" | |
count += 1 | |
if count >= 4: | |
flag = True | |
#token_no += 1 | |
input_ids = torch.cat([input_ids, next_token], dim=-1) | |
if next_token.item() == tokenizer.eos_token_id: | |
break | |
generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True) | |
##print(generated_text) | |
initial_context = generated_text | |
use_llm_back_up += 1 | |
##print(initial_context) | |
##print('-------------------------------------------------------------------------------------------------------------------------------------------------------------\n\n') | |
##print("\n\n") | |
generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True) | |
#total = use_llm + use_llm_back_up + use_ngram | |
##print(f"total: {use_llm} ({(use_llm / total) * 100:.2f}%)") | |
##print(f"use_llms: {use_llm_back_up} ({(use_llm_back_up / total) * 100:.2f}%)") | |
##print(f"use_ngram: {use_ngram} ({(use_ngram / total) * 100:.2f}%)") | |
##print('-------------------------------------------------------------------------------------------------------------------------------------------------------------\n\n') | |
return generated_text, Tokens, token_tables,colored_text | |
def combined_model_predictions(query, initial_context, top_p, max_length, top_k, threshold, docs): | |
Question = [query] | |
context_text = get_web_content(Question[0], docs)[0] | |
print('Content Fetched') | |
generated_text, tokens, token_tables, colored_html = generate_text_with_probs(initial_context, context_text, top_p, max_length, top_k, threshold) | |
data_list = [(token_index, tupes[0], tupes[1], tupes[2]) for token_index, tupes in tokens.items()] | |
df = pd.DataFrame(data_list, columns=['Token_pos', 'Token', 'Source Model', "Probability"]) | |
return colored_html, df, token_tables | |
iface = gr.Interface( | |
fn=combined_model_predictions, | |
inputs=[ | |
gr.Textbox(lines=2,placeholder="Enter query here..."), | |
gr.Textbox(lines=2,placeholder="Enter initial context here..."), | |
gr.Slider(0, 1, step=0.01, value=0.9, label="Top-p (nucleus) sampling"), | |
gr.Slider(1, 100, value= 4, step=1, label="Max Length"), | |
gr.Slider(1, 50, value= 5, step=1, label="Top-k"), | |
gr.Slider(0, 1, step=0.01, value=0.9, label="LLM cumulative Threshold"), | |
gr.Slider(1, 50, step=1, value=10, label="Web_retrieved Docs to fetch") | |
], | |
outputs=[ | |
gr.HTML(label="Generated Text"), | |
gr.Dataframe(label="Tokens"), | |
gr.Dataframe(label="Token tables"), | |
], | |
title="Next Token Visualizer (GPT-2-large - 812M param.)" | |
) | |
iface.launch() | |