# %% # Import necessary libraries from moviepy.editor import VideoFileClip import os from PIL import Image import numpy as np def extract_frames(video, frame_dir, n_samples, start=-1, end=-1): os.makedirs(frame_dir, exist_ok=True) if start == -1: start = 0 if end == -1: end = video.duration duration = end - start interval = duration / n_samples for i in range(n_samples): frame_time = start + i * interval frame = video.get_frame(frame_time) frame_image = Image.fromarray(np.uint8(frame)) frame_path = os.path.join(frame_dir, f"frame_{i+1}.png") frame_image.save(frame_path) def extract_video_parts(video, out_dir): os.makedirs(out_dir, exist_ok=True) # Extract audio audio_path = f"{out_dir}/audio.mp3" video.audio.write_audiofile(audio_path) # Extract 20 frames from the video extract_frames(video, f"{out_dir}/frames", 20) # Extract 20 frames from first 5 seconds extract_frames(video, f"{out_dir}/5s_frames", 20, start=0, end=5) # %% tags = [] with open("labels.txt", "r") as f: for line in f: tags.append(line.strip()) # %% from transformers import AutoTokenizer, AutoModel import torch import torch.nn.functional as F # Load the tokenizer and model tokenizer = AutoTokenizer.from_pretrained('nomic-ai/nomic-embed-text-v1.5') text_model = AutoModel.from_pretrained('nomic-ai/nomic-embed-text-v1.5', trust_remote_code=True) text_model.eval() # Function to get embeddings for tags def get_tag_embeddings(tags): encoded_input = tokenizer(tags, padding=True, truncation=True, return_tensors='pt') with torch.no_grad(): model_output = text_model(**encoded_input) text_embeddings = F.normalize(model_output.last_hidden_state[:, 0], p=2, dim=1) return text_embeddings tag_embeddings = get_tag_embeddings(tags) # %% from transformers import AutoImageProcessor, AutoModel from PIL import Image import os from collections import Counter processor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5") vision_model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", trust_remote_code=True) def get_frames(frame_dir): # Order frames by number but they will have numerical suffixes found_frames = [frame for frame in os.listdir(frame_dir) if frame.startswith("frame_")] frame_numbers = [int(frame.split("_")[-1].split(".")[0]) for frame in found_frames] frames = [Image.open(os.path.join(frame_dir, f"frame_{frame_no}.png")) for frame_no in sorted(frame_numbers)] return frames def frames_to_embeddings(frames): inputs = processor(frames, return_tensors="pt") img_emb = vision_model(**inputs).last_hidden_state img_embeddings = F.normalize(img_emb[:, 0], p=2, dim=1) return img_embeddings def compute_similarities(img_embeddings, tag_embeddings): similarities = torch.matmul(img_embeddings, tag_embeddings.T) return similarities def get_top_tags(similarities, tags): top_5_tags = similarities.topk(5).indices.tolist() return [tags[tag_idx] for tag_idx in top_5_tags] def analyze_frames(frame_dir, tag_embeddings): frames = get_frames(frame_dir) img_embeddings = frames_to_embeddings(frames) cosine_similarities = compute_similarities(img_embeddings, tag_embeddings) results = { "images": [], "summary": {} } summary = Counter() for i, img in enumerate(frames): top_5_tags = get_top_tags(cosine_similarities[i], tags) results["images"].append({"image": img.filename, "tags": top_5_tags}) summary.update(top_5_tags) results["summary"]["tags"] = summary return results # %% import openai def transcribe(audio_path): client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) transcript = client.audio.transcriptions.create(model="whisper-1", file=open(audio_path, "rb")) return transcript.text # %% # Load model directly from transformers import AutoFeatureExtractor, AutoModelForAudioClassification audio_extractor = AutoFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593") audio_feature_model = AutoModelForAudioClassification.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593") # %% from pydub import AudioSegment def extract_audio_features(audio_path): with open(audio_path, "rb") as file: audio = file.read() # Convert to wav audio = AudioSegment.from_file(audio_path, format="mp3") audio = audio.get_array_of_samples() inputs = audio_extractor(audio, return_tensors="pt") with torch.no_grad(): outputs = audio_feature_model(**inputs).logits predicted_class_ids = outputs.topk(3).indices.tolist()[0] predicted_labels = [audio_feature_model.config.id2label[class_id] for class_id in predicted_class_ids] return predicted_labels # %% import base64 from io import BytesIO def base64_encode_image(image): buffered = BytesIO() new_width = image.width // 2 new_height = image.height // 2 resized_image = image.resize((new_width, new_height), Image.LANCZOS) resized_image.save(buffered, format="JPEG") img_str = base64.b64encode(buffered.getvalue()) return 'data:image/jpeg;base64,' + img_str.decode('utf-8') def ai_summary(transcript, frames, audio_description, extra_context=""): client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) messages=[ {"role": "system", "content": "You are social media content analysis bot trying to uncover trends about what makes a video distinct. Given the transcript, frames, and a description of the audio, give a short analysis of the video content and what makes it unique."}, {"role": "user", "content": [{ "type": "text", "text": f"Transcript: {transcript}\n\n\n\nAudio: {audio_description}\n\nExtra Context?: {extra_context or 'n/a'}", }, *[ { "type": "image_url", "image_url": {"url": base64_encode_image(frame)}, } for frame in frames ] ]} ] return client.chat.completions.create( model="gpt-4o", messages=messages ) # %% import gradio as gr # %% import uuid, shutil import tempfile def tiktok_analyze(video_path): results = { "overview": "", "ai_overview": "", "first_5s_analysis": "", "video_analysis": "", "transcript": "", } video_id = str(uuid.uuid4()) # copy video path to videos/video_id path_root = f"{tempfile.gettempdir()}/videos/{video_id}" os.makedirs(path_root, exist_ok=True) shutil.copy(video_path, f"{path_root}.mp4") video = VideoFileClip(f"{path_root}.mp4") extract_video_parts(video, f"{path_root}_parts") frames = get_frames(f"{path_root}_parts/frames") first_5s_analysis = analyze_frames(f"{path_root}_parts/5s_frames", tag_embeddings) whole_analysis = analyze_frames(f"{path_root}_parts/frames", tag_embeddings) audio_features = extract_audio_features(f"{path_root}_parts/audio.mp3") results["transcript"] = transcribe(f"{path_root}_parts/audio.mp3") ai_summary_response = ai_summary(results["transcript"], frames, audio_features).choices[0].message.content results["overview"] = f""" ## Overview **duration:** {video.duration} **major themes:** {", ".join(list(whole_analysis["summary"]["tags"])[:5])} **audio:** {", ".join(audio_features)} """ results["ai_overview"] = "# AI Summary\n" + ai_summary_response results["first_5s_analysis"] = f"Major themes: {', '.join(first_5s_analysis['summary']['tags'])}" results["video_analysis"] = f"Major themes: {', '.join(whole_analysis['summary']['tags'])}" return [ results["overview"], results["first_5s_analysis"], results["video_analysis"], results["ai_overview"], results["transcript"], ] demo = gr.Interface( title="Tiktok Content Analyzer", description="Start by uploading a video to analyze.", fn=tiktok_analyze, inputs="video", outputs=[ gr.Markdown(label="Overview"), gr.Text(label="First 5s Content Analysis"), gr.Text(label="Content Analysis"), gr.Markdown(label="AI Summary"), gr.Text(label="Transcript")] ) demo.launch() # %%