Livia_Zaharia
added code for the first time
bacf16b
raw
history blame
14.4 kB
# coding=utf-8
# Copyright 2019 The Google Research Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""Generic helper functions used across codebase."""
import warnings
from collections import namedtuple
from datetime import datetime
import os
import math
import pathlib
import torch
import numpy as np
import pandas as pd
pd.options.mode.chained_assignment = None
from typing import List, Tuple
from sklearn import preprocessing
import data_formatter
from data_formatter import types
DataTypes = types.DataTypes
InputTypes = types.InputTypes
MINUTE = 60
# OS related functions.
def create_folder_if_not_exist(directory):
"""Creates folder if it doesn't exist.
Args:
directory: Folder path to create.
"""
# Also creates directories recursively
pathlib.Path(directory).mkdir(parents=True, exist_ok=True)
def csv_path_to_folder(path: str):
return "/".join(path.split('/')[:-1]) + "/"
def interpolate(data: pd.DataFrame,
column_definition: List[Tuple[str, DataTypes, InputTypes]],
gap_threshold: int = 0,
min_drop_length: int = 0,
interval_length: int = 0):
"""Interpolates missing values in data.
Args:
df: Dataframe to interpolate on. Sorted by id and then time (a DateTime object).
column_definition: List of tuples describing columns (column_name, data_type, input_type).
gap_threshold: Number in minutes, maximum allowed gap for interpolation.
min_drop_length: Number of points, minimum number within an interval to interpolate.
interval_length: Number in minutes, length of interpolation.
Returns:
data: DataFrame with missing values interpolated and
additional column ('segment') indicating continuous segments.
column_definition: Updataed list of tuples (column_name, data_type, input_type).
"""
# select all real-valued columns that are not id, time, or static
interpolation_columns = [column_name for column_name, data_type, input_type in column_definition if
data_type == DataTypes.REAL_VALUED and
input_type not in set([InputTypes.ID, InputTypes.TIME, InputTypes.STATIC_INPUT])]
# select all other columns except time
constant_columns = [column_name for column_name, data_type, input_type in column_definition if
input_type not in set([InputTypes.TIME])]
constant_columns += ['id_segment']
# get id and time columns
id_col = [column_name for column_name, data_type, input_type in column_definition if input_type == InputTypes.ID][0]
time_col = [column_name for column_name, data_type, input_type in column_definition if input_type == InputTypes.TIME][0]
# round to minute
data[time_col] = data[time_col].dt.round('1min')
# count dropped segments
dropped_segments = 0
# count number of values that are interpolated
interpolation_count = 0
# store final output
output = []
for id, id_data in data.groupby(id_col):
# sort values
id_data.sort_values(time_col, inplace=True)
# get time difference between consecutive rows
lag = (id_data[time_col].diff().dt.total_seconds().fillna(0) / 60.0).astype(int)
# if lag > gap_threshold
id_segment = (lag > gap_threshold).cumsum()
id_data['id_segment'] = id_segment
for segment, segment_data in id_data.groupby('id_segment'):
# if segment is too short, then we don't interpolate
if len(segment_data) < min_drop_length:
dropped_segments += 1
continue
# find and print duplicated times
duplicates = segment_data.duplicated(time_col, keep=False)
if duplicates.any():
print(segment_data[duplicates])
raise ValueError('Duplicate times in segment {} of id {}'.format(segment, id))
# reindex at interval_length minute intervals
segment_data = segment_data.set_index(time_col)
index_new = pd.date_range(start = segment_data.index[0],
end = segment_data.index[-1],
freq = interval_length)
index_union = index_new.union(segment_data.index)
segment_data = segment_data.reindex(index_union)
# count nan values in interpolation columns
interpolation_count += segment_data[interpolation_columns[0]].isna().sum()
# interpolate
segment_data[interpolation_columns] = segment_data[interpolation_columns].interpolate(method='index')
# fill constant columns with last value
segment_data[constant_columns] = segment_data[constant_columns].ffill()
# delete rows not conforming to frequency
segment_data = segment_data.reindex(index_new)
# reset index, make the time a column with name time_col
segment_data = segment_data.reset_index().rename(columns={'index': time_col})
# set the id_segment to position in output
segment_data['id_segment'] = len(output)
# add to output
output.append(segment_data)
# print number of dropped segments and number of segments
print('\tDropped segments: {}'.format(dropped_segments))
print('\tExtracted segments: {}'.format(len(output)))
# concat all segments and reset index
output = pd.concat(output)
output.reset_index(drop=True, inplace=True)
# count number of interpolated values
print('\tInterpolated values: {}'.format(interpolation_count))
print('\tPercent of values interpolated: {:.2f}%'.format(interpolation_count / len(output) * 100))
# add id_segment column to column_definition as ID
column_definition += [('id_segment', DataTypes.CATEGORICAL, InputTypes.SID)]
return output, column_definition
def create_index(time_col: pd.Series, interval_length: int):
"""Creates a new index at interval_length minute intervals.
Args:
time_col: Series of times.
interval_length: Number in minutes, length of interpolation.
Returns:
index: New index.
"""
# margin of error
eps = pd.Timedelta('1min')
new_time_col = [time_col.iloc[0]]
for time in time_col.iloc[1:]:
if time - new_time_col[-1] <= pd.Timedelta(interval_length) + eps:
new_time_col.append(time)
else:
filler = new_time_col[-1] + pd.Timedelta(interval_length)
while filler < time:
new_time_col.append(filler)
filler += pd.Timedelta(interval_length)
new_time_col.append(time)
return pd.to_datetime(new_time_col)
def split(df: pd.DataFrame,
column_definition: List[Tuple[str, DataTypes, InputTypes]],
test_percent_subjects: float,
length_segment: int,
max_length_input: int,
random_state: int = 42):
"""Splits data into train, validation and test sets.
Args:
df: Dataframe to split.
column_definition: List of tuples describing columns (column_name, data_type, input_type).
test_percent_subjects: Float number from [0, 1], percentage of subjects to use for test set.
length_segment: Number of points, length of segments saved for validation / test sets.
max_length_input: Number of points, maximum length of input sequences for models.
random_state: Number, Random state for reproducibility.
Returns:
train_idx: Training set indices.
val_idx: Validation set indices.
test_idx: Test set indices.
"""
# set random state
np.random.seed(random_state)
# get id and id_segment columns
id_col = [column_name for column_name, data_type, input_type in column_definition if input_type == InputTypes.ID][0]
id_segment_col = [column_name for column_name, data_type, input_type in column_definition if input_type == InputTypes.SID][0]
# get unique ids
ids = df[id_col].unique()
# select some subjects for test data set
test_ids = np.random.choice(ids, math.ceil(len(ids) * test_percent_subjects), replace=False)
test_idx_ood = list(df[df[id_col].isin(test_ids)].index)
# get the remaning data for training and validation
df = df[~df[id_col].isin(test_ids)]
# iterate through subjects and split into train, val and test
train_idx = []; val_idx = []; test_idx = []
for id, id_data in df.groupby(id_col):
segment_ids = id_data[id_segment_col].unique()
if len(segment_ids) >= 2:
train_idx += list(id_data.loc[id_data[id_segment_col].isin(segment_ids[:-2])].index)
penultimate_segment = id_data[id_data[id_segment_col] == segment_ids[-2]]
last_segment = id_data[id_data[id_segment_col] == segment_ids[-1]]
if len(last_segment) >= max_length_input + 3 * length_segment:
train_idx += list(penultimate_segment.index)
train_idx += list(last_segment.iloc[:-2*length_segment].index)
val_idx += list(last_segment.iloc[-2*length_segment-max_length_input:-length_segment].index)
test_idx += list(last_segment.iloc[-length_segment-max_length_input:].index)
elif len(last_segment) >= max_length_input + 2 * length_segment:
train_idx += list(penultimate_segment.index)
val_idx += list(last_segment.iloc[:-length_segment].index)
test_idx += list(last_segment.iloc[-length_segment-max_length_input:].index)
else:
test_idx += list(last_segment.index)
if len(penultimate_segment) >= max_length_input + 2 * length_segment:
val_idx += list(penultimate_segment.iloc[-length_segment-max_length_input:].index)
train_idx += list(penultimate_segment.iloc[:-length_segment].index)
else:
train_idx += list(penultimate_segment.index)
else:
if len(id_data) >= max_length_input + 3 * length_segment:
train_idx += list(id_data.iloc[:-2*length_segment].index)
val_idx += list(id_data.iloc[-2*length_segment-max_length_input:-length_segment].index)
test_idx += list(id_data.iloc[-length_segment-max_length_input:].index)
elif len(id_data) >= max_length_input + 2 * length_segment:
train_idx += list(id_data.iloc[:-length_segment].index)
test_idx += list(id_data.iloc[-length_segment-max_length_input:].index)
else:
train_idx += list(id_data.index)
total_len = len(train_idx) + len(val_idx) + len(test_idx) + len(test_idx_ood)
print('\tTrain: {} ({:.2f}%)'.format(len(train_idx), len(train_idx) / total_len * 100))
print('\tVal: {} ({:.2f}%)'.format(len(val_idx), len(val_idx) / total_len * 100))
print('\tTest: {} ({:.2f}%)'.format(len(test_idx), len(test_idx) / total_len * 100))
print('\tTest OOD: {} ({:.2f}%)'.format(len(test_idx_ood), len(test_idx_ood) / total_len * 100))
return train_idx, val_idx, test_idx, test_idx_ood
def encode(df: pd.DataFrame,
column_definition: List[Tuple[str, DataTypes, InputTypes]],
date: List,):
"""Encodes categorical columns.
Args:
df: Dataframe to split.
column_definition: List of tuples describing columns (column_name, data_type, input_type).
date: List of str, list containing date info to extract.
Returns:
df: Dataframe with encoded columns.
column_definition: Updated list of tuples containing column name and types.
encoders: dictionary containing encoders.
"""
encoders = {}
new_columns = []
for i in range(len(column_definition)):
column, column_type, input_type = column_definition[i]
if column_type == DataTypes.DATE:
for extract_col in date:
df[column + '_' + extract_col] = getattr(df[column].dt, extract_col)
df[column + '_' + extract_col] = df[column + '_' + extract_col].astype(np.float32)
new_columns.append((column + '_' + extract_col, DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT))
elif column_type == DataTypes.CATEGORICAL:
encoders[column] = preprocessing.LabelEncoder()
df[column] = encoders[column].fit_transform(df[column]).astype(np.float32)
column_definition[i] = (column, DataTypes.REAL_VALUED, input_type)
else:
continue
column_definition += new_columns
# print updated column definition
print('\tUpdated column definition:')
for column, column_type, input_type in column_definition:
print('\t\t{}: {} ({})'.format(column,
DataTypes(column_type).name,
InputTypes(input_type).name))
return df, column_definition, encoders
def scale(train_data: pd.DataFrame,
val_data: pd.DataFrame,
test_data: pd.DataFrame,
column_definition: List[Tuple[str, DataTypes, InputTypes]],
scaler: str):
"""Scales numerical data.
Args:
train_data: pd.Dataframe, DataFrame of training data.
val_data: pd.Dataframe, DataFrame of validation data.
test_data: pd.Dataframe, DataFrame of testing data.
column_definition: List of tuples describing columns (column_name, data_type, input_type).
scaler: String, scaler to use.
Returns:
train_data: pd.Dataframe, DataFrame of scaled training data.
val_data: pd.Dataframe, DataFrame of scaled validation data.
test_data: pd.Dataframe, DataFrame of scaled testing data.
scalers: dictionary index by column names containing scalers.
"""
# select all real-valued columns
columns_to_scale = [column for column, data_type, input_type in column_definition if data_type == DataTypes.REAL_VALUED]
# handle no scaling case
if scaler == 'None':
print('\tNo scaling applied')
return train_data, val_data, test_data, None
scalers = {}
for column in columns_to_scale:
scaler_column = getattr(preprocessing, scaler)()
train_data[column] = scaler_column.fit_transform(train_data[column].values.reshape(-1, 1))
# handle empty validation and test sets
val_data[column] = scaler_column.transform(val_data[column].values.reshape(-1, 1)) if val_data.shape[0] > 0 else val_data[column]
test_data[column] = scaler_column.transform(test_data[column].values.reshape(-1, 1)) if test_data.shape[0] > 0 else test_data[column]
scalers[column] = scaler_column
# print columns that were scaled
print('\tScaled columns: {}'.format(columns_to_scale))
return train_data, val_data, test_data, scalers