# Description: Script to run multiple experiments on runai import re import subprocess import os import argparse import time from prettytable import PrettyTable class Bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' def pretty_table(dct): table = PrettyTable(['Job', 'Status']) for c in sorted(dct.keys()): table.add_row([c, dct[c]]) print(table) def init_parser(): parser = argparse.ArgumentParser(prog="RUNAI SCRIPT") parser.add_argument('action', type=str, default=None, help='Train or Test', choices=['train', 'test', 'run']) parser.add_argument('--config_folder', type=str, default=None, help='Run all configs in folder') parser.add_argument('--config', type=str, default=None, help='Run all configs in folder') parser.add_argument('--name', type=str, default=None, help='prefix') parser.add_argument('--delete', action='store_true', help='Delete job') parser.add_argument('--delete_fail', action='store_true', help='Delete job') parser.add_argument('--delete_pending', action='store_true', help='Delete job') parser.add_argument('--log', action='store_true', help='Show logs') parser.add_argument('--delete_folder', action='store_true', help='Delete workdir folder') parser.add_argument('--permute_keypoints', action='store_true', help='Delete workdir folder') parser.add_argument('--dist', action='store_true', help='Distributed Training') parser.add_argument('--find_best', action='store_true', help='Find best according to val') parser.add_argument('--results', action='store_true', help='Show Results') parser.add_argument('--no_base', action='store_true', help='Skip base models') parser.add_argument('--show_cmd', action='store_true', help='Show command') parser.add_argument('--large', action='store_true', help='Use large node') parser.add_argument('--eval_three', action='store_true', help='Evaluate on 3 ckpts') parser.add_argument('--pck', type=float, default=0.2, help='PCK threshold') parser.add_argument('--auc', action='store_true', help='Evaluate AUC') parser.add_argument('--mpck', action='store_true', help='Evaluate mPCK') parser.add_argument('--check_logs', action='store_true', help='check runai logs instead of workdir') parser.add_argument('--stat', action='store_true', help='check runai status') parser.add_argument('--CVPR24', action='store_true', help='run on CVPR24 legacy folder') parser.add_argument('--run_best_ckpt', action='store_true', help='run on CVPR24 legacy folder') parser.add_argument('--num_samples', type=int, default=32, help='PCK threshold') parser.add_argument('--ft_epochs', type=int, default=None, help='Num of FT epochs') parser.add_argument('--masking', type=float, default=None, help='Num of FT epochs') parser.add_argument('--masking_lamda', type=float, default=None, help='Num of FT epochs') return parser.parse_args() def check_status(job_name): status = None status_command = f'runai describe job {job_name}' log = subprocess.run(status_command, shell=True, capture_output=True) log = log.stdout.decode('utf-8') pattern = r"Status:\s+(\w+)" match = re.search(pattern, log) if match: status = match.group(1) return status def train_is_running(job_name, status=['Running', 'Pending', 'Failed']): run_status = check_status(job_name) for stat in status: if run_status == stat: print(f'{Bcolors.FAIL}{job_name} is {stat}{Bcolors.ENDC}') return True return False def get_best_run(workdir_path, config, find_best): file_name = None ckpt_path = f'{workdir_path}/latest.pth' if find_best == 'best': local_path = f'work_dir_runai/{config.split(".")[0]}' if os.path.exists(local_path): file_names = [filename for filename in os.listdir(local_path) if filename.startswith("best_")] if len(file_names) > 0: file_name = file_names[0] ckpt_path = f'{workdir_path}/{file_name}' elif find_best == 'epoch_100': local_path = f'work_dir_runai/{config.split(".")[0]}' if os.path.exists(local_path): file_name = 'epoch_100.pth' if len(file_name) > 0: ckpt_path = f'{workdir_path}/{file_name}' return ckpt_path, file_name def check_runai_logs(job_name): os_command = f'runai logs {job_name}' # status = subprocess.run(os_command, shell=True, capture_output=True) # status = status.decode('utf-8') status = subprocess.run(os_command, shell=True, capture_output=True, text=True) status = status.stdout return status def get_run_name(config, args, run): run = run.replace('_', '-') lwr_config = config.lower() train_job_name = f'or-{lwr_config.split(".")[0].replace("_", "-")}' if len(train_job_name) > 60: renamed_config = name_abriviator(lwr_config) train_job_name = f'or-{renamed_config.split(".")[0].replace("_", "-")}'[:60] test_job_name = f'ev-{run}-{lwr_config.split(".")[0].replace("_", "-")}' if len(test_job_name) > 40: renamed_config = name_abriviator(lwr_config) test_job_name = f'ev-{run}-{renamed_config.split(".")[0].replace("_", "-")}'[:58] job_names = [train_job_name, test_job_name] for i in range(len(job_names)): if job_names[i].endswith('-'): job_names[i] = job_names[i][:-1] if args.name is not None: job_names[i] = f'{args.name}-{job_names[i]}' return job_names def name_abriviator(name): replace_dict = { 'encoder': 'enc', 'decoder': 'dec', 'look_twice': 'lt', 'cross_category': 'cc', 'max_hops': 'hops', 'lamda': 'l', 'symmetric': 'sym', 'auxiliary': 'aux', 'batch_size': 'bs', } for key, value in replace_dict.items(): name = name.replace(key, value) return name def check_skip(lwr_config, args): if args.no_base and 'base' in lwr_config: print(f'Skipping {Bcolors.OKCYAN}{lwr_config}{Bcolors.ENDC} - base model') return True # if not args.action == "train" and ('cross_category' in lwr_config or 'cross_cat' in lwr_config): # print( # f'Skipping {Bcolors.OKCYAN}{lwr_config}{Bcolors.ENDC} - test on cross_caregory, validation is the same as test') # return True return False def print_results(results): print(f'\n\n\n{Bcolors.OKGREEN}Scores{Bcolors.ENDC}') config_length = max(15, max(len(key) for key in results.keys())) config_column_width = config_length + 2 print(f'| {"Config":<{config_column_width}} | {"Max Value":<11} | {"Latest Value":<13} | {"Best Value":<10} | {"Best Epoch":<10} |') print(f'|{"-" * (config_column_width + 2)}|{"-" * 13}|{"-" * 15}|{"-" * 13}|{"-" * 11}|') for config, val_dict in sorted(results.items()): config_print = config.split('/')[-1].replace('.py', '') other_results = val_dict.copy() del other_results['latest'] best_key = max(other_results, key=other_results.get) latest_val = parse_result(val_dict['latest'], Bcolors.OKBLUE) best_val = parse_result(val_dict[best_key], Bcolors.HEADER) if val_dict['latest'] is None and val_dict[best_key] is None: max_val = f'{Bcolors.WARNING}No results{Bcolors.ENDC}' elif val_dict['latest'] is None: max_val = best_val elif val_dict[best_key] is None: max_val = latest_val else: max_val = latest_val if val_dict['latest'] > val_dict[best_key] else best_val # print as a table: config, max_val, latest_val, best_val print(f'| {config_print:<{config_column_width}} | {max_val:<20} | {latest_val:<22} | {best_val:<20} |{best_key:<10} |') # print(f'{config_print}: {round(max_val * 100, 2)} ' # f'Latest: {latest_val} {best_key}: {best_val}') def parse_result(value, color): if value is None: return f'{Bcolors.WARNING}No results{Bcolors.ENDC}' else: return f'{color}{round(value * 100, 2)}{Bcolors.ENDC}' def main(): delay = 1 args = init_parser() scores = {} stat = {} best_run = None if args.config_folder: configs = [] # list all py files in folder and subfolders if '*' in args.config_folder: config_folder = args.config_folder.strip("'") parent_folder = os.path.relpath(os.path.join(config_folder, os.pardir)) configs = [os.path.join(parent_folder, f) for f in os.listdir(parent_folder) if config_folder.split('*')[0] in os.path.join(parent_folder, f)] else: matched_folders = [args.config_folder] for matched_folder in matched_folders: for root, dirs, files in os.walk(matched_folder): for file in files: if file.endswith(".py"): configs.append(os.path.join(root, file)) else: configs = [args.config] print(f"{Bcolors.OKGREEN}Running {args.action} on {len(configs)} configs{Bcolors.ENDC}") if args.action == "test" and not args.eval_three and not args.find_best: runs = ['latest', 'best'] elif args.eval_three: runs = ['latest', 'best', 'epoch_100'] elif args.find_best: runs = ['best'] else: runs = ['latest'] for config_path in sorted(configs): for run in runs: config = config_path.split("/")[-2] + "_" + config_path.split("/")[-1].replace('_config', '') if args.CVPR24: workdir_path = f'/storage/orhir/capeformer_legacy/{config.split(".")[0]}' else: workdir_path = f'/storage/orhir/capeformer/{config.split(".")[0]}' local_workdir_path = f'work_dir_runai/{config.split(".")[0]}' lwr_config = config.lower() if check_skip(lwr_config, args): continue if args.action == "train" or args.action == "run": gpu = 4 if args.dist else 1 resource = f' -g {gpu}' else: # resource = f' --gpu-memory 4G --cpu 2 --memory 4G' resource = f' -g 0.3' if args.large: resource += f' --node-pools blaufer' if args.stat: train_job_name, job_name = get_run_name(config, args, run) if args.action == "train" or args.action == "run": job_name = train_job_name print(f'{"-" * 30 + Bcolors.OKCYAN + job_name + Bcolors.ENDC + "-" * 30}') status = check_status(job_name) stat[job_name] = status continue # else: # resource += f' --node-pools faculty' if args.action == "train": job_name, _ = get_run_name(config, args, run) if args.dist: py_command = (f'python -m torch.distributed.launch ' f'--nproc_per_node={gpu} --master_port=29500 ' f'train.py --gpus {gpu} --config {config_path} ' f'--work-dir {workdir_path} --autoscale-lr ' f'--launcher pytorch') else: py_command = (f'python train.py ' f' --config {config_path}' f' --work-dir {workdir_path}') elif args.action == "run": job_name, _ = get_run_name(config, args, run) if args.masking is not None: masking_precent = int(args.masking * 100) workdir_path = f'/storage/orhir/capeformer/CVPR25_ablation_mask_{masking_precent}' job_name += f'-{masking_precent}' if args.masking_lamda: workdir_path = f'/storage/orhir/capeformer/CVPR25_ablation_mask_lamda_{int(args.masking_lamda)}' job_name += f'-lamda-{int(args.masking_lamda)}' py_command = (f'python run.py ' f' --config {config_path}' f' --work_dir {workdir_path}') if args.run_best_ckpt: py_command += ' --best' job_name += '-best' if args.ft_epochs: py_command += f' --ft_epochs {args.ft_epochs}' if args.masking: py_command += f' --masking_ratio {args.masking}' if args.masking_lamda: py_command += f' --lamda_masking {args.masking_lamda}' else: train_job_name, job_name = get_run_name(config, args, run) ckpt_path, best_run = get_best_run(workdir_path, config, run) py_command = f'python test.py {config_path} {ckpt_path} --num_samples {args.num_samples}' if args.permute_keypoints: py_command += ' --permute_keypoints' job_name = (job_name + '-permute-keypoints')[:60] print(f'{"-" * 30 + Bcolors.OKCYAN + job_name + Bcolors.ENDC + "-" * 30}') if args.log: os_command = f'runai logs {job_name}' elif args.delete_fail: if not train_is_running(job_name, ['Failed', 'Error']): print("Job not failed, skipping...") continue os_command = f'runai delete job {job_name}' elif args.delete_pending: if not train_is_running(job_name, ['Pending']): continue os_command = f'runai delete job {job_name}' elif args.delete: os_command = f'runai delete job {job_name}' elif args.results: if args.check_logs: # First check if the job is completed status = check_runai_logs(job_name) else: if args.action == 'run': log_file = os.path.join(f'work_dir_runai/{config.split(".")[0]}', 'base_skeleton_bias', 'testing_log.txt') else: log_file = os.path.join(f'work_dir_runai/{config.split(".")[0]}', 'testing_log.txt') if os.path.exists(log_file): with open(log_file, 'r') as f: status = f.read() # Parse config: match = re.search(f'\*\*[\s\S]*?checkpoint:\s*.*?{run}[\s\S]*?(AUC:[\s\S]*?mPCK:\s*[\d.]+)', status) if match: status = match.group(1) else: status = '' delay = 0 else: status = check_runai_logs(job_name) if args.auc and 'AUC' in status: score = float(status.split('AUC: ')[1].split('\n')[0]) elif args.mpck and 'mPCK' in status: score = float(status.split('mPCK: ')[1].split('\n')[0]) elif f'PCK@{args.pck}:' in status: score = float(status.split(f'PCK@{args.pck}: ')[1].split('\n')[0]) else: score = None best_run = best_run.replace('best_PCK_', '').strip('.pth') if best_run else "No Best" key = 'latest' if run == 'latest' else best_run if config in scores: scores[config][key] = score else: scores[config] = {key: score} continue else: if args.action == 'test': if not train_is_running(train_job_name, ['Completed', 'Succeeded']): print('Train not completed') continue os_command = (f'runai submit --pvc=storage:/storage -i orhir/capeformer ' f' --name {job_name} {resource} --large-shm ' f' --command -- {py_command}') # print(os_command) if args.show_cmd: print(f'{Bcolors.OKGREEN}{os_command}{Bcolors.ENDC}') subprocess.run(os_command, shell=True) if args.delete_folder: if os.path.exists(local_workdir_path): subprocess.run(f'rm -rf {local_workdir_path}', shell=True) else: subprocess.run(f'echo {Bcolors.WARNING}No workdir folder to delete{Bcolors.ENDC}', shell=True) # print(f'\n{"-" * 150}') time.sleep(delay) if args.results: print_results(scores) if args.stat: pretty_table(stat) if __name__ == "__main__": main()