video-search / app.py
Abhilashvj's picture
Update app.py
6a8900c verified
raw
history blame
7.55 kB
import streamlit as st
import json
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
import base64
from PIL import Image
import io
import cv2
from insightface.app import FaceAnalysis
from moviepy.editor import VideoFileClip
# Load models
@st.cache_resource
def load_models():
unified_model = SentenceTransformer("clip-ViT-B-32")
face_app = FaceAnalysis(providers=['CPUExecutionProvider'])
face_app.prepare(ctx_id=0, det_size=(640, 640))
return unified_model, face_app
unified_model, face_app = load_models()
# Load data
@st.cache_data
def load_data(video_id):
with open(f"{video_id}_summary.json", "r") as f:
summary = json.load(f)
with open(f"{video_id}_transcription.json", "r") as f:
transcription = json.load(f)
with open(f"{video_id}_unified_metadata.json", "r") as f:
unified_metadata = json.load(f)
with open(f"{video_id}_face_metadata.json", "r") as f:
face_metadata = json.load(f)
return summary, transcription, unified_metadata, face_metadata
video_id = "IMFUOexuEXw"
video_path = "avengers_interview.mp4"
summary, transcription, unified_metadata, face_metadata = load_data(video_id)
# Load FAISS indexes
@st.cache_resource
def load_indexes(video_id):
unified_index = faiss.read_index(f"{video_id}_unified_index.faiss")
face_index = faiss.read_index(f"{video_id}_face_index.faiss")
return unified_index, face_index
unified_index, face_index = load_indexes(video_id)
# Search functions
def unified_search(query, index, metadata, model, n_results=5):
if isinstance(query, str):
query_vector = model.encode([query], convert_to_tensor=True).cpu().numpy()
else: # Assume it's an image
query_vector = model.encode(query, convert_to_tensor=True).cpu().numpy()
D, I = index.search(query_vector, n_results)
results = [{'data': metadata[i], 'distance': d} for i, d in zip(I[0], D[0])]
return results
def face_search(face_embedding, index, metadata, n_results=5):
D, I = index.search(np.array(face_embedding).reshape(1, -1), n_results)
results = [metadata[i] for i in I[0]]
return results, D[0]
def detect_and_embed_face(image, face_app):
img_array = np.array(image)
faces = face_app.get(img_array)
if len(faces) == 0:
return None
largest_face = max(faces, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]))
return largest_face.embedding
def create_video_clip(video_path, start_time, end_time, output_path):
with VideoFileClip(video_path) as video:
new_clip = video.subclip(start_time, end_time)
new_clip.write_videofile(output_path, codec="libx264", audio_codec="aac")
return output_path
# Streamlit UI
st.title("Video Analysis Dashboard")
# Sidebar with full video and scrollable transcript
st.sidebar.header("Full Video")
st.sidebar.video(video_path)
st.sidebar.header("Video Transcript")
transcript_text = transcription['transcription']
st.sidebar.text_area("Full Transcript", transcript_text, height=300)
# Main content
st.header("Video Summary")
col1, col2 = st.columns(2)
with col1:
st.subheader("Prominent Faces")
for face in summary['prominent_faces']:
st.write(f"Face ID: {face['id']}, Appearances: {face['appearances']}")
if 'thumbnail' in face:
image = Image.open(io.BytesIO(base64.b64decode(face['thumbnail'])))
st.image(image, caption=f"Face ID: {face['id']}", width=100)
with col2:
st.subheader("Themes")
for theme in summary['themes']:
st.write(f"Theme ID: {theme['id']}, Keywords: {', '.join(theme['keywords'])}")
# Search functionality
st.header("Search")
search_type = st.selectbox("Select search type", ["Unified", "Face"])
if search_type == "Unified":
search_method = st.radio("Choose search method", ["Text", "Image"])
if search_method == "Text":
query = st.text_input("Enter your search query")
if st.button("Search"):
results = unified_search(query, unified_index, unified_metadata, unified_model)
st.subheader("Search Results")
for result in results:
st.write(f"Time: {result['data']['start']:.2f}s - {result['data']['end']:.2f}s, Distance: {result['distance']:.4f}")
if 'text' in result['data']:
st.write(f"Text: {result['data']['text']}")
clip_path = create_video_clip(video_path, result['data']['start'], result['data']['end'], f"temp_clip_{result['data']['start']}.mp4")
st.video(clip_path)
st.write("---")
else:
uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"])
if uploaded_file is not None:
image = Image.open(uploaded_file)
st.image(image, caption="Uploaded Image", use_column_width=True)
if st.button("Search"):
results = unified_search(image, unified_index, unified_metadata, unified_model)
st.subheader("Image Search Results")
for result in results:
st.write(f"Time: {result['data']['start']:.2f}s - {result['data']['end']:.2f}s, Distance: {result['distance']:.4f}")
clip_path = create_video_clip(video_path, result['data']['start'], result['data']['end'], f"temp_clip_{result['data']['start']}.mp4")
st.video(clip_path)
st.write("---")
elif search_type == "Face":
face_search_type = st.radio("Choose face search method", ["Select from video", "Upload image"])
if face_search_type == "Select from video":
face_id = st.selectbox("Select a face", [face['id'] for face in summary['prominent_faces']])
if st.button("Search"):
selected_face = next(face for face in summary['prominent_faces'] if face['id'] == face_id)
face_results, face_distances = face_search(selected_face['embedding'], face_index, face_metadata)
st.subheader("Face Search Results")
for result, distance in zip(face_results, face_distances):
st.write(f"Time: {result['start']:.2f}s - {result['end']:.2f}s, Distance: {distance:.4f}")
clip_path = create_video_clip(video_path, result['start'], result['end'], f"temp_face_clip_{result['start']}.mp4")
st.video(clip_path)
st.write("---")
else:
uploaded_file = st.file_uploader("Choose a face image...", type=["jpg", "jpeg", "png"])
if uploaded_file is not None:
image = Image.open(uploaded_file)
st.image(image, caption="Uploaded Image", use_column_width=True)
if st.button("Search"):
face_embedding = detect_and_embed_face(image, face_app)
if face_embedding is not None:
face_results, face_distances = face_search(face_embedding, face_index, face_metadata)
st.subheader("Face Search Results")
for result, distance in zip(face_results, face_distances):
st.write(f"Time: {result['start']:.2f}s - {result['end']:.2f}s, Distance: {distance:.4f}")
clip_path = create_video_clip(video_path, result['start'], result['end'], f"temp_face_clip_{result['start']}.mp4")
st.video(clip_path)
st.write("---")
else:
st.error("No face detected in the uploaded image. Please try another image.")