Spaces:
Sleeping
Sleeping
# Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor. | |
# | |
# #TODO: license: MIT pending (evaluation suite itself can be completely open, nothing copyleft from the dataset reaches us here) | |
"""TODO: Add a description here.""" | |
# TODO: Add BibTeX citation | |
_CITATION = """\ | |
@InProceedings{huggingface:module, | |
title = {A great new module}, | |
authors={huggingface, Inc.}, | |
year={2020} | |
} | |
""" | |
# TODO: Add description of the module here | |
_DESCRIPTION = """\ | |
This EvaluationSuite currently solves {1} tasks to test code intelligence of genereative language models for "creative programming" (fragment shaders). | |
""" | |
# via https://huggingface.co/docs/evaluate/evaluation_suite | |
import evaluate | |
from evaluate import evaluator #used by Suite.run() | |
from evaluate.evaluator.utils import DatasetColumn # used in .prepare_data() | |
from evaluate.evaluation_suite import SubTask | |
from datasets import Dataset | |
from typing import Any, Callable, Dict, List, Optional, Union # used in .prepare_pipeline() | |
import transformers | |
from transformers import Pipeline, pipeline | |
from datasets import load_dataset #used by Suite.run() | |
# write a custom evaluator, inherent from: https://github.com/huggingface/evaluate/blob/v0.4.0/src/evaluate/evaluator/text_generation.py#L31 | |
class ReturnGenerationEvaluator(evaluate.TextGenerationEvaluator): | |
def __init__(self, task="text-generation", default_metric_name="exact_match", predictions_prefix: str = "generated"): | |
super().__init__(task=task, default_metric_name=default_metric_name) | |
self.predictions_prefix = predictions_prefix | |
PIPELINE_KWARGS = {"return_full_text":False, "do_sample":False} #these kwargs are for the pipeline call, not the pipeline init. | |
# for the pipeline init we need to copy the whole function and add two lines. this still prints errors due to the pad_toke_id = eos_token_id change. | |
# from: https://github.com/huggingface/evaluate/blob/v0.4.0/src/evaluate/evaluator/base.py#L375 | |
def prepare_pipeline( | |
self, | |
model_or_pipeline: Union[str, "Pipeline", Callable, "PreTrainedModel", "TFPreTrainedModel"], # noqa: F821 | |
tokenizer: Union["PreTrainedTokenizerBase", "FeatureExtractionMixin"] = None, # noqa: F821 | |
feature_extractor: Union["PreTrainedTokenizerBase", "FeatureExtractionMixin"] = None, # noqa: F821 | |
device: int = None, | |
): | |
""" | |
Prepare pipeline. | |
Args: | |
model_or_pipeline (`str` or `Pipeline` or `Callable` or `PreTrainedModel` or `TFPreTrainedModel`, | |
defaults to `None`): | |
If the argument in not specified, we initialize the default pipeline for the task. If the argument is of the type `str` or | |
is a model instance, we use it to initialize a new `Pipeline` with the given model. Otherwise we assume the | |
argument specifies a pre-initialized pipeline. | |
preprocessor (`PreTrainedTokenizerBase` or `FeatureExtractionMixin`, *optional*, defaults to `None`): | |
Argument can be used to overwrite a default preprocessor if `model_or_pipeline` represents a model for | |
which we build a pipeline. If `model_or_pipeline` is `None` or a pre-initialized pipeline, we ignore | |
this argument. | |
Returns: | |
The initialized pipeline, with modifications for the specific task of generating text, even with long inputs. | |
""" | |
if device is None: | |
device = self._infer_device() | |
if ( | |
isinstance(model_or_pipeline, str) | |
or isinstance(model_or_pipeline, transformers.PreTrainedModel) | |
or isinstance(model_or_pipeline, transformers.TFPreTrainedModel) | |
): | |
pipe = pipeline( | |
self.task, | |
model=model_or_pipeline, | |
tokenizer=tokenizer, | |
feature_extractor=feature_extractor, | |
device=device, | |
# my additions here: | |
handle_long_generation= "hole", #our solution? relevant: https://github.com/huggingface/transformers/issues/14033#issuecomment-948385227 | |
# pad_token_id=tokenizer.eos_token_id, #to avoid the warning, however there might be issues as tokenizers will call this differently. | |
do_sample=False, #important to get reproduceable results but we need to make sure the generator is deterministic | |
) | |
else: | |
if model_or_pipeline is None: | |
pipe = pipeline(self.task, device=device) | |
else: | |
pipe = model_or_pipeline | |
# if tokenizer is not None and feature_extractor is not None: | |
# logger.warning("Ignoring the value of the preprocessor argument (`tokenizer` or `feature_extractor`).") #excluded warning because I didn't import logger | |
if (pipe.task != self.task) and not (self.task == "translation" and pipe.task.startswith("translation")): | |
raise ValueError( | |
f"Incompatible `model_or_pipeline`. Please specify `model_or_pipeline` compatible with the `{self.task}` task." | |
) | |
return pipe | |
def _resolve_context_lenght(self, model_or_pipeline=None): #TODO should really copy the typing hints here. | |
# tokenizer needs to know the context length for our pipe strategy, but it has to be passed to the tokenizer, not model. | |
# the tokenizer should read from the model config, but that can be wrong, or it has a task overwrite (for "text-generation" for example you get 50) | |
#model_or_pipeline only exists via the .compute call, so we have to take it in | |
# model_or_pipeline.tokenier.config.max_new_tokens = 1024 # we shouldn't return it, but overwrite the tokenizer config, which the pipeline relies on. | |
return 1024 # we shouldn't return it, but overwrite the tokenizer config, which the pipeline relies on. | |
def _estimate_stopping(self, labels, **kwargs): | |
""" estimates max_new_tokens for the pipeline call | |
by counting the characters in the longest string of the references and multiplying by 2 (for good measure but probably not needed) | |
Args: | |
labels: A list of dicts by knowing the labels | |
Returns: | |
`int`: the estimated max_new_tokens, should be smaller than context_lenght in all cases | |
""" | |
context_lenght = self._resolve_context_lenght(**kwargs) | |
estimate = min(max([len(ref) for ref in labels])*2, context_lenght) | |
return estimate | |
# this one needs to be adjusted | |
def predictions_processor(self, predictions, *args, **kwargs): | |
""" | |
processes the output of the pipeline to be compatible with the metric. | |
generated texts cut off by the first semicolon and whitespaces are stripped (using python str builtins) | |
Args: | |
predictions: A list of lists of dicts | |
Returns: | |
`dict`: All the processed text are flattened and stored under the "predictions" key. | |
""" | |
return {"predictions": [pred[f"{self.predictions_prefix}_text"].split(";")[0].strip() for pred_list in predictions for pred in pred_list]} | |
# straight copy, doesn't seem to give me the | |
def prepare_data(self, data: Dataset, input_column: str, label_column: str, *args, **kwargs): | |
""" | |
Prepare data. | |
Args: | |
data (`Dataset`): Specifies the dataset we will run evaluation on. | |
input_column (`str`, defaults to `"text"`): | |
the name of the column containing the text feature in the dataset specified by `data`. | |
label_column (`str`, defaults to `"label"`): | |
the name of the column containing the labels in the dataset specified by `data`. | |
Returns: | |
`dict`: metric inputs. everything before the first semicolon and whitespaces are stripped (using python str builtins, just like the pred prep) | |
`list`: pipeline inputs. | |
""" | |
self.check_required_columns(data, {"input_column": input_column, "label_column": label_column}) #this will throw and exception with useful error messages | |
# don't put everything in the return statement, so you have the control... | |
references = [ref.split(";")[0].strip() for ref in data[label_column]] | |
self.PIPELINE_KWARGS.update({"max_new_tokens": self._estimate_stopping(references)}) #this is a hack, does it work tho? | |
return {"references": references}, data[input_column] #DatasetColumn(data, input_column) doesn't seem to work. data[input_column] does, but ignores any of the features of the helper class.. | |
# via: https://huggingface.co/docs/evaluate/evaluation_suite | |
# relevant source: https://github.com/huggingface/evaluate/blob/v0.4.0/src/evaluate/evaluation_suite/__init__.py | |
class Suite(evaluate.EvaluationSuite): | |
def __init__(self, name): | |
super().__init__(name) | |
self.preprocessor = lambda x: {"return_statement": x["return_statement"].split(";")[0]} #like this? refactored to RetrunGenerationEvaluator | |
self.suite = [ | |
# more subtasks are only possible once we can pass custom evaluators. -> https://github.com/huggingface/evaluate/pull/367 | |
SubTask( #this one is adjusted already | |
task_type="text-generation", #this call an evaluator, but can you specify your own custom evaluator instead? | |
data="Vipitis/Shadertoys-fine", | |
subset="return_completion", | |
split="test", # use this to select a subset of the data during testing, perhaps remove later? | |
args_for_task={ | |
# "metric": "exact_match", | |
"input_column": "body", | |
"label_column": "return_statement", | |
} | |
) | |
] | |
# from: https://github.com/huggingface/evaluate/blob/v0.4.0/src/evaluate/evaluation_suite/__init__.py#LL103C5-L129C27 | |
def run( | |
self, model_or_pipeline: Union[str, "Pipeline", Callable, "PreTrainedModel", "TFPreTrainedModel"] = "Vipitis/CodeGPT-small-java-adaptedGPT2-transfer-shadertoys", #not so useful default model? | |
snippet: int = "" # noqa: F821 | |
) -> Dict[str, float]: | |
self.assert_suite_nonempty() | |
results_all = [] | |
for task in self.suite: | |
task_name = task.data | |
if task.data_preprocessor: # task requires extra preprocessing is all done inside the Evaluator | |
ds = load_dataset(task.data, name=task.subset, split=(task.split + f"[:{snippet}]")) | |
task.data = ds.map(task.data_preprocessor) | |
task_evaluator = ReturnGenerationEvaluator() #this is the change we make: specify our custom evaluator from above. | |
args_for_task = task.args_for_task | |
args_for_task["model_or_pipeline"] = model_or_pipeline | |
args_for_task["data"] = task.data | |
args_for_task["subset"] = task.subset | |
args_for_task["split"] = (task.split + f"[:{snippet}]") #make a downselection of the split via keywordarg in the .run() call? | |
results = task_evaluator.compute(**args_for_task) | |
results["task_name"] = task_name + "/" + task.subset if task.subset else task_name | |
results["data_preprocessor"] = str(task.data_preprocessor) if task.data_preprocessor is not None else None | |
results_all.append(results) | |
return results_all |