Spaces:
Runtime error
Runtime error
from itertools import combinations | |
import numpy as np | |
import pandas as pd | |
SUPPORTED_TYPES = [".csv", ".json", ".xlsx"] | |
def hello_world(): return "hello world!" | |
def load_file(file): | |
""" | |
Takes a file given by Streamlit and loads into a DataFrame. | |
Returns a DataFrame, metadata, and result string. | |
@param file: File uploaded into streamlit. | |
@rtype: tuple | |
@return: A tuple of format (pd.DataFrame, (str, str), str). | |
""" | |
df = None | |
if file is None: return df, ("", ""), "" | |
filename = file.name | |
extension = filename.split(".")[-1] | |
metadata = (filename, extension) | |
import_functions = { | |
"csv": pd.read_csv, | |
"json": pd.read_json, | |
"xlsx": pd.read_excel | |
} | |
try: | |
reader = import_functions.get(extension, None) | |
if reader is None: | |
return df, metadata, f"Error: Invalid extension '{extension}'" | |
df = reader(file) | |
rows, columns = df.shape | |
return df, metadata, f"File '{filename}' loaded successfully.\nFound {rows} rows, {columns} columns." | |
except Exception as error: | |
return df, metadata, f"Error: Unable to read file '{filename}' ({type(error)}: {error})" | |
def data_cleaner(df, drop_missing=False, remove_duplicates=True): | |
""" | |
Takes a DataFrame and removes empty and duplicate entries. | |
@type df: pd.DataFrame | |
@param df: A DataFrame of uncleaned data. | |
@type drop_missing: bool | |
@param drop_missing: Determines if rows with any missing values are dropped ("any"), or just empty rows ("all"). | |
@type remove_duplicates: bool | |
@param remove_duplicates: Determines if duplicate rows are removed. | |
@rtype: pd.DataFrame | |
@return: A DataFrame with requested cleaning applied | |
""" | |
df = df.dropna(how="any" if drop_missing else "all") | |
if remove_duplicates: df = df.drop_duplicates() | |
return df | |
def unique_ratio(df, col): | |
return df[col].nunique()/df[col].count() | |
def bin_numeric(df, name_col: str, num_bins: int): | |
df_copy = df.copy().select_dtypes(include=np.number) | |
col_name = df[name_col].sort_values() | |
min_, max_ = col_name.min(), col_name.max() | |
bins = np.array_split(col_name.values, num_bins) | |
pivots = [min_] + [b[0] for b in bins[1:]] + [max_] | |
bins_list = [(pivots[i], pivots[i+1]) for i in range(num_bins)] | |
for bin_min, bin_max in bins_list: | |
for row in df_copy.index: | |
if bin_min <= df_copy.loc[row, name_col] < bin_max: | |
df.loc[row, name_col] = f"{bin_min} - {bin_max}" | |
return df | |
def get_kanon_false(df, k=2): | |
df = df.select_dtypes(include=np.number) | |
k_anon_false = set() # columns containing non-unique k-tuples - need anonymization | |
pairwise_combinations = list(combinations(df.columns, k)) # get k-wise combinations of all columns in data | |
check = lambda x: x == k-1 | |
for k_tuple in pairwise_combinations: | |
# if k_tuple in k_anon_false: | |
# continue | |
k_pair_counts = df.loc[:, k_tuple].value_counts().tolist() # checks for n_unique_values for each k-tuple | |
if any(check(i) for i in k_pair_counts): # if any value corresponding to the k-tuple is >1, i.e. non-unique | |
k_anon_false.add((k_tuple[0], unique_ratio(df, k_tuple[0]))) | |
k_anon_false.add((k_tuple[1], unique_ratio(df, k_tuple[1]))) | |
return sorted(k_anon_false, key = lambda x:x[1], reverse = True) | |
def k_anonymize(df, k=2): | |
k_anon_false = get_kanon_false(df) | |
while k_anon_false: | |
for i in k_anon_false: | |
col, _ = i | |
print(f"Binning {col}") | |
df = bin_numeric(df, col, num_bins = 15) | |
k_anon_false = get_kanon_false(df) | |
print(f"Updated sensitivity: {k_anon_false}") | |
return df | |
def data_anonymizer(df, k=2): | |
return k_anonymize(df, k) |