|
from transformers import PreTrainedModel, AutoModel
|
|
import torch
|
|
import torch.nn as nn
|
|
import math
|
|
from .config import BERTMultiAttentionConfig
|
|
|
|
class MultiHeadAttention(nn.Module):
|
|
def __init__(self, config):
|
|
super(MultiHeadAttention, self).__init__()
|
|
self.hidden_size = config.hidden_size
|
|
self.num_heads = config.num_heads
|
|
self.head_dim = config.hidden_size // config.num_heads
|
|
|
|
self.query = nn.Linear(config.hidden_size, config.hidden_size)
|
|
self.key = nn.Linear(config.hidden_size, config.hidden_size)
|
|
self.value = nn.Linear(config.hidden_size, config.hidden_size)
|
|
self.out = nn.Linear(config.hidden_size, config.hidden_size)
|
|
|
|
self.layer_norm_q = nn.LayerNorm(config.hidden_size)
|
|
self.layer_norm_k = nn.LayerNorm(config.hidden_size)
|
|
self.layer_norm_v = nn.LayerNorm(config.hidden_size)
|
|
self.layer_norm_out = nn.LayerNorm(config.hidden_size)
|
|
|
|
self.dropout = nn.Dropout(config.dropout)
|
|
|
|
def forward(self, query, key, value):
|
|
batch_size = query.size(0)
|
|
|
|
query = self.layer_norm_q(self.query(query))
|
|
key = self.layer_norm_k(self.key(key))
|
|
value = self.layer_norm_v(self.value(value))
|
|
|
|
query = query.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
|
|
key = key.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
|
|
value = value.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
|
|
|
|
attention_scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.head_dim)
|
|
attention_weights = nn.Softmax(dim=-1)(attention_scores)
|
|
attention_weights = self.dropout(attention_weights)
|
|
|
|
attended_values = torch.matmul(attention_weights, value).permute(0, 2, 1, 3).contiguous()
|
|
attended_values = attended_values.view(batch_size, -1, self.hidden_size)
|
|
|
|
out = self.layer_norm_out(self.out(attended_values))
|
|
out = self.dropout(out)
|
|
|
|
return out
|
|
|
|
class BERTMultiAttentionModel(PreTrainedModel):
|
|
config_class = BERTMultiAttentionConfig
|
|
|
|
def __init__(self, config):
|
|
super(BERTMultiAttentionModel, self).__init__(config)
|
|
self.config = config
|
|
|
|
self.transformer = AutoModel.from_pretrained(config.transformer)
|
|
self.cross_attention = MultiHeadAttention(config)
|
|
self.fc1 = nn.Linear(config.hidden_size * 2, 256)
|
|
self.layer_norm_fc1 = nn.LayerNorm(256)
|
|
self.dropout1 = nn.Dropout(config.dropout)
|
|
self.rnn = nn.LSTM(input_size=256, hidden_size=config.rnn_hidden_size, num_layers=config.rnn_num_layers, batch_first=True, bidirectional=config.rnn_bidirectional, dropout=config.dropout)
|
|
self.layer_norm_rnn = nn.LayerNorm(256)
|
|
self.dropout2 = nn.Dropout(config.dropout)
|
|
self.fc_proj = nn.Linear(256, 256)
|
|
self.layer_norm_proj = nn.LayerNorm(256)
|
|
self.dropout3 = nn.Dropout(config.dropout)
|
|
self.fc_final = nn.Linear(256, 1)
|
|
|
|
def forward(self, input_ids1, attention_mask1, input_ids2, attention_mask2):
|
|
output1 = self.transformer(input_ids1, attention_mask=attention_mask1)[0]
|
|
output2 = self.transformer(input_ids2, attention_mask=attention_mask2)[0]
|
|
|
|
attended_output = self.cross_attention(output1, output2, output2)
|
|
combined_output = torch.cat([output1, attended_output], dim=2)
|
|
combined_output = torch.mean(combined_output, dim=1)
|
|
|
|
combined_output = self.layer_norm_fc1(self.fc1(combined_output))
|
|
combined_output = self.dropout1(torch.relu(combined_output))
|
|
combined_output = combined_output.unsqueeze(1)
|
|
|
|
_, (hidden_state, _) = self.rnn(combined_output)
|
|
hidden_state_concat = torch.cat([hidden_state[0], hidden_state[1]], dim=-1)
|
|
|
|
hidden_state_proj = self.layer_norm_proj(self.fc_proj(hidden_state_concat))
|
|
hidden_state_proj = self.dropout2(hidden_state_proj)
|
|
|
|
final = self.fc_final(hidden_state_proj)
|
|
final = self.dropout3(final)
|
|
|
|
return torch.sigmoid(final)
|
|
|
|
|
|
AutoModel.register(BERTMultiAttentionConfig, BERTMultiAttentionModel)
|
|
|