import gradio as gr import os import tempfile import logging from podcastfy.client import generate_podcast from dotenv import load_dotenv import requests import json import litellm litellm.drop_params = True #设置 litellm.drop_params = True 来自动忽略不支持的参数 # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # 定义语音选项 VOICE_OPTIONS = [ {"id": "3b55b3d84d2f453a98d8ca9bb24182d6", "name": "邓紫琪"}, {"id": "fa756c4628b94b7394d1822e5848cf59", "name": "杨幂"}, {"id": "08f18a5692544543a6ca5fdd1eaa328c", "name": "宋雨琦"}, {"id": "f2ed19ca0ea246bf9cbc6382be00e4fc", "name": "王志文"}, {"id": "738d0cc1a3e9430a9de2b544a466a7fc", "name": "雷军"}, {"id": "1512d05841734931bf905d0520c272b1", "name": "周杰伦"}, {"id": "e4642e5edccd4d9ab61a69e82d4f8a14", "name": "蔡徐坤"}, {"id": "e04a3dc718864c999ef7db3035764aa8", "name": "刘华强"}, {"id": "7c66db6e457c4d53b1fe428a8c547953", "name": "郭德纲"}, {"id": "f6f293aabfe24e46aff0fc309c233d31", "name": "曹操"}, {"id": "22e8eb5f1f424c749592cd9db3927368", "name": "李云龙"}, {"id": "5e680ebc2eeb4f78a2224f2e1003b8c6", "name": "刘备"}, {"id": "zh-HK-HiuGaaiNeural", "name": "曉佳(粤语女声)"}, {"id": "zh-HK-HiuMaanNeural", "name": "曉曼(粤语女声)"}, {"id": "zh-HK-WanLungNeural", "name": "雲龍(粤语男声)"}, {"id": "zh-CN-XiaoxiaoNeural", "name": "晓晓(活泼女声)"}, {"id": "zh-CN-XiaoyiNeural", "name": "晓伊(女声)"}, {"id": "zh-CN-YunjianNeural", "name": "云健(解说男声)"}, {"id": "zh-CN-YunxiNeural", "name": "云希(阳光男声)"}, {"id": "zh-CN-YunxiaNeural", "name": "云夏(少年男声)"}, {"id": "zh-CN-YunyangNeural", "name": "云扬(专业男声)"}, {"id": "zh-CN-liaoning-XiaobeiNeural", "name": "晓贝(辽宁女声)"}, {"id": "zh-TW-HsiaoChenNeural", "name": "曉臻(湾湾女声)"}, {"id": "zh-TW-YunJheNeural", "name": "雲哲(湾湾男声)"}, {"id": "zh-TW-HsiaoYuNeural", "name": "曉雨(湾湾女声)"}, {"id": "zh-CN-shaanxi-XiaoniNeural", "name": "晓妮(陕西女声)"}, {"id": "alloy", "name": "alloy(用于官方)"}, {"id": "echo", "name": "echo"}, {"id": "fable", "name": "fable"}, {"id": "onyx", "name": "onyx"}, {"id": "nova", "name": "nova"}, {"id": "shimmer", "name": "shimmer"}, { "id": "male-botong", "name": "思远" }, { "id": "Podcast_girl", "name": "心悦" }, { "id": "boyan_new_hailuo", "name": "子轩" }, { "id": "female-shaonv", "name": "灵儿" }, { "id": "YaeMiko_hailuo", "name": "语嫣" }, { "id": "xiaoyi_mix_hailuo", "name": "少泽" }, { "id": "xiaomo_sft", "name": "芷溪" }, { "id": "cove_test2_hailuo", "name": "浩翔" }, { "id": "scarlett_hailuo", "name": "雅涵" }, { "id": "Leishen2_hailuo", "name": "模仿雷电将军" }, { "id": "Zhongli_hailuo", "name": "模仿钟离" }, { "id": "Paimeng_hailuo", "name": "模仿派蒙" }, { "id": "keli_hailuo", "name": "模仿可莉" }, { "id": "Hutao_hailuo", "name": "模仿胡桃" }, { "id": "Xionger_hailuo", "name": "模仿熊二" }, { "id": "Haimian_hailuo", "name": "模仿海绵宝宝" }, { "id": "Robot_hunter_hailuo", "name": "模仿变形金刚" }, { "id": "Linzhiling_hailuo", "name": "小玲玲" }, { "id": "huafei_hailuo", "name": "拽妃" }, { "id": "lingfeng_hailuo", "name": "东北er" }, { "id": "male_dongbei_hailuo", "name": "老铁" }, { "id": "Beijing_hailuo", "name": "北京er" }, { "id": "JayChou_hailuo", "name": "JayJay" }, { "id": "Daniel_hailuo", "name": "潇然" }, { "id": "Bingjiao_zongcai_hailuo", "name": "沉韵" }, { "id": "female-yaoyao-hd", "name": "瑶瑶" }, { "id": "murong_sft", "name": "晨曦" }, { "id": "shangshen_sft", "name": "沐珊" }, { "id": "kongchen_sft", "name": "祁辰" }, { "id": "shenteng2_hailuo", "name": "夏洛特" }, { "id": "Guodegang_hailuo", "name": "郭嘚嘚" }, { "id": "yueyue_hailuo", "name": "小月月" } ] def get_next_gemini_key(api_keys_str): """从多个key中轮询获取下一个key""" if not hasattr(get_next_gemini_key, '_current_index'): get_next_gemini_key._current_index = 0 keys = [k.strip() for k in api_keys_str.split(',') if k.strip()] if not keys: return None key = keys[get_next_gemini_key._current_index] get_next_gemini_key._current_index = (get_next_gemini_key._current_index + 1) % len(keys) return key.strip() def get_api_key(key_name, ui_value): """获取API key的新逻辑""" if key_name == "GEMINI_API_KEY": # 1. 优先使用UI传入的key(s) if ui_value: if ',' in ui_value: # 多个key的情况 selected_key = get_next_gemini_key(ui_value) else: # 单个key的情况 selected_key = ui_value.strip() # 2. 其次使用GEMINI_API_KEYS环境变量 elif os.getenv("GEMINI_API_KEYS"): selected_key = get_next_gemini_key(os.getenv("GEMINI_API_KEYS")) # 3. 最后使用GEMINI_API_KEY环境变量 else: selected_key = os.getenv("GEMINI_API_KEY") # 设置当前使用的key到GEMINI_API_KEY环境变量 if selected_key: os.environ["GEMINI_API_KEY"] = selected_key return selected_key return ui_value if ui_value else os.getenv(key_name) def process_inputs( text_input, urls_input, pdf_files, image_files, gemini_key, openai_key, openai_base_url, # 新增参数 elevenlabs_key, max_num_chunks, min_chunk_size, conversation_style, roles_person1, roles_person2, dialogue_structure, podcast_name, podcast_tagline, output_language, tts_model, creativity_level, user_instructions, engagement_techniques, tts_openai_question, tts_openai_answer, ending_message, longform, llm_model_name, #api_key_label, #gemini_model, #openai_model, ): try: logger.info("Starting podcast generation process") # API key handling logger.debug("Setting API keys") os.environ["GEMINI_API_KEY"] = get_api_key("GEMINI_API_KEY", gemini_key) logger.debug("Setting OpenAI API key") if not openai_key and not os.getenv("OPENAI_API_KEY"): raise ValueError("OpenAI API key is required when using OpenAI TTS model") os.environ["OPENAI_API_KEY"] = get_api_key("OPENAI_API_KEY", openai_key) # if api_key_label == "OPENAI_API_KEY": os.environ["OPENAI_API_BASE"] = get_api_key("OPENAI_BASE_URL", openai_base_url) if tts_model == "openai": os.environ["OPENAI_BASE_URL"] = get_api_key("OPENAI_BASE_URL", openai_base_url) # 根据选择的名称找到对应的 voice ID tts_openai_question = next(voice["id"] for voice in VOICE_OPTIONS if voice["name"] == tts_openai_question) tts_openai_answer = next(voice["id"] for voice in VOICE_OPTIONS if voice["name"] == tts_openai_answer) if tts_model == "elevenlabs": logger.debug("Setting ElevenLabs API key") if not elevenlabs_key and not os.getenv("ELEVENLABS_API_KEY"): raise ValueError("ElevenLabs API key is required when using ElevenLabs TTS model") os.environ["ELEVENLABS_API_KEY"] = get_api_key("ELEVENLABS_API_KEY", elevenlabs_key) # Process URLs urls = [url.strip() for url in urls_input.split('\n') if url.strip()] logger.debug(f"Processed URLs: {urls}") temp_files = [] temp_dirs = [] # Handle PDF files if pdf_files is not None and len(pdf_files) > 0: logger.info(f"Processing {len(pdf_files)} PDF files") pdf_temp_dir = tempfile.mkdtemp() temp_dirs.append(pdf_temp_dir) for i, pdf_file in enumerate(pdf_files): pdf_path = os.path.join(pdf_temp_dir, f"input_pdf_{i}.pdf") temp_files.append(pdf_path) with open(pdf_path, 'wb') as f: f.write(pdf_file) urls.append(pdf_path) logger.debug(f"Saved PDF {i} to {pdf_path}") # Handle image files image_paths = [] if image_files is not None and len(image_files) > 0: logger.info(f"Processing {len(image_files)} image files") img_temp_dir = tempfile.mkdtemp() temp_dirs.append(img_temp_dir) for i, img_file in enumerate(image_files): # Get file extension from the original name in the file tuple original_name = img_file.orig_name if hasattr(img_file, 'orig_name') else f"image_{i}.jpg" extension = original_name.split('.')[-1] logger.debug(f"Processing image file {i}: {original_name}") img_path = os.path.join(img_temp_dir, f"input_image_{i}.{extension}") temp_files.append(img_path) try: # Write the bytes directly to the file with open(img_path, 'wb') as f: if isinstance(img_file, (tuple, list)): f.write(img_file[1]) # Write the bytes content else: f.write(img_file) # Write the bytes directly image_paths.append(img_path) logger.debug(f"Saved image {i} to {img_path}") except Exception as e: logger.error(f"Error saving image {i}: {str(e)}") raise # Prepare conversation config logger.debug("Preparing conversation config") conversation_config = { "max_num_chunks": max_num_chunks, "min_chunk_size": min_chunk_size, "conversation_style": conversation_style.split(','), "roles_person1": roles_person1, "roles_person2": roles_person2, "dialogue_structure": dialogue_structure.split(','), "podcast_name": podcast_name, "podcast_tagline": podcast_tagline, "output_language": output_language, "creativity": creativity_level, "user_instructions": user_instructions, "engagement_techniques": engagement_techniques, 'text_to_speech': { 'ending_message': ending_message, 'openai': { 'default_voices': { 'question': tts_openai_question, 'answer': tts_openai_answer }, "model": "tts-1", }, }, } # Generate podcast logger.info("Calling generate_podcast function") logger.debug(f"URLs: {urls}") logger.debug(f"Image paths: {image_paths}") logger.debug(f"Text input present: {'Yes' if text_input else 'No'}") audio_file = generate_podcast( urls=urls if urls else None, text=text_input if text_input else None, image_paths=image_paths if image_paths else None, tts_model=tts_model, conversation_config=conversation_config, longform=longform, llm_model_name=llm_model_name, api_key_label="OPENAI_API_KEY", #llm_model_name=get_active_model(api_key_label, gemini_model, openai_model), ) logger.info("Podcast generation completed") # Cleanup logger.debug("Cleaning up temporary files") for file_path in temp_files: if os.path.exists(file_path): os.unlink(file_path) logger.debug(f"Removed temp file: {file_path}") for dir_path in temp_dirs: if os.path.exists(dir_path): os.rmdir(dir_path) logger.debug(f"Removed temp directory: {dir_path}") return audio_file except Exception as e: logger.error(f"Error in process_inputs: {str(e)}", exc_info=True) # Cleanup on error for file_path in temp_files: if os.path.exists(file_path): os.unlink(file_path) for dir_path in temp_dirs: if os.path.exists(dir_path): os.rmdir(dir_path) return str(e) # Create Gradio interface with updated theme with gr.Blocks( title="AI播客plus", theme=gr.themes.Base( primary_hue="blue", secondary_hue="slate", neutral_hue="slate" ), css=""" /* Move toggle arrow to left side */ .gr-accordion { --accordion-arrow-size: 1.5em; } .gr-accordion > .label-wrap { flex-direction: row !important; justify-content: flex-start !important; gap: 1em; } .gr-accordion > .label-wrap > .icon { order: -1; } """ ) as demo: with gr.Tab("默认环境变量已设置 Gemini、OpenAI API Key "): # API Keys Section with gr.Row(): gr.Markdown( """

🔑 API Keys

""", elem_classes=["section-header"] ) theme_btn = gr.Button("🌓", scale=0, min_width=0) with gr.Accordion("配置 API Keys", open=False): gemini_key = gr.Textbox( label="Gemini API Key", type="password", value="", info="必须的,多个key请用逗号分隔" ) openai_key = gr.Textbox( label="OpenAI API Key", type="password", value="", info="只有在使用OpenAI文本转语音模型的情况下才需要此项" ) openai_base_url = gr.Textbox( label="OpenAI Base URL", value="", info="可选,留空使用默认URL:https://api.openai.com/v1" ) elevenlabs_key = gr.Textbox( label="ElevenLabs API Key", type="password", value="", info="建议使用ElevenLabs TTS模型,仅在使用该模型时才需要此项" ) # Content Input Section gr.Markdown( """

📝 输入内容

""", elem_classes=["section-header"] ) with gr.Accordion("设置输入内容", open=False): with gr.Group(): text_input = gr.Textbox( label="文本输入", placeholder="在此输入或粘贴文字...", lines=3 ) urls_input = gr.Textbox( label="URLs", placeholder="请逐行输入网址,支持网站和YouTube视频链接.", lines=3 ) # Place PDF and Image uploads side by side with gr.Row(): with gr.Column(): pdf_files = gr.Files( # Changed from gr.File to gr.Files label="上传 PDFs", # Updated label file_types=[".pdf"], type="binary" ) gr.Markdown("*上传一个或多个PDF文件来创建播客*", elem_classes=["file-info"]) with gr.Column(): image_files = gr.Files( label="上传图片", file_types=["image"], type="binary" ) gr.Markdown("*上传一个或多个图片文件来创建播客*", elem_classes=["file-info"]) # Customization Section gr.Markdown( """

⚙️ 自定义选项

""", elem_classes=["section-header"] ) with gr.Accordion("自定义选项", open=False): # Basic Settings gr.Markdown( """

📊 基本设置

""", ) llm_model_name = gr.Radio( choices=["gemini-2.0-flash-exp", "o1-preview", "o1-mini", "gpt-4o", "gpt-4o-mini", "claude-3-5-sonnet-20240620", "claude-3-opus-20240229", "claude-3-haiku-20240307"], value="gemini-2.0-flash-exp", label="文本生成模型", info="默认使用 gemini-2.0-flash-exp " ) longform = gr.Checkbox( label="长篇模式", value=False, info="启用长篇内容生成模式,启用长篇需要Google Cloud支持,设置好GOOGLE_API_KEY" ) with gr.Group(visible=False) as longform_settings_group: max_num_chunks = gr.Slider( minimum=1, maximum=20, value=8, step=1, label="最大轮数", info="长篇模式下,生成的最大轮数" ) min_chunk_size = gr.Slider( minimum=300, maximum=2000, value=600, step=100, label="一轮最小字符数", info="长篇模式下,生成一轮所需的最小字符数" ) # 添加更新可见性的函数 def update_longform_settings(is_longform): return gr.update(visible=is_longform) # 添加事件监听 longform.change( fn=update_longform_settings, inputs=[longform], outputs=[longform_settings_group] ) conversation_style = gr.Textbox( label="对话风格", value="engaging,fast-paced,enthusiastic", info="用于对话的风格列表(以逗号分隔)默认:生动活泼,节奏明快,热情洋溢。学术辩论: formal,analytical,critical;讲故事: narrative,suspenseful,descriptive" ) # Roles and Structure gr.Markdown( """

👥 角色设定与结构安排

""", ) roles_person1 = gr.Textbox( label="第一位发言者的角色", value="main summarizer", info="在对话中,第一个说话人扮演的角色,默认:主要负责总结的人。学术辩论: thesis presenter;讲故事: storyteller" ) roles_person2 = gr.Textbox( label="第二位发言者的角色", value="questioner/clarifier", info="在对话中,第二个说话人所扮演的角色或承担的任务,默认:提问者/释疑者。学术辩论: counterargument provider;讲故事: audience participator" ) dialogue_structure = gr.Textbox( label="对话结构", value="Introduction,Main Content Summary,Conclusion", info="对话结构的各个部分(用逗号隔开)默认:引言,主要内容的概括,总结。学术辩论: Opening Statements,Thesis Presentation,Counterarguments,Rebuttals,Closing Remarks;讲故事: Scene Setting,Character Introduction,Rising Action,Climax,Resolution" ) engagement_techniques = gr.Textbox( label="沟通技巧", value="rhetorical questions,anecdotes,analogies,humor", info="一些沟通和交流方式(用逗号隔开)默认:各种修辞、生动例子、形象比喻、诙谐幽默。学术辩论: socratic questioning,historical references,thought experiments;讲故事: cliffhangers,vivid imagery,audience prompts" ) creativity_level = gr.Slider( minimum=0, maximum=1, value=0.7, step=0.1, label="创意等级", info="调节生成对话的创意程度(0 为注重事实,1 为更具创意)。学术辩论:0。讲故事:0.9" ) # Podcast Identity gr.Markdown( """

🎙️ 播客特色

""", ) podcast_name = gr.Textbox( label="播客名", value="猛然间", info="播客的名字" ) podcast_tagline = gr.Textbox( label="播客宣传语", value="猛然回首,太匆匆", info="播客的宣传语或副标题" ) output_language = gr.Textbox( label="输出语言", value="Chinese", info="播客使用的语言" ) # Voice Settings gr.Markdown( """

🗣️ 语音设置

""", ) ending_message = gr.Textbox( label="结束语", value="欢迎下次继续收听!", info="结束语" ) tts_model = gr.Radio( choices=["openai", "geminimulti", "elevenlabs", "gemini", "edge"], value="openai", label="文本转语音模型", info="选择语音合成模型 (edge 免费但音质较差, 其他模型音质更好但需申请 API keys)" ) with gr.Group(visible=True) as openai_voice_group: tts_openai_question = gr.Dropdown( choices=[voice["name"] for voice in VOICE_OPTIONS], value=VOICE_OPTIONS[27]["name"], label="第一位发言者的语音", info="选择OpenAI TTS 第一位发言者的语音" ) tts_openai_answer = gr.Dropdown( choices=[voice["name"] for voice in VOICE_OPTIONS], value=VOICE_OPTIONS[26]["name"], label="第二位发言者的语音", info="选择OpenAI TTS 第二位发言者的语音" ) # 添加更新可见性的函数 def update_voice_options(tts_model): return gr.update(visible=(tts_model == "openai")) # 添加事件监听 tts_model.change( fn=update_voice_options, inputs=[tts_model], outputs=[openai_voice_group] ) # Advanced Settings gr.Markdown( """

🔧 高级选项

""", ) user_instructions = gr.Textbox( label="个性化指令", value="", lines=2, placeholder="在此处添加你希望AI遵循的具体指令,以控制对话的走向和内容...", info="一些额外的指令,用来帮助AI更好地理解你想要聊天的内容和方向" ) # api_key_label = gr.Radio( # choices=["GEMINI_API_KEY", "OPENAI_API_KEY"], # value="GEMINI_API_KEY", # label="文本生成模型供应商", # info="默认使用 Gemini " # ) # with gr.Group(visible=True) as gemini_llm_group: # gemini_model = gr.Radio( # choices=["gemini-1.5-pro-latest", "gemini-exp-1121", "learnlm-1.5-pro-experimental"], # value="gemini-1.5-pro-latest", # label="Gemini 文本生成模型", # info="默认使用 gemini-1.5-pro-latest " # ) # def fetch_openai_models(): # try: # response = requests.get("https://api.168369.xyz/v1/models") # data = response.json() # 提取所有模型的 id # model_ids = [model["id"] for model in data["data"]] # return model_ids # except Exception as e: # print(f"获取模型列表失败: {str(e)}") # return ["获取模型列表失败"] # with gr.Group(visible=False) as openai_llm_group: # openai_model = gr.Radio( #choices=fetch_openai_models(), # 从 API 获取模型列表 # choices=["o1-mini", "o1-preview", "gpt-4o-mini", "gpt-4o", "gpt-4-turbo", "gpt-4", "gpt-4-turbo-2024-04-09"], # value="gpt-4o-mini", # label="Openai 文本生成模型", # info="默认为 gpt-4o-mini" # ) # 添加获取当前有效模型的函数 # def get_active_model(api_key_label, gemini_model, openai_model): # if api_key_label == "GEMINI_API_KEY": # return gemini_model # else: # OPENAI_API_KEY # return openai_model # 添加更新可见性的函数 # def update_llm_options(api_key_label): # if api_key_label == "GEMINI_API_KEY": # return gr.update(visible=True), gr.update(visible=False) # else: # OPENAI_API_KEY # return gr.update(visible=False), gr.update(visible=True) # 添加事件监听 # api_key_label.change( # fn=update_llm_options, # inputs=[api_key_label], # outputs=[gemini_llm_group, openai_llm_group] # ) # Output Section gr.Markdown( """

🎵 生成结果

""", elem_classes=["section-header"] ) with gr.Group(): generate_btn = gr.Button("🎙️ 生成播客", variant="primary") audio_output = gr.Audio( type="filepath", label="生成的播客" ) # Handle generation generate_btn.click( process_inputs, inputs=[ text_input, urls_input, pdf_files, image_files, gemini_key, openai_key, openai_base_url, elevenlabs_key, max_num_chunks, min_chunk_size, conversation_style, roles_person1, roles_person2, dialogue_structure, podcast_name, podcast_tagline, output_language, tts_model, creativity_level, user_instructions, engagement_techniques, tts_openai_question, tts_openai_answer, ending_message, longform, llm_model_name, #api_key_label, gemini_model, openai_model, ], outputs=audio_output ) # Add theme toggle functionality theme_btn.click( None, None, None, js=""" function() { document.querySelector('body').classList.toggle('dark'); return []; } """ ) if __name__ == "__main__": demo.queue().launch(share=True)