|
|
|
|
|
import tensorflow as tf |
|
import cv2 |
|
import numpy as np |
|
import dlib |
|
from imutils import face_utils |
|
import os |
|
import pickle |
|
from collections import deque |
|
import threading |
|
import queue |
|
import time |
|
|
|
def load_model(model_path='final_model_sequences.keras'): |
|
""" |
|
Loads the trained model. |
|
|
|
Args: |
|
model_path (str): Path to the saved model. |
|
|
|
Returns: |
|
tensorflow.keras.Model: Loaded model. |
|
""" |
|
model = tf.keras.models.load_model(model_path) |
|
return model |
|
|
|
def get_facial_landmarks(detector, predictor, image): |
|
""" |
|
Detects facial landmarks in an image. |
|
|
|
Args: |
|
detector: dlib face detector. |
|
predictor: dlib shape predictor. |
|
image (numpy.ndarray): Input image. |
|
|
|
Returns: |
|
dict: Coordinates of eyes and eyebrows. |
|
""" |
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
rects = detector(gray, 1) |
|
|
|
if len(rects) == 0: |
|
return None |
|
|
|
|
|
rect = rects[0] |
|
shape = predictor(gray, rect) |
|
shape = face_utils.shape_to_np(shape) |
|
|
|
landmarks = {} |
|
|
|
landmarks['left_eye'] = shape[36:42] |
|
landmarks['right_eye'] = shape[42:48] |
|
landmarks['left_eyebrow'] = shape[17:22] |
|
landmarks['right_eyebrow'] = shape[22:27] |
|
|
|
return landmarks |
|
|
|
def extract_roi(image, landmarks, region='left_eye', padding=5): |
|
""" |
|
Extracts a region of interest (ROI) from the image based on landmarks. |
|
|
|
Args: |
|
image (numpy.ndarray): Input image. |
|
landmarks (dict): Facial landmarks. |
|
region (str): Region to extract ('left_eye', 'right_eye', 'left_eyebrow', 'right_eyebrow'). |
|
padding (int): Padding around the ROI. |
|
|
|
Returns: |
|
numpy.ndarray: Extracted ROI. |
|
""" |
|
points = landmarks.get(region) |
|
if points is None: |
|
return None |
|
|
|
|
|
x, y, w, h = cv2.boundingRect(points) |
|
x = max(x - padding, 0) |
|
y = max(y - padding, 0) |
|
w = w + 2 * padding |
|
h = h + 2 * padding |
|
|
|
roi = image[y:y+h, x:x+w] |
|
return roi |
|
|
|
def preprocess_frame(image, detector, predictor, img_size=(64, 64)): |
|
""" |
|
Preprocesses a single frame: detects landmarks, extracts ROIs, and prepares the input. |
|
|
|
Args: |
|
image (numpy.ndarray): Input frame. |
|
detector: dlib face detector. |
|
predictor: dlib shape predictor. |
|
img_size (tuple): Desired image size for ROIs. |
|
|
|
Returns: |
|
numpy.ndarray: Preprocessed frame as a concatenated ROI image. |
|
""" |
|
landmarks = get_facial_landmarks(detector, predictor, image) |
|
if landmarks is None: |
|
return None |
|
|
|
|
|
rois = {} |
|
rois['left_eye'] = extract_roi(image, landmarks, 'left_eye') |
|
rois['right_eye'] = extract_roi(image, landmarks, 'right_eye') |
|
rois['left_eyebrow'] = extract_roi(image, landmarks, 'left_eyebrow') |
|
rois['right_eyebrow'] = extract_roi(image, landmarks, 'right_eyebrow') |
|
|
|
|
|
roi_images = [] |
|
for region in ['left_eye', 'right_eye', 'left_eyebrow', 'right_eyebrow']: |
|
roi = rois.get(region) |
|
if roi is not None: |
|
roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) |
|
roi = cv2.resize(roi, img_size) |
|
roi = roi.astype('float32') / 255.0 |
|
roi = np.expand_dims(roi, axis=-1) |
|
roi_images.append(roi) |
|
|
|
if len(roi_images) == 0: |
|
return None |
|
|
|
|
|
combined_roi = np.hstack(roi_images) |
|
return combined_roi |
|
|
|
def movement_to_text(label_map): |
|
""" |
|
Creates a mapping from class indices to text. |
|
|
|
Args: |
|
label_map (dict): Mapping from class names to indices. |
|
|
|
Returns: |
|
dict: Mapping from indices to text descriptions. |
|
""" |
|
movement_to_text_map = { |
|
'upward_eyebrow': 'Eyebrow Raised', |
|
'downward_eyebrow': 'Eyebrow Lowered', |
|
'left_eye': 'Left Eye Movement', |
|
'right_eye': 'Right Eye Movement', |
|
|
|
} |
|
|
|
|
|
index_to_text = {} |
|
for cls, idx in label_map.items(): |
|
text = movement_to_text_map.get(cls, cls) |
|
index_to_text[idx] = text |
|
return index_to_text |
|
|
|
def prediction_worker(model, input_queue, output_queue, max_seq_length): |
|
""" |
|
Worker thread for handling model predictions. |
|
|
|
Args: |
|
model (tensorflow.keras.Model): Trained model. |
|
input_queue (queue.Queue): Queue to receive sequences for prediction. |
|
output_queue (queue.Queue): Queue to send prediction results. |
|
max_seq_length (int): Fixed sequence length for the model. |
|
""" |
|
while True: |
|
sequence = input_queue.get() |
|
if sequence is None: |
|
break |
|
|
|
|
|
if sequence.shape[0] < max_seq_length: |
|
pad_width = max_seq_length - sequence.shape[0] |
|
padding = np.zeros((pad_width, *sequence.shape[1:]), dtype=sequence.dtype) |
|
sequence_padded = np.concatenate((sequence, padding), axis=0) |
|
else: |
|
sequence_padded = sequence[:max_seq_length] |
|
|
|
|
|
sequence_padded = np.expand_dims(sequence_padded, axis=0) |
|
|
|
|
|
prediction = model.predict(sequence_padded) |
|
class_idx = np.argmax(prediction) |
|
confidence = np.max(prediction) |
|
|
|
|
|
output_queue.put((class_idx, confidence)) |
|
|
|
def main(): |
|
|
|
model = load_model('final_model_sequences.keras') |
|
|
|
|
|
with open('dataset_sequences.pkl', 'rb') as f: |
|
data = pickle.load(f) |
|
label_map = data['label_map'] |
|
index_to_text = movement_to_text(label_map) |
|
|
|
|
|
detector = dlib.get_frontal_face_detector() |
|
predictor_path = 'shape_predictor_68_face_landmarks.dat' |
|
|
|
if not os.path.exists(predictor_path): |
|
print(f"Error: {predictor_path} not found. Download it from http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2") |
|
return |
|
|
|
predictor = dlib.shape_predictor(predictor_path) |
|
|
|
|
|
input_queue = queue.Queue() |
|
output_queue = queue.Queue() |
|
|
|
|
|
max_seq_length = 20 |
|
|
|
|
|
pred_thread = threading.Thread(target=prediction_worker, args=(model, input_queue, output_queue, max_seq_length)) |
|
pred_thread.daemon = True |
|
pred_thread.start() |
|
|
|
|
|
cap = cv2.VideoCapture(0) |
|
|
|
if not cap.isOpened(): |
|
print("Error: Could not open webcam.") |
|
return |
|
|
|
print("Starting real-time prediction. Press 'q' to quit.") |
|
|
|
|
|
frame_buffer = deque(maxlen=max_seq_length) |
|
|
|
|
|
latest_prediction = "Initializing..." |
|
|
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: |
|
print("Failed to grab frame.") |
|
break |
|
|
|
|
|
preprocessed_frame = preprocess_frame(frame, detector, predictor, img_size=(64, 64)) |
|
if preprocessed_frame is not None: |
|
frame_buffer.append(preprocessed_frame) |
|
else: |
|
|
|
frame_buffer.append(np.zeros((64, 256, 1), dtype='float32')) |
|
|
|
|
|
if len(frame_buffer) == max_seq_length: |
|
|
|
sequence_array = np.array(frame_buffer) |
|
input_queue.put(sequence_array) |
|
|
|
|
|
try: |
|
while True: |
|
class_idx, confidence = output_queue.get_nowait() |
|
movement = index_to_text.get(class_idx, "Unknown") |
|
latest_prediction = f"{movement} ({confidence*100:.2f}%)" |
|
except queue.Empty: |
|
pass |
|
|
|
|
|
cv2.putText(frame, latest_prediction, (30, 30), cv2.FONT_HERSHEY_SIMPLEX, |
|
0.8, (0, 255, 0), 2, cv2.LINE_AA) |
|
|
|
|
|
cv2.imshow('Real-time Movement Prediction', frame) |
|
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'): |
|
break |
|
|
|
|
|
cap.release() |
|
cv2.destroyAllWindows() |
|
|
|
|
|
input_queue.put(None) |
|
pred_thread.join() |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|