|
import gradio as gr |
|
from gradio_webrtc import WebRTC, StreamHandler, get_twilio_turn_credentials |
|
import websockets.sync.client |
|
import numpy as np |
|
import json |
|
import base64 |
|
import os |
|
from dotenv import load_dotenv |
|
|
|
class GeminiConfig: |
|
def __init__(self): |
|
load_dotenv() |
|
self.api_key = self._get_api_key() |
|
self.host = 'generativelanguage.googleapis.com' |
|
self.model = 'models/gemini-2.0-flash-exp' |
|
self.ws_url = f'wss://{self.host}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={self.api_key}' |
|
|
|
def _get_api_key(self): |
|
api_key = os.getenv('GOOGLE_API_KEY') |
|
if not api_key: |
|
raise ValueError("GOOGLE_API_KEY not found in environment variables. Please set it in your .env file.") |
|
return api_key |
|
|
|
class AudioProcessor: |
|
@staticmethod |
|
def encode_audio(data, sample_rate): |
|
encoded = base64.b64encode(data.tobytes()).decode('UTF-8') |
|
return { |
|
'realtimeInput': { |
|
'mediaChunks': [{ |
|
'mimeType': f'audio/pcm;rate={sample_rate}', |
|
'data': encoded, |
|
}], |
|
}, |
|
} |
|
|
|
@staticmethod |
|
def process_audio_response(data): |
|
audio_data = base64.b64decode(data) |
|
return np.frombuffer(audio_data, dtype=np.int16) |
|
|
|
class GeminiHandler(StreamHandler): |
|
def __init__(self, |
|
expected_layout="mono", |
|
output_sample_rate=24000, |
|
output_frame_size=480) -> None: |
|
super().__init__(expected_layout, output_sample_rate, output_frame_size, |
|
input_sample_rate=24000) |
|
self.config = GeminiConfig() |
|
self.ws = None |
|
self.all_output_data = None |
|
self.audio_processor = AudioProcessor() |
|
|
|
def copy(self): |
|
return GeminiHandler( |
|
expected_layout=self.expected_layout, |
|
output_sample_rate=self.output_sample_rate, |
|
output_frame_size=self.output_frame_size |
|
) |
|
|
|
def _initialize_websocket(self): |
|
try: |
|
self.ws = websockets.sync.client.connect( |
|
self.config.ws_url, |
|
timeout=30 |
|
) |
|
initial_request = { |
|
'setup': { |
|
'model': self.config.model, |
|
} |
|
} |
|
self.ws.send(json.dumps(initial_request)) |
|
setup_response = json.loads(self.ws.recv()) |
|
print(f"Setup response: {setup_response}") |
|
except websockets.exceptions.WebSocketException as e: |
|
print(f"WebSocket connection failed: {str(e)}") |
|
self.ws = None |
|
except Exception as e: |
|
print(f"Setup failed: {str(e)}") |
|
self.ws = None |
|
|
|
def receive(self, frame: tuple[int, np.ndarray]) -> None: |
|
try: |
|
if not self.ws: |
|
self._initialize_websocket() |
|
|
|
_, array = frame |
|
array = array.squeeze() |
|
audio_message = self.audio_processor.encode_audio(array, self.output_sample_rate) |
|
self.ws.send(json.dumps(audio_message)) |
|
except Exception as e: |
|
print(f"Error in receive: {str(e)}") |
|
if self.ws: |
|
self.ws.close() |
|
self.ws = None |
|
|
|
def _process_server_content(self, content): |
|
for part in content.get('parts', []): |
|
data = part.get('inlineData', {}).get('data', '') |
|
if data: |
|
audio_array = self.audio_processor.process_audio_response(data) |
|
if self.all_output_data is None: |
|
self.all_output_data = audio_array |
|
else: |
|
self.all_output_data = np.concatenate((self.all_output_data, audio_array)) |
|
|
|
while self.all_output_data.shape[-1] >= self.output_frame_size: |
|
yield (self.output_sample_rate, |
|
self.all_output_data[:self.output_frame_size].reshape(1, -1)) |
|
self.all_output_data = self.all_output_data[self.output_frame_size:] |
|
|
|
def generator(self): |
|
while True: |
|
if not self.ws: |
|
print("WebSocket not connected") |
|
yield None |
|
continue |
|
|
|
try: |
|
message = self.ws.recv(timeout=5) |
|
msg = json.loads(message) |
|
|
|
if 'serverContent' in msg: |
|
content = msg['serverContent'].get('modelTurn', {}) |
|
yield from self._process_server_content(content) |
|
except TimeoutError: |
|
print("Timeout waiting for server response") |
|
yield None |
|
except Exception as e: |
|
print(f"Error in generator: {str(e)}") |
|
yield None |
|
|
|
def emit(self) -> tuple[int, np.ndarray] | None: |
|
if not self.ws: |
|
return None |
|
if not hasattr(self, '_generator'): |
|
self._generator = self.generator() |
|
try: |
|
return next(self._generator) |
|
except StopIteration: |
|
self.reset() |
|
return None |
|
|
|
def reset(self) -> None: |
|
if hasattr(self, '_generator'): |
|
delattr(self, '_generator') |
|
self.all_output_data = None |
|
|
|
def shutdown(self) -> None: |
|
if self.ws: |
|
self.ws.close() |
|
|
|
def check_connection(self): |
|
try: |
|
if not self.ws or self.ws.closed: |
|
self._initialize_websocket() |
|
return True |
|
except Exception as e: |
|
print(f"Connection check failed: {str(e)}") |
|
return False |
|
|
|
class GeminiVoiceChat: |
|
def __init__(self): |
|
load_dotenv() |
|
self.demo = self._create_interface() |
|
|
|
def _create_interface(self): |
|
with gr.Blocks() as demo: |
|
gr.HTML(""" |
|
<div style='text-align: center'> |
|
<h1>Gemini 2.0 Voice Chat</h1> |
|
<p>Speak with Gemini using real-time audio streaming</p> |
|
</div> |
|
""") |
|
|
|
webrtc = WebRTC( |
|
label="Conversation", |
|
modality="audio", |
|
mode="send-receive", |
|
rtc_configuration=get_twilio_turn_credentials() |
|
) |
|
|
|
webrtc.stream( |
|
GeminiHandler(), |
|
inputs=[webrtc], |
|
outputs=[webrtc], |
|
time_limit=90, |
|
concurrency_limit=10 |
|
) |
|
return demo |
|
|
|
def launch(self): |
|
self.demo.launch() |
|
|
|
def demo(): |
|
chat = GeminiVoiceChat() |
|
return chat.demo |
|
|
|
|
|
demo = demo() |