PrarthanaTS's picture
Update app.py
2cbbdd2
import time
import torch
import lightning as L
from torch.utils.data import DataLoader
from lightning.fabric.loggers import CSVLogger
from lightning.fabric.strategies import FSDPStrategy
from tsai_gpt.model import GPT, Block, Config
from tsai_gpt.tokenizer import Tokenizer
from tsai_gpt.packed_dataset import CombinedDataset, PackedDataset
from tsai_gpt.speed_monitor import SpeedMonitorBase, estimate_flops, measure_flops
from tsai_gpt.speed_monitor import SpeedMonitorFabric as SpeedMonitor
from tsai_gpt.utils import chunked_cross_entropy, get_default_supported_precision, num_parameters, load_checkpoint, gptq_quantization
import torch.nn as nn
from pathlib import Path
import sys
import random
from torch import nn
import lightning.pytorch as pl
from torch.nn import functional as F
model_name = "pythia-160m"
name = "redpajama"
def _init_weights(module: nn.Module) -> None:
"""Meant to be used with `gpt.apply(gpt._init_weights)`."""
if isinstance(module, nn.Linear):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
config = Config.from_name(model_name)
model = GPT(config)
next(model.parameters()).sum() #-25 -2 -860
model.apply(_init_weights)
model.load_state_dict
checkpoint_dir = Path("final-gpt-model-ckpt.pth")
strategy = "auto"
quantize = None
devices = 1
precision = None
precision = get_default_supported_precision(training=False)
plugins = None
fabric = L.Fabric(devices=devices, precision=precision, strategy=strategy, plugins=plugins)
fabric.launch()
fabric.print(f"Loading model {str(checkpoint_dir)!r} with {config.__dict__}", file=sys.stderr)
with fabric.init_module(empty_init=True), gptq_quantization(quantize=="gptq.int4"):
model = GPT(config)
model.eval()
model = fabric.setup_module(model)
load_checkpoint(fabric, model, checkpoint_dir)
tokenizer = Tokenizer(Path('tokenizer_config'))
@torch.inference_mode()
def generate(
model: GPT,
idx: torch.Tensor,
max_returned_tokens: int,
*,
temperature: float = 1.0,
top_k:int = None,
eos_id:int = None,
) -> torch.Tensor:
"""Takes a conditioning sequence (prompt) as input and continues to generate as many tokens as requested.
The implementation of this function is modified from A. Karpathy's nanoGPT.
Args:
model: The model to use.
idx: Tensor of shape (T) with indices of the prompt sequence.
max_returned_tokens: The maximum number of tokens to return (given plus generated).
temperature: Scales the predicted logits by 1 / temperature.
top_k: If specified, only sample among the tokens with the k highest probabilities.
eos_id: If specified, stop generating any more token once the <eos> token is triggered.
"""
T = idx.size(0)
assert max_returned_tokens > T
if model.max_seq_length < max_returned_tokens - 1:
# rolling the kv cache based on the `input_pos` value would be necessary. However, doing so would introduce a
# data dependency on the `input_pos` tensor and impact model compilation. Since this setting is uncommon, we do
# not support it to avoid negatively impacting the overall speed
raise NotImplementedError(f"max_seq_length {model.max_seq_length} needs to be >= {max_returned_tokens - 1}")
device, dtype = idx.device, idx.dtype
# create an empty tensor of the expected final shape and fill in the current tokens
empty = torch.empty(max_returned_tokens, dtype=dtype, device=device)
empty[:T] = idx
idx = empty
input_pos = torch.arange(0, T, device=device)
# generate up to a fixed number of tokens
for _ in range(max_returned_tokens - T):
x = idx.index_select(0, input_pos).view(1, -1)
# forward
logits = model(x, input_pos)
logits = logits[0, -1] / temperature
# optionally crop the logits to only the top k options
if top_k is not None:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits = torch.where(logits < v[[-1]], -float("Inf"), logits)
probs = torch.nn.functional.softmax(logits, dim=-1)
idx_next = torch.multinomial(probs, num_samples=1).to(dtype=dtype)
# advance
input_pos = input_pos[-1:] + 1
# concatenate the new generation
idx = idx.index_copy(0, input_pos, idx_next)
# if <eos> token is triggered, return the output (stop generation)
if idx_next == eos_id:
return idx[:input_pos] # include the EOS token
return idx
device = 'cuda' if torch.cuda.is_available() else 'cpu'
def generate_text(input_text, temperature=0.8, max_tokens=200, top_k=None):
encoded = tokenizer.encode(input_text, device=fabric.device)
max_returned_tokens = encoded.size(0) + max_tokens
with fabric.init_tensor():
# set the max_seq_length to limit the memory usage to what we need
model.max_seq_length = max_returned_tokens
with fabric.init_tensor():
model.set_kv_cache(batch_size=1)
y = generate(model, encoded, max_returned_tokens, temperature=temperature, top_k=top_k)
return(tokenizer.decode(y))
import gradio as gr
title = "GPT from scratch"
demo = gr.Interface(generate_text,
inputs=[gr.Textbox(label="Enter any prompt ", type="text", value="Once upon a time,"),
gr.Slider(minimum=0, maximum=1, step=0.1, value=0.8, label="Temperature"),
gr.Slider(minimum=200, maximum=1000, step=50, value=300, label="Max Tokens"),
gr.Slider(minimum=10, maximum=100, step=5, value=20, label="Top K")],
outputs=gr.Textbox(label="Text generated", type="text"), title=title)
demo.launch()