charlesyu108's picture
Create app.py
a3b1cb3 verified
raw
history blame
8.5 kB
# %%
# Import necessary libraries
from moviepy.editor import VideoFileClip
import os
from PIL import Image
import numpy as np
def extract_frames(video, frame_dir, n_samples, start=-1, end=-1):
os.makedirs(frame_dir, exist_ok=True)
if start == -1:
start = 0
if end == -1:
end = video.duration
duration = end - start
interval = duration / n_samples
for i in range(n_samples):
frame_time = start + i * interval
frame = video.get_frame(frame_time)
frame_image = Image.fromarray(np.uint8(frame))
frame_path = os.path.join(frame_dir, f"frame_{i+1}.png")
frame_image.save(frame_path)
def extract_video_parts(video, out_dir):
os.makedirs(out_dir, exist_ok=True)
# Extract audio
audio_path = f"{out_dir}/audio.mp3"
video.audio.write_audiofile(audio_path)
# Extract 20 frames from the video
extract_frames(video, f"{out_dir}/frames", 20)
# Extract 20 frames from first 5 seconds
extract_frames(video, f"{out_dir}/5s_frames", 20, start=0, end=5)
# %%
tags = []
with open("labels.txt", "r") as f:
for line in f:
tags.append(line.strip())
# %%
from transformers import AutoTokenizer, AutoModel
import torch
import torch.nn.functional as F
# Load the tokenizer and model
tokenizer = AutoTokenizer.from_pretrained('nomic-ai/nomic-embed-text-v1.5')
text_model = AutoModel.from_pretrained('nomic-ai/nomic-embed-text-v1.5', trust_remote_code=True)
text_model.eval()
# Function to get embeddings for tags
def get_tag_embeddings(tags):
encoded_input = tokenizer(tags, padding=True, truncation=True, return_tensors='pt')
with torch.no_grad():
model_output = text_model(**encoded_input)
text_embeddings = F.normalize(model_output.last_hidden_state[:, 0], p=2, dim=1)
return text_embeddings
tag_embeddings = get_tag_embeddings(tags)
# %%
from transformers import AutoImageProcessor, AutoModel
from PIL import Image
import os
from collections import Counter
processor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5")
vision_model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", trust_remote_code=True)
def get_frames(frame_dir):
# Order frames by number but they will have numerical suffixes
found_frames = [frame for frame in os.listdir(frame_dir) if frame.startswith("frame_")]
frame_numbers = [int(frame.split("_")[-1].split(".")[0]) for frame in found_frames]
frames = [Image.open(os.path.join(frame_dir, f"frame_{frame_no}.png")) for frame_no in sorted(frame_numbers)]
return frames
def frames_to_embeddings(frames):
inputs = processor(frames, return_tensors="pt")
img_emb = vision_model(**inputs).last_hidden_state
img_embeddings = F.normalize(img_emb[:, 0], p=2, dim=1)
return img_embeddings
def compute_similarities(img_embeddings, tag_embeddings):
similarities = torch.matmul(img_embeddings, tag_embeddings.T)
return similarities
def get_top_tags(similarities, tags):
top_5_tags = similarities.topk(5).indices.tolist()
return [tags[tag_idx] for tag_idx in top_5_tags]
def analyze_frames(frame_dir, tag_embeddings):
frames = get_frames(frame_dir)
img_embeddings = frames_to_embeddings(frames)
cosine_similarities = compute_similarities(img_embeddings, tag_embeddings)
results = {
"images": [],
"summary": {}
}
summary = Counter()
for i, img in enumerate(frames):
top_5_tags = get_top_tags(cosine_similarities[i], tags)
results["images"].append({"image": img.filename, "tags": top_5_tags})
summary.update(top_5_tags)
results["summary"]["tags"] = summary
return results
# %%
import openai
def transcribe(audio_path):
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
transcript = client.audio.transcriptions.create(model="whisper-1", file=open(audio_path, "rb"))
return transcript.text
# %%
# Load model directly
from transformers import AutoFeatureExtractor, AutoModelForAudioClassification
audio_extractor = AutoFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")
audio_feature_model = AutoModelForAudioClassification.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")
# %%
from pydub import AudioSegment
def extract_audio_features(audio_path):
with open(audio_path, "rb") as file:
audio = file.read()
# Convert to wav
audio = AudioSegment.from_file(audio_path, format="mp3")
audio = audio.get_array_of_samples()
inputs = audio_extractor(audio, return_tensors="pt")
with torch.no_grad():
outputs = audio_feature_model(**inputs).logits
predicted_class_ids = outputs.topk(3).indices.tolist()[0]
predicted_labels = [audio_feature_model.config.id2label[class_id] for class_id in predicted_class_ids]
return predicted_labels
# %%
import base64
from io import BytesIO
def base64_encode_image(image):
buffered = BytesIO()
new_width = image.width // 2
new_height = image.height // 2
resized_image = image.resize((new_width, new_height), Image.LANCZOS)
resized_image.save(buffered, format="JPEG")
img_str = base64.b64encode(buffered.getvalue())
return 'data:image/jpeg;base64,' + img_str.decode('utf-8')
def ai_summary(transcript, frames, audio_description, extra_context=""):
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
messages=[
{"role": "system", "content": "You are social media content analysis bot trying to uncover trends about what makes a video distinct. Given the transcript, frames, and a description of the audio, give a short analysis of the video content and what makes it unique."},
{"role": "user",
"content": [{
"type": "text",
"text": f"Transcript: {transcript}\n\n\n\nAudio: {audio_description}\n\nExtra Context?: {extra_context or 'n/a'}",
},
*[
{
"type": "image_url",
"image_url": {"url": base64_encode_image(frame)},
} for frame in frames
]
]}
]
return client.chat.completions.create(
model="gpt-4o",
messages=messages
)
# %%
import app as gr
# %%
import uuid, shutil
import tempfile
def tiktok_analyze(video_path):
results = {
"overview": "",
"ai_overview": "",
"first_5s_analysis": "",
"video_analysis": "",
"transcript": "",
}
video_id = str(uuid.uuid4())
# copy video path to videos/video_id
path_root = f"{tempfile.gettempdir()}/videos/{video_id}"
os.makedirs(path_root, exist_ok=True)
shutil.copy(video_path, f"{path_root}.mp4")
video = VideoFileClip(f"{path_root}.mp4")
extract_video_parts(video, f"{path_root}_parts")
frames = get_frames(f"{path_root}_parts/frames")
first_5s_analysis = analyze_frames(f"{path_root}_parts/5s_frames", tag_embeddings)
whole_analysis = analyze_frames(f"{path_root}_parts/frames", tag_embeddings)
audio_features = extract_audio_features(f"{path_root}_parts/audio.mp3")
results["transcript"] = transcribe(f"{path_root}_parts/audio.mp3")
ai_summary_response = ai_summary(results["transcript"], frames, audio_features).choices[0].message.content
results["overview"] = f"""
## Overview
**duration:** {video.duration}
**major themes:** {", ".join(list(whole_analysis["summary"]["tags"])[:5])}
**audio:** {", ".join(audio_features)}
"""
results["ai_overview"] = "# AI Summary\n" + ai_summary_response
results["first_5s_analysis"] = f"Major themes: {', '.join(first_5s_analysis['summary']['tags'])}"
results["video_analysis"] = f"Major themes: {', '.join(whole_analysis['summary']['tags'])}"
return [
results["overview"],
results["first_5s_analysis"],
results["video_analysis"],
results["ai_overview"],
results["transcript"],
]
demo = gr.Interface(
title="Tiktok Content Analyzer",
description="Start by uploading a video to analyze.",
fn=tiktok_analyze,
inputs="video",
outputs=[
gr.Markdown(label="Overview"),
gr.Text(label="First 5s Content Analysis"),
gr.Text(label="Content Analysis"),
gr.Markdown(label="AI Summary"),
gr.Text(label="Transcript")]
)
demo.launch()
# %%