Spaces:
Running
Running
import pickle | |
import random | |
import shutil | |
from collections import Counter | |
from pathlib import Path | |
import numpy | |
import zipfile | |
SERVER_URL = "http://localhost:8000/" | |
INPUT_BROWSER_LIMIT = 550 | |
DATA_DIR = Path("./data") | |
DEPLOYMENT_DIR = Path("./deployment") | |
ROOT_DIR = DEPLOYMENT_DIR / "users" | |
SHARED_BASE_MODULE_DIR = DEPLOYMENT_DIR / "base_modules" | |
SHARED_SMOOTHER_MODULE_DIR = DEPLOYMENT_DIR / "smoother_module" | |
KEY_SMOOTHER_MODULE_DIR = "EvaluationKey_Smoother" | |
KEY_BASE_MODULE_DIR = "EvaluationKey_Base_Modules" | |
ENCRYPTED_INPUT_DIR = "Encrypt_Input" | |
ENCRYPTED_OUTPUT_DIR = "Encrypt_Output" | |
FHE_COMPUTATION_TIMELINE = Path("server_fhe_computation_timeline.txt") | |
LABELS = ["European", "African", "Americas", "East Asian", "South Asian"] | |
ID_POPULATION = {0: "European", 3: "African", 2: "Americas", 1: "East Asian", 4: "South Asian"} | |
POPULATION_ID = {"European": 0, "African": 3, "Americas": 2, "East Asian": 1, "South Asian": 4} | |
COLORS = ["#FFD208", "#FFE46C", "#FFED9C", "#FFF6CE", "#FFD9A0"] | |
# load_pickle("data/meta_dict.pkl") | |
META = {"A": 5, "C": 1059079, "M": 10589, "NW": 100, "CT": 1059, "CTR": 0.1, "WSCM": 0.2, "SS": 75} | |
BUILD_GENS = [1, 2, 4, 6, 8, 12, 16, 24, 32, 48] | |
import os | |
def load_pickle_from_zip(file_name, zip_path="data.zip"): | |
""" | |
Load a pickle file from within a zip archive. | |
""" | |
if not os.path.exists(zip_path): | |
raise FileNotFoundError(f"The zip file '{zip_path}' does not exist.") | |
with zipfile.ZipFile(zip_path, 'r') as z: | |
if file_name not in z.namelist(): | |
print("-----", file_name, z.namelist()) | |
raise KeyError(f"The file '{file_name}' does not exist in the zip archive '{zip_path}'.") | |
with z.open(file_name) as f: | |
return pickle.load(f) | |
def generate_weighted_percentages(): | |
dominant_percentage = random.randint(50, 70) | |
remaining_percentage = 100 - dominant_percentage | |
other_percentages = [random.random() for _ in range(4)] | |
total = sum(other_percentages) | |
other_percentages = [round(p / total * remaining_percentage, 2) for p in other_percentages] | |
percentages = [dominant_percentage] + other_percentages | |
# Adjust the total to be exactly 100 (if rounding errors occurred) | |
diff = round(100 - sum(percentages), 2) | |
if diff != 0: | |
percentages[0] += diff # Adjust the dominant percentage to make the total 100 | |
return percentages | |
def select_random_ancestors(): | |
ancestors = list(ID_POPULATION.keys()) | |
random.shuffle(ancestors) | |
return ancestors | |
def read_pickle(path): | |
with open(path, "rb") as f: | |
data = pickle.load(f) | |
return data | |
def compute_distribution(y, size=5): | |
y_pred = numpy.zeros(size) | |
for k, v in Counter(y).items(): | |
y_pred[k] = v / len(y) | |
return y_pred | |
def slide_window(data, smooth_win_size, y=None): | |
N, W, A = data.shape | |
pad = (smooth_win_size + 1) // 2 | |
data_padded = numpy.pad(data, ((0, 0), (pad, pad), (0, 0)), mode="reflect") | |
X_slide = numpy.lib.stride_tricks.sliding_window_view(data_padded, (1, smooth_win_size, A)) | |
X_slide = X_slide[:, :W, :].reshape(N * W, -1) | |
y_slide = None if y is None else y.reshape(N * W) | |
return X_slide, y_slide | |
# def read_vcf(vcf_file): | |
# return allel.read_vcf(vcf_file, region=None, fields="*") | |
def clean_dir(directory): | |
"""Remove the specified directory if it exists.""" | |
if directory.exists() and directory.is_dir(): | |
print(f"Removing existing model directory: {directory}") | |
shutil.rmtree(directory) | |
def process_data_for_base_modules(meta, X_t): | |
n_windows = meta["NW"] # meta["C"] // meta["M"] | |
context = meta["CT"] # int(meta["M"] * meta['CTR']) | |
if context != 0.0: | |
pad_left = numpy.flip(X_t[:, 0:context], axis=1) | |
pad_right = numpy.flip(X_t[:, -context:], axis=1) | |
X_t = numpy.concatenate([pad_left, X_t, pad_right], axis=1) | |
M_ = meta["M"] + 2 * context | |
idx = numpy.arange(0, meta["C"], meta["M"])[:-2] | |
X_b = numpy.lib.stride_tricks.sliding_window_view(X_t, M_, axis=1)[:, idx, :] | |
rem = meta["C"] - meta["M"] * n_windows | |
# print(f"{X_t.shape=} -> {X_b.shape=} | {n_windows=}, {context=}, {M_=}, {rem=}") | |
return X_b, n_windows, M_, rem | |
def extract_model_number(path): | |
try: | |
return int(path.split("_")[-1]) | |
except (ValueError, IndexError): | |
print(f"Error: Unable to extract model number from path: {path}") | |
return None | |
def is_none(obj) -> bool: | |
""" | |
Check if the object is None. | |
Args: | |
obj (any): The input to be checked. | |
Returns: | |
bool: True if the object is None or empty, False otherwise. | |
""" | |
return obj is None or (obj is not None and (hasattr(obj, "__len__") and len(obj) == 0)) | |
def load_pickle(path: str) -> numpy.array: | |
"""Load data. | |
Args: | |
path (str): | |
Returns: | |
Dict: The genome. | |
""" | |
with open(path, "rb") as f: | |
data = pickle.load(f) | |
return data | |
def write_pickle(path: str, data) -> numpy.array: | |
with open(path, "wb") as f: | |
pickle.dump(data, f) | |
def write_bytes(path, data): | |
"""Save binary data.""" | |
with path.open("wb") as f: | |
f.write(data) | |
def read_bytes(path): | |
"""Load data from a binary file.""" | |
with path.open("rb") as f: | |
return f.read() | |