|
import json |
|
import os |
|
from pathlib import Path |
|
|
|
import torch |
|
from docarray.index import InMemoryExactNNIndex |
|
from common.repo_doc import RepoDoc |
|
import random |
|
from torchmetrics.classification import Accuracy, Precision, Recall, F1Score, AUROC |
|
from tqdm import tqdm |
|
|
|
INDEX_PATH = Path(__file__).parent.joinpath("..\\data\\") |
|
TOPIC_CLUSTER_PATH = Path(__file__).parent.joinpath("..\\data\\repo_topic_clusters.json") |
|
CODE_CLUSTER_PATH = Path(__file__).parent.joinpath("..\\data\\repo_code_clusters.json") |
|
|
|
|
|
def read_repo_cluster(filename): |
|
|
|
with open(filename, 'r', encoding='utf-8') as file: |
|
data = json.load(file) |
|
return data |
|
|
|
|
|
def find_files_in_directory(directory): |
|
|
|
files = [] |
|
for file in os.listdir(directory): |
|
if file[:5] == "index" and file[5] != ".": |
|
files.append(os.path.join(directory, file)) |
|
return files |
|
|
|
|
|
def read_repo_embedding(): |
|
|
|
map = {} |
|
for filename in find_files_in_directory(INDEX_PATH): |
|
data = InMemoryExactNNIndex[RepoDoc](index_file_path=Path(__file__).parent.joinpath(filename)) |
|
docs_tmp = data._docs |
|
for doc in docs_tmp: |
|
map[doc.name] = doc.repository_embedding |
|
return map |
|
|
|
|
|
def build_cluster_repo_embedding(mode: str): |
|
""" |
|
build the dataset according to code cluster |
|
where mode is "code" or "topic" |
|
""" |
|
embedding = read_repo_embedding() |
|
if mode == "code": |
|
cluster_id = read_repo_cluster(CODE_CLUSTER_PATH) |
|
elif mode == "topic": |
|
cluster_id = read_repo_cluster(TOPIC_CLUSTER_PATH) |
|
else: |
|
raise ValueError("parameter 'mode' must be 'code' or 'topic'") |
|
data = [] |
|
for name in embedding: |
|
data.append({'name': name, 'embedding': embedding[name], 'id': cluster_id[name]}) |
|
return data |
|
|
|
|
|
def build_dataset(data, ratio=0.7): |
|
""" |
|
return the train set and test set which are like (index1, index2) : (same, not same) |
|
""" |
|
positive_repo = [] |
|
negative_repo = [] |
|
n = len(data) |
|
|
|
for i in range(n): |
|
for j in range(i, n): |
|
if data[i]['id'] == data[j]['id']: |
|
positive_repo.append((i, j, (1.0, 0.0))) |
|
positive_repo.append((j, i, (1.0, 0.0))) |
|
else: |
|
negative_repo.append((i, j, (0.0, 1.0))) |
|
negative_repo.append((j, i, (0.0, 1.0))) |
|
|
|
positive_length = len(positive_repo) |
|
negative_repo = random.choices(negative_repo, k=positive_length) |
|
|
|
random.shuffle(positive_repo) |
|
random.shuffle(negative_repo) |
|
split_index = int(positive_length * ratio) |
|
train_set = positive_repo[:split_index] + negative_repo[:split_index] |
|
random.shuffle(train_set) |
|
test_set = positive_repo[split_index:] + negative_repo[split_index:] |
|
random.shuffle(test_set) |
|
print("Positive data:", len(positive_repo)) |
|
print("Negative data:", len(negative_repo)) |
|
return train_set, test_set |
|
|
|
|
|
def train_epoch(epoch, model, loader, device, criterion, optimizer): |
|
model.train() |
|
accuracy = Accuracy(task='binary') |
|
precision = Precision(task='binary') |
|
recall = Recall(task='binary') |
|
f1 = F1Score(task='binary') |
|
auroc = AUROC(task='binary') |
|
accuracy.to(device) |
|
precision.to(device) |
|
recall.to(device) |
|
f1.to(device) |
|
auroc.to(device) |
|
total_loss = 0 |
|
count = 0 |
|
for repo1, repo2, label in tqdm(loader): |
|
count += len(label) |
|
optimizer.zero_grad() |
|
repo1 = repo1.to(device) |
|
repo2 = repo2.to(device) |
|
label = label.to(device) |
|
pred = model(repo1, repo2) |
|
|
|
loss = criterion(pred, label) |
|
loss.backward() |
|
total_loss += loss.item() |
|
optimizer.step() |
|
|
|
accuracy(pred, label) |
|
precision(pred, label) |
|
recall(pred, label) |
|
f1(pred, label) |
|
auroc(pred, label) |
|
print("Epoch", epoch, "Train loss:", total_loss / count, "Acc", accuracy.compute().item(), "Precision:", |
|
precision.compute().item(), "Recall:", recall.compute().item(), "F1:", f1.compute().item(), |
|
"AUROC:", auroc.compute().item()) |
|
|
|
|
|
def evaluate(model, loader, device, criterion): |
|
model.eval() |
|
with torch.no_grad(): |
|
test_accuracy = Accuracy(task='binary') |
|
test_precision = Precision(task='binary') |
|
test_recall = Recall(task='binary') |
|
test_f1 = F1Score(task='binary') |
|
test_auroc = AUROC(task='binary') |
|
test_accuracy.to(device) |
|
test_precision.to(device) |
|
test_recall.to(device) |
|
test_f1.to(device) |
|
test_auroc.to(device) |
|
total_loss = 0 |
|
count = 0 |
|
for repo1, repo2, label in tqdm(loader): |
|
count += len(label) |
|
repo1 = repo1.to(device) |
|
repo2 = repo2.to(device) |
|
label = label.to(device) |
|
pred = model(repo1, repo2) |
|
loss = criterion(pred, label) |
|
total_loss += loss.item() |
|
|
|
test_accuracy(pred, label) |
|
test_precision(pred, label) |
|
test_recall(pred, label) |
|
test_f1(pred, label) |
|
test_auroc(pred, label) |
|
print("Test loss:", total_loss / count, "Acc", test_accuracy.compute().item(), "Precision:", |
|
test_precision.compute().item(), "Recall:", test_recall.compute().item(), "F1:", test_f1.compute().item(), |
|
"AUROC:", test_auroc.compute().item()) |
|
|
|
return test_accuracy.compute().item(), total_loss / count, test_precision.compute().item(), test_recall.compute().item(), \ |
|
test_f1.compute().item(), test_auroc.compute().item() |
|
|
|
|
|
def calculate_similarity(model, repo_emb1, repo_emb2): |
|
return torch.nn.functional.softmax(model(repo_emb1, repo_emb2) + model(repo_emb2, repo_emb1), dim=1) |
|
|