import pandas as pd import numpy as np import torch from transformers import RobertaTokenizer, RobertaForSequenceClassification from torch import nn from torch.nn import init, MarginRankingLoss from transformers import BertModel, RobertaModel from transformers import BertTokenizer, RobertaTokenizer from torch.optim import Adam from distutils.version import LooseVersion from torch.utils.data import Dataset, DataLoader from torch.utils.tensorboard import SummaryWriter from datetime import datetime from torch.autograd import Variable from transformers import AutoConfig, AutoModel, AutoTokenizer import nltk import re import Levenshtein import spacy import en_core_web_sm import torch.optim as optim from torch.distributions import Categorical from numpy import linalg as LA from transformers import AutoModelForMaskedLM from nltk.corpus import wordnet import torch.nn.functional as F import random from transformers import get_linear_schedule_with_warmup from sklearn.metrics import precision_recall_fscore_support from nltk.corpus import words as wal from sklearn.utils import resample # In[56]: class MyDataset(Dataset): def __init__(self,file_name): df1 = pd.read_csv(file_name) df1 = df1.fillna("") res = df1['X'] self.X_list = res.to_numpy() self.y_list = df1['y'].to_numpy() def __len__(self): return len(self.X_list) def __getitem__(self,idx): mapi = [] mapi.append(self.X_list[idx]) mapi.append(self.y_list[idx]) return mapi # In[59]: class Step1_model(nn.Module): def __init__(self, hidden_size=512): super(Step1_model, self).__init__() self.hidden_size = hidden_size self.model = RobertaForSequenceClassification.from_pretrained("microsoft/graphcodebert-base", num_labels=6) self.tokenizer = AutoTokenizer.from_pretrained("microsoft/graphcodebert-base") self.config = AutoConfig.from_pretrained("microsoft/graphcodebert-base") for name, param in self.model.named_parameters(): param.requires_grad = True def forward(self, mapi): X_init = mapi[0] X_init = X_init.replace("[MASK]", " ".join([tokenizer.mask_token] * 1)) y = mapi[1] print(y) nl = re.findall(r'[A-Z](?:[a-z]+|[A-Z]*(?=[A-Z]|$))|[a-z]+|\d+', y) lb = ' '.join(nl).lower() x = tokenizer.tokenize(lb) nlab = len(x) print(nlab) tokens = self.tokenizer.encode_plus(X_init, add_special_tokens=False,return_tensors='pt') input_id_chunki = tokens['input_ids'][0].split(510) input_id_chunks = [] mask_chunks = [] mask_chunki = tokens['attention_mask'][0].split(510) for tensor in input_id_chunki: input_id_chunks.append(tensor) for tensor in mask_chunki: mask_chunks.append(tensor) xi = torch.full((1,), fill_value=101) yi = torch.full((1,), fill_value=1) zi = torch.full((1,), fill_value=102) for r in range(len(input_id_chunks)): input_id_chunks[r] = torch.cat([xi, input_id_chunks[r]],dim = -1) input_id_chunks[r] = torch.cat([input_id_chunks[r],zi],dim=-1) mask_chunks[r] = torch.cat([yi, mask_chunks[r]],dim=-1) mask_chunks[r] = torch.cat([mask_chunks[r],yi],dim=-1) di = torch.full((1,), fill_value=0) for i in range(len(input_id_chunks)): # get required padding length pad_len = 512 - input_id_chunks[i].shape[0] # check if tensor length satisfies required chunk size if pad_len > 0: # if padding length is more than 0, we must add padding for p in range(pad_len): input_id_chunks[i] = torch.cat([input_id_chunks[i],di],dim=-1) mask_chunks[i] = torch.cat([mask_chunks[i],di],dim=-1) input_ids = torch.stack(input_id_chunks) attention_mask = torch.stack(mask_chunks) input_dict = { 'input_ids': input_ids.long(), 'attention_mask': attention_mask.int() } with torch.no_grad(): outputs = self.model(**input_dict) last_hidden_state = outputs.logits.squeeze() lhs_agg = [] if len(last_hidden_state) == 1: lhs_agg.append(last_hidden_state) else: for p in range(len(last_hidden_state)): lhs_agg.append(last_hidden_state[p]) lhs = lhs_agg[0] for i in range(len(lhs_agg)): if i == 0: continue lhs+=lhs_agg[i] lhs/=len(lhs_agg) # print(lhs) predicted_prob = torch.softmax(lhs, dim=0) if nlab > 6: nlab = 6 pll = -1*torch.log(predicted_prob[nlab-1]) return {'loss':pll} # In[60]: epoch_number = 0 EPOCHS = 5 run_int = 8 tokenizer = AutoTokenizer.from_pretrained("microsoft/graphcodebert-base") model = Step1_model() optimizer = optim.AdamW(model.parameters(), lr=2e-5) myDs=MyDataset('dat1.csv') train_loader=DataLoader(myDs,batch_size=1,shuffle=False) best_loss = torch.full((1,), fill_value=100000) # In[61]: flag = 0 def train_one_epoch(transformer_model, dataset): global flag for batch in dataset: p = 0 inputs = batch optimizer.zero_grad() for i in range(len(inputs[0])): l = [] l.append(inputs[0][i]) l.append(inputs[1][i]) opi = transformer_model(l) loss = opi['loss'] loss.backward() optimizer.step() if p % 1 == 0: print(' batch loss: {}'.format(loss)) return loss # In[62]: for epoch in range(EPOCHS): print('EPOCH {}:'.format(epoch_number + 1)) model.train(True) avg_loss = train_one_epoch(model,train_loader) model.train(False) print('LOSS train {}'.format(avg_loss)) if avg_loss < best_loss: best_loss = avg_loss model_path = 'var_runs_class/model_{}_{}'.format(run_int, epoch_number) torch.save(model.state_dict(), model_path) epoch_number += 1 # In[ ]: