import os import subprocess import signal os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" import gradio as gr import tempfile import torch from huggingface_hub import HfApi, ModelCard, whoami from gradio_huggingfacehub_search import HuggingfaceHubSearch from pathlib import Path from textwrap import dedent ########### import subprocess import threading from queue import Queue, Empty def stream_output(pipe, queue): """Read output from pipe and put it in the queue.""" for line in iter(pipe.readline, b''): queue.put(line.decode('utf-8').rstrip()) pipe.close() def run_command(command): # Create process with pipes for stdout and stderr process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, #bufsize=1, universal_newlines=False ) # Create queues to store output stdout_queue = Queue() stderr_queue = Queue() # Create and start threads to read output stdout_thread = threading.Thread(target=stream_output, args=(process.stdout, stdout_queue)) stderr_thread = threading.Thread(target=stream_output, args=(process.stderr, stderr_queue)) stdout_thread.daemon = True stderr_thread.daemon = True stdout_thread.start() stderr_thread.start() output_stdout = "" output_stderr = "" # Monitor output in real-time while process.poll() is None: # Check stdout try: stdout_line = stdout_queue.get_nowait() print(f"STDOUT: {stdout_line}") output_stdout += stdout_line + "\n" except Empty: pass # Check stderr try: stderr_line = stderr_queue.get_nowait() print(f"STDERR: {stderr_line}") output_stderr += stderr_line + "\n" except Empty: pass # Get remaining lines stdout_thread.join() stderr_thread.join() return (process.returncode, output_stdout, output_stderr) ########### def process_model(ft_model_id: str, base_model_id: str, rank: str, private_repo, oauth_token: gr.OAuthToken | None): if oauth_token is None or oauth_token.token is None: raise gr.Error("You must be logged in") model_name = ft_model_id.split('/')[-1] if not os.path.exists("outputs"): os.makedirs("outputs") try: api = HfApi(token=oauth_token.token) with tempfile.TemporaryDirectory(dir="outputs") as outputdir: device = "cuda" if torch.cuda.is_available() else "cpu" cmd = [ "mergekit-extract-lora", ft_model_id, base_model_id, outputdir, f"--rank={rank}", f"--device={device}" ] print("cmd", cmd) (returncode, output_stdout, output_stderr) = run_command(cmd) print("returncode", returncode) print("output_stdout", output_stdout) print("output_stderr", output_stderr) if returncode != 0: raise Exception(f"Error converting to LoRA PEFT {q_method}: {output_stderr}") print("Model converted to LoRA PEFT successfully!") print(f"Converted model path: {outputdir}") # Check output dir if not os.listdir(outputdir): raise Exception("Output directory is empty!") # Create repo username = whoami(oauth_token.token)["name"] new_repo_url = api.create_repo(repo_id=f"{username}/LoRA-{model_name}", exist_ok=True, private=private_repo) new_repo_id = new_repo_url.repo_id print("Repo created successfully!", new_repo_url) # Upload files api.upload_folder( folder_path=outputdir, path_in_repo="", repo_id=new_repo_id, ) print("Uploaded", outputdir) return ( f'