import gradio as gr import os import tempfile import logging from podcastfy.client import generate_podcast from dotenv import load_dotenv import requests import json import litellm litellm.drop_params = True #设置 litellm.drop_params = True 来自动忽略不支持的参数 # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # 定义语音选项 VOICE_OPTIONS = [ {"id": "3b55b3d84d2f453a98d8ca9bb24182d6", "name": "邓紫琪"}, {"id": "fa756c4628b94b7394d1822e5848cf59", "name": "杨幂"}, {"id": "08f18a5692544543a6ca5fdd1eaa328c", "name": "宋雨琦"}, {"id": "f2ed19ca0ea246bf9cbc6382be00e4fc", "name": "王志文"}, {"id": "738d0cc1a3e9430a9de2b544a466a7fc", "name": "雷军"}, {"id": "1512d05841734931bf905d0520c272b1", "name": "周杰伦"}, {"id": "e4642e5edccd4d9ab61a69e82d4f8a14", "name": "蔡徐坤"}, {"id": "e04a3dc718864c999ef7db3035764aa8", "name": "刘华强"}, {"id": "7c66db6e457c4d53b1fe428a8c547953", "name": "郭德纲"}, {"id": "f6f293aabfe24e46aff0fc309c233d31", "name": "曹操"}, {"id": "22e8eb5f1f424c749592cd9db3927368", "name": "李云龙"}, {"id": "5e680ebc2eeb4f78a2224f2e1003b8c6", "name": "刘备"}, {"id": "zh-HK-HiuGaaiNeural", "name": "曉佳(粤语女声)"}, {"id": "zh-HK-HiuMaanNeural", "name": "曉曼(粤语女声)"}, {"id": "zh-HK-WanLungNeural", "name": "雲龍(粤语男声)"}, {"id": "zh-CN-XiaoxiaoNeural", "name": "晓晓(活泼女声)"}, {"id": "zh-CN-XiaoyiNeural", "name": "晓伊(女声)"}, {"id": "zh-CN-YunjianNeural", "name": "云健(解说男声)"}, {"id": "zh-CN-YunxiNeural", "name": "云希(阳光男声)"}, {"id": "zh-CN-YunxiaNeural", "name": "云夏(少年男声)"}, {"id": "zh-CN-YunyangNeural", "name": "云扬(专业男声)"}, {"id": "zh-CN-liaoning-XiaobeiNeural", "name": "晓贝(辽宁女声)"}, {"id": "zh-TW-HsiaoChenNeural", "name": "曉臻(湾湾女声)"}, {"id": "zh-TW-YunJheNeural", "name": "雲哲(湾湾男声)"}, {"id": "zh-TW-HsiaoYuNeural", "name": "曉雨(湾湾女声)"}, {"id": "zh-CN-shaanxi-XiaoniNeural", "name": "晓妮(陕西女声)"}, {"id": "alloy", "name": "alloy(用于官方)"}, {"id": "echo", "name": "echo"}, {"id": "fable", "name": "fable"}, {"id": "onyx", "name": "onyx"}, {"id": "nova", "name": "nova"}, {"id": "shimmer", "name": "shimmer"}, { "id": "male-botong", "name": "思远" }, { "id": "Podcast_girl", "name": "心悦" }, { "id": "boyan_new_hailuo", "name": "子轩" }, { "id": "female-shaonv", "name": "灵儿" }, { "id": "YaeMiko_hailuo", "name": "语嫣" }, { "id": "xiaoyi_mix_hailuo", "name": "少泽" }, { "id": "xiaomo_sft", "name": "芷溪" }, { "id": "cove_test2_hailuo", "name": "浩翔" }, { "id": "scarlett_hailuo", "name": "雅涵" }, { "id": "Leishen2_hailuo", "name": "模仿雷电将军" }, { "id": "Zhongli_hailuo", "name": "模仿钟离" }, { "id": "Paimeng_hailuo", "name": "模仿派蒙" }, { "id": "keli_hailuo", "name": "模仿可莉" }, { "id": "Hutao_hailuo", "name": "模仿胡桃" }, { "id": "Xionger_hailuo", "name": "模仿熊二" }, { "id": "Haimian_hailuo", "name": "模仿海绵宝宝" }, { "id": "Robot_hunter_hailuo", "name": "模仿变形金刚" }, { "id": "Linzhiling_hailuo", "name": "小玲玲" }, { "id": "huafei_hailuo", "name": "拽妃" }, { "id": "lingfeng_hailuo", "name": "东北er" }, { "id": "male_dongbei_hailuo", "name": "老铁" }, { "id": "Beijing_hailuo", "name": "北京er" }, { "id": "JayChou_hailuo", "name": "JayJay" }, { "id": "Daniel_hailuo", "name": "潇然" }, { "id": "Bingjiao_zongcai_hailuo", "name": "沉韵" }, { "id": "female-yaoyao-hd", "name": "瑶瑶" }, { "id": "murong_sft", "name": "晨曦" }, { "id": "shangshen_sft", "name": "沐珊" }, { "id": "kongchen_sft", "name": "祁辰" }, { "id": "shenteng2_hailuo", "name": "夏洛特" }, { "id": "Guodegang_hailuo", "name": "郭嘚嘚" }, { "id": "yueyue_hailuo", "name": "小月月" } ] def get_next_gemini_key(api_keys_str): """从多个key中轮询获取下一个key""" if not hasattr(get_next_gemini_key, '_current_index'): get_next_gemini_key._current_index = 0 keys = [k.strip() for k in api_keys_str.split(',') if k.strip()] if not keys: return None key = keys[get_next_gemini_key._current_index] get_next_gemini_key._current_index = (get_next_gemini_key._current_index + 1) % len(keys) return key.strip() def get_api_key(key_name, ui_value): """获取API key的新逻辑""" if key_name == "GEMINI_API_KEY": # 1. 优先使用UI传入的key(s) if ui_value: if ',' in ui_value: # 多个key的情况 selected_key = get_next_gemini_key(ui_value) else: # 单个key的情况 selected_key = ui_value.strip() # 2. 其次使用GEMINI_API_KEYS环境变量 elif os.getenv("GEMINI_API_KEYS"): selected_key = get_next_gemini_key(os.getenv("GEMINI_API_KEYS")) # 3. 最后使用GEMINI_API_KEY环境变量 else: selected_key = os.getenv("GEMINI_API_KEY") # 设置当前使用的key到GEMINI_API_KEY环境变量 if selected_key: os.environ["GEMINI_API_KEY"] = selected_key return selected_key return ui_value if ui_value else os.getenv(key_name) def process_inputs( text_input, urls_input, pdf_files, image_files, gemini_key, openai_key, openai_base_url, # 新增参数 elevenlabs_key, max_num_chunks, min_chunk_size, conversation_style, roles_person1, roles_person2, dialogue_structure, podcast_name, podcast_tagline, output_language, tts_model, creativity_level, user_instructions, engagement_techniques, tts_openai_question, tts_openai_answer, ending_message, longform, llm_model_name, #api_key_label, #gemini_model, #openai_model, ): try: logger.info("Starting podcast generation process") # API key handling logger.debug("Setting API keys") os.environ["GEMINI_API_KEY"] = get_api_key("GEMINI_API_KEY", gemini_key) logger.debug("Setting OpenAI API key") if not openai_key and not os.getenv("OPENAI_API_KEY"): raise ValueError("OpenAI API key is required when using OpenAI TTS model") os.environ["OPENAI_API_KEY"] = get_api_key("OPENAI_API_KEY", openai_key) # if api_key_label == "OPENAI_API_KEY": os.environ["OPENAI_API_BASE"] = get_api_key("OPENAI_BASE_URL", openai_base_url) if tts_model == "openai": os.environ["OPENAI_BASE_URL"] = get_api_key("OPENAI_BASE_URL", openai_base_url) # 根据选择的名称找到对应的 voice ID tts_openai_question = next(voice["id"] for voice in VOICE_OPTIONS if voice["name"] == tts_openai_question) tts_openai_answer = next(voice["id"] for voice in VOICE_OPTIONS if voice["name"] == tts_openai_answer) if tts_model == "elevenlabs": logger.debug("Setting ElevenLabs API key") if not elevenlabs_key and not os.getenv("ELEVENLABS_API_KEY"): raise ValueError("ElevenLabs API key is required when using ElevenLabs TTS model") os.environ["ELEVENLABS_API_KEY"] = get_api_key("ELEVENLABS_API_KEY", elevenlabs_key) # Process URLs urls = [url.strip() for url in urls_input.split('\n') if url.strip()] logger.debug(f"Processed URLs: {urls}") temp_files = [] temp_dirs = [] # Handle PDF files if pdf_files is not None and len(pdf_files) > 0: logger.info(f"Processing {len(pdf_files)} PDF files") pdf_temp_dir = tempfile.mkdtemp() temp_dirs.append(pdf_temp_dir) for i, pdf_file in enumerate(pdf_files): pdf_path = os.path.join(pdf_temp_dir, f"input_pdf_{i}.pdf") temp_files.append(pdf_path) with open(pdf_path, 'wb') as f: f.write(pdf_file) urls.append(pdf_path) logger.debug(f"Saved PDF {i} to {pdf_path}") # Handle image files image_paths = [] if image_files is not None and len(image_files) > 0: logger.info(f"Processing {len(image_files)} image files") img_temp_dir = tempfile.mkdtemp() temp_dirs.append(img_temp_dir) for i, img_file in enumerate(image_files): # Get file extension from the original name in the file tuple original_name = img_file.orig_name if hasattr(img_file, 'orig_name') else f"image_{i}.jpg" extension = original_name.split('.')[-1] logger.debug(f"Processing image file {i}: {original_name}") img_path = os.path.join(img_temp_dir, f"input_image_{i}.{extension}") temp_files.append(img_path) try: # Write the bytes directly to the file with open(img_path, 'wb') as f: if isinstance(img_file, (tuple, list)): f.write(img_file[1]) # Write the bytes content else: f.write(img_file) # Write the bytes directly image_paths.append(img_path) logger.debug(f"Saved image {i} to {img_path}") except Exception as e: logger.error(f"Error saving image {i}: {str(e)}") raise # Prepare conversation config logger.debug("Preparing conversation config") conversation_config = { "max_num_chunks": max_num_chunks, "min_chunk_size": min_chunk_size, "conversation_style": conversation_style.split(','), "roles_person1": roles_person1, "roles_person2": roles_person2, "dialogue_structure": dialogue_structure.split(','), "podcast_name": podcast_name, "podcast_tagline": podcast_tagline, "output_language": output_language, "creativity": creativity_level, "user_instructions": user_instructions, "engagement_techniques": engagement_techniques, 'text_to_speech': { 'ending_message': ending_message, 'openai': { 'default_voices': { 'question': tts_openai_question, 'answer': tts_openai_answer }, "model": "tts-1", }, }, } # Generate podcast logger.info("Calling generate_podcast function") logger.debug(f"URLs: {urls}") logger.debug(f"Image paths: {image_paths}") logger.debug(f"Text input present: {'Yes' if text_input else 'No'}") audio_file = generate_podcast( urls=urls if urls else None, text=text_input if text_input else None, image_paths=image_paths if image_paths else None, tts_model=tts_model, conversation_config=conversation_config, longform=longform, llm_model_name=llm_model_name, api_key_label="OPENAI_API_KEY", #llm_model_name=get_active_model(api_key_label, gemini_model, openai_model), ) logger.info("Podcast generation completed") # Cleanup logger.debug("Cleaning up temporary files") for file_path in temp_files: if os.path.exists(file_path): os.unlink(file_path) logger.debug(f"Removed temp file: {file_path}") for dir_path in temp_dirs: if os.path.exists(dir_path): os.rmdir(dir_path) logger.debug(f"Removed temp directory: {dir_path}") return audio_file except Exception as e: logger.error(f"Error in process_inputs: {str(e)}", exc_info=True) # Cleanup on error for file_path in temp_files: if os.path.exists(file_path): os.unlink(file_path) for dir_path in temp_dirs: if os.path.exists(dir_path): os.rmdir(dir_path) return str(e) # Create Gradio interface with updated theme with gr.Blocks( title="AI播客plus", theme=gr.themes.Base( primary_hue="blue", secondary_hue="slate", neutral_hue="slate" ), css=""" /* Move toggle arrow to left side */ .gr-accordion { --accordion-arrow-size: 1.5em; } .gr-accordion > .label-wrap { flex-direction: row !important; justify-content: flex-start !important; gap: 1em; } .gr-accordion > .label-wrap > .icon { order: -1; } """ ) as demo: with gr.Tab("默认环境变量已设置 Gemini、OpenAI API Key "): # API Keys Section with gr.Row(): gr.Markdown( """