import streamlit as st | |
import torch | |
from PIL import Image | |
import face_recognition | |
import faiss | |
from sentence_transformers import SentenceTransformer | |
from transformers import pipeline | |
import cv2 | |
import numpy as np | |
import subprocess | |
import tempfile | |
import os | |
import yt_dlp | |
from moviepy.editor import VideoFileClip | |
# Helper functions | |
def get_video_id(url): | |
return url.split("v=")[1].split("&")[0] | |
def download_youtube_video(url, output_path): | |
ydl_opts = { | |
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', | |
'outtmpl': os.path.join(output_path, '%(id)s.%(ext)s'), | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=True) | |
filename = ydl.prepare_filename(info) | |
return filename | |
def process_video(video_url, output_dir, video_id): | |
# Placeholder for video processing logic | |
# This should include face detection, object detection, transcription, etc. | |
# For now, we'll just download the video | |
video_path = download_youtube_video(video_url, output_dir) | |
# Extract frames (simplified version) | |
video = cv2.VideoCapture(video_path) | |
fps = video.get(cv2.CAP_PROP_FPS) | |
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
duration = frame_count / fps | |
frames = [] | |
frame_times = [] | |
for i in range(0, frame_count, int(fps)): # Extract one frame per second | |
video.set(cv2.CAP_PROP_POS_FRAMES, i) | |
ret, frame = | |
if ret: | |
frames.append(frame) | |
frame_times.append(i / fps) | |
video.release() | |
return { | |
'video_path': video_path, | |
'frames': frames, | |
'frame_times': frame_times, | |
'duration': duration, | |
'fps': fps | |
} | |
def search(query, index_path, metadata_path, model): | |
# Placeholder for search functionality | |
# This should use FAISS for efficient similarity search | |
return [], [] | |
# Load models | |
def load_models(): | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
clip_model, preprocess = torch.hub.load('openai/CLIP', 'clip_vit_b32', device=device) | |
text_model = SentenceTransformer("all-MiniLM-L6-v2").to(device) | |
qa_model = pipeline("question-answering", model="distilbert-base-cased-distilled-squad", device=0 if torch.cuda.is_available() else -1) | |
return clip_model, preprocess, text_model, qa_model | |
clip_model, preprocess, text_model, qa_model = load_models() | |
# Streamlit UI | |
st.title("Enhanced YouTube Video Analysis") | |
video_url = st.text_input("Enter YouTube Video URL") | |
if st.button("Analyze"): | |
with st.spinner("Processing video..."): | |
video_id = get_video_id(video_url) | |
results = process_video(video_url, "output_dir", video_id) | |
if results: | |
st.success("Video processed successfully!") | |
# Text search and question answering | |
st.subheader("Text Search and Q&A") | |
query = st.text_input("Enter a search query or question") | |
if query: | |
# Placeholder for text search and QA | |
st.write("Text search and QA functionality to be implemented") | |
# Image upload and similarity search | |
st.subheader("Image Search") | |
uploaded_image = st.file_uploader("Upload an image to find similar frames", type=["jpg", "jpeg", "png"]) | |
if uploaded_image: | |
# Placeholder for image search | |
st.write("Image search functionality to be implemented") | |
# Face upload and recognition | |
st.subheader("Face Search") | |
uploaded_face = st.file_uploader("Upload a face image to find appearances", type=["jpg", "jpeg", "png"]) | |
if uploaded_face: | |
face_image = face_recognition.load_image_file(uploaded_face) | |
face_encoding = face_recognition.face_encodings(face_image)[0] | |
face_appearances = [] | |
face_frames = [] | |
for i, frame in enumerate(results['frames']): | |
face_locations = face_recognition.face_locations(frame) | |
face_encodings = face_recognition.face_encodings(frame, face_locations) | |
for encoding in face_encodings: | |
if face_recognition.compare_faces([face_encoding], encoding)[0]: | |
face_appearances.append(results['frame_times'][i]) | |
face_frames.append(frame) | |
st.write(f"Face appearances found at {len(face_appearances)} timestamps.") | |
if face_frames: | |
# Create a temporary directory to store frames | |
with tempfile.TemporaryDirectory() as temp_dir: | |
# Save frames as images | |
for i, frame in enumerate(face_frames): | |
cv2.imwrite(os.path.join(temp_dir, f"frame_{i:04d}.jpg"), frame) | |
# Use FFmpeg to create a video from the frames | |
output_video = "face_appearances.mp4" | |
ffmpeg_command = [ | |
"ffmpeg", | |
"-framerate", str(results['fps']), | |
"-i", os.path.join(temp_dir, "frame_%04d.jpg"), | |
"-c:v", "libx264", | |
"-pix_fmt", "yuv420p", | |
output_video | |
] | |, check=True) | |
# Display the generated video | | | |
# Provide download link for the video | |
with open(output_video, "rb") as file: | |
btn = st.download_button( | |
label="Download Face Appearances Video", | |
data=file, | |
file_name="face_appearances.mp4", | |
mime="video/mp4" | |
) | |
else: | |
st.write("No frames with the uploaded face were found in the video.") | |
# Display original video | |
st.subheader("Original Video") | |['video_path']) | |
else: | |
st.warning("Please enter a valid YouTube URL and click 'Analyze'") |