|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
import numpy as np |
|
import pickle |
|
from torch.utils.data import Dataset, DataLoader |
|
from safetensors.torch import save_file |
|
import logging |
|
import json |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
sequence_length = 64 |
|
batch_size = 16 |
|
num_epochs = 1 |
|
learning_rate = 0.00001 |
|
embedding_dim = 256 |
|
hidden_dim = 800 |
|
num_layers = 4 |
|
|
|
|
|
logging.info('Reading the text file...') |
|
with open('fulltext.json', 'r') as file: |
|
text = file.read() |
|
logging.info('Text file read successfully.') |
|
|
|
|
|
logging.info('Preprocessing the text...') |
|
words = json.loads(text) |
|
vocab = set(words) |
|
vocab.add('<pad>') |
|
vocab.add('<UNK>') |
|
word2idx = {word: idx for idx, word in enumerate(vocab)} |
|
idx2word = {idx: word for idx, word in enumerate(vocab)} |
|
vocab_size = len(vocab) |
|
|
|
logging.info(f'Vocabulary size: {vocab_size}') |
|
|
|
|
|
|
|
logging.info('Creating sequences...') |
|
sequences = [] |
|
for i in range(len(words) - sequence_length): |
|
seq = words[i:i + sequence_length] |
|
label = words[i + sequence_length] |
|
sequences.append((seq, label)) |
|
|
|
logging.info(f'Number of sequences: {len(sequences)}') |
|
|
|
|
|
class TextDataset(Dataset): |
|
def __init__(self, sequences, word2idx): |
|
self.sequences = sequences |
|
self.word2idx = word2idx |
|
|
|
def __len__(self): |
|
return len(self.sequences) |
|
|
|
def __getitem__(self, idx): |
|
seq, label = self.sequences[idx] |
|
seq_idx = [self.word2idx.get(word, self.word2idx['<UNK>']) for word in seq] |
|
label_idx = self.word2idx.get(label, self.word2idx['<UNK>']) |
|
return torch.tensor(seq_idx, dtype=torch.long), torch.tensor(label_idx, dtype=torch.long) |
|
|
|
logging.info('Creating dataset and dataloader...') |
|
dataset = TextDataset(sequences, word2idx) |
|
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) |
|
|
|
|
|
class LSTMModel(nn.Module): |
|
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers): |
|
super(LSTMModel, self).__init__() |
|
self.embedding = nn.Embedding(vocab_size, embedding_dim) |
|
self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True) |
|
self.fc = nn.Linear(hidden_dim, vocab_size) |
|
|
|
def forward(self, x): |
|
embeds = self.embedding(x) |
|
lstm_out, _ = self.lstm(embeds) |
|
logits = self.fc(lstm_out[:, -1, :]) |
|
return logits |
|
|
|
logging.info('Initializing the LSTM model...') |
|
model = LSTMModel(vocab_size, embedding_dim, hidden_dim, num_layers) |
|
criterion = nn.CrossEntropyLoss() |
|
optimizer = optim.Adam(model.parameters(), lr=learning_rate) |
|
|
|
|
|
logging.info('Starting training...') |
|
for epoch in range(num_epochs): |
|
for batch_idx, batch in enumerate(dataloader): |
|
inputs, targets = batch |
|
outputs = model(inputs) |
|
loss = criterion(outputs, targets) |
|
|
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
|
|
if batch_idx % 10 == 0: |
|
logging.info(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx}/{len(dataloader)}], Loss: {loss.item():.4f}') |
|
|
|
|
|
logging.info('Saving the model...') |
|
save_file(model.state_dict(), 'lstm_model.safetensors') |
|
with open('word2idx.pkl', 'wb') as f: |
|
pickle.dump(word2idx, f) |
|
with open('idx2word.pkl', 'wb') as f: |
|
pickle.dump(idx2word, f) |
|
|
|
logging.info('Model and vocabulary saved successfully.') |
|
|