# -*- coding: utf-8 -*- """classification.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1JuZNV3fqC5XQ0L-jhIyVRbIDPfWWGkVI """ import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, models, transforms from torch.utils.data import DataLoader from torch.utils.data import DataLoader, random_split import os import matplotlib.pyplot as plt import random from PIL import Image import numpy as np import pandas as pd # Define the data directories data_dir = 'drive/MyDrive/Ai_Hackathon_2024/plant_data/data_for_training' augmented_data_dir = 'drive/MyDrive/Ai_Hackathon_2024/plant_data/augmented_data' # Define the desired number of images per class N = 50 # Define the augmentation transforms augmentation_transforms = transforms.Compose([ transforms.RandomHorizontalFlip(), transforms.RandomRotation(30), transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)), transforms.RandomResizedCrop(size=(224, 224), scale=(0.8, 1.0)), transforms.Pad(padding=10, padding_mode='reflect'), # Add padding with reflection transforms.ToTensor(), ]) # Load the dataset print('loading dataset...') dataset = datasets.ImageFolder(data_dir) class_names = dataset.classes print('loaded dataset.') # Function to save augmented images def save_image(img, path, idx): img.save(os.path.join(path, f'{idx}.png')) # Augment the dataset if not os.path.exists(augmented_data_dir): os.makedirs(augmented_data_dir) print('starting augmentation process...') for class_idx in range(len(dataset.classes)): print(f"class_idx = {class_idx}") class_dir = os.path.join(augmented_data_dir, dataset.classes[class_idx]) if not os.path.exists(class_dir): os.makedirs(class_dir) class_images = [img_path for img_path, label in dataset.samples if label == class_idx] current_count = 0 # Save original images first for img_path in class_images: img = Image.open(img_path) save_image(img, class_dir, current_count) current_count += 1 # If there are fewer than N images, augment the dataset while current_count < N: img_path = random.choice(class_images) img = Image.open(img_path) img = augmentation_transforms(img) img = transforms.ToPILImage()(img) # Convert back to PIL Image save_image(img, class_dir, current_count) current_count += 1 print('Data augmentation completed.') # Define the data directory data_dir = augmented_data_dir #'drive/MyDrive/Ai_Hackathon_2024/plant_data/data_for_training' # Set the random seed for reproducibility seed = 42 torch.manual_seed(seed) # Define transforms data_transforms = transforms.Compose([ transforms.Resize((224, 224)), transforms.RandomHorizontalFlip(), transforms.RandomRotation(30), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) # Create the dataset full_dataset = datasets.ImageFolder(data_dir, transform=data_transforms) # Define the train-validation split ratio train_size = int(0.8 * len(full_dataset)) val_size = len(full_dataset) - train_size # Split the dataset train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size], generator=torch.Generator().manual_seed(seed)) # Create data loaders train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) # Load the pre-trained ResNet50 model resnet50 = models.resnet50(weights='ResNet50_Weights.DEFAULT') # Freeze the parameters of the pre-trained model for param in resnet50.parameters(): param.requires_grad = False # Remove the final fully connected layer num_ftrs = resnet50.fc.in_features resnet50.fc = nn.Identity() # Replace the final layer with an identity function to get the feature vectors # Define a custom neural network with one hidden layer and an output layer class CustomNet(nn.Module): def __init__(self, num_ftrs, num_classes): super(CustomNet, self).__init__() self.resnet50 = resnet50 self.hidden = nn.Linear(num_ftrs, 512) self.relu = nn.ReLU() self.output = nn.Linear(512, num_classes) def forward(self, x): x = self.resnet50(x) # Extract features using the pre-trained model x = self.hidden(x) # Pass through the hidden layer x = self.relu(x) # Apply ReLU activation x = self.output(x) # Output layer return x # Instantiate the custom network num_classes = len(full_dataset.classes) model = CustomNet(num_ftrs, num_classes) # Move the model to the appropriate device device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") model = model.to(device) # Define criterion and optimizer criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) def train_model(model, dataloaders, criterion, optimizer, num_epochs=10): best_model_wts = model.state_dict() best_acc = 0.0 train_losses = [] val_losses = [] for epoch in range(num_epochs): print(f'Epoch {epoch}/{num_epochs - 1}') print('-' * 10) # Each epoch has a training and validation phase for phase in ['train', 'val']: if phase == 'train': model.train() else: model.eval() running_loss = 0.0 running_corrects = 0 for inputs, labels in dataloaders[phase]: inputs, labels = inputs.to(device), labels.to(device) # Zero the parameter gradients optimizer.zero_grad() # Forward with torch.set_grad_enabled(phase == 'train'): outputs = model(inputs) _, preds = torch.max(outputs, 1) loss = criterion(outputs, labels) # Backward + optimize only if in training phase if phase == 'train': loss.backward() optimizer.step() running_loss += loss.item() * inputs.size(0) running_corrects += torch.sum(preds == labels.data) epoch_loss = running_loss / len(dataloaders[phase].dataset) epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset) print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}') if phase == 'train': train_losses.append(epoch_loss) else: val_losses.append(epoch_loss) # Deep copy the model if phase == 'val' and epoch_acc > best_acc: best_acc = epoch_acc best_model_wts = model.state_dict() print('Best val Acc: {:4f}'.format(best_acc)) # Load best model weights model.load_state_dict(best_model_wts) # Plot the training and validation loss plt.figure(figsize=(10, 5)) plt.plot(train_losses, label='Training Loss') plt.plot(val_losses, label='Validation Loss') plt.xlabel('Epochs') plt.ylabel('Loss') plt.legend() plt.show() return model # Create a dictionary to hold the dataloaders dataloaders = {'train': train_loader, 'val': val_loader} # Train and evaluate the model model = train_model(model, dataloaders, criterion, optimizer, num_epochs=10) # Save the model torch.save(model.state_dict(), 'drive/MyDrive/Ai_Hackathon_2024/plant_data/fine_tuned_plant_classifier.pth') # Function to evaluate the model def evaluate_model(model, dataloader): model.eval() correct = 0 total = 0 all_preds = [] all_labels = [] with torch.no_grad(): for inputs, labels in dataloader: inputs, labels = inputs.to(device), labels.to(device) outputs = model(inputs) _, preds = torch.max(outputs, 1) all_preds.extend(preds.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) correct += (preds == labels).sum().item() total += labels.size(0) accuracy = correct / total return accuracy, all_preds, all_labels # Evaluate the model dataloader = DataLoader(full_dataset, batch_size=32, shuffle=True) accuracy, all_preds, all_labels = evaluate_model(model, dataloader) # Calculate the number of correct and incorrect predictions correct_preds = sum(np.array(all_preds) == np.array(all_labels)) incorrect_preds = len(all_labels) - correct_preds print(f'Total images: {len(all_labels)}') print(f'Correct predictions: {correct_preds}') print(f'Incorrect predictions: {incorrect_preds}') print(f'Accuracy: {accuracy:.4f}') ##-----------------------------------------------------------## real_dataset = datasets.ImageFolder('drive/MyDrive/Ai_Hackathon_2024/plant_data/data_for_training', transform=data_transforms) # Evaluate the model dataloader = DataLoader(real_dataset, batch_size=32, shuffle=True) accuracy, all_preds, all_labels = evaluate_model(model, dataloader) # Calculate the number of correct and incorrect predictions correct_preds = sum(np.array(all_preds) == np.array(all_labels)) incorrect_preds = len(all_labels) - correct_preds print('-'*10) print(f'Total images: {len(all_labels)}') print(f'Correct predictions: {correct_preds}') print(f'Incorrect predictions: {incorrect_preds}') print(f'Accuracy: {accuracy:.4f}') # Function to load and preprocess the image def process_image(image_path): data_transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) image = Image.open(image_path).convert('RGB') image = data_transform(image)# data_transforms(image) # <-- data transforms uses all the random cropping as well image = image.unsqueeze(0) # Add batch dimension return image #----------------------------INFERENCE PART---------------------------- # Function to predict the class of a single image def predict_single_image(image_path, model): # Load the image and preprocess it image = process_image(image_path) # Load the model model.eval() device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") model = model.to(device) # Pass the image through the model with torch.no_grad(): image = image.to(device) outputs = model(image) probabilities = torch.nn.functional.softmax(outputs[0], dim=0) # Return the class names and their probabilities as a Pandas Series return pd.Series(probabilities.cpu().numpy(), index=class_names).sort_values(ascending=False) def classify(img_path): # Path to the single image image_path = img_path # Initialize your custom model model = CustomNet(num_ftrs, num_classes) # Load the trained model weights model.load_state_dict(torch.load('./fine_tuned:plant_classifier.pth')) # Predict the class probabilities class_probabilities = predict_single_image(image_path, model) return class_probabilities #----------------------------INFERENCE PART---------------------------- ## script to automatically include larger drone images import os import shutil from PIL import Image # Define the paths source_dir = 'path/to/source_images' # The directory with new images target_base_dir = 'path/to/training_images' # The base directory containing original class folders new_base_dir = 'path/to/training_images_2' # The base directory for the new substructure # Extract the class folders class_folders = [d for d in os.listdir(target_base_dir) if os.path.isdir(os.path.join(target_base_dir, d))] # Function to extract ID from a filename def extract_id(filename): return filename.split('_')[0] # Assumes ID is the first part of the filename separated by '_' # Function to crop the middle section of an image def crop_middle_section(image): width, height = image.size new_width = width // 3 new_height = height // 3 left = (width - new_width) // 2 top = (height - new_height) // 2 right = left + new_width bottom = top + new_height return image.crop((left, top, right, bottom)) # Create the new base directory if it does not exist os.makedirs(new_base_dir, exist_ok=True) # Create a dictionary to map IDs to their respective class folders id_to_class_folder = {} for class_folder in class_folders: class_folder_path = os.path.join(target_base_dir, class_folder) for filename in os.listdir(class_folder_path): if os.path.isfile(os.path.join(class_folder_path, filename)): file_id = extract_id(filename) id_to_class_folder[file_id] = class_folder # Copy and manipulate the matching images for filename in os.listdir(source_dir): if os.path.isfile(os.path.join(source_dir, filename)): file_id = extract_id(filename) if file_id in id_to_class_folder: target_class_folder = id_to_class_folder[file_id] new_class_folder_path = os.path.join(new_base_dir, target_class_folder) os.makedirs(new_class_folder_path, exist_ok=True) # Create the class folder if it doesn't exist target_path = os.path.join(new_class_folder_path, filename) # Open and manipulate the image image_path = os.path.join(source_dir, filename) with Image.open(image_path) as img: cropped_img = crop_middle_section(img) cropped_img.save(target_path) print(f'Copied and cropped {filename} to {new_class_folder_path}') print('Image processing and copying completed.')