import torch import torch.nn as nn import torch.nn.functional as F import math import torch.optim as optim from transformers import AutoModelForCausalLM from transformers.modeling_utils import PreTrainedModel from transformers.configuration_utils import PretrainedConfig # Update the DecoderLayer to use the grouped MultiHeadAttention class DecoderLayer(nn.Module): def __init__(self, d_model, n_heads, dim_feedforward, dropout=0.1, group_size=16): super(DecoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads, dropout, group_size) self.feed_forward = PositionwiseFeedForward(d_model, dim_feedforward, dropout) self.layer_norm1 = nn.LayerNorm(d_model) self.layer_norm2 = nn.LayerNorm(d_model) self.dropout = nn.Dropout(dropout) def forward(self, x): # Self-Attention Mechanism (SA) norm_x = self.layer_norm1(x) x = x + self.dropout(self.self_attn(norm_x, norm_x, norm_x)) # Feed-Forward Network (FFN) norm_x = self.layer_norm2(x) x = x + self.dropout(self.feed_forward(norm_x)) return x class MultiHeadAttention(nn.Module): def __init__(self, d_model, n_heads, dropout=0.1, group_size=16): super(MultiHeadAttention, self).__init__() self.query_linear = nn.Linear(d_model, d_model) self.key_linear = nn.Linear(d_model, d_model) self.value_linear = nn.Linear(d_model, d_model) self.dropout = nn.Dropout(dropout) self.n_heads = n_heads self.d_model = d_model self.group_size = group_size def forward(self, query, key, value): # Compute attention scores query = self.query_linear(query) key = self.key_linear(key) value = self.value_linear(value) # Split the input sequences into groups query_groups = query.chunk(self.group_size, dim=1) key_groups = key.chunk(self.group_size, dim=1) value_groups = value.chunk(self.group_size, dim=1) attention_scores = [] for q, k, v in zip(query_groups, key_groups, value_groups): scores = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(self.d_model) scores = F.softmax(scores, dim=-1) scores = self.dropout(scores) attention_scores.append(torch.matmul(scores, v)) # Concatenate the outputs from all groups output = torch.cat(attention_scores, dim=1) return output class PositionwiseFeedForward(nn.Module): def __init__(self, d_model, dim_feedforward, dropout=0.1): super(PositionwiseFeedForward, self).__init__() self.linear1 = nn.Linear(d_model, dim_feedforward) self.dropout = nn.Dropout(dropout) self.linear2 = nn.Linear(dim_feedforward, d_model) def forward(self, x): x = F.relu(self.linear1(x)) x = self.dropout(x) x = self.linear2(x) return x # Update the Decoder class to use the grouped MultiHeadAttention class Decoder(nn.Module): def __init__(self, num_layers, d_model, n_heads, dim_feedforward, dropout=0.1, group_size=16): super(Decoder, self).__init__() self.layers = nn.ModuleList([ DecoderLayer(d_model, n_heads, dim_feedforward, dropout, group_size) for _ in range(num_layers) ]) self.layer_norm = nn.LayerNorm(d_model) def forward(self, x): for layer in self.layers: x = layer(x) x = self.layer_norm(x) return x class Embeddings(nn.Module): def __init__(self, d_model, vocab_size): super(Embeddings, self).__init__() self.lut = nn.Embedding(vocab_size, d_model) self.d_model = d_model def forward(self, x): return self.lut(x) * math.sqrt(self.d_model) class PositionalEncoding(nn.Module): def __init__(self, d_model, dropout=0.1, max_len=5000): super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(dropout) pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer('pe', pe) def forward(self, x): x = x + self.pe[:x.size(0), :] return self.dropout(x) class RMSNorm(nn.Module): def __init__(self, dim, epsilon=1e-6, scale=True): super(RMSNorm, self).__init__() self.epsilon = epsilon self.scale = scale self.weight = nn.Parameter(torch.ones(dim)) def forward(self, x): rms = torch.sqrt(torch.mean(torch.square(x), dim=-1, keepdim=True)) if self.scale: weight = self.weight / (rms + self.epsilon) return weight * x else: return x / (rms + self.epsilon) class TransformerDecoder(nn.Module): def __init__(self, num_layers, d_model, n_heads, dim_feedforward, dropout=0.1, vocab_size=10000, group_size=16): super(TransformerDecoder, self).__init__() self.embeddings = Embeddings(d_model, vocab_size) self.positional_encoding = PositionalEncoding(d_model, dropout) self.decoder = Decoder(num_layers, d_model, n_heads, dim_feedforward, dropout) self.rms_norm = RMSNorm(d_model) self.group_size = group_size def forward(self, x): x = self.embeddings(x) x = self.positional_encoding(x) x = self.decoder(x) x = self.rms_norm(x) return x class TransformerDecoderLM(nn.Module): def __init__(self, num_layers, d_model, n_heads, dim_feedforward, dropout=0.1, vocab_size=10000, group_size=16): super(TransformerDecoderLM, self).__init__() self.transformer = TransformerDecoder(num_layers, d_model, n_heads, dim_feedforward, dropout, vocab_size, group_size) self.lm_head = nn.Linear(d_model, vocab_size) def forward(self, input_ids): transformer_output = self.transformer(input_ids) lm_logits = self.lm_head(transformer_output) return lm_logits class CustomConfig(PretrainedConfig): model_type = "custom_transformer" def __init__(self, num_layers=6, d_model=512, n_heads=8, dim_feedforward=2048, dropout=0.1, vocab_size=10000, group_size=16, **kwargs): self.num_layers = num_layers self.d_model = d_model self.n_heads = n_heads self.dim_feedforward = dim_feedforward self.dropout = dropout self.vocab_size = vocab_size self.group_size = group_size super().__init__(**kwargs) class CustomTransformerForCausalLM(PreTrainedModel): config_class = CustomConfig def __init__(self, config): super().__init__(config) self.transformer = TransformerDecoderLM( num_layers=config.num_layers, d_model=config.d_model, n_heads=config.n_heads, dim_feedforward=config.dim_feedforward, dropout=config.dropout, vocab_size=config.vocab_size, group_size=config.group_size ) def forward(self, input_ids, labels=None): logits = self.transformer(input_ids) loss = None if labels is not None: loss_fct = nn.CrossEntropyLoss() loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1)) return {"loss": loss, "logits": logits}