Spaces:
Running
Running
# %% | |
# Import necessary libraries | |
from moviepy.editor import VideoFileClip | |
import os | |
from PIL import Image | |
import numpy as np | |
def extract_frames(video, frame_dir, n_samples, start=-1, end=-1): | |
os.makedirs(frame_dir, exist_ok=True) | |
if start == -1: | |
start = 0 | |
if end == -1: | |
end = video.duration | |
duration = end - start | |
interval = duration / n_samples | |
for i in range(n_samples): | |
frame_time = start + i * interval | |
frame = video.get_frame(frame_time) | |
frame_image = Image.fromarray(np.uint8(frame)) | |
frame_path = os.path.join(frame_dir, f"frame_{i+1}.png") | |
frame_image.save(frame_path) | |
def extract_video_parts(video, out_dir): | |
os.makedirs(out_dir, exist_ok=True) | |
# Extract audio | |
audio_path = f"{out_dir}/audio.mp3" | |
video.audio.write_audiofile(audio_path) | |
# Extract 20 frames from the video | |
extract_frames(video, f"{out_dir}/frames", 20) | |
# Extract 20 frames from first 5 seconds | |
extract_frames(video, f"{out_dir}/5s_frames", 20, start=0, end=5) | |
# %% | |
tags = [] | |
with open("labels.txt", "r") as f: | |
for line in f: | |
tags.append(line.strip()) | |
# %% | |
from transformers import AutoTokenizer, AutoModel | |
import torch | |
import torch.nn.functional as F | |
# Load the tokenizer and model | |
tokenizer = AutoTokenizer.from_pretrained('nomic-ai/nomic-embed-text-v1.5') | |
text_model = AutoModel.from_pretrained('nomic-ai/nomic-embed-text-v1.5', trust_remote_code=True) | |
text_model.eval() | |
# Function to get embeddings for tags | |
def get_tag_embeddings(tags): | |
encoded_input = tokenizer(tags, padding=True, truncation=True, return_tensors='pt') | |
with torch.no_grad(): | |
model_output = text_model(**encoded_input) | |
text_embeddings = F.normalize(model_output.last_hidden_state[:, 0], p=2, dim=1) | |
return text_embeddings | |
tag_embeddings = get_tag_embeddings(tags) | |
# %% | |
from transformers import AutoImageProcessor, AutoModel | |
from PIL import Image | |
import os | |
from collections import Counter | |
processor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5") | |
vision_model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", trust_remote_code=True) | |
def get_frames(frame_dir): | |
# Order frames by number but they will have numerical suffixes | |
found_frames = [frame for frame in os.listdir(frame_dir) if frame.startswith("frame_")] | |
frame_numbers = [int(frame.split("_")[-1].split(".")[0]) for frame in found_frames] | |
frames = [Image.open(os.path.join(frame_dir, f"frame_{frame_no}.png")) for frame_no in sorted(frame_numbers)] | |
return frames | |
def frames_to_embeddings(frames): | |
inputs = processor(frames, return_tensors="pt") | |
img_emb = vision_model(**inputs).last_hidden_state | |
img_embeddings = F.normalize(img_emb[:, 0], p=2, dim=1) | |
return img_embeddings | |
def compute_similarities(img_embeddings, tag_embeddings): | |
similarities = torch.matmul(img_embeddings, tag_embeddings.T) | |
return similarities | |
def get_top_tags(similarities, tags): | |
top_5_tags = similarities.topk(5).indices.tolist() | |
return [tags[tag_idx] for tag_idx in top_5_tags] | |
def analyze_frames(frame_dir, tag_embeddings): | |
frames = get_frames(frame_dir) | |
img_embeddings = frames_to_embeddings(frames) | |
cosine_similarities = compute_similarities(img_embeddings, tag_embeddings) | |
results = { | |
"images": [], | |
"summary": {} | |
} | |
summary = Counter() | |
for i, img in enumerate(frames): | |
top_5_tags = get_top_tags(cosine_similarities[i], tags) | |
results["images"].append({"image": img.filename, "tags": top_5_tags}) | |
summary.update(top_5_tags) | |
results["summary"]["tags"] = summary | |
return results | |
# %% | |
import openai | |
def transcribe(audio_path): | |
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
transcript = client.audio.transcriptions.create(model="whisper-1", file=open(audio_path, "rb")) | |
return transcript.text | |
# %% | |
# Load model directly | |
from transformers import AutoFeatureExtractor, AutoModelForAudioClassification | |
audio_extractor = AutoFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593") | |
audio_feature_model = AutoModelForAudioClassification.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593") | |
# %% | |
from pydub import AudioSegment | |
def extract_audio_features(audio_path): | |
with open(audio_path, "rb") as file: | |
audio = file.read() | |
# Convert to wav | |
audio = AudioSegment.from_file(audio_path, format="mp3") | |
audio = audio.get_array_of_samples() | |
inputs = audio_extractor(audio, return_tensors="pt") | |
with torch.no_grad(): | |
outputs = audio_feature_model(**inputs).logits | |
predicted_class_ids = outputs.topk(3).indices.tolist()[0] | |
predicted_labels = [audio_feature_model.config.id2label[class_id] for class_id in predicted_class_ids] | |
return predicted_labels | |
# %% | |
import base64 | |
from io import BytesIO | |
def base64_encode_image(image): | |
buffered = BytesIO() | |
new_width = image.width // 2 | |
new_height = image.height // 2 | |
resized_image = image.resize((new_width, new_height), Image.LANCZOS) | |
resized_image.save(buffered, format="JPEG") | |
img_str = base64.b64encode(buffered.getvalue()) | |
return 'data:image/jpeg;base64,' + img_str.decode('utf-8') | |
def ai_summary(transcript, frames, audio_description, extra_context=""): | |
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
messages=[ | |
{"role": "system", "content": "You are social media content analysis bot trying to uncover trends about what makes a video distinct. Given the transcript, frames, and a description of the audio, give a short analysis of the video content and what makes it unique."}, | |
{"role": "user", | |
"content": [{ | |
"type": "text", | |
"text": f"Transcript: {transcript}\n\n\n\nAudio: {audio_description}\n\nExtra Context?: {extra_context or 'n/a'}", | |
}, | |
*[ | |
{ | |
"type": "image_url", | |
"image_url": {"url": base64_encode_image(frame)}, | |
} for frame in frames | |
] | |
]} | |
] | |
return client.chat.completions.create( | |
model="gpt-4o", | |
messages=messages | |
) | |
# %% | |
import app as gr | |
# %% | |
import uuid, shutil | |
import tempfile | |
def tiktok_analyze(video_path): | |
results = { | |
"overview": "", | |
"ai_overview": "", | |
"first_5s_analysis": "", | |
"video_analysis": "", | |
"transcript": "", | |
} | |
video_id = str(uuid.uuid4()) | |
# copy video path to videos/video_id | |
path_root = f"{tempfile.gettempdir()}/videos/{video_id}" | |
os.makedirs(path_root, exist_ok=True) | |
shutil.copy(video_path, f"{path_root}.mp4") | |
video = VideoFileClip(f"{path_root}.mp4") | |
extract_video_parts(video, f"{path_root}_parts") | |
frames = get_frames(f"{path_root}_parts/frames") | |
first_5s_analysis = analyze_frames(f"{path_root}_parts/5s_frames", tag_embeddings) | |
whole_analysis = analyze_frames(f"{path_root}_parts/frames", tag_embeddings) | |
audio_features = extract_audio_features(f"{path_root}_parts/audio.mp3") | |
results["transcript"] = transcribe(f"{path_root}_parts/audio.mp3") | |
ai_summary_response = ai_summary(results["transcript"], frames, audio_features).choices[0].message.content | |
results["overview"] = f""" | |
## Overview | |
**duration:** {video.duration} | |
**major themes:** {", ".join(list(whole_analysis["summary"]["tags"])[:5])} | |
**audio:** {", ".join(audio_features)} | |
""" | |
results["ai_overview"] = "# AI Summary\n" + ai_summary_response | |
results["first_5s_analysis"] = f"Major themes: {', '.join(first_5s_analysis['summary']['tags'])}" | |
results["video_analysis"] = f"Major themes: {', '.join(whole_analysis['summary']['tags'])}" | |
return [ | |
results["overview"], | |
results["first_5s_analysis"], | |
results["video_analysis"], | |
results["ai_overview"], | |
results["transcript"], | |
] | |
demo = gr.Interface( | |
title="Tiktok Content Analyzer", | |
description="Start by uploading a video to analyze.", | |
fn=tiktok_analyze, | |
inputs="video", | |
outputs=[ | |
gr.Markdown(label="Overview"), | |
gr.Text(label="First 5s Content Analysis"), | |
gr.Text(label="Content Analysis"), | |
gr.Markdown(label="AI Summary"), | |
gr.Text(label="Transcript")] | |
) | |
demo.launch() | |
# %% | |