# -*- coding: utf-8 -*- """ Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1p8LZ5eICRuSfjSRLGIDv4TDW32GSm4Wf """ #!pip install torch gradio transformers pandas langchain-fireworks fireworks stanza sentence_transformers anytree import torch from transformers import GPT2LMHeadModel, GPT2Tokenizer import gradio as gr import pandas as pd from collections import Counter, defaultdict import os from huggingface_hub import login import requests from bs4 import BeautifulSoup import numpy as np import re from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from googlesearch import search import time import random from lxml import html import nltk nltk.download('punkt') from sentence_transformers import SentenceTransformer, util model_ranker = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') Question = [ "RG Kar recent rape and murder case" # "Who won the physics nobel prize in 2023?", # "Who has been awarded the Nobel Prize in Physics in 2023 ] headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", } exclude=["Thank you for your patience","Subscribe","subscribe","trouble retrieving the article content","browser settings", "Thank you for your patience while we verify access. If you are in Reader mode please exit and log into your Times account, or subscribe for all of The Times.", "Thank you for your patience while we verify access.", "Already a subscriber? Log in.", "Want all of The Times? Subscribe.", "Advertisement", "Site Index", "Thank you for your patience while we verify access. If you are in Reader mode please exit andlog intoyour Times account, orsubscribefor all of The Times.", "Already a subscriber?Log in.", "Want all of The Times?Subscribe.", "Site Information Navigation", "Please enable JS and disable any ad blocker" ] def fetch_article_text_sequential(url): headers = { "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", } exclude=[ "Thank you for your patience","Subscribe","subscribe","trouble retrieving the article content","browser settings", "Thank you for your patience while we verify access. If you are in Reader mode please exit and log into your Times account, or subscribe for all of The Times.", "Thank you for your patience while we verify access.", "Already a subscriber? Log in.", "Want all of The Times? Subscribe.", "Advertisement", "Site Index", "Thank you for your patience while we verify access. If you are in Reader mode please exit andlog intoyour Times account, orsubscribefor all of The Times.", "Already a subscriber?Log in.", "Want all of The Times?Subscribe.", "Site Information Navigation" ] try: # Send a request to the webpage with the specified headers response = requests.get(url, headers=headers,timeout=5) response.raise_for_status() # Check that the request was successful # Parse the webpage content soup = BeautifulSoup(response.text, 'html.parser') # Initialize an empty list to store the text sequentially article_content = [] # Define the tags we are interested in (headlines and paragraphs) tags_of_interest = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p'] # Find all tags of interest in the order they appear in the document for tag in soup.find_all(tags_of_interest): if not any(excluded_phrase in tag.get_text() for excluded_phrase in exclude): text = tag.get_text(strip=True) article_content.append(text) return '\n'.join(article_content) except: return None def fetch_article_text_sequential_new(url): user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15', # Add more User-Agents here ] headers = { 'User-Agent': random.choice(user_agents) } try: response =requests.get(url,timeout=5,verify=False,headers=headers) response.raise_for_status() # Check for HTTP errors response.encoding = 'utf-8' content = response.text if not content.strip(): return "" try: tree = html.fromstring(content) except: return "" # Extract all paragraph scraped_data = [] tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p'] for tag in tags: for element in tree.xpath(f'//{tag}'): scraped_data.append(element.text_content()) return '\n'.join(scraped_data) except: return "" def get_google_search_results(query, start=0): search_url = "https://www.google.com/search" params = {"q": query, "start": start} response = requests.get(search_url,timeout=5,verify=False, params=params, headers=headers) soup = BeautifulSoup(response.text, "html.parser") search_results = [] for g in soup.find_all(class_="g"): title = g.find("h3").text if g.find("h3") else "No title" link = g.find("a")["href"] if g.find("a") else "No link" if not link.lower().endswith(('.pdf', '.PDF')): search_results.append({"title": title, "link": link}) return search_results def fetch_sentences_from_html(html): try: # Parse the string with BeautifulSoup if html == None: return [] soup = BeautifulSoup(html, 'html.parser') paragraphs = soup.find_all("p") text = " ".join(p.get_text() for p in paragraphs) sentences = re.split(r'(?8: web_context.append(line) top_sentences = rank_sentences(web_context) t2=time.time() minutes, seconds = divmod(t2-t1, 60) print(f"{minutes} minutes and {seconds} seconds") ans = "\n".join(sentence.strip() for sentence in top_sentences if sentence.strip()) return ans # Get the token from the environment variable api_token = os.getenv('HF_TOKEN') # Load pre-trained model and tokenizer model_name = "gpt2-large" model = GPT2LMHeadModel.from_pretrained(model_name) tokenizer = GPT2Tokenizer.from_pretrained(model_name) #device = torch.device("mps") #model.to(device) model.eval() def create_ngrams(tokens, n): return [tuple(tokens[i:i+n]) for i in range(len(tokens)-n+1)] ###Smoothing___ def kneser_ney_smoothing(ngram_counts, lower_order_counts, discount=0.75): """ Apply Kneser-Ney smoothing to n-gram counts. Args: ngram_counts (Counter): Counts of n-grams (e.g., 4-grams or 3-grams). lower_order_counts (Counter): Counts of (n-1)-grams (e.g., 3-grams or 2-grams). discount (float): Discounting parameter. Returns: defaultdict: Smoothed probabilities. """ continuation_counts = Counter() lower_counts = Counter() for ngram in ngram_counts: lower_ngram = ngram[1:] continuation_counts[lower_ngram] += 1 lower_counts[lower_ngram] += 1 def continuation_probability(word): return continuation_counts[word] / sum(continuation_counts.values()) probabilities = defaultdict(lambda: defaultdict(float)) for ngram, count in ngram_counts.items(): lower_ngram = ngram[:-1] lower_count = lower_order_counts[lower_ngram] discounted_count = max(count - discount, 0) lambda_factor = (discount / lower_count) * len(continuation_counts) probabilities[lower_ngram][ngram[-1]] = (discounted_count / lower_count) + lambda_factor * continuation_probability(ngram[-1]) return probabilities def get_probability_from_context(Context): context_tokens = tokenizer.tokenize(Context) four_grams = create_ngrams(context_tokens, 4) three_grams = create_ngrams(context_tokens, 3) four_gram_counts = Counter(four_grams) three_gram_counts = Counter(three_grams) probabilities = kneser_ney_smoothing(four_gram_counts, three_gram_counts) return probabilities, four_gram_counts, three_gram_counts def predict_next_token(probabilities, three_gram): return probabilities.get(three_gram, {}) def generate_text_with_probs(initial_context, context_text , top_p, max_length, top_k, threshold=0.6): Tokens = {} #input_ids = tokenizer.encode(initial_context, return_tensors="pt").to(device='mps') input_ids = tokenizer.encode(initial_context, return_tensors="pt").to(device='cpu') generated_text = initial_context token_tables = [] token_no = 1 context_tokens = tokenizer.tokenize(context_text) four_grams = create_ngrams(context_tokens, 4) three_grams = create_ngrams(context_tokens, 3) two_grams = create_ngrams(context_tokens, 2) one_grams = create_ngrams(context_tokens, 1) four_gram_counts = Counter(four_grams) three_gram_counts = Counter(three_grams) two_grams_counts = Counter(two_grams) one_grams_counts = Counter(one_grams) prob_list = ["four_gram", "three_gram", "two_gram", "one_gram"] # Define prob_list here prob = [four_gram_counts ,three_gram_counts ,two_grams_counts ,one_grams_counts] probs = kneser_ney_smoothing(four_gram_counts, three_gram_counts) use_llm = 0 use_llm_back_up = 0 use_ngram = 0 flag = False count = 0 Token_index = 0 colored_text = initial_context with torch.no_grad(): #while len(generated_text.split()) < max_length: for _ in range(max_length): outputs = model(input_ids=input_ids) next_token_logits = outputs.logits[:, -1, :] sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True) cumulative_probs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1) sorted_indices_to_remove = cumulative_probs > top_p sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() sorted_indices_to_remove[..., 0] = 0 indices_to_remove = sorted_indices[sorted_indices_to_remove] next_token_logits[:, indices_to_remove] = -float('Inf') probabilities = torch.softmax(next_token_logits, dim=-1) top_tokens = sorted_indices[0, :top_k] top_probs = probabilities[0, top_tokens] top_token_probs = [(tokenizer.decode([token.item()]), prob.item()) for token, prob in zip(top_tokens, top_probs)] df = pd.DataFrame(top_token_probs, columns=["Token", "Probability"]) df.index = df.index + 1 token_tables.append((f"{token_no}>> Next token options from LLM", df)) ##print("Next token options from LLM") ##print(df) cumulative_prob = cumulative_probs[0, top_k - 1].item() ##print(f"cumulative_prob from LLM: {cumulative_prob}") entropy = (-1)*np.sum(np.array(df['Probability'])*np.log(df['Probability'])) ##print("LLM Entropy:",(-1)*np.sum(np.array(df['Probability'])*np.log(df['Probability']))) ##print("\n") input_text = tokenizer.decode(input_ids[0], skip_special_tokens=True) input_tokens = tokenizer.tokenize(input_text) use_llm += 1 __token_pob__ = {} num = 0 num_ = 4 while __token_pob__ == {} and num < 3: probs = kneser_ney_smoothing(prob[num],prob[num+1]) __inputs__ = tuple(input_tokens[-(3-num):]) __token_pob__ = probs.get(__inputs__, {}) ##print(num,"\n",num_) num += 1 num_ -= 1 ##print(f"Next word probs N_GRAM:{__token_pob__},\n input_{num_}_gram: {__inputs__},\n using {prob_list[num]}_counter and {prob_list[num-1]}_counter; probability exist: {__token_pob__ != {}}") df = pd.DataFrame(list(__token_pob__.items()), columns=['Token', 'Probability']) df.index = df.index + 1 token_tables.append((f"{token_no}>> Next token options from N_gram", df)) token_no +=1 ##print(f"Next token options from N_GRAM:") ##print(df) ##print("Cumulative Probability of N_gram:",np.sum(df['Probability'])) #print("\n") if cumulative_prob < threshold and __token_pob__ != {} and flag == True and count >= 4 or np.sum(df['Probability']) > cumulative_prob: Token_index+=1 #if cumulative_prob < threshold and __token_pob__ != {} and flag == True and count >= 4 or entropy >= 0.6: ##print("Using n-gram model") next_token = max(__token_pob__, key=__token_pob__.get) if next_token == 'Ċ': sorted_tokens = sorted(__token_pob__.items(), key=lambda x: x[1], reverse=True) if len(sorted_tokens) > 1: next_token = sorted_tokens[1][0] ##print("Second max token : ", next_token) Tokens[Token_index] = [next_token,"ngram",__token_pob__[next_token]] ####### color_code = "#78bfd3" # Light blue for n-gram colored_text += f"{tokenizer.convert_tokens_to_string(next_token)}" else: Tokens[Token_index] = [next_token,"ngram",__token_pob__[next_token]] ###### color_code = "#78bfd3" # Light blue for n-gram colored_text += f"{tokenizer.convert_tokens_to_string(next_token)}" ##print("n-gram token : ",next_token) input_tokens.append(next_token) generated_text = tokenizer.convert_tokens_to_string(input_tokens) ##print(generated_text) initial_context = generated_text #input_ids = tokenizer.encode(generated_text, return_tensors="pt").to(device='mps') input_ids = tokenizer.encode(generated_text, return_tensors="pt").to(device='cpu') use_ngram += 1 else: ##print("Using LLM") Token_index+=1 next_token = torch.multinomial(probabilities, num_samples=1) next_token_prob = probabilities[0, next_token].item() next_token_text = tokenizer.decode(next_token.item()) ##print("LLM token : ",next_token_text) Tokens[Token_index] = [next_token_text,"llm",next_token_prob] color_code = "#c99a6e" colored_text += f"{next_token_text}" count += 1 if count >= 4: flag = True #token_no += 1 input_ids = torch.cat([input_ids, next_token], dim=-1) if next_token.item() == tokenizer.eos_token_id: break generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True) ##print(generated_text) initial_context = generated_text use_llm_back_up += 1 ##print(initial_context) ##print('-------------------------------------------------------------------------------------------------------------------------------------------------------------\n\n') ##print("\n\n") generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True) #total = use_llm + use_llm_back_up + use_ngram ##print(f"total: {use_llm} ({(use_llm / total) * 100:.2f}%)") ##print(f"use_llms: {use_llm_back_up} ({(use_llm_back_up / total) * 100:.2f}%)") ##print(f"use_ngram: {use_ngram} ({(use_ngram / total) * 100:.2f}%)") ##print('-------------------------------------------------------------------------------------------------------------------------------------------------------------\n\n') return generated_text, Tokens, token_tables,colored_text def save_content_as_file(question, docs): # Fetch the web content based on the question content = get_web_content(question, docs) # Define file path to save the content file_path = "fetched_content.txt" # Write the content to a text file with open(file_path, "w") as f: f.write(content) # Return the file path to download return file_path '''def combined_model_predictions(query, initial_context, top_p, max_length, top_k, threshold, docs): Question = [query] context_text = get_web_content(Question[0], docs) print('Content Fetched') generated_text, tokens, token_tables, colored_html = generate_text_with_probs(initial_context, context_text, top_p, max_length, top_k, threshold) data_list = [(token_index, tupes[0], tupes[1], tupes[2]) for token_index, tupes in tokens.items()] df = pd.DataFrame(data_list, columns=['Token_pos', 'Token', 'Source Model', "Probability"]) return colored_html, df, token_tables iface = gr.Interface( fn=combined_model_predictions, inputs=[ gr.Textbox(lines=2,placeholder="Enter query here..."), gr.Textbox(lines=2,placeholder="Enter initial context here..."), gr.Slider(0, 1, step=0.01, value=0.9, label="Top-p (nucleus) sampling"), gr.Slider(1, 100, value= 4, step=1, label="Max Length"), gr.Slider(1, 50, value= 5, step=1, label="Top-k"), gr.Slider(0, 1, step=0.01, value=0.9, label="LLM cumulative Threshold"), gr.Slider(1, 50, step=1, value=10, label="Web_retrieved Docs to fetch") ], outputs=[ gr.HTML(label="Generated Text"), gr.Dataframe(label="Tokens"), gr.Dataframe(label="Token tables"), ], title="Next Token Visualizer (GPT-2-large - 812M param.)" ) iface.launch()''' import pandas as pd import gradio as gr def combined_model_predictions(query, initial_context, top_p, max_length, top_k, threshold, docs): Question = [query] context_text = get_web_content(Question[0], docs) print('Content Fetched') # Write context_text to a .txt file file_name = "context_corpora.txt" with open(file_name, "w") as file: file.write(context_text) # Generate the text using the model generated_text, tokens, token_tables, colored_html = generate_text_with_probs(initial_context, context_text, top_p, max_length, top_k, threshold) # Create a DataFrame for tokens data_list = [(token_index, tupes[0], tupes[1], tupes[2]) for token_index, tupes in tokens.items()] df = pd.DataFrame(data_list, columns=['Token_pos', 'Token', 'Source Model', "Probability"]) # Return the file path for download, colored HTML, and DataFrames return file_name, colored_html, df, token_tables # Gradio interface iface = gr.Interface( fn=combined_model_predictions, inputs=[ gr.Textbox(lines=2, placeholder="Enter query here..."), gr.Textbox(lines=2, placeholder="Enter initial context here..."), gr.Slider(0, 1, step=0.01, value=0.9, label="Top-p (nucleus) sampling"), gr.Slider(1, 100, value=4, step=1, label="Max Length"), gr.Slider(1, 50, value=5, step=1, label="Top-k"), gr.Slider(0, 1, step=0.01, value=0.9, label="LLM cumulative Threshold"), gr.Slider(1, 50, step=1, value=10, label="Web_retrieved Docs to fetch") ], outputs=[ gr.File(label="Download Context Corpora"), gr.HTML(label="Generated Text"), gr.Dataframe(label="Tokens"), gr.Dataframe(label="Token tables"), ], title="Next Token Visualizer (GPT-2-large - 812M param.)" ) iface.launch()