# coding=utf-8 # Copyright 2019 The Google Research Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Lint as: python3 """Generic helper functions used across codebase.""" import warnings from collections import namedtuple from datetime import datetime import os import math import pathlib import torch import numpy as np import pandas as pd pd.options.mode.chained_assignment = None from typing import List, Tuple from sklearn import preprocessing import data_formatter from data_formatter import types DataTypes = types.DataTypes InputTypes = types.InputTypes MINUTE = 60 # OS related functions. def create_folder_if_not_exist(directory): """Creates folder if it doesn't exist. Args: directory: Folder path to create. """ # Also creates directories recursively pathlib.Path(directory).mkdir(parents=True, exist_ok=True) def csv_path_to_folder(path: str): return "/".join(path.split('/')[:-1]) + "/" def interpolate(data: pd.DataFrame, column_definition: List[Tuple[str, DataTypes, InputTypes]], gap_threshold: int = 0, min_drop_length: int = 0, interval_length: int = 0): """Interpolates missing values in data. Args: df: Dataframe to interpolate on. Sorted by id and then time (a DateTime object). column_definition: List of tuples describing columns (column_name, data_type, input_type). gap_threshold: Number in minutes, maximum allowed gap for interpolation. min_drop_length: Number of points, minimum number within an interval to interpolate. interval_length: Number in minutes, length of interpolation. Returns: data: DataFrame with missing values interpolated and additional column ('segment') indicating continuous segments. column_definition: Updataed list of tuples (column_name, data_type, input_type). """ # select all real-valued columns that are not id, time, or static interpolation_columns = [column_name for column_name, data_type, input_type in column_definition if data_type == DataTypes.REAL_VALUED and input_type not in set([InputTypes.ID, InputTypes.TIME, InputTypes.STATIC_INPUT])] # select all other columns except time constant_columns = [column_name for column_name, data_type, input_type in column_definition if input_type not in set([InputTypes.TIME])] constant_columns += ['id_segment'] # get id and time columns id_col = [column_name for column_name, data_type, input_type in column_definition if input_type == InputTypes.ID][0] time_col = [column_name for column_name, data_type, input_type in column_definition if input_type == InputTypes.TIME][0] # round to minute data[time_col] = data[time_col].dt.round('1min') # count dropped segments dropped_segments = 0 # count number of values that are interpolated interpolation_count = 0 # store final output output = [] for id, id_data in data.groupby(id_col): # sort values id_data.sort_values(time_col, inplace=True) # get time difference between consecutive rows lag = (id_data[time_col].diff().dt.total_seconds().fillna(0) / 60.0).astype(int) # if lag > gap_threshold id_segment = (lag > gap_threshold).cumsum() id_data['id_segment'] = id_segment for segment, segment_data in id_data.groupby('id_segment'): # if segment is too short, then we don't interpolate if len(segment_data) < min_drop_length: dropped_segments += 1 continue # find and print duplicated times duplicates = segment_data.duplicated(time_col, keep=False) if duplicates.any(): print(segment_data[duplicates]) raise ValueError('Duplicate times in segment {} of id {}'.format(segment, id)) # reindex at interval_length minute intervals segment_data = segment_data.set_index(time_col) index_new = pd.date_range(start = segment_data.index[0], end = segment_data.index[-1], freq = interval_length) index_union = index_new.union(segment_data.index) segment_data = segment_data.reindex(index_union) # count nan values in interpolation columns interpolation_count += segment_data[interpolation_columns[0]].isna().sum() # interpolate segment_data[interpolation_columns] = segment_data[interpolation_columns].interpolate(method='index') # fill constant columns with last value segment_data[constant_columns] = segment_data[constant_columns].ffill() # delete rows not conforming to frequency segment_data = segment_data.reindex(index_new) # reset index, make the time a column with name time_col segment_data = segment_data.reset_index().rename(columns={'index': time_col}) # set the id_segment to position in output segment_data['id_segment'] = len(output) # add to output output.append(segment_data) # print number of dropped segments and number of segments print('\tDropped segments: {}'.format(dropped_segments)) print('\tExtracted segments: {}'.format(len(output))) # concat all segments and reset index output = pd.concat(output) output.reset_index(drop=True, inplace=True) # count number of interpolated values print('\tInterpolated values: {}'.format(interpolation_count)) print('\tPercent of values interpolated: {:.2f}%'.format(interpolation_count / len(output) * 100)) # add id_segment column to column_definition as ID column_definition += [('id_segment', DataTypes.CATEGORICAL, InputTypes.SID)] return output, column_definition def create_index(time_col: pd.Series, interval_length: int): """Creates a new index at interval_length minute intervals. Args: time_col: Series of times. interval_length: Number in minutes, length of interpolation. Returns: index: New index. """ # margin of error eps = pd.Timedelta('1min') new_time_col = [time_col.iloc[0]] for time in time_col.iloc[1:]: if time - new_time_col[-1] <= pd.Timedelta(interval_length) + eps: new_time_col.append(time) else: filler = new_time_col[-1] + pd.Timedelta(interval_length) while filler < time: new_time_col.append(filler) filler += pd.Timedelta(interval_length) new_time_col.append(time) return pd.to_datetime(new_time_col) def split(df: pd.DataFrame, column_definition: List[Tuple[str, DataTypes, InputTypes]], test_percent_subjects: float, length_segment: int, max_length_input: int, random_state: int = 42): """Splits data into train, validation and test sets. Args: df: Dataframe to split. column_definition: List of tuples describing columns (column_name, data_type, input_type). test_percent_subjects: Float number from [0, 1], percentage of subjects to use for test set. length_segment: Number of points, length of segments saved for validation / test sets. max_length_input: Number of points, maximum length of input sequences for models. random_state: Number, Random state for reproducibility. Returns: train_idx: Training set indices. val_idx: Validation set indices. test_idx: Test set indices. """ # set random state np.random.seed(random_state) # get id and id_segment columns id_col = [column_name for column_name, data_type, input_type in column_definition if input_type == InputTypes.ID][0] id_segment_col = [column_name for column_name, data_type, input_type in column_definition if input_type == InputTypes.SID][0] # get unique ids ids = df[id_col].unique() # select some subjects for test data set test_ids = np.random.choice(ids, math.ceil(len(ids) * test_percent_subjects), replace=False) test_idx_ood = list(df[df[id_col].isin(test_ids)].index) # get the remaning data for training and validation df = df[~df[id_col].isin(test_ids)] # iterate through subjects and split into train, val and test train_idx = []; val_idx = []; test_idx = [] for id, id_data in df.groupby(id_col): segment_ids = id_data[id_segment_col].unique() if len(segment_ids) >= 2: train_idx += list(id_data.loc[id_data[id_segment_col].isin(segment_ids[:-2])].index) penultimate_segment = id_data[id_data[id_segment_col] == segment_ids[-2]] last_segment = id_data[id_data[id_segment_col] == segment_ids[-1]] if len(last_segment) >= max_length_input + 3 * length_segment: train_idx += list(penultimate_segment.index) train_idx += list(last_segment.iloc[:-2*length_segment].index) val_idx += list(last_segment.iloc[-2*length_segment-max_length_input:-length_segment].index) test_idx += list(last_segment.iloc[-length_segment-max_length_input:].index) elif len(last_segment) >= max_length_input + 2 * length_segment: train_idx += list(penultimate_segment.index) val_idx += list(last_segment.iloc[:-length_segment].index) test_idx += list(last_segment.iloc[-length_segment-max_length_input:].index) else: test_idx += list(last_segment.index) if len(penultimate_segment) >= max_length_input + 2 * length_segment: val_idx += list(penultimate_segment.iloc[-length_segment-max_length_input:].index) train_idx += list(penultimate_segment.iloc[:-length_segment].index) else: train_idx += list(penultimate_segment.index) else: if len(id_data) >= max_length_input + 3 * length_segment: train_idx += list(id_data.iloc[:-2*length_segment].index) val_idx += list(id_data.iloc[-2*length_segment-max_length_input:-length_segment].index) test_idx += list(id_data.iloc[-length_segment-max_length_input:].index) elif len(id_data) >= max_length_input + 2 * length_segment: train_idx += list(id_data.iloc[:-length_segment].index) test_idx += list(id_data.iloc[-length_segment-max_length_input:].index) else: train_idx += list(id_data.index) total_len = len(train_idx) + len(val_idx) + len(test_idx) + len(test_idx_ood) print('\tTrain: {} ({:.2f}%)'.format(len(train_idx), len(train_idx) / total_len * 100)) print('\tVal: {} ({:.2f}%)'.format(len(val_idx), len(val_idx) / total_len * 100)) print('\tTest: {} ({:.2f}%)'.format(len(test_idx), len(test_idx) / total_len * 100)) print('\tTest OOD: {} ({:.2f}%)'.format(len(test_idx_ood), len(test_idx_ood) / total_len * 100)) return train_idx, val_idx, test_idx, test_idx_ood def encode(df: pd.DataFrame, column_definition: List[Tuple[str, DataTypes, InputTypes]], date: List,): """Encodes categorical columns. Args: df: Dataframe to split. column_definition: List of tuples describing columns (column_name, data_type, input_type). date: List of str, list containing date info to extract. Returns: df: Dataframe with encoded columns. column_definition: Updated list of tuples containing column name and types. encoders: dictionary containing encoders. """ encoders = {} new_columns = [] for i in range(len(column_definition)): column, column_type, input_type = column_definition[i] if column_type == DataTypes.DATE: for extract_col in date: df[column + '_' + extract_col] = getattr(df[column].dt, extract_col) df[column + '_' + extract_col] = df[column + '_' + extract_col].astype(np.float32) new_columns.append((column + '_' + extract_col, DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT)) elif column_type == DataTypes.CATEGORICAL: encoders[column] = preprocessing.LabelEncoder() df[column] = encoders[column].fit_transform(df[column]).astype(np.float32) column_definition[i] = (column, DataTypes.REAL_VALUED, input_type) else: continue column_definition += new_columns # print updated column definition print('\tUpdated column definition:') for column, column_type, input_type in column_definition: print('\t\t{}: {} ({})'.format(column, DataTypes(column_type).name, InputTypes(input_type).name)) return df, column_definition, encoders def scale(train_data: pd.DataFrame, val_data: pd.DataFrame, test_data: pd.DataFrame, column_definition: List[Tuple[str, DataTypes, InputTypes]], scaler: str): """Scales numerical data. Args: train_data: pd.Dataframe, DataFrame of training data. val_data: pd.Dataframe, DataFrame of validation data. test_data: pd.Dataframe, DataFrame of testing data. column_definition: List of tuples describing columns (column_name, data_type, input_type). scaler: String, scaler to use. Returns: train_data: pd.Dataframe, DataFrame of scaled training data. val_data: pd.Dataframe, DataFrame of scaled validation data. test_data: pd.Dataframe, DataFrame of scaled testing data. scalers: dictionary index by column names containing scalers. """ # select all real-valued columns columns_to_scale = [column for column, data_type, input_type in column_definition if data_type == DataTypes.REAL_VALUED] # handle no scaling case if scaler == 'None': print('\tNo scaling applied') return train_data, val_data, test_data, None scalers = {} for column in columns_to_scale: scaler_column = getattr(preprocessing, scaler)() train_data[column] = scaler_column.fit_transform(train_data[column].values.reshape(-1, 1)) # handle empty validation and test sets val_data[column] = scaler_column.transform(val_data[column].values.reshape(-1, 1)) if val_data.shape[0] > 0 else val_data[column] test_data[column] = scaler_column.transform(test_data[column].values.reshape(-1, 1)) if test_data.shape[0] > 0 else test_data[column] scalers[column] = scaler_column # print columns that were scaled print('\tScaled columns: {}'.format(columns_to_scale)) return train_data, val_data, test_data, scalers