import torch from transformers import GPT2LMHeadModel, GPT2Tokenizer import gradio as gr import pandas as pd from collections import Counter, defaultdict import os from huggingface_hub import login # Get the token from the environment variable api_token = os.getenv('HF_TOKEN') # Load pre-trained model and tokenizer model_name = "gpt2-large" model = GPT2LMHeadModel.from_pretrained(model_name) tokenizer = GPT2Tokenizer.from_pretrained(model_name) device = torch.device("mps") if torch.has_mps else torch.device("cpu") model.to(device) model.eval() def create_ngrams(tokens, n): return [tuple(tokens[i:i+n]) for i in range(len(tokens)-n+1)] def calculate_probabilities(four_gram_counts, three_gram_counts): probabilities = defaultdict(lambda: defaultdict(float)) for four_gram, count in four_gram_counts.items(): three_gram = four_gram[:-1] probabilities[three_gram][four_gram[-1]] = count / three_gram_counts[three_gram] return probabilities def kneser_ney_smoothing(ngram_counts, lower_order_counts, discount=0.75): continuation_counts = Counter() lower_counts = Counter() for ngram in ngram_counts: lower_counts[ngram[1:]] += 1 continuation_counts[ngram[1:]] += 1 def continuation_probability(word): return continuation_counts[word] / sum(continuation_counts.values()) probabilities = defaultdict(lambda: defaultdict(float)) for ngram, count in ngram_counts.items(): lower_ngram = ngram[:-1] discounted_count = max(count - discount, 0) lambda_factor = (discount / lower_order_counts[lower_ngram]) * len(continuation_counts) probabilities[lower_ngram][ngram[-1]] = (discounted_count / lower_order_counts[lower_ngram]) + lambda_factor * continuation_probability(ngram[-1]) return probabilities def generate_text_with_probs(initial_context, top_p, max_length, top_k): input_ids = tokenizer.encode(initial_context, return_tensors="pt").to(device) generated_text = initial_context token_tables = [] token_no = 1 with torch.no_grad(): for _ in range(max_length): outputs = model(input_ids=input_ids) next_token_logits = outputs.logits[:, -1, :] # Apply top-p (nucleus) sampling sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True) cumulative_probs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1) sorted_indices_to_remove = cumulative_probs > top_p sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() sorted_indices_to_remove[..., 0] = 0 indices_to_remove = sorted_indices[sorted_indices_to_remove] next_token_logits[:, indices_to_remove] = -float('Inf') probabilities = torch.softmax(next_token_logits, dim=-1) next_token = torch.multinomial(probabilities, num_samples=1) next_token_prob = probabilities[0, next_token].item() next_token_text = tokenizer.decode(next_token.item()) top_tokens = sorted_indices[0, :top_k] top_probs = probabilities[0, top_tokens] top_token_probs = [(tokenizer.decode([token.item()]), prob.item()) for token, prob in zip(top_tokens, top_probs)] df = pd.DataFrame(top_token_probs, columns=["Token", "Probability"]) df.index = df.index + 1 token_tables.append((f"{token_no}>> Next token: {next_token_text} (Probability: {next_token_prob:.8f})", df)) token_no+=1 input_ids = torch.cat([input_ids, next_token], dim=-1) if next_token.item() == tokenizer.eos_token_id: break generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True) return generated_text[len(initial_context):], token_tables def predict_next_token_ngram(input_text, context_text, max_length): ip = input_text context_tokens = tokenizer.tokenize(context_text) four_grams = create_ngrams(context_tokens, 4) four_gram_counts = Counter(four_grams) three_gram_counts = Counter([gram[:-1] for gram in four_grams]) probabilities = calculate_probabilities(four_gram_counts, three_gram_counts) probs = kneser_ney_smoothing(four_gram_counts, three_gram_counts) input_tokens = tokenizer.tokenize(input_text) generated_tokens = input_tokens.copy() generated_text = input_text token_tables = [] if len(input_tokens) >= (max_length + len(generated_tokens)): generated_text = tokenizer.convert_tokens_to_string(input_tokens) return generated_text, token_tables token_no = 1 while len(input_tokens) < (max_length + len(generated_tokens)): input_3_gram = tuple(input_tokens[-3:]) next_token_probs = probs.get(input_3_gram, {}) if not next_token_probs: break next_token = max(next_token_probs, key=next_token_probs.get) input_tokens.append(next_token) top_k = 4 top_k_tokens = sorted(next_token_probs.items(), key=lambda x: x[1], reverse=True)[:top_k] top_k_tokens_df = pd.DataFrame(top_k_tokens, columns=["Token", "Probability"]) top_k_tokens_df.index = top_k_tokens_df.index + 1 # Add numbering to the DataFrame top_k_tokens_df["Token"] = top_k_tokens_df["Token"].apply(lambda x: tokenizer.convert_tokens_to_string([x])) token_tables.append((f"{token_no}>> Next token: {next_token}", top_k_tokens_df)) token_no+=1 generated_text = tokenizer.convert_tokens_to_string(input_tokens) return generated_text[len(ip):], token_tables def combined_model_predictions(context_text, initial_context, top_p, max_length, top_k): generated_text, token_tables = generate_text_with_probs(initial_context, top_p, max_length, top_k) ngram_generated_text, ngram_token_tables = predict_next_token_ngram(initial_context, context_text, max_length) return generated_text, token_tables, ngram_generated_text, ngram_token_tables iface = gr.Interface( fn=combined_model_predictions, inputs=[ gr.Textbox(lines=4, placeholder="Enter context for N-gram model..."), gr.Textbox(lines=2, placeholder="Enter initial context here..."), gr.Slider(0, 1, step=0.01, value=0.9, label="Top-p (nucleus) sampling"), gr.Slider(1, 100, step=1, value=50, label="Max length"), gr.Slider(1, 50, step=1, value=10, label="Top-k"), ], outputs=[ gr.Textbox(label="Generated Text"), gr.Dataframe(label="LLM Token Probabilities"), gr.Textbox(label="N-gram Generated Text"), gr.Dataframe(label="N-gram Token Predictions"), ], title="Next Token Visualizer (GPT-2-large - 812M param.)" ) iface.launch()