Spaces:
Runtime error
Runtime error
File size: 6,265 Bytes
f25ff37 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import streamlit as st
import torch
from PIL import Image
import face_recognition
import faiss
from sentence_transformers import SentenceTransformer
from transformers import pipeline
import cv2
import numpy as np
import subprocess
import tempfile
import os
import yt_dlp
from moviepy.editor import VideoFileClip
# Helper functions
def get_video_id(url):
return url.split("v=")[1].split("&")[0]
def download_youtube_video(url, output_path):
ydl_opts = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'outtmpl': os.path.join(output_path, '%(id)s.%(ext)s'),
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
return filename
def process_video(video_url, output_dir, video_id):
# Placeholder for video processing logic
# This should include face detection, object detection, transcription, etc.
# For now, we'll just download the video
video_path = download_youtube_video(video_url, output_dir)
# Extract frames (simplified version)
video = cv2.VideoCapture(video_path)
fps = video.get(cv2.CAP_PROP_FPS)
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps
frames = []
frame_times = []
for i in range(0, frame_count, int(fps)): # Extract one frame per second
video.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = video.read()
if ret:
frames.append(frame)
frame_times.append(i / fps)
video.release()
return {
'video_path': video_path,
'frames': frames,
'frame_times': frame_times,
'duration': duration,
'fps': fps
}
def search(query, index_path, metadata_path, model):
# Placeholder for search functionality
# This should use FAISS for efficient similarity search
return [], []
# Load models
@st.cache_resource
def load_models():
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
clip_model, preprocess = torch.hub.load('openai/CLIP', 'clip_vit_b32', device=device)
text_model = SentenceTransformer("all-MiniLM-L6-v2").to(device)
qa_model = pipeline("question-answering", model="distilbert-base-cased-distilled-squad", device=0 if torch.cuda.is_available() else -1)
return clip_model, preprocess, text_model, qa_model
clip_model, preprocess, text_model, qa_model = load_models()
# Streamlit UI
st.title("Enhanced YouTube Video Analysis")
video_url = st.text_input("Enter YouTube Video URL")
if st.button("Analyze"):
with st.spinner("Processing video..."):
video_id = get_video_id(video_url)
results = process_video(video_url, "output_dir", video_id)
if results:
st.success("Video processed successfully!")
# Text search and question answering
st.subheader("Text Search and Q&A")
query = st.text_input("Enter a search query or question")
if query:
# Placeholder for text search and QA
st.write("Text search and QA functionality to be implemented")
# Image upload and similarity search
st.subheader("Image Search")
uploaded_image = st.file_uploader("Upload an image to find similar frames", type=["jpg", "jpeg", "png"])
if uploaded_image:
# Placeholder for image search
st.write("Image search functionality to be implemented")
# Face upload and recognition
st.subheader("Face Search")
uploaded_face = st.file_uploader("Upload a face image to find appearances", type=["jpg", "jpeg", "png"])
if uploaded_face:
face_image = face_recognition.load_image_file(uploaded_face)
face_encoding = face_recognition.face_encodings(face_image)[0]
face_appearances = []
face_frames = []
for i, frame in enumerate(results['frames']):
face_locations = face_recognition.face_locations(frame)
face_encodings = face_recognition.face_encodings(frame, face_locations)
for encoding in face_encodings:
if face_recognition.compare_faces([face_encoding], encoding)[0]:
face_appearances.append(results['frame_times'][i])
face_frames.append(frame)
st.write(f"Face appearances found at {len(face_appearances)} timestamps.")
if face_frames:
# Create a temporary directory to store frames
with tempfile.TemporaryDirectory() as temp_dir:
# Save frames as images
for i, frame in enumerate(face_frames):
cv2.imwrite(os.path.join(temp_dir, f"frame_{i:04d}.jpg"), frame)
# Use FFmpeg to create a video from the frames
output_video = "face_appearances.mp4"
ffmpeg_command = [
"ffmpeg",
"-framerate", str(results['fps']),
"-i", os.path.join(temp_dir, "frame_%04d.jpg"),
"-c:v", "libx264",
"-pix_fmt", "yuv420p",
output_video
]
subprocess.run(ffmpeg_command, check=True)
# Display the generated video
st.video(output_video)
# Provide download link for the video
with open(output_video, "rb") as file:
btn = st.download_button(
label="Download Face Appearances Video",
data=file,
file_name="face_appearances.mp4",
mime="video/mp4"
)
else:
st.write("No frames with the uploaded face were found in the video.")
# Display original video
st.subheader("Original Video")
st.video(results['video_path'])
else:
st.warning("Please enter a valid YouTube URL and click 'Analyze'") |