Eye-Movement-Recognition / prediction_sequences.py
shayan5422's picture
Upload 12 files
1d4559c verified
# prediction_sequences.py
import tensorflow as tf
import cv2
import numpy as np
import dlib
from imutils import face_utils
import os
import pickle
from collections import deque
import threading
import queue
import time
def load_model(model_path='final_model_sequences.keras'):
"""
Loads the trained model.
Args:
model_path (str): Path to the saved model.
Returns:
tensorflow.keras.Model: Loaded model.
"""
model = tf.keras.models.load_model(model_path)
return model
def get_facial_landmarks(detector, predictor, image):
"""
Detects facial landmarks in an image.
Args:
detector: dlib face detector.
predictor: dlib shape predictor.
image (numpy.ndarray): Input image.
Returns:
dict: Coordinates of eyes and eyebrows.
"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
rects = detector(gray, 1)
if len(rects) == 0:
return None # No face detected
# Assuming the first detected face is the target
rect = rects[0]
shape = predictor(gray, rect)
shape = face_utils.shape_to_np(shape)
landmarks = {}
# Define landmarks for left and right eyes and eyebrows
landmarks['left_eye'] = shape[36:42] # Left eye landmarks
landmarks['right_eye'] = shape[42:48] # Right eye landmarks
landmarks['left_eyebrow'] = shape[17:22] # Left eyebrow landmarks
landmarks['right_eyebrow'] = shape[22:27] # Right eyebrow landmarks
return landmarks
def extract_roi(image, landmarks, region='left_eye', padding=5):
"""
Extracts a region of interest (ROI) from the image based on landmarks.
Args:
image (numpy.ndarray): Input image.
landmarks (dict): Facial landmarks.
region (str): Region to extract ('left_eye', 'right_eye', 'left_eyebrow', 'right_eyebrow').
padding (int): Padding around the ROI.
Returns:
numpy.ndarray: Extracted ROI.
"""
points = landmarks.get(region)
if points is None:
return None
# Compute the bounding box
x, y, w, h = cv2.boundingRect(points)
x = max(x - padding, 0)
y = max(y - padding, 0)
w = w + 2 * padding
h = h + 2 * padding
roi = image[y:y+h, x:x+w]
return roi
def preprocess_frame(image, detector, predictor, img_size=(64, 64)):
"""
Preprocesses a single frame: detects landmarks, extracts ROIs, and prepares the input.
Args:
image (numpy.ndarray): Input frame.
detector: dlib face detector.
predictor: dlib shape predictor.
img_size (tuple): Desired image size for ROIs.
Returns:
numpy.ndarray: Preprocessed frame as a concatenated ROI image.
"""
landmarks = get_facial_landmarks(detector, predictor, image)
if landmarks is None:
return None # No face detected
# Extract ROIs for eyes and eyebrows
rois = {}
rois['left_eye'] = extract_roi(image, landmarks, 'left_eye')
rois['right_eye'] = extract_roi(image, landmarks, 'right_eye')
rois['left_eyebrow'] = extract_roi(image, landmarks, 'left_eyebrow')
rois['right_eyebrow'] = extract_roi(image, landmarks, 'right_eyebrow')
# Process ROIs
roi_images = []
for region in ['left_eye', 'right_eye', 'left_eyebrow', 'right_eyebrow']:
roi = rois.get(region)
if roi is not None:
roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) # Convert to grayscale
roi = cv2.resize(roi, img_size)
roi = roi.astype('float32') / 255.0 # Normalize to [0,1]
roi = np.expand_dims(roi, axis=-1) # Add channel dimension
roi_images.append(roi)
if len(roi_images) == 0:
return None # No ROIs extracted
# Concatenate ROIs horizontally to form a single image
combined_roi = np.hstack(roi_images)
return combined_roi
def movement_to_text(label_map):
"""
Creates a mapping from class indices to text.
Args:
label_map (dict): Mapping from class names to indices.
Returns:
dict: Mapping from indices to text descriptions.
"""
movement_to_text_map = {
'upward_eyebrow': 'Eyebrow Raised',
'downward_eyebrow': 'Eyebrow Lowered',
'left_eye': 'Left Eye Movement',
'right_eye': 'Right Eye Movement',
# Add more mappings as needed
}
# Create index to text mapping
index_to_text = {}
for cls, idx in label_map.items():
text = movement_to_text_map.get(cls, cls)
index_to_text[idx] = text
return index_to_text
def prediction_worker(model, input_queue, output_queue, max_seq_length):
"""
Worker thread for handling model predictions.
Args:
model (tensorflow.keras.Model): Trained model.
input_queue (queue.Queue): Queue to receive sequences for prediction.
output_queue (queue.Queue): Queue to send prediction results.
max_seq_length (int): Fixed sequence length for the model.
"""
while True:
sequence = input_queue.get()
if sequence is None:
break # Sentinel to stop the thread
# Pad or truncate the sequence to match the model's expected input
if sequence.shape[0] < max_seq_length:
pad_width = max_seq_length - sequence.shape[0]
padding = np.zeros((pad_width, *sequence.shape[1:]), dtype=sequence.dtype)
sequence_padded = np.concatenate((sequence, padding), axis=0)
else:
sequence_padded = sequence[:max_seq_length]
# Expand dimensions to match model input (1, frames, height, width, channels)
sequence_padded = np.expand_dims(sequence_padded, axis=0)
# Perform prediction
prediction = model.predict(sequence_padded)
class_idx = np.argmax(prediction)
confidence = np.max(prediction)
# Put the result in the output queue
output_queue.put((class_idx, confidence))
def main():
# Load the trained model
model = load_model('final_model_sequences.keras')
# Load label map
with open('dataset_sequences.pkl', 'rb') as f:
data = pickle.load(f)
label_map = data['label_map']
index_to_text = movement_to_text(label_map)
# Initialize dlib's face detector and landmark predictor
detector = dlib.get_frontal_face_detector()
predictor_path = 'shape_predictor_68_face_landmarks.dat'
if not os.path.exists(predictor_path):
print(f"Error: {predictor_path} not found. Download it from http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2")
return
predictor = dlib.shape_predictor(predictor_path)
# Initialize queues for communication between threads
input_queue = queue.Queue()
output_queue = queue.Queue()
# Define sequence length (number of frames)
max_seq_length = 20 # Adjust based on your training data
# Start the prediction worker thread
pred_thread = threading.Thread(target=prediction_worker, args=(model, input_queue, output_queue, max_seq_length))
pred_thread.daemon = True
pred_thread.start()
# Start video capture
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("Error: Could not open webcam.")
return
print("Starting real-time prediction. Press 'q' to quit.")
# Initialize a deque to store the sequence of preprocessed frames
frame_buffer = deque(maxlen=max_seq_length)
# Variable to store the latest prediction result
latest_prediction = "Initializing..."
while True:
ret, frame = cap.read()
if not ret:
print("Failed to grab frame.")
break
# Preprocess the current frame
preprocessed_frame = preprocess_frame(frame, detector, predictor, img_size=(64, 64))
if preprocessed_frame is not None:
frame_buffer.append(preprocessed_frame)
else:
# If no face detected, append a zero array to maintain sequence length
frame_buffer.append(np.zeros((64, 256, 1), dtype='float32'))
# If the buffer is full, send the sequence to the prediction thread
if len(frame_buffer) == max_seq_length:
# Convert deque to numpy array
sequence_array = np.array(frame_buffer)
input_queue.put(sequence_array)
# Check if there's a new prediction result
try:
while True:
class_idx, confidence = output_queue.get_nowait()
movement = index_to_text.get(class_idx, "Unknown")
latest_prediction = f"{movement} ({confidence*100:.2f}%)"
except queue.Empty:
pass # No new prediction
# Display the prediction on the frame
cv2.putText(frame, latest_prediction, (30, 30), cv2.FONT_HERSHEY_SIMPLEX,
0.8, (0, 255, 0), 2, cv2.LINE_AA)
# Display the frame
cv2.imshow('Real-time Movement Prediction', frame)
# Exit condition
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Cleanup
cap.release()
cv2.destroyAllWindows()
# Stop the prediction thread
input_queue.put(None) # Sentinel to stop the thread
pred_thread.join()
if __name__ == "__main__":
main()