|
|
|
|
|
|
|
from pathlib import Path |
|
|
|
import caveclient as cc |
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
import pandas as pd |
|
import seaborn as sns |
|
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis |
|
from sklearn.ensemble import RandomForestClassifier |
|
from sklearn.metrics import classification_report |
|
from sklearn.model_selection import KFold |
|
from sklearn.pipeline import Pipeline |
|
from sklearn.preprocessing import QuantileTransformer |
|
from skops.io import dump |
|
|
|
client = cc.CAVEclient("minnie65_phase3_v1") |
|
|
|
out_path = Path("./troglobyte-sandbox/models/") |
|
|
|
model_name = "local_compartment_classifier_bd_boxes" |
|
|
|
data_path = Path("./troglobyte-sandbox/data/bounding_box_labels") |
|
|
|
files = list(data_path.glob("*.csv")) |
|
|
|
|
|
label_df = pd.read_csv(out_path / model_name / "labels.csv", index_col=[0, 1]) |
|
label_df = label_df.rename(columns=lambda x: x.replace(".1", "")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
X_df = pd.read_csv(out_path / model_name / "features_new.csv", index_col=[0, 1]) |
|
|
|
|
|
|
|
|
|
|
|
def box_train_test_split( |
|
train_box_indices, test_box_indices, X_df, label_df, label_column |
|
): |
|
train_label_df = label_df.loc[train_box_indices + 1].droplevel("bbox_id") |
|
test_label_df = label_df.loc[test_box_indices + 1].droplevel("bbox_id") |
|
|
|
train_X_df = X_df.loc[train_label_df["root_id"]] |
|
test_X_df = X_df.loc[test_label_df["root_id"]] |
|
train_X_df = train_X_df.dropna() |
|
test_X_df = test_X_df.dropna() |
|
|
|
train_l2_y = train_X_df.index.get_level_values("object_id").map( |
|
train_label_df[label_column] |
|
) |
|
test_l2_y = test_X_df.index.get_level_values("object_id").map( |
|
test_label_df[label_column] |
|
) |
|
|
|
|
|
train_X_df = train_X_df.loc[train_l2_y.notna()] |
|
train_l2_y = train_l2_y[train_l2_y.notna()].values.astype(str) |
|
|
|
test_X_df = test_X_df.loc[test_l2_y.notna()] |
|
test_l2_y = test_l2_y[test_l2_y.notna()].values.astype(str) |
|
|
|
return train_X_df, test_X_df, train_l2_y, test_l2_y |
|
|
|
|
|
def aggregate_votes_by_object(X_df, l2_node_predictions): |
|
l2_node_predictions = pd.Series( |
|
index=X_df.index, data=l2_node_predictions, name="label" |
|
) |
|
|
|
object_prediction_counts = ( |
|
l2_node_predictions.groupby(level="object_id").value_counts().to_frame() |
|
) |
|
|
|
object_n_predictions = object_prediction_counts.groupby("object_id").sum() |
|
|
|
sufficient_data_index = object_n_predictions.query("count > 3").index |
|
|
|
object_prediction_counts = object_prediction_counts.loc[sufficient_data_index] |
|
|
|
object_prediction_probs = object_prediction_counts.unstack(fill_value=0) |
|
object_prediction_probs = object_prediction_probs.div( |
|
object_prediction_probs.sum(axis=1), axis=0 |
|
) |
|
|
|
object_prediction_counts.reset_index(drop=False, inplace=True) |
|
|
|
max_locs = object_prediction_counts.groupby("object_id")["count"].idxmax() |
|
|
|
max_predictions = object_prediction_counts.loc[max_locs] |
|
max_predictions["proportion"] = ( |
|
max_predictions["count"] |
|
/ object_n_predictions.loc[max_predictions["object_id"]]["count"].values |
|
) |
|
max_predictions = max_predictions.set_index("object_id") |
|
return max_predictions, object_prediction_probs |
|
|
|
|
|
|
|
def get_lda(n_classes): |
|
lda = Pipeline( |
|
[ |
|
("transformer", QuantileTransformer(output_distribution="normal")), |
|
("lda", LinearDiscriminantAnalysis(n_components=n_classes - 1)), |
|
] |
|
) |
|
return lda |
|
|
|
|
|
rf = RandomForestClassifier(n_estimators=500, max_depth=4) |
|
|
|
box_indices = np.arange(1, 4) |
|
|
|
rows = [] |
|
for fold, (train_box_indices, test_box_indices) in enumerate( |
|
KFold(n_splits=3).split(box_indices.reshape(-1, 1)) |
|
): |
|
for label_column in ["axon_label", "simple_label"]: |
|
train_X_df, test_X_df, train_l2_y, test_l2_y = box_train_test_split( |
|
train_box_indices, test_box_indices, X_df, label_df, label_column |
|
) |
|
n_classes = label_df[label_column].nunique() |
|
models = {"rf": rf, "lda": get_lda(n_classes)} |
|
for model_type, model in models.items(): |
|
model.fit(train_X_df, train_l2_y) |
|
train_preds = model.predict(train_X_df) |
|
test_preds = model.predict(test_X_df) |
|
|
|
|
|
train_report = classification_report( |
|
train_l2_y, train_preds, output_dict=True |
|
) |
|
rows.append( |
|
{ |
|
"model": model_type, |
|
"fold": fold, |
|
"accuracy": train_report["accuracy"], |
|
"macro_f1": train_report["macro avg"]["f1-score"], |
|
"weighted_f1": train_report["weighted avg"]["f1-score"], |
|
"evaluation": "train", |
|
"labeling": label_column, |
|
"level": "level2", |
|
} |
|
) |
|
|
|
test_report = classification_report(test_l2_y, test_preds, output_dict=True) |
|
rows.append( |
|
{ |
|
"model": model_type, |
|
"fold": fold, |
|
"accuracy": test_report["accuracy"], |
|
"macro_f1": test_report["macro avg"]["f1-score"], |
|
"weighted_f1": test_report["weighted avg"]["f1-score"], |
|
"evaluation": "test", |
|
"labeling": label_column, |
|
"level": "level2", |
|
} |
|
) |
|
|
|
|
|
train_object_predictions, train_object_probs = aggregate_votes_by_object( |
|
train_X_df, train_preds |
|
) |
|
train_object_y = ( |
|
label_df.droplevel(0) |
|
.loc[train_object_predictions.index, label_column] |
|
.values.astype(str) |
|
) |
|
train_object_report = classification_report( |
|
train_object_y, train_object_predictions["label"], output_dict=True |
|
) |
|
rows.append( |
|
{ |
|
"model": model_type + "-vote", |
|
"fold": fold, |
|
"accuracy": train_object_report["accuracy"], |
|
"macro_f1": train_object_report["macro avg"]["f1-score"], |
|
"weighted_f1": train_object_report["weighted avg"]["f1-score"], |
|
"evaluation": "train", |
|
"labeling": label_column, |
|
"level": "root", |
|
} |
|
) |
|
|
|
test_object_predictions, test_object_probs = aggregate_votes_by_object( |
|
test_X_df, test_preds |
|
) |
|
test_object_y = ( |
|
label_df.droplevel(0) |
|
.loc[test_object_predictions.index, label_column] |
|
.values.astype(str) |
|
) |
|
test_object_report = classification_report( |
|
test_object_y, test_object_predictions["label"], output_dict=True |
|
) |
|
rows.append( |
|
{ |
|
"model": model_type + "-vote", |
|
"fold": fold, |
|
"accuracy": test_object_report["accuracy"], |
|
"macro_f1": train_object_report["macro avg"]["f1-score"], |
|
"weighted_f1": train_object_report["weighted avg"]["f1-score"], |
|
"evaluation": "test", |
|
"labeling": label_column, |
|
"level": "root", |
|
} |
|
) |
|
|
|
|
|
|
|
|
|
evaluation_df = pd.DataFrame(rows) |
|
|
|
sns.set_context("talk") |
|
|
|
fig, axs = plt.subplots(2, 3, figsize=(15, 10), constrained_layout=True, sharey="col") |
|
for i, labeling in enumerate(["simple_label", "axon_label"]): |
|
for j, metric in enumerate(["accuracy", "weighted_f1", "macro_f1"]): |
|
ax = axs[i, j] |
|
show_legend = (i == 0) & (j == 0) |
|
sns.stripplot( |
|
data=evaluation_df.query("labeling == @labeling"), |
|
x="model", |
|
y=metric, |
|
hue="evaluation", |
|
ax=ax, |
|
legend=show_legend, |
|
s=10, |
|
jitter=True, |
|
) |
|
ax.spines[["right", "top"]].set_visible(False) |
|
if j == 1: |
|
ax.set_title("Labeling: " + labeling) |
|
|
|
|
|
|
|
lda = model |
|
train_X_transformed = lda.transform(train_X_df) |
|
|
|
|
|
fig, ax = plt.subplots(1, 1, figsize=(10, 10)) |
|
sns.scatterplot( |
|
x=train_X_transformed[:, 0], |
|
y=train_X_transformed[:, 1], |
|
hue=train_l2_y, |
|
ax=ax, |
|
s=10, |
|
alpha=0.7, |
|
) |
|
ax.set(xticks=[], yticks=[], xlabel="LDA1", ylabel="LDA2") |
|
ax.spines[["right", "top"]].set_visible(False) |
|
|
|
fig, ax = plt.subplots(1, 1, figsize=(10, 10)) |
|
sns.scatterplot( |
|
x=train_X_transformed[:, 0], |
|
y=train_X_transformed[:, 2], |
|
hue=train_l2_y, |
|
ax=ax, |
|
s=10, |
|
alpha=0.7, |
|
) |
|
ax.set(xticks=[], yticks=[], xlabel="LDA1", ylabel="LDA3") |
|
ax.spines[["right", "top"]].set_visible(False) |
|
|
|
|
|
|
|
final_lda = Pipeline( |
|
[ |
|
("transformer", QuantileTransformer(output_distribution="normal")), |
|
("lda", LinearDiscriminantAnalysis(n_components=n_classes - 1)), |
|
] |
|
) |
|
|
|
train_X_df, test_X_df, train_l2_y, test_l2_y = box_train_test_split( |
|
np.array([0, 1, 2]), np.array([]), X_df, label_df, label_column |
|
) |
|
|
|
final_lda.fit(train_X_df, train_l2_y) |
|
|
|
report = classification_report( |
|
train_l2_y, final_lda.predict(train_X_df), output_dict=True |
|
) |
|
|
|
|
|
report_table = pd.DataFrame(report).T |
|
|
|
report_overall = report_table.loc[["accuracy", "macro avg", "weighted avg"]] |
|
report_overall.index.name = "type" |
|
report_overall.reset_index(inplace=True) |
|
report_by_class = report_table.drop(index=["accuracy", "macro avg", "weighted avg"]) |
|
report_by_class.index.name = "class" |
|
report_by_class.reset_index(inplace=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
from pathlib import Path |
|
|
|
from skops import card, hub_utils |
|
|
|
hub_out_path = Path( |
|
"troglobyte-sandbox/models/local_compartment_classifier_bd_boxes/hub_model" |
|
) |
|
if not hub_out_path.exists(): |
|
hub_utils.init( |
|
model=model_pickle_file, |
|
requirements=["scikit-learn", "caveclient"], |
|
dst=hub_out_path, |
|
task="tabular-classification", |
|
data=train_X_df, |
|
) |
|
|
|
hub_utils.add_files(__file__, dst=hub_out_path, exist_ok=True) |
|
|
|
|
|
if not os.path.exists(hub_out_path / "README.md"): |
|
model_card = card.Card(model, metadata=card.metadata_from_config(hub_out_path)) |
|
model_card.metadata.license = "mit" |
|
model_description = ( |
|
"This is a model trained to classify pieces of neuron as axon, dendrite, soma, or" |
|
"glia, " |
|
"based only on their local shape and synapse features." |
|
"The model is a linear discriminant classifier which was trained on compartment " |
|
"labels generated by Bethanny Danskin for 3 6x6x6 um boxes in the Minnie65 Phase3 " |
|
"dataset." |
|
) |
|
model_card_authors = "bdpedigo" |
|
model_card.add( |
|
model_card_authors=model_card_authors, |
|
model_description=model_description, |
|
) |
|
model_card.add_table( |
|
folded=False, |
|
**{ |
|
"Classification Report (overall)": report_overall, |
|
"Classification Report (by class)": report_by_class, |
|
}, |
|
) |
|
model_card.save(hub_out_path / "README.md") |
|
|
|
from huggingface_hub import HfApi |
|
|
|
api = HfApi() |
|
api.upload_folder( |
|
repo_id=f"bdpedigo/{model_name}", |
|
folder_path=hub_out_path, |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
syn_features = [col for col in X_df.columns if "syn" in col] |
|
train_X_df_no_syn = train_X_df.drop(columns=syn_features) |
|
|
|
final_lda_no_syn = Pipeline( |
|
[ |
|
("transformer", QuantileTransformer(output_distribution="normal")), |
|
("lda", LinearDiscriminantAnalysis(n_components=n_classes - 1)), |
|
] |
|
) |
|
|
|
final_lda_no_syn.fit(train_X_df_no_syn, train_l2_y) |
|
|
|
print(classification_report(train_l2_y, final_lda_no_syn.predict(train_X_df_no_syn))) |
|
|
|
with open(out_path / model_name / f"{model_name}_no_syn.skops", mode="bw") as f: |
|
dump(final_lda_no_syn, file=f) |
|
|