Spaces:
Runtime error
Runtime error
import threading | |
from queue import Queue, Empty | |
import numpy as np | |
import requests | |
import base64 | |
import time | |
from dataclasses import dataclass, field | |
import websocket | |
import threading | |
import ssl | |
import librosa | |
import os | |
class AudioStreamingClient: | |
def __init__(self): | |
self.auth_token = os.environ.get("HF_AUTH_TOKEN", None) | |
self.api_url = os.environ.get("HF_API_URL", None) | |
self.stop_event = threading.Event() | |
self.send_queue = Queue() | |
self.recv_queue = Queue() | |
self.session_id = None | |
self.headers = { | |
"Accept": "application/json", | |
"Authorization": f"Bearer {self.auth_token}", | |
"Content-Type": "application/json" | |
} | |
self.session_state = "idle" # Possible states: idle, sending, processing, waiting | |
self.ws_ready = threading.Event() | |
def start(self): | |
print("Starting audio streaming...") | |
ws_url = self.api_url.replace("http", "ws") + "/ws" | |
self.ws = websocket.WebSocketApp( | |
ws_url, | |
header=[f"{key}: {value}" for key, value in self.headers.items()], | |
on_open=self.on_open, | |
on_message=self.on_message, | |
on_error=self.on_error, | |
on_close=self.on_close | |
) | |
self.ws_thread = threading.Thread(target=self.ws.run_forever, kwargs={'sslopt': {"cert_reqs": ssl.CERT_NONE}}) | |
self.ws_thread.start() | |
# Wait for the WebSocket to be ready | |
self.ws_ready.wait() | |
self.send_thread = threading.Thread(target=self.send_audio) | |
self.send_thread.start() | |
def on_close(self): | |
self.stop_event.set() | |
self.send_thread.join() | |
self.ws.close() | |
self.ws_thread.join() | |
print("Audio streaming stopped.") | |
def on_open(self, ws): | |
print("WebSocket connection opened.") | |
self.ws_ready.set() # Signal that the WebSocket is ready | |
def on_message(self, ws, message): | |
# message is bytes | |
if message == b'DONE': | |
print("listen") | |
self.session_state = "listen" | |
else: | |
print("processing") | |
self.session_state = "processing" | |
audio_np = np.frombuffer(message, dtype=np.int16) | |
self.recv_queue.put(audio_np) | |
def on_error(self, ws, error): | |
print(f"WebSocket error: {error}") | |
def on_close(self, ws, close_status_code, close_msg): | |
print("WebSocket connection closed.") | |
def send_audio(self): | |
while not self.stop_event.is_set(): | |
if not self.send_queue.empty(): | |
chunk = self.send_queue.get() | |
if self.session_state != "processing": | |
self.ws.send(chunk.tobytes(), opcode=websocket.ABNF.OPCODE_BINARY) | |
else: | |
self.ws.send([], opcode=websocket.ABNF.OPCODE_BINARY) # handshake | |
time.sleep(0.01) | |
def put_audio(self, chunk, sample_rate): | |
chunk = np.clip(chunk, -32768, 32767).astype(np.int16) | |
chunk = chunk.astype(np.float32) / 32768.0 | |
chunk = librosa.resample(chunk, orig_sr=48000, target_sr=16000) | |
chunk = (chunk * 32768.0).astype(np.int16) | |
self.send_queue.put(chunk) | |
def get_audio(self, sample_rate, output_size): | |
output_chunk = np.array([], dtype=np.int16) | |
output_sample_rate = 16000 | |
output_chunk_size = int(output_size*output_sample_rate/sample_rate) | |
while output_chunk.size < output_chunk_size: | |
try: | |
self.ws.send([], opcode=websocket.ABNF.OPCODE_BINARY) # handshake | |
chunk = self.recv_queue.get(timeout=0.1) | |
except Empty: | |
chunk = None | |
if chunk is not None: | |
# Ensure chunk is int16 and clip to valid range | |
chunk_int16 = np.clip(chunk, -32768, 32767).astype(np.int16) | |
output_chunk = np.concatenate([output_chunk, chunk_int16]) | |
else: | |
print("padding chunk of size ", len(output_chunk)) | |
output_chunk = np.pad(output_chunk, (0, output_chunk_size - len(output_chunk))) | |
output_chunk = output_chunk.astype(np.float32) / 32768.0 | |
output_chunk = librosa.resample(output_chunk, orig_sr=output_sample_rate, target_sr=sample_rate) | |
output_chunk = (output_chunk * 32768.0).astype(np.int16) | |
print("output_chunk size: ", len(output_chunk)) | |
output_chunk = output_chunk[:output_size] | |
return np.pad(output_chunk, (0, output_size - len(output_chunk))) | |
if __name__ == "__main__": | |
client = AudioStreamingClient() | |
client.start() |