Spaces:
Running
Running
import csv | |
import json | |
import lzma | |
import os | |
import tarfile | |
import textwrap | |
import datasets | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
import pytest | |
from datasets import config | |
from datasets.arrow_dataset import Dataset | |
from datasets.features import ClassLabel, Features, Sequence, Value | |
def set_test_cache_config(tmp_path_factory, monkeypatch): | |
# test_hf_cache_home = tmp_path_factory.mktemp("cache") # TODO: why a cache dir per test function does not work? | |
test_hf_cache_home = tmp_path_factory.getbasetemp() / "cache" | |
test_hf_evaluate_cache = test_hf_cache_home / "datasets" | |
test_hf_metrics_cache = test_hf_cache_home / "metrics" | |
test_hf_modules_cache = test_hf_cache_home / "modules" | |
monkeypatch.setattr("evaluate.config.HF_EVALUATE_CACHE", str(test_hf_evaluate_cache)) | |
monkeypatch.setattr("evaluate.config.HF_METRICS_CACHE", str(test_hf_metrics_cache)) | |
monkeypatch.setattr("evaluate.config.HF_MODULES_CACHE", str(test_hf_modules_cache)) | |
test_DOWNLOADED_EVALUATE_PATH = test_hf_evaluate_cache / "downloads" | |
monkeypatch.setattr("evaluate.config.DOWNLOADED_EVALUATE_PATH", str(test_DOWNLOADED_EVALUATE_PATH)) | |
test_EXTRACTED_EVALUATE_PATH = test_hf_evaluate_cache / "downloads" / "extracted" | |
monkeypatch.setattr("evaluate.config.EXTRACTED_EVALUATE_PATH", str(test_EXTRACTED_EVALUATE_PATH)) | |
def disable_tqdm_output(): | |
datasets.disable_progress_bar() | |
def set_update_download_counts_to_false(monkeypatch): | |
# don't take tests into account when counting downloads | |
monkeypatch.setattr("evaluate.config.HF_UPDATE_DOWNLOAD_COUNTS", False) | |
monkeypatch.setattr("datasets.config.HF_UPDATE_DOWNLOAD_COUNTS", False) | |
FILE_CONTENT = """\ | |
Text data. | |
Second line of data.""" | |
def dataset(): | |
n = 10 | |
features = Features( | |
{ | |
"tokens": Sequence(Value("string")), | |
"labels": Sequence(ClassLabel(names=["negative", "positive"])), | |
"answers": Sequence( | |
{ | |
"text": Value("string"), | |
"answer_start": Value("int32"), | |
} | |
), | |
"id": Value("int64"), | |
} | |
) | |
dataset = Dataset.from_dict( | |
{ | |
"tokens": [["foo"] * 5] * n, | |
"labels": [[1] * 5] * n, | |
"answers": [{"answer_start": [97], "text": ["1976"]}] * 10, | |
"id": list(range(n)), | |
}, | |
features=features, | |
) | |
return dataset | |
def arrow_file(tmp_path_factory, dataset): | |
filename = str(tmp_path_factory.mktemp("data") / "file.arrow") | |
dataset.map(cache_file_name=filename) | |
return filename | |
def text_file(tmp_path_factory): | |
filename = tmp_path_factory.mktemp("data") / "file.txt" | |
data = FILE_CONTENT | |
with open(filename, "w") as f: | |
f.write(data) | |
return filename | |
def xz_file(tmp_path_factory): | |
filename = tmp_path_factory.mktemp("data") / "file.txt.xz" | |
data = bytes(FILE_CONTENT, "utf-8") | |
with lzma.open(filename, "wb") as f: | |
f.write(data) | |
return filename | |
def gz_file(tmp_path_factory): | |
import gzip | |
path = str(tmp_path_factory.mktemp("data") / "file.txt.gz") | |
data = bytes(FILE_CONTENT, "utf-8") | |
with gzip.open(path, "wb") as f: | |
f.write(data) | |
return path | |
def bz2_file(tmp_path_factory): | |
import bz2 | |
path = tmp_path_factory.mktemp("data") / "file.txt.bz2" | |
data = bytes(FILE_CONTENT, "utf-8") | |
with bz2.open(path, "wb") as f: | |
f.write(data) | |
return path | |
def zstd_file(tmp_path_factory): | |
if config.ZSTANDARD_AVAILABLE: | |
import zstandard as zstd | |
path = tmp_path_factory.mktemp("data") / "file.txt.zst" | |
data = bytes(FILE_CONTENT, "utf-8") | |
with zstd.open(path, "wb") as f: | |
f.write(data) | |
return path | |
def lz4_file(tmp_path_factory): | |
if config.LZ4_AVAILABLE: | |
import lz4.frame | |
path = tmp_path_factory.mktemp("data") / "file.txt.lz4" | |
data = bytes(FILE_CONTENT, "utf-8") | |
with lz4.frame.open(path, "wb") as f: | |
f.write(data) | |
return path | |
def xml_file(tmp_path_factory): | |
filename = tmp_path_factory.mktemp("data") / "file.xml" | |
data = textwrap.dedent( | |
"""\ | |
<?xml version="1.0" encoding="UTF-8" ?> | |
<tmx version="1.4"> | |
<header segtype="sentence" srclang="ca" /> | |
<body> | |
<tu> | |
<tuv xml:lang="ca"><seg>Contingut 1</seg></tuv> | |
<tuv xml:lang="en"><seg>Content 1</seg></tuv> | |
</tu> | |
<tu> | |
<tuv xml:lang="ca"><seg>Contingut 2</seg></tuv> | |
<tuv xml:lang="en"><seg>Content 2</seg></tuv> | |
</tu> | |
<tu> | |
<tuv xml:lang="ca"><seg>Contingut 3</seg></tuv> | |
<tuv xml:lang="en"><seg>Content 3</seg></tuv> | |
</tu> | |
<tu> | |
<tuv xml:lang="ca"><seg>Contingut 4</seg></tuv> | |
<tuv xml:lang="en"><seg>Content 4</seg></tuv> | |
</tu> | |
<tu> | |
<tuv xml:lang="ca"><seg>Contingut 5</seg></tuv> | |
<tuv xml:lang="en"><seg>Content 5</seg></tuv> | |
</tu> | |
</body> | |
</tmx>""" | |
) | |
with open(filename, "w") as f: | |
f.write(data) | |
return filename | |
DATA = [ | |
{"col_1": "0", "col_2": 0, "col_3": 0.0}, | |
{"col_1": "1", "col_2": 1, "col_3": 1.0}, | |
{"col_1": "2", "col_2": 2, "col_3": 2.0}, | |
{"col_1": "3", "col_2": 3, "col_3": 3.0}, | |
] | |
DATA2 = [ | |
{"col_1": "4", "col_2": 4, "col_3": 4.0}, | |
{"col_1": "5", "col_2": 5, "col_3": 5.0}, | |
] | |
DATA_DICT_OF_LISTS = { | |
"col_1": ["0", "1", "2", "3"], | |
"col_2": [0, 1, 2, 3], | |
"col_3": [0.0, 1.0, 2.0, 3.0], | |
} | |
DATA_312 = [ | |
{"col_3": 0.0, "col_1": "0", "col_2": 0}, | |
{"col_3": 1.0, "col_1": "1", "col_2": 1}, | |
] | |
DATA_STR = [ | |
{"col_1": "s0", "col_2": 0, "col_3": 0.0}, | |
{"col_1": "s1", "col_2": 1, "col_3": 1.0}, | |
{"col_1": "s2", "col_2": 2, "col_3": 2.0}, | |
{"col_1": "s3", "col_2": 3, "col_3": 3.0}, | |
] | |
def dataset_dict(): | |
return DATA_DICT_OF_LISTS | |
def arrow_path(tmp_path_factory): | |
dataset = Dataset.from_dict(DATA_DICT_OF_LISTS) | |
path = str(tmp_path_factory.mktemp("data") / "dataset.arrow") | |
dataset.map(cache_file_name=path) | |
return path | |
def csv_path(tmp_path_factory): | |
path = str(tmp_path_factory.mktemp("data") / "dataset.csv") | |
with open(path, "w", newline="") as f: | |
writer = csv.DictWriter(f, fieldnames=["col_1", "col_2", "col_3"]) | |
writer.writeheader() | |
for item in DATA: | |
writer.writerow(item) | |
return path | |
def csv2_path(tmp_path_factory): | |
path = str(tmp_path_factory.mktemp("data") / "dataset2.csv") | |
with open(path, "w", newline="") as f: | |
writer = csv.DictWriter(f, fieldnames=["col_1", "col_2", "col_3"]) | |
writer.writeheader() | |
for item in DATA: | |
writer.writerow(item) | |
return path | |
def bz2_csv_path(csv_path, tmp_path_factory): | |
import bz2 | |
path = tmp_path_factory.mktemp("data") / "dataset.csv.bz2" | |
with open(csv_path, "rb") as f: | |
data = f.read() | |
# data = bytes(FILE_CONTENT, "utf-8") | |
with bz2.open(path, "wb") as f: | |
f.write(data) | |
return path | |
def zip_csv_path(csv_path, csv2_path, tmp_path_factory): | |
import zipfile | |
path = tmp_path_factory.mktemp("data") / "dataset.csv.zip" | |
with zipfile.ZipFile(path, "w") as f: | |
f.write(csv_path, arcname=os.path.basename(csv_path)) | |
f.write(csv2_path, arcname=os.path.basename(csv2_path)) | |
return path | |
def zip_csv_with_dir_path(csv_path, csv2_path, tmp_path_factory): | |
import zipfile | |
path = tmp_path_factory.mktemp("data") / "dataset_with_dir.csv.zip" | |
with zipfile.ZipFile(path, "w") as f: | |
f.write(csv_path, arcname=os.path.join("main_dir", os.path.basename(csv_path))) | |
f.write(csv2_path, arcname=os.path.join("main_dir", os.path.basename(csv2_path))) | |
return path | |
def parquet_path(tmp_path_factory): | |
path = str(tmp_path_factory.mktemp("data") / "dataset.parquet") | |
schema = pa.schema( | |
{ | |
"col_1": pa.string(), | |
"col_2": pa.int64(), | |
"col_3": pa.float64(), | |
} | |
) | |
with open(path, "wb") as f: | |
writer = pq.ParquetWriter(f, schema=schema) | |
pa_table = pa.Table.from_pydict({k: [DATA[i][k] for i in range(len(DATA))] for k in DATA[0]}, schema=schema) | |
writer.write_table(pa_table) | |
writer.close() | |
return path | |
def json_list_of_dicts_path(tmp_path_factory): | |
path = str(tmp_path_factory.mktemp("data") / "dataset.json") | |
data = {"data": DATA} | |
with open(path, "w") as f: | |
json.dump(data, f) | |
return path | |
def json_dict_of_lists_path(tmp_path_factory): | |
path = str(tmp_path_factory.mktemp("data") / "dataset.json") | |
data = {"data": DATA_DICT_OF_LISTS} | |
with open(path, "w") as f: | |
json.dump(data, f) | |
return path | |
def jsonl_path(tmp_path_factory): | |
path = str(tmp_path_factory.mktemp("data") / "dataset.jsonl") | |
with open(path, "w") as f: | |
for item in DATA: | |
f.write(json.dumps(item) + "\n") | |
return path | |
def jsonl2_path(tmp_path_factory): | |
path = str(tmp_path_factory.mktemp("data") / "dataset2.jsonl") | |
with open(path, "w") as f: | |
for item in DATA: | |
f.write(json.dumps(item) + "\n") | |
return path | |
def jsonl_312_path(tmp_path_factory): | |
path = str(tmp_path_factory.mktemp("data") / "dataset_312.jsonl") | |
with open(path, "w") as f: | |
for item in DATA_312: | |
f.write(json.dumps(item) + "\n") | |
return path | |
def jsonl_str_path(tmp_path_factory): | |
path = str(tmp_path_factory.mktemp("data") / "dataset-str.jsonl") | |
with open(path, "w") as f: | |
for item in DATA_STR: | |
f.write(json.dumps(item) + "\n") | |
return path | |
def text_gz_path(tmp_path_factory, text_path): | |
import gzip | |
path = str(tmp_path_factory.mktemp("data") / "dataset.txt.gz") | |
with open(text_path, "rb") as orig_file: | |
with gzip.open(path, "wb") as zipped_file: | |
zipped_file.writelines(orig_file) | |
return path | |
def jsonl_gz_path(tmp_path_factory, jsonl_path): | |
import gzip | |
path = str(tmp_path_factory.mktemp("data") / "dataset.jsonl.gz") | |
with open(jsonl_path, "rb") as orig_file: | |
with gzip.open(path, "wb") as zipped_file: | |
zipped_file.writelines(orig_file) | |
return path | |
def zip_jsonl_path(jsonl_path, jsonl2_path, tmp_path_factory): | |
import zipfile | |
path = tmp_path_factory.mktemp("data") / "dataset.jsonl.zip" | |
with zipfile.ZipFile(path, "w") as f: | |
f.write(jsonl_path, arcname=os.path.basename(jsonl_path)) | |
f.write(jsonl2_path, arcname=os.path.basename(jsonl2_path)) | |
return path | |
def zip_jsonl_with_dir_path(jsonl_path, jsonl2_path, tmp_path_factory): | |
import zipfile | |
path = tmp_path_factory.mktemp("data") / "dataset_with_dir.jsonl.zip" | |
with zipfile.ZipFile(path, "w") as f: | |
f.write(jsonl_path, arcname=os.path.join("main_dir", os.path.basename(jsonl_path))) | |
f.write(jsonl2_path, arcname=os.path.join("main_dir", os.path.basename(jsonl2_path))) | |
return path | |
def tar_jsonl_path(jsonl_path, jsonl2_path, tmp_path_factory): | |
path = tmp_path_factory.mktemp("data") / "dataset.jsonl.tar" | |
with tarfile.TarFile(path, "w") as f: | |
f.add(jsonl_path, arcname=os.path.basename(jsonl_path)) | |
f.add(jsonl2_path, arcname=os.path.basename(jsonl2_path)) | |
return path | |
def tar_nested_jsonl_path(tar_jsonl_path, jsonl_path, jsonl2_path, tmp_path_factory): | |
path = tmp_path_factory.mktemp("data") / "dataset_nested.jsonl.tar" | |
with tarfile.TarFile(path, "w") as f: | |
f.add(tar_jsonl_path, arcname=os.path.join("nested", os.path.basename(tar_jsonl_path))) | |
return path | |
def text_path(tmp_path_factory): | |
data = ["0", "1", "2", "3"] | |
path = str(tmp_path_factory.mktemp("data") / "dataset.txt") | |
with open(path, "w") as f: | |
for item in data: | |
f.write(item + "\n") | |
return path | |
def text2_path(tmp_path_factory): | |
data = ["0", "1", "2", "3"] | |
path = str(tmp_path_factory.mktemp("data") / "dataset2.txt") | |
with open(path, "w") as f: | |
for item in data: | |
f.write(item + "\n") | |
return path | |
def zip_text_path(text_path, text2_path, tmp_path_factory): | |
import zipfile | |
path = tmp_path_factory.mktemp("data") / "dataset.text.zip" | |
with zipfile.ZipFile(path, "w") as f: | |
f.write(text_path, arcname=os.path.basename(text_path)) | |
f.write(text2_path, arcname=os.path.basename(text2_path)) | |
return path | |
def zip_text_with_dir_path(text_path, text2_path, tmp_path_factory): | |
import zipfile | |
path = tmp_path_factory.mktemp("data") / "dataset_with_dir.text.zip" | |
with zipfile.ZipFile(path, "w") as f: | |
f.write(text_path, arcname=os.path.join("main_dir", os.path.basename(text_path))) | |
f.write(text2_path, arcname=os.path.join("main_dir", os.path.basename(text2_path))) | |
return path | |
def text_path_with_unicode_new_lines(tmp_path_factory): | |
text = "\n".join(["First", "Second\u2029with Unicode new line", "Third"]) | |
path = str(tmp_path_factory.mktemp("data") / "dataset_with_unicode_new_lines.txt") | |
with open(path, "w", encoding="utf-8") as f: | |
f.write(text) | |
return path | |