File size: 3,824 Bytes
a51662f
 
7ad6c98
 
 
 
9ac3994
 
7ad6c98
 
 
 
 
 
 
 
 
3b7db7f
7ad6c98
9ac3994
7ad6c98
 
 
 
 
9ac3994
 
 
 
 
7ad6c98
9ac3994
 
 
 
7ad6c98
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6c3e9dd
 
a51662f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from itertools import combinations
import numpy as np
import pandas as pd

SUPPORTED_TYPES = [".csv", ".json", ".xlsx"]

def hello_world(): return "hello world!"

def load_file(file):
    """
    Takes a file given by Streamlit and loads into a DataFrame.
    Returns a DataFrame, metadata, and result string.

    @param file: File uploaded into streamlit.
    @rtype: tuple
    @return: A tuple of format (pd.DataFrame, (str, str), str).
    """
    df = None

    if file is None: return df, ("", ""), ""

    filename = file.name
    extension = filename.split(".")[-1] 
    metadata = (filename, extension)

    import_functions = {
        "csv": pd.read_csv,
        "json": pd.read_json,
        "xlsx": pd.read_excel
    }
    try:
        reader = import_functions.get(extension, None)
        if reader is None: 
            return df, metadata, f"Error: Invalid extension '{extension}'"
        df = reader(file)
        rows, columns = df.shape
        return df, metadata, f"File '{filename}' loaded successfully.\nFound {rows} rows, {columns} columns."
    except Exception as error:
        return df, metadata, f"Error: Unable to read file '{filename}' ({type(error)}: {error})"

def data_cleaner(df, drop_missing=False, remove_duplicates=True):
    """
    Takes a DataFrame and removes empty and duplicate entries.

    @type df: pd.DataFrame
    @param df: A DataFrame of uncleaned data.
    @type drop_missing: bool
    @param drop_missing: Determines if rows with any missing values are dropped ("any"), or just empty rows ("all").
    @type remove_duplicates: bool
    @param remove_duplicates: Determines if duplicate rows are removed.
    @rtype: pd.DataFrame
    @return: A DataFrame with requested cleaning applied
    """
    df = df.dropna(how="any" if drop_missing else "all")
    if remove_duplicates: df = df.drop_duplicates()
    return df

def unique_ratio(df, col):
    return df[col].nunique()/df[col].count()

def bin_numeric(df, name_col: str, num_bins: int):

    df_copy = df.copy().select_dtypes(include=np.number)

    col_name = df[name_col].sort_values()
    min_, max_ = col_name.min(), col_name.max()
    bins = np.array_split(col_name.values, num_bins)
    pivots = [min_] + [b[0] for b in bins[1:]] + [max_]
    bins_list = [(pivots[i], pivots[i+1]) for i in range(num_bins)]

    for bin_min, bin_max in bins_list:

        for row in df_copy.index:
            if bin_min <= df_copy.loc[row, name_col] < bin_max:
                df.loc[row, name_col] = f"{bin_min} - {bin_max}"

    return df

def get_kanon_false(df, k=2):
    df = df.select_dtypes(include=np.number)
    k_anon_false = set() # columns containing non-unique k-tuples - need anonymization
    pairwise_combinations = list(combinations(df.columns, k)) # get k-wise combinations of all columns in data
    check = lambda x: x == k-1

    for k_tuple in pairwise_combinations:

            # if k_tuple in k_anon_false:
            #     continue

        k_pair_counts = df.loc[:, k_tuple].value_counts().tolist() # checks for n_unique_values for each k-tuple

        if any(check(i) for i in k_pair_counts): # if any value corresponding to the k-tuple is >1, i.e. non-unique
            k_anon_false.add((k_tuple[0], unique_ratio(df, k_tuple[0])))
            k_anon_false.add((k_tuple[1], unique_ratio(df, k_tuple[1])))
    
    return sorted(k_anon_false, key = lambda x:x[1], reverse = True)

def k_anonymize(df, k=2):
    k_anon_false = get_kanon_false(df)
    while k_anon_false:
        for i in k_anon_false:
            col, _ = i
            print(f"Binning {col}")
            df = bin_numeric(df, col, num_bins = 15)
            k_anon_false = get_kanon_false(df)
            print(f"Updated sensitivity: {k_anon_false}")
    return df

def data_anonymizer(df, k=2):
    return k_anonymize(df, k)