Spaces:
Sleeping
Sleeping
# coding=utf-8 | |
# Copyright 2019 The Google Research Authors. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# Lint as: python3 | |
"""Generic helper functions used across codebase.""" | |
import warnings | |
from collections import namedtuple | |
from datetime import datetime | |
import os | |
import math | |
import pathlib | |
import torch | |
import numpy as np | |
import pandas as pd | |
pd.options.mode.chained_assignment = None | |
from typing import List, Tuple | |
from sklearn import preprocessing | |
import data_formatter | |
from data_formatter import types | |
DataTypes = types.DataTypes | |
InputTypes = types.InputTypes | |
MINUTE = 60 | |
# OS related functions. | |
def create_folder_if_not_exist(directory): | |
"""Creates folder if it doesn't exist. | |
Args: | |
directory: Folder path to create. | |
""" | |
# Also creates directories recursively | |
pathlib.Path(directory).mkdir(parents=True, exist_ok=True) | |
def csv_path_to_folder(path: str): | |
return "/".join(path.split('/')[:-1]) + "/" | |
def interpolate(data: pd.DataFrame, | |
column_definition: List[Tuple[str, DataTypes, InputTypes]], | |
gap_threshold: int = 0, | |
min_drop_length: int = 0, | |
interval_length: int = 0): | |
"""Interpolates missing values in data. | |
Args: | |
df: Dataframe to interpolate on. Sorted by id and then time (a DateTime object). | |
column_definition: List of tuples describing columns (column_name, data_type, input_type). | |
gap_threshold: Number in minutes, maximum allowed gap for interpolation. | |
min_drop_length: Number of points, minimum number within an interval to interpolate. | |
interval_length: Number in minutes, length of interpolation. | |
Returns: | |
data: DataFrame with missing values interpolated and | |
additional column ('segment') indicating continuous segments. | |
column_definition: Updataed list of tuples (column_name, data_type, input_type). | |
""" | |
# select all real-valued columns that are not id, time, or static | |
interpolation_columns = [column_name for column_name, data_type, input_type in column_definition if | |
data_type == DataTypes.REAL_VALUED and | |
input_type not in set([InputTypes.ID, InputTypes.TIME, InputTypes.STATIC_INPUT])] | |
# select all other columns except time | |
constant_columns = [column_name for column_name, data_type, input_type in column_definition if | |
input_type not in set([InputTypes.TIME])] | |
constant_columns += ['id_segment'] | |
# get id and time columns | |
id_col = [column_name for column_name, data_type, input_type in column_definition if input_type == InputTypes.ID][0] | |
time_col = [column_name for column_name, data_type, input_type in column_definition if input_type == InputTypes.TIME][0] | |
# round to minute | |
data[time_col] = data[time_col].dt.round('1min') | |
# count dropped segments | |
dropped_segments = 0 | |
# count number of values that are interpolated | |
interpolation_count = 0 | |
# store final output | |
output = [] | |
for id, id_data in data.groupby(id_col): | |
# sort values | |
id_data.sort_values(time_col, inplace=True) | |
# get time difference between consecutive rows | |
lag = (id_data[time_col].diff().dt.total_seconds().fillna(0) / 60.0).astype(int) | |
# if lag > gap_threshold | |
id_segment = (lag > gap_threshold).cumsum() | |
id_data['id_segment'] = id_segment | |
for segment, segment_data in id_data.groupby('id_segment'): | |
# if segment is too short, then we don't interpolate | |
if len(segment_data) < min_drop_length: | |
dropped_segments += 1 | |
continue | |
# find and print duplicated times | |
duplicates = segment_data.duplicated(time_col, keep=False) | |
if duplicates.any(): | |
print(segment_data[duplicates]) | |
raise ValueError('Duplicate times in segment {} of id {}'.format(segment, id)) | |
# reindex at interval_length minute intervals | |
segment_data = segment_data.set_index(time_col) | |
index_new = pd.date_range(start = segment_data.index[0], | |
end = segment_data.index[-1], | |
freq = interval_length) | |
index_union = index_new.union(segment_data.index) | |
segment_data = segment_data.reindex(index_union) | |
# count nan values in interpolation columns | |
interpolation_count += segment_data[interpolation_columns[0]].isna().sum() | |
# interpolate | |
segment_data[interpolation_columns] = segment_data[interpolation_columns].interpolate(method='index') | |
# fill constant columns with last value | |
segment_data[constant_columns] = segment_data[constant_columns].ffill() | |
# delete rows not conforming to frequency | |
segment_data = segment_data.reindex(index_new) | |
# reset index, make the time a column with name time_col | |
segment_data = segment_data.reset_index().rename(columns={'index': time_col}) | |
# set the id_segment to position in output | |
segment_data['id_segment'] = len(output) | |
# add to output | |
output.append(segment_data) | |
# print number of dropped segments and number of segments | |
print('\tDropped segments: {}'.format(dropped_segments)) | |
print('\tExtracted segments: {}'.format(len(output))) | |
# concat all segments and reset index | |
output = pd.concat(output) | |
output.reset_index(drop=True, inplace=True) | |
# count number of interpolated values | |
print('\tInterpolated values: {}'.format(interpolation_count)) | |
print('\tPercent of values interpolated: {:.2f}%'.format(interpolation_count / len(output) * 100)) | |
# add id_segment column to column_definition as ID | |
column_definition += [('id_segment', DataTypes.CATEGORICAL, InputTypes.SID)] | |
return output, column_definition | |
def create_index(time_col: pd.Series, interval_length: int): | |
"""Creates a new index at interval_length minute intervals. | |
Args: | |
time_col: Series of times. | |
interval_length: Number in minutes, length of interpolation. | |
Returns: | |
index: New index. | |
""" | |
# margin of error | |
eps = pd.Timedelta('1min') | |
new_time_col = [time_col.iloc[0]] | |
for time in time_col.iloc[1:]: | |
if time - new_time_col[-1] <= pd.Timedelta(interval_length) + eps: | |
new_time_col.append(time) | |
else: | |
filler = new_time_col[-1] + pd.Timedelta(interval_length) | |
while filler < time: | |
new_time_col.append(filler) | |
filler += pd.Timedelta(interval_length) | |
new_time_col.append(time) | |
return pd.to_datetime(new_time_col) | |
def split(df: pd.DataFrame, | |
column_definition: List[Tuple[str, DataTypes, InputTypes]], | |
test_percent_subjects: float, | |
length_segment: int, | |
max_length_input: int, | |
random_state: int = 42): | |
"""Splits data into train, validation and test sets. | |
Args: | |
df: Dataframe to split. | |
column_definition: List of tuples describing columns (column_name, data_type, input_type). | |
test_percent_subjects: Float number from [0, 1], percentage of subjects to use for test set. | |
length_segment: Number of points, length of segments saved for validation / test sets. | |
max_length_input: Number of points, maximum length of input sequences for models. | |
random_state: Number, Random state for reproducibility. | |
Returns: | |
train_idx: Training set indices. | |
val_idx: Validation set indices. | |
test_idx: Test set indices. | |
""" | |
# set random state | |
np.random.seed(random_state) | |
# get id and id_segment columns | |
id_col = [column_name for column_name, data_type, input_type in column_definition if input_type == InputTypes.ID][0] | |
id_segment_col = [column_name for column_name, data_type, input_type in column_definition if input_type == InputTypes.SID][0] | |
# get unique ids | |
ids = df[id_col].unique() | |
# select some subjects for test data set | |
test_ids = np.random.choice(ids, math.ceil(len(ids) * test_percent_subjects), replace=False) | |
test_idx_ood = list(df[df[id_col].isin(test_ids)].index) | |
# get the remaning data for training and validation | |
df = df[~df[id_col].isin(test_ids)] | |
# iterate through subjects and split into train, val and test | |
train_idx = []; val_idx = []; test_idx = [] | |
for id, id_data in df.groupby(id_col): | |
segment_ids = id_data[id_segment_col].unique() | |
if len(segment_ids) >= 2: | |
train_idx += list(id_data.loc[id_data[id_segment_col].isin(segment_ids[:-2])].index) | |
penultimate_segment = id_data[id_data[id_segment_col] == segment_ids[-2]] | |
last_segment = id_data[id_data[id_segment_col] == segment_ids[-1]] | |
if len(last_segment) >= max_length_input + 3 * length_segment: | |
train_idx += list(penultimate_segment.index) | |
train_idx += list(last_segment.iloc[:-2*length_segment].index) | |
val_idx += list(last_segment.iloc[-2*length_segment-max_length_input:-length_segment].index) | |
test_idx += list(last_segment.iloc[-length_segment-max_length_input:].index) | |
elif len(last_segment) >= max_length_input + 2 * length_segment: | |
train_idx += list(penultimate_segment.index) | |
val_idx += list(last_segment.iloc[:-length_segment].index) | |
test_idx += list(last_segment.iloc[-length_segment-max_length_input:].index) | |
else: | |
test_idx += list(last_segment.index) | |
if len(penultimate_segment) >= max_length_input + 2 * length_segment: | |
val_idx += list(penultimate_segment.iloc[-length_segment-max_length_input:].index) | |
train_idx += list(penultimate_segment.iloc[:-length_segment].index) | |
else: | |
train_idx += list(penultimate_segment.index) | |
else: | |
if len(id_data) >= max_length_input + 3 * length_segment: | |
train_idx += list(id_data.iloc[:-2*length_segment].index) | |
val_idx += list(id_data.iloc[-2*length_segment-max_length_input:-length_segment].index) | |
test_idx += list(id_data.iloc[-length_segment-max_length_input:].index) | |
elif len(id_data) >= max_length_input + 2 * length_segment: | |
train_idx += list(id_data.iloc[:-length_segment].index) | |
test_idx += list(id_data.iloc[-length_segment-max_length_input:].index) | |
else: | |
train_idx += list(id_data.index) | |
total_len = len(train_idx) + len(val_idx) + len(test_idx) + len(test_idx_ood) | |
print('\tTrain: {} ({:.2f}%)'.format(len(train_idx), len(train_idx) / total_len * 100)) | |
print('\tVal: {} ({:.2f}%)'.format(len(val_idx), len(val_idx) / total_len * 100)) | |
print('\tTest: {} ({:.2f}%)'.format(len(test_idx), len(test_idx) / total_len * 100)) | |
print('\tTest OOD: {} ({:.2f}%)'.format(len(test_idx_ood), len(test_idx_ood) / total_len * 100)) | |
return train_idx, val_idx, test_idx, test_idx_ood | |
def encode(df: pd.DataFrame, | |
column_definition: List[Tuple[str, DataTypes, InputTypes]], | |
date: List,): | |
"""Encodes categorical columns. | |
Args: | |
df: Dataframe to split. | |
column_definition: List of tuples describing columns (column_name, data_type, input_type). | |
date: List of str, list containing date info to extract. | |
Returns: | |
df: Dataframe with encoded columns. | |
column_definition: Updated list of tuples containing column name and types. | |
encoders: dictionary containing encoders. | |
""" | |
encoders = {} | |
new_columns = [] | |
for i in range(len(column_definition)): | |
column, column_type, input_type = column_definition[i] | |
if column_type == DataTypes.DATE: | |
for extract_col in date: | |
df[column + '_' + extract_col] = getattr(df[column].dt, extract_col) | |
df[column + '_' + extract_col] = df[column + '_' + extract_col].astype(np.float32) | |
new_columns.append((column + '_' + extract_col, DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT)) | |
elif column_type == DataTypes.CATEGORICAL: | |
encoders[column] = preprocessing.LabelEncoder() | |
df[column] = encoders[column].fit_transform(df[column]).astype(np.float32) | |
column_definition[i] = (column, DataTypes.REAL_VALUED, input_type) | |
else: | |
continue | |
column_definition += new_columns | |
# print updated column definition | |
print('\tUpdated column definition:') | |
for column, column_type, input_type in column_definition: | |
print('\t\t{}: {} ({})'.format(column, | |
DataTypes(column_type).name, | |
InputTypes(input_type).name)) | |
return df, column_definition, encoders | |
def scale(train_data: pd.DataFrame, | |
val_data: pd.DataFrame, | |
test_data: pd.DataFrame, | |
column_definition: List[Tuple[str, DataTypes, InputTypes]], | |
scaler: str): | |
"""Scales numerical data. | |
Args: | |
train_data: pd.Dataframe, DataFrame of training data. | |
val_data: pd.Dataframe, DataFrame of validation data. | |
test_data: pd.Dataframe, DataFrame of testing data. | |
column_definition: List of tuples describing columns (column_name, data_type, input_type). | |
scaler: String, scaler to use. | |
Returns: | |
train_data: pd.Dataframe, DataFrame of scaled training data. | |
val_data: pd.Dataframe, DataFrame of scaled validation data. | |
test_data: pd.Dataframe, DataFrame of scaled testing data. | |
scalers: dictionary index by column names containing scalers. | |
""" | |
# select all real-valued columns | |
columns_to_scale = [column for column, data_type, input_type in column_definition if data_type == DataTypes.REAL_VALUED] | |
# handle no scaling case | |
if scaler == 'None': | |
print('\tNo scaling applied') | |
return train_data, val_data, test_data, None | |
scalers = {} | |
for column in columns_to_scale: | |
scaler_column = getattr(preprocessing, scaler)() | |
train_data[column] = scaler_column.fit_transform(train_data[column].values.reshape(-1, 1)) | |
# handle empty validation and test sets | |
val_data[column] = scaler_column.transform(val_data[column].values.reshape(-1, 1)) if val_data.shape[0] > 0 else val_data[column] | |
test_data[column] = scaler_column.transform(test_data[column].values.reshape(-1, 1)) if test_data.shape[0] > 0 else test_data[column] | |
scalers[column] = scaler_column | |
# print columns that were scaled | |
print('\tScaled columns: {}'.format(columns_to_scale)) | |
return train_data, val_data, test_data, scalers |