import hashlib import os from glob import glob import laion_clap from diskcache import Cache from qdrant_client import QdrantClient from qdrant_client.http import models from tqdm import tqdm # Functions utils ################################################################################## def get_md5(fpath): with open(fpath, "rb") as f: file_hash = hashlib.md5() while chunk := f.read(8192): file_hash.update(chunk) return file_hash.hexdigest() # PARAMETERS ####################################################################################### CACHE_FOLDER = '/home/nahia/data/audio/' KAGGLE_TRAIN_PATH = '/home/nahia/Documents/audio/actor/Actor_01/' # ################## Loading the CLAP model ################### print("[INFO] Loading the model...") model_name = 'music_speech_epoch_15_esc_89.25.pt' model = laion_clap.CLAP_Module(enable_fusion=False) model.load_ckpt() # download the default pretrained checkpoint. # Initialize the cache os.makedirs(CACHE_FOLDER, exist_ok=True) cache = Cache(CACHE_FOLDER) # Embed the audio files ! audio_files = [p for p in glob(os.path.join(KAGGLE_TRAIN_PATH, '*.wav'))] audio_embeddings = [] chunk_size = 100 total_chunks = int(len(audio_files) / chunk_size) # Use tqdm for a progress bar for i in tqdm(range(0, len(audio_files), chunk_size), total=total_chunks): chunk = audio_files[i:i + chunk_size] # Get a chunk of audio files chunk_embeddings = [] for audio_file in chunk: # Compute a unique hash for the audio file file_key = get_md5(audio_file) if file_key in cache: # If the embedding for this file is cached, retrieve it embedding = cache[file_key] else: # Otherwise, compute the embedding and cache it embedding = model.get_audio_embedding_from_filelist(x=[audio_file], use_tensor=False)[ 0] # Assuming the model returns a list cache[file_key] = embedding chunk_embeddings.append(embedding) audio_embeddings.extend(chunk_embeddings) # It's a good practice to close the cache when done cache.close() # Creating a qdrant collection ##################################################################### client = QdrantClient("localhost", port=6333) print("[INFO] Client created...") print("[INFO] Creating qdrant data collection...") client.create_collection( collection_name="demo_db7", vectors_config=models.VectorParams( size=audio_embeddings[0].shape[0], distance=models.Distance.COSINE ), ) # Créer des enregistrements Qdrant à partir des embeddings records = [] for idx, (audio_path, embedding) in enumerate(zip(audio_files, audio_embeddings)): record = models.PointStruct( id=idx, vector=embedding, payload={"audio_path": audio_path, "style": audio_path.split('/')[-2]} ) records.append(record) # Téléverser les enregistrements dans la collection Qdrant print("[INFO] Uploading data records to data collection...") client.upload_points(collection_name="demo_db7", points=records) print("[INFO] Successfully uploaded data records to data collection!")