# -*- coding: utf-8 -*-
"""
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1p8LZ5eICRuSfjSRLGIDv4TDW32GSm4Wf
"""
#!pip install torch gradio transformers pandas langchain-fireworks fireworks stanza sentence_transformers anytree
import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import gradio as gr
import pandas as pd
from collections import Counter, defaultdict
import os
from huggingface_hub import login
import requests
from bs4 import BeautifulSoup
import numpy as np
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from googlesearch import search
import time
import random
from lxml import html
import nltk
nltk.download('punkt')
from sentence_transformers import SentenceTransformer, util
model_ranker = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
Question = [
"RG Kar recent rape and murder case"
# "Who won the physics nobel prize in 2023?",
# "Who has been awarded the Nobel Prize in Physics in 2023
]
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
}
exclude=["Thank you for your patience","Subscribe","subscribe","trouble retrieving the article content","browser settings",
"Thank you for your patience while we verify access. If you are in Reader mode please exit and log into your Times account, or subscribe for all of The Times.",
"Thank you for your patience while we verify access.",
"Already a subscriber? Log in.",
"Want all of The Times? Subscribe.",
"Advertisement",
"Site Index",
"Thank you for your patience while we verify access. If you are in Reader mode please exit andlog intoyour Times account, orsubscribefor all of The Times.",
"Already a subscriber?Log in.",
"Want all of The Times?Subscribe.",
"Site Information Navigation",
"Please enable JS and disable any ad blocker"
]
def fetch_article_text_sequential(url):
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
}
exclude=[
"Thank you for your patience","Subscribe","subscribe","trouble retrieving the article content","browser settings",
"Thank you for your patience while we verify access. If you are in Reader mode please exit and log into your Times account, or subscribe for all of The Times.",
"Thank you for your patience while we verify access.",
"Already a subscriber? Log in.",
"Want all of The Times? Subscribe.",
"Advertisement",
"Site Index",
"Thank you for your patience while we verify access. If you are in Reader mode please exit andlog intoyour Times account, orsubscribefor all of The Times.",
"Already a subscriber?Log in.",
"Want all of The Times?Subscribe.",
"Site Information Navigation"
]
try:
# Send a request to the webpage with the specified headers
response = requests.get(url, headers=headers,timeout=5)
response.raise_for_status() # Check that the request was successful
# Parse the webpage content
soup = BeautifulSoup(response.text, 'html.parser')
# Initialize an empty list to store the text sequentially
article_content = []
# Define the tags we are interested in (headlines and paragraphs)
tags_of_interest = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p']
# Find all tags of interest in the order they appear in the document
for tag in soup.find_all(tags_of_interest):
if not any(excluded_phrase in tag.get_text() for excluded_phrase in exclude):
text = tag.get_text(strip=True)
article_content.append(text)
return '\n'.join(article_content)
except:
return None
def fetch_article_text_sequential_new(url):
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15',
# Add more User-Agents here
]
headers = {
'User-Agent': random.choice(user_agents)
}
try:
response =requests.get(url,timeout=5,verify=False,headers=headers)
response.raise_for_status() # Check for HTTP errors
response.encoding = 'utf-8'
content = response.text
if not content.strip():
return ""
try:
tree = html.fromstring(content)
except:
return ""
# Extract all paragraph
scraped_data = []
tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p']
for tag in tags:
for element in tree.xpath(f'//{tag}'):
scraped_data.append(element.text_content())
return '\n'.join(scraped_data)
except:
return ""
def get_google_search_results(query, start=0):
search_url = "https://www.google.com/search"
params = {"q": query, "start": start}
response = requests.get(search_url,timeout=5,verify=False, params=params, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")
search_results = []
for g in soup.find_all(class_="g"):
title = g.find("h3").text if g.find("h3") else "No title"
link = g.find("a")["href"] if g.find("a") else "No link"
if not link.lower().endswith(('.pdf', '.PDF')):
search_results.append({"title": title, "link": link})
return search_results
def fetch_sentences_from_html(html):
try:
# Parse the string with BeautifulSoup
if html == None:
return []
soup = BeautifulSoup(html, 'html.parser')
paragraphs = soup.find_all("p")
text = " ".join(p.get_text() for p in paragraphs)
sentences = re.split(r'(?8:
web_context.append(line)
top_sentences = rank_sentences(web_context)
t2=time.time()
minutes, seconds = divmod(t2-t1, 60)
print(f"{minutes} minutes and {seconds} seconds")
ans = "\n".join(sentence.strip() for sentence in top_sentences if sentence.strip())
return ans
# Get the token from the environment variable
api_token = os.getenv('HF_TOKEN')
# Load pre-trained model and tokenizer
model_name = "gpt2-large"
model = GPT2LMHeadModel.from_pretrained(model_name)
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
#device = torch.device("mps")
#model.to(device)
model.eval()
def create_ngrams(tokens, n): return [tuple(tokens[i:i+n]) for i in range(len(tokens)-n+1)]
###Smoothing___
def kneser_ney_smoothing(ngram_counts, lower_order_counts, discount=0.75):
"""
Apply Kneser-Ney smoothing to n-gram counts.
Args:
ngram_counts (Counter): Counts of n-grams (e.g., 4-grams or 3-grams).
lower_order_counts (Counter): Counts of (n-1)-grams (e.g., 3-grams or 2-grams).
discount (float): Discounting parameter.
Returns:
defaultdict: Smoothed probabilities.
"""
continuation_counts = Counter()
lower_counts = Counter()
for ngram in ngram_counts:
lower_ngram = ngram[1:]
continuation_counts[lower_ngram] += 1
lower_counts[lower_ngram] += 1
def continuation_probability(word):
return continuation_counts[word] / sum(continuation_counts.values())
probabilities = defaultdict(lambda: defaultdict(float))
for ngram, count in ngram_counts.items():
lower_ngram = ngram[:-1]
lower_count = lower_order_counts[lower_ngram]
discounted_count = max(count - discount, 0)
lambda_factor = (discount / lower_count) * len(continuation_counts)
probabilities[lower_ngram][ngram[-1]] = (discounted_count / lower_count) + lambda_factor * continuation_probability(ngram[-1])
return probabilities
def get_probability_from_context(Context):
context_tokens = tokenizer.tokenize(Context)
four_grams = create_ngrams(context_tokens, 4)
three_grams = create_ngrams(context_tokens, 3)
four_gram_counts = Counter(four_grams)
three_gram_counts = Counter(three_grams)
probabilities = kneser_ney_smoothing(four_gram_counts, three_gram_counts)
return probabilities, four_gram_counts, three_gram_counts
def predict_next_token(probabilities, three_gram):
return probabilities.get(three_gram, {})
def generate_text_with_probs(initial_context, context_text , top_p, max_length, top_k, threshold=0.6):
Tokens = {}
#input_ids = tokenizer.encode(initial_context, return_tensors="pt").to(device='mps')
input_ids = tokenizer.encode(initial_context, return_tensors="pt").to(device='cpu')
generated_text = initial_context
token_tables = []
token_no = 1
context_tokens = tokenizer.tokenize(context_text)
four_grams = create_ngrams(context_tokens, 4)
three_grams = create_ngrams(context_tokens, 3)
two_grams = create_ngrams(context_tokens, 2)
one_grams = create_ngrams(context_tokens, 1)
four_gram_counts = Counter(four_grams)
three_gram_counts = Counter(three_grams)
two_grams_counts = Counter(two_grams)
one_grams_counts = Counter(one_grams)
prob_list = ["four_gram", "three_gram", "two_gram", "one_gram"] # Define prob_list here
prob = [four_gram_counts ,three_gram_counts ,two_grams_counts ,one_grams_counts]
probs = kneser_ney_smoothing(four_gram_counts, three_gram_counts)
use_llm = 0
use_llm_back_up = 0
use_ngram = 0
flag = False
count = 0
Token_index = 0
colored_text = initial_context
with torch.no_grad():
#while len(generated_text.split()) < max_length:
for _ in range(max_length):
outputs = model(input_ids=input_ids)
next_token_logits = outputs.logits[:, -1, :]
sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True)
cumulative_probs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1)
sorted_indices_to_remove = cumulative_probs > top_p
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
sorted_indices_to_remove[..., 0] = 0
indices_to_remove = sorted_indices[sorted_indices_to_remove]
next_token_logits[:, indices_to_remove] = -float('Inf')
probabilities = torch.softmax(next_token_logits, dim=-1)
top_tokens = sorted_indices[0, :top_k]
top_probs = probabilities[0, top_tokens]
top_token_probs = [(tokenizer.decode([token.item()]), prob.item()) for token, prob in zip(top_tokens, top_probs)]
df = pd.DataFrame(top_token_probs, columns=["Token", "Probability"])
df.index = df.index + 1
token_tables.append((f"{token_no}>> Next token options from LLM", df))
##print("Next token options from LLM")
##print(df)
cumulative_prob = cumulative_probs[0, top_k - 1].item()
##print(f"cumulative_prob from LLM: {cumulative_prob}")
entropy = (-1)*np.sum(np.array(df['Probability'])*np.log(df['Probability']))
##print("LLM Entropy:",(-1)*np.sum(np.array(df['Probability'])*np.log(df['Probability'])))
##print("\n")
input_text = tokenizer.decode(input_ids[0], skip_special_tokens=True)
input_tokens = tokenizer.tokenize(input_text)
use_llm += 1
__token_pob__ = {}
num = 0
num_ = 4
while __token_pob__ == {} and num < 3:
probs = kneser_ney_smoothing(prob[num],prob[num+1])
__inputs__ = tuple(input_tokens[-(3-num):])
__token_pob__ = probs.get(__inputs__, {})
##print(num,"\n",num_)
num += 1
num_ -= 1
##print(f"Next word probs N_GRAM:{__token_pob__},\n input_{num_}_gram: {__inputs__},\n using {prob_list[num]}_counter and {prob_list[num-1]}_counter; probability exist: {__token_pob__ != {}}")
df = pd.DataFrame(list(__token_pob__.items()), columns=['Token', 'Probability'])
df.index = df.index + 1
token_tables.append((f"{token_no}>> Next token options from N_gram", df))
token_no +=1
##print(f"Next token options from N_GRAM:")
##print(df)
##print("Cumulative Probability of N_gram:",np.sum(df['Probability']))
#print("\n")
if cumulative_prob < threshold and __token_pob__ != {} and flag == True and count >= 4 or np.sum(df['Probability']) > cumulative_prob:
Token_index+=1
#if cumulative_prob < threshold and __token_pob__ != {} and flag == True and count >= 4 or entropy >= 0.6:
##print("Using n-gram model")
next_token = max(__token_pob__, key=__token_pob__.get)
if next_token == 'Ċ':
sorted_tokens = sorted(__token_pob__.items(), key=lambda x: x[1], reverse=True)
if len(sorted_tokens) > 1:
next_token = sorted_tokens[1][0]
##print("Second max token : ", next_token)
Tokens[Token_index] = [next_token,"ngram",__token_pob__[next_token]]
#######
color_code = "#78bfd3" # Light blue for n-gram
colored_text += f"{tokenizer.convert_tokens_to_string(next_token)}"
else:
Tokens[Token_index] = [next_token,"ngram",__token_pob__[next_token]]
######
color_code = "#78bfd3" # Light blue for n-gram
colored_text += f"{tokenizer.convert_tokens_to_string(next_token)}"
##print("n-gram token : ",next_token)
input_tokens.append(next_token)
generated_text = tokenizer.convert_tokens_to_string(input_tokens)
##print(generated_text)
initial_context = generated_text
#input_ids = tokenizer.encode(generated_text, return_tensors="pt").to(device='mps')
input_ids = tokenizer.encode(generated_text, return_tensors="pt").to(device='cpu')
use_ngram += 1
else:
##print("Using LLM")
Token_index+=1
next_token = torch.multinomial(probabilities, num_samples=1)
next_token_prob = probabilities[0, next_token].item()
next_token_text = tokenizer.decode(next_token.item())
##print("LLM token : ",next_token_text)
Tokens[Token_index] = [next_token_text,"llm",next_token_prob]
color_code = "#c99a6e"
colored_text += f"{next_token_text}"
count += 1
if count >= 4:
flag = True
#token_no += 1
input_ids = torch.cat([input_ids, next_token], dim=-1)
if next_token.item() == tokenizer.eos_token_id:
break
generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True)
##print(generated_text)
initial_context = generated_text
use_llm_back_up += 1
##print(initial_context)
##print('-------------------------------------------------------------------------------------------------------------------------------------------------------------\n\n')
##print("\n\n")
generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True)
#total = use_llm + use_llm_back_up + use_ngram
##print(f"total: {use_llm} ({(use_llm / total) * 100:.2f}%)")
##print(f"use_llms: {use_llm_back_up} ({(use_llm_back_up / total) * 100:.2f}%)")
##print(f"use_ngram: {use_ngram} ({(use_ngram / total) * 100:.2f}%)")
##print('-------------------------------------------------------------------------------------------------------------------------------------------------------------\n\n')
return generated_text, Tokens, token_tables,colored_text
def save_content_as_file(question, docs):
# Fetch the web content based on the question
content = get_web_content(question, docs)
# Define file path to save the content
file_path = "fetched_content.txt"
# Write the content to a text file
with open(file_path, "w") as f:
f.write(content)
# Return the file path to download
return file_path
'''def combined_model_predictions(query, initial_context, top_p, max_length, top_k, threshold, docs):
Question = [query]
context_text = get_web_content(Question[0], docs)
print('Content Fetched')
generated_text, tokens, token_tables, colored_html = generate_text_with_probs(initial_context, context_text, top_p, max_length, top_k, threshold)
data_list = [(token_index, tupes[0], tupes[1], tupes[2]) for token_index, tupes in tokens.items()]
df = pd.DataFrame(data_list, columns=['Token_pos', 'Token', 'Source Model', "Probability"])
return colored_html, df, token_tables
iface = gr.Interface(
fn=combined_model_predictions,
inputs=[
gr.Textbox(lines=2,placeholder="Enter query here..."),
gr.Textbox(lines=2,placeholder="Enter initial context here..."),
gr.Slider(0, 1, step=0.01, value=0.9, label="Top-p (nucleus) sampling"),
gr.Slider(1, 100, value= 4, step=1, label="Max Length"),
gr.Slider(1, 50, value= 5, step=1, label="Top-k"),
gr.Slider(0, 1, step=0.01, value=0.9, label="LLM cumulative Threshold"),
gr.Slider(1, 50, step=1, value=10, label="Web_retrieved Docs to fetch")
],
outputs=[
gr.HTML(label="Generated Text"),
gr.Dataframe(label="Tokens"),
gr.Dataframe(label="Token tables"),
],
title="Next Token Visualizer (GPT-2-large - 812M param.)"
)
iface.launch()'''
import pandas as pd
import gradio as gr
def combined_model_predictions(query, initial_context, top_p, max_length, top_k, threshold, docs):
Question = [query]
context_text = get_web_content(Question[0], docs)
print('Content Fetched')
# Write context_text to a .txt file
file_name = "context_corpora.txt"
with open(file_name, "w") as file:
file.write(context_text)
# Generate the text using the model
generated_text, tokens, token_tables, colored_html = generate_text_with_probs(initial_context, context_text, top_p, max_length, top_k, threshold)
# Create a DataFrame for tokens
data_list = [(token_index, tupes[0], tupes[1], tupes[2]) for token_index, tupes in tokens.items()]
df = pd.DataFrame(data_list, columns=['Token_pos', 'Token', 'Source Model', "Probability"])
# Return the file path for download, colored HTML, and DataFrames
return file_name, colored_html, df, token_tables
# Gradio interface
iface = gr.Interface(
fn=combined_model_predictions,
inputs=[
gr.Textbox(lines=2, placeholder="Enter query here..."),
gr.Textbox(lines=2, placeholder="Enter initial context here..."),
gr.Slider(0, 1, step=0.01, value=0.9, label="Top-p (nucleus) sampling"),
gr.Slider(1, 100, value=4, step=1, label="Max Length"),
gr.Slider(1, 50, value=5, step=1, label="Top-k"),
gr.Slider(0, 1, step=0.01, value=0.9, label="LLM cumulative Threshold"),
gr.Slider(1, 50, step=1, value=10, label="Web_retrieved Docs to fetch")
],
outputs=[
gr.File(label="Download Context Corpora"),
gr.HTML(label="Generated Text"),
gr.Dataframe(label="Tokens"),
gr.Dataframe(label="Token tables"),
],
title="Next Token Visualizer (GPT-2-large - 812M param.)"
)
iface.launch()