Spaces:
Running
Running
import os | |
import torch | |
import datasets | |
import transformers | |
import json | |
from .schema import ActionInfo, EnvException, EnhancedJSONEncoder | |
from reactagent.prompt2model.prompt_parser import MockPromptSpec, TaskType | |
from reactagent.prompt2model.dataset_retriever import DescriptionDatasetRetriever | |
from reactagent.prompt2model.dataset_generator import PromptBasedDatasetGenerator, DatasetSplit | |
from reactagent.prompt2model.dataset_processor import TextualizeProcessor | |
from reactagent.prompt2model.model_retriever import DescriptionModelRetriever | |
from reactagent.prompt2model.model_trainer import GenerationModelTrainer | |
from reactagent.prompt2model.model_executor import GenerationModelExecutor, ModelOutput | |
from reactagent.prompt2model.model_evaluator import Seq2SeqEvaluator | |
def generate_dataset(instruction, examples, save_dir, num_train, num_valid, num_test, work_dir = '.', **kwargs): | |
try: | |
num_train = int(num_train) | |
num_valid = int(num_valid) | |
num_test = int(num_test) | |
except ValueError: | |
raise EnvException("Number of examples should be an integer") | |
prompt_spec = MockPromptSpec(TaskType.TEXT_GENERATION, instruction=instruction, examples=examples) | |
generator = PromptBasedDatasetGenerator() | |
dataset_dict = generator.generate_dataset_dict(prompt_spec, { | |
DatasetSplit.TRAIN: num_train, | |
DatasetSplit.VAL: num_valid, | |
DatasetSplit.TEST: num_test | |
}) | |
save_path = os.path.join(work_dir, save_dir) | |
dataset_dict.save_to_disk(save_path) | |
return f"Dataset successfully generated and saved to {save_path}" | |
def retrieve_dataset(instruction, save_dir, work_dir = '.', **kwargs): | |
prompt_spec = MockPromptSpec(TaskType.TEXT_GENERATION, instruction=instruction, examples="") | |
retriever = DescriptionDatasetRetriever() | |
dataset_dict = retriever.retrieve_dataset_dict(prompt_spec) | |
save_path = os.path.join(work_dir, save_dir) | |
dataset_dict.save_to_disk(save_path) | |
return f"Dataset successfully generated and saved to {save_path}" | |
def retrieve_model(instruction, work_dir = '.', **kwargs): | |
prompt_spec = MockPromptSpec(TaskType.TEXT_GENERATION, instruction=instruction, examples="") | |
retriever = DescriptionModelRetriever(use_bm25=True, use_HyDE=True) | |
top_models = retriever.retrieve(prompt_spec) | |
return "Top Models:\n" + "".join(f"{i+1}. {model}\n" for i, model in enumerate(top_models)) | |
def process_dataset(instruction, load_dirs, save_dirs, work_dir = '.', **kwargs): | |
prompt_spec = MockPromptSpec(TaskType.TEXT_GENERATION, instruction=instruction, examples="") | |
load_dirs = load_dirs.split(':') | |
save_dirs = save_dirs.split(':') | |
if len(load_dirs) != len(save_dirs): | |
raise EnvException("Number of load directories should match number of save directories") | |
load_paths = [os.path.join(work_dir, load_dir) for load_dir in load_dirs] | |
save_paths = [os.path.join(work_dir, save_dir) for save_dir in save_dirs] | |
# load the datasets | |
dataset_dicts = [datasets.load_from_disk(load_path) for load_path in load_paths] | |
# process the datasets | |
processor = TextualizeProcessor(has_encoder=True) | |
modified_dataset_dicts = processor.process_dataset_dict(prompt_spec, dataset_dicts) | |
# save the processed datasets | |
for dataset_dict, save_path in zip(modified_dataset_dicts, save_paths): | |
dataset_dict.save_to_disk(save_path) | |
return f"Data successfully processed and saved to {save_paths}" | |
def train_model(model_name, load_dirs, result_dir, epochs, batch_size, warmup_steps, weight_decay, learning_rate, work_dir = '.', **kwargs): | |
try: | |
epochs = int(epochs) | |
batch_size = int(batch_size) | |
warmup_steps = int(warmup_steps) | |
weight_decay = float(weight_decay) | |
learning_rate = float(learning_rate) | |
except ValueError: | |
raise EnvException("Numerical parameters should be integers or floats as appropriate") | |
load_dirs = load_dirs.split(':') | |
result_dir = os.path.join(work_dir, result_dir) | |
# load the datasets | |
load_paths = [os.path.join(work_dir, load_dir) for load_dir in load_dirs] | |
dataset_dicts = [datasets.load_from_disk(load_path) for load_path in load_paths] | |
training_datasets = [dataset_dict["train"] for dataset_dict in dataset_dicts] | |
validation_datasets = [dataset_dict["val"] for dataset_dict in dataset_dicts] | |
trainer = GenerationModelTrainer( | |
model_name, | |
has_encoder=True, | |
executor_batch_size=batch_size, | |
tokenizer_max_length=1024, | |
sequence_max_length=1280, | |
) | |
hparams ={ | |
"output_dir": os.path.join(result_dir, "training_output"), | |
"save_strategy": "epoch", | |
"num_train_epochs": epochs, | |
"per_device_train_batch_size": batch_size, | |
"evaluation_strategy": "epoch", | |
"warmup_steps": warmup_steps, | |
"weight_decay": weight_decay, | |
"learning_rate": learning_rate, | |
}, | |
trained_model, trained_tokenizer = trainer.train_model( | |
hyperparameter_choices=hparams, | |
training_datasets=training_datasets, | |
validation_datasets=validation_datasets, | |
) | |
trained_model.save_pretrained(os.path.join(result_dir, "trained_model")) | |
trained_tokenizer.save_pretrained(os.path.join(result_dir, "trained_tokenizer")) | |
return f"Model and Tokenizer successfully trained and saved respectively to {result_dir}/trained_model and {result_dir}/trained_tokenizer" | |
def execute_model(result_dir, load_dirs, save_path, batch_size, input_column, work_dir = '.', **kwargs): | |
load_dirs = load_dirs.split(':') | |
result_dir = os.path.join(work_dir, result_dir) | |
save_path = os.path.join(work_dir, save_path) | |
try: | |
batch_size = int(batch_size) | |
except ValueError: | |
raise EnvException("Batch size should be an integer") | |
# load the datasets | |
load_paths = [os.path.join(work_dir, load_dir) for load_dir in load_dirs] | |
dataset_dicts = [datasets.load_from_disk(load_path) for load_path in load_paths] | |
test_datasets = [dataset_dict["test"] for dataset_dict in dataset_dicts] | |
test_dataset = datasets.concatenate_datasets(test_datasets) | |
trained_model_path = os.path.join(result_dir, "trained_model") | |
trained_tokenizer_path = os.path.join(result_dir, "trained_tokenizer") | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
trained_model = transformers.AutoModelForSeq2SeqLM.from_pretrained(trained_model_path).to(device) | |
trained_tokenizer = transformers.AutoTokenizer.from_pretrained(trained_tokenizer_path) | |
executor = GenerationModelExecutor( | |
trained_model, | |
trained_tokenizer, | |
batch_size, | |
tokenizer_max_length=1024, | |
sequence_max_length=1280, | |
) | |
outputs = executor.make_prediction( | |
test_set=test_dataset, | |
input_column=input_column | |
) | |
with open(save_path, 'w') as f: | |
json.dump(outputs, f, cls=EnhancedJSONEncoder) | |
return f"Model successfully executed on the test sets of the specified datasets and saved to {save_path}" | |
def evaluate_model(load_dirs, save_path, output_column, work_dir = '.', **kwargs): | |
load_dirs = load_dirs.split(':') | |
# load the datasets | |
load_paths = [os.path.join(work_dir, load_dir) for load_dir in load_dirs] | |
dataset_dicts = [datasets.load_from_disk(load_path) for load_path in load_paths] | |
test_datasets = [dataset_dict["test"] for dataset_dict in dataset_dicts] | |
test_dataset = datasets.concatenate_datasets(test_datasets) | |
save_path = os.path.join(work_dir, save_path) | |
with open(save_path, 'r') as f: | |
outputs = json.load(f) | |
outputs = [ModelOutput(**output) for output in outputs] | |
evaluator = Seq2SeqEvaluator() | |
metric_values = evaluator.evaluate_model( | |
test_dataset, | |
gt_column=output_column, | |
predictions=outputs, | |
encoder_model_name="xlm-roberta-base", | |
) | |
return f"Evaluation metrics: {metric_values}" | |
P2M_ACTIONS = [ | |
ActionInfo( | |
name="Retrieve Model", | |
description="Retrieve a suitable model based on a detailed description of the requirements. You can obtain the model given the name using the transformers.AutoModel.from_pretrained function.", | |
usage={ | |
"instruction": "an instruction on how to generate the output from the input", | |
}, | |
return_value="The observation will be a list of suitable models. You can choose one of them based on the requirements.", | |
is_primitive=False, | |
function=retrieve_model | |
), | |
] | |
# P2M_ACTIONS = [ | |
# ActionInfo( | |
# name="Generate Dataset", | |
# description="Generate a dataset based on an instruction and examples. You can load the dataset later from `save_dir` using the load_from_disk function of the HuggingFace datasets library.", | |
# usage={ | |
# "instruction": "an instruction on how to generate the output from the input", | |
# "examples": "examples of input-output pairs", | |
# "save_dir": "directory to save the generated dataset dict to. We recommend saving to data/generated/", | |
# "num_train": "number of examples to generate in the training set", | |
# "num_valid": "number of examples to generate in the validation set", | |
# "num_test": "number of examples to generate in the test set", | |
# }, | |
# return_value="The observation will be a success message if the dataset was generated successfully. Otherwise, an error message will be returned.", | |
# is_primitive=False, | |
# function=generate_dataset | |
# ), | |
# ActionInfo( | |
# name="Retrieve Dataset", | |
# description="Retrieve a suitable dataset based on a detailed description of the requirements. You can load the dataset later from `save_dir` using the load_from_disk function of the HuggingFace datasets library.", | |
# usage={ | |
# "instruction": "an instruction on how to generate the output from the input", | |
# "save_dir": "directory to save the generated dataset dict to. We recommend saving to data/retrieved/", | |
# }, | |
# return_value="The observation will be a success message if the dataset was retrieved successfully. Otherwise, an error message will be returned.", | |
# is_primitive=False, | |
# function=retrieve_dataset | |
# ), | |
# ActionInfo( | |
# name="Retrieve Model", | |
# description="Retrieve a suitable model based on a detailed description of the requirements. You can obtain the model given the name using the transformers.AutoModelForSeq2SeqLM.from_pretrained function.", | |
# usage={ | |
# "instruction": "an instruction on how to generate the output from the input", | |
# }, | |
# return_value="The observation will be a list of suitable models. You can choose one of them based on the requirements.", | |
# is_primitive=False, | |
# function=retrieve_model | |
# ), | |
# ActionInfo( | |
# name="Process Dataset", | |
# description="Process dataset based on a detailed description of the requirements. You can load the processed data later from `save_dirs` using the load_from_disk function of the HuggingFace datasets library. The input text will be in the `model_input` column and the output text will be in the `model_output` column.", | |
# usage={ | |
# "instruction": "an instruction on how to generate the output from the input", | |
# "load_dirs": "directories to load the dataset dicts from, separated by colons", | |
# "save_dirs": "directories to save the processed dataset dicts to, separated by colons. The order should match the order of the loaded datasets. We recommend saving to data/processed/", | |
# }, | |
# return_value="The observation will be a success message if the data was processed successfully. Otherwise, an error message will be returned.", | |
# is_primitive=False, | |
# function=process_dataset | |
# ), | |
# ActionInfo( | |
# name="Train Model", | |
# description="Train a Seq2Seq model from HuggingFace transformers library using the processed datasets and given hyperparameters.", | |
# usage={ | |
# "model_name": "name of the model to train", | |
# "load_dirs": "directories to load the dataset dicts from, separated by colons", | |
# "result_dir": "directory to save the trained model and tokenizer to. We recommend using results/{trial_id}/. The trained model will be available as `{result_dir}/trained_model/` and the tokenizer will be available as `{result_dir}/trained_tokenizer/`.", | |
# "epochs": "number of epochs to train the model for", | |
# "batch_size": "batch size for training the model", | |
# "warmup_steps": "number of warmup steps for the optimizer", | |
# "weight_decay": "weight decay for the optimizer", | |
# "learning_rate": "learning rate for the optimizer", | |
# }, | |
# return_value="The observation will be a success message if the model was trained successfully. Otherwise, an error message will be returned.", | |
# is_primitive=False, | |
# function=train_model | |
# ), | |
# ActionInfo( | |
# name="Execute Model on Test Set", | |
# description="Execute a trained model on the test sets of specified dataset dicts.", | |
# usage={ | |
# "result_dir": "directory where the trained model and tokenizer are saved", | |
# "load_dirs": "directories to load the dataset dicts from, separated by colons", | |
# "save_path": "file to save the results of the model execution in json format", | |
# "batch_size": "batch size for executing the model", | |
# "input_column": "column name of the input text", | |
# }, | |
# return_value="The observation will be a success message if the model was executed successfully. Otherwise, an error message will be returned.", | |
# is_primitive=False, | |
# function=execute_model, | |
# ), | |
# ActionInfo( | |
# name="Evaluate Model", | |
# description="Evaluate a trained model on the test sets of specified dataset dicts.", | |
# usage={ | |
# "load_dirs": "directories to load the dataset dicts from, separated by colons", | |
# "save_path": "file to load the results of the model execution in json format", | |
# "output_column": "column name of the output text", | |
# }, | |
# return_value="The values for various evaluation metrics will be returned.", | |
# is_primitive=False, | |
# function=evaluate_model, | |
# ) | |
# ] | |