import sys import os import yaml import random from typing import Any, BinaryIO, Callable, Dict, List, Optional, Sequence, Tuple, Union from pathlib import Path import numpy as np from scipy import stats import pandas as pd import darts from darts import models from darts import metrics from darts import TimeSeries from darts.dataprocessing.transformers import Scaler from pytorch_lightning.callbacks import Callback from sympy import pprint # import data formatter sys.path.append(os.path.join(os.path.dirname(__file__), '..')) from data_formatter.base import * pd.set_option('display.width', None) # Set display width to None to avoid truncation pd.set_option('display.max_columns', None) # Display all columns def make_series(data: Dict[str, pd.DataFrame], time_col: str, group_col: str, value_cols: Dict[str, List[str]], include_sid: bool = False, verbose: bool = False ) -> Dict[str, darts.TimeSeries]: """ Makes the TimeSeries from the data. Parameters ---------- data dict of train, val, test dataframes time_col name of time column group_col name of group column value_cols dict with key specifying the type of covariate and value specifying the list of columns. include_sid whether to include segment id as static covariate Returns ------- series: Dict[str, Dict[str, darts.TimeSeries]] dict of train, val, test splits of target and covariates TimeSeries objects scalers: Dict[str, darts.preprocessing.Scaler] dict of scalers for target and covariates """ series = {i: {j: None for j in value_cols} for i in data.keys()} scalers = {} for key, df in data.items(): for name, cols in value_cols.items(): # Adjust display settings if verbose: print(f"DATAFRAME for key {key} in NAME {name} and COLS {cols} and GROUP_COL {group_col}") pprint(df.head(1)) series[key][name] = TimeSeries.from_group_dataframe(df = df, group_cols = group_col, time_col = time_col, value_cols = cols) if cols is not None else None if series[key][name] is not None and include_sid is False: for i in range(len(series[key][name])): series[key][name][i] = series[key][name][i].with_static_covariates(None) if cols is not None: if key == 'train': scalers[name] = ScalerCustom() series[key][name] = scalers[name].fit_transform(series[key][name]) else: series[key][name] = scalers[name].transform(series[key][name]) else: scalers[name] = None return series, scalers def load_data(url: str, config_path: Path, #df: pd.DataFrame, use_covs: bool = False, cov_type: str = 'past', use_static_covs: bool = False, seed = 0): """ Load data according to the specified config file and covert to Darts TimeSeries objects. Parameters ---------- seed: int Random seed for data splitting. study_file: str Path to the study file. dataset: str Name of the dataset. use_covs: bool Whether to use covariates. cov_type: str Type of covariates to use. Can be 'past' or 'mixed' or 'dual'. use_static_covs: bool Whether to use static covariates. Returns ------- formatter: DataFormatter Data formatter object. series: Dict[str, Dict[str, TimeSeries]] First dictionary specified the split, second dictionary specifies the type of series (target or covariate). scalers: Dict[str, Scaler] Dictionary of scalers with key indicating the type of series (target or covariate). """ """ config={ 'data_csv_path':f'{url}', 'drop': None, 'ds_name': 'livia_mini', 'index_col': -1, 'observation_interval': '5min', 'column_definition': { {'data_type': 'categorical', 'input_type':'id', 'name':'id' }, {'date_type':'date', 'input_type':'time', 'name':'time' }, {'date_type':'real_valued', 'input_type':'target', 'name':'gl' } }, 'encoding_params':{'date':['day','month','year','hour','minute','second'] }, 'nan_vals':None, 'interpolation_params':{'gap_threshold': 45, 'min_drop_length': 240 }, 'scaling_params':{'scaler':None }, 'split_params':{'length_segment': 13, 'random_state':seed, 'test_percent_subjects': 0.1 }, 'max_length_input': 192, 'length_pred': 12, 'params':{ 'gluformer':{'in_len': 96, 'd_model': 512, 'n_heads': 10, 'd_fcn': 1024, 'num_enc_layers': 2, 'num_dec_layers': 2, 'length_pred': 12 } } } """ with config_path.open("r") as f: config = yaml.safe_load(f) config["data_csv_path"] = url formatter = DataFormatter(config #,df ) assert use_covs is not None, 'use_covs must be specified in the load_data call' # convert to series time_col = formatter.get_column('time') group_col = formatter.get_column('sid') target_col = formatter.get_column('target') static_cols = formatter.get_column('static_covs') static_cols = static_cols + [formatter.get_column('id')] if static_cols is not None else [formatter.get_column('id')] dynamic_cols = formatter.get_column('dynamic_covs') future_cols = formatter.get_column('future_covs') data = {'train': formatter.train_data, 'val': formatter.val_data, 'test': formatter.test_data.loc[~formatter.test_data.index.isin(formatter.test_idx_ood)], 'test_ood': formatter.test_data.loc[formatter.test_data.index.isin(formatter.test_idx_ood)]} value_cols = {'target': target_col, 'static': static_cols, 'dynamic': dynamic_cols, 'future': future_cols} # build series series, scalers = make_series(data, time_col, group_col, value_cols) if not use_covs: # set dynamic and future covariates to None for split in ['train', 'val', 'test', 'test_ood']: for cov in ['dynamic', 'future']: series[split][cov] = None elif use_covs and cov_type == 'mixed': pass # this is the default for make_series() elif use_covs and cov_type == 'past': # use future covariates as dynamic (past) covariates if series['train']['dynamic'] is None: for split in ['train', 'val', 'test', 'test_ood']: series[split]['dynamic'] = series[split]['future'] else: for split in ['train', 'val', 'test', 'test_ood']: for i in range(len(series[split]['future'])): series[split]['dynamic'][i] = series[split]['dynamic'][i].concatenate(series[split]['future'][i], axis=1) # erase future covariates for split in ['train', 'val', 'test', 'test_ood']: series[split]['future'] = None elif use_covs and cov_type == 'dual': # erase dynamic (past) covariates for split in ['train', 'val', 'test', 'test_ood']: series[split]['dynamic'] = None if use_static_covs: # attach static covariates to series for split in ['train', 'val', 'test', 'test_ood']: for i in range(len(series[split]['target'])): static_covs = series[split]['static'][i][0].pd_dataframe() series[split]['target'][i] = series[split]['target'][i].with_static_covariates(static_covs) return formatter, series, scalers def reshuffle_data(formatter: DataFormatter, seed: int = 0, use_covs: bool = None, cov_type: str = 'past', use_static_covs: bool = False,): """ Reshuffle data according to the seed and covert to Darts TimeSeries objects. Parameters ---------- formatter: DataFormatter Data formatter object containing the data seed: int Random seed for data splitting. use_covs: bool Whether to use covariates. cov_type: str Type of covariates to use. Can be 'past' or 'mixed' or 'dual'. use_static_covs: bool Whether to use static covariates. Returns ------- formatter: DataFormatter Reshuffled data formatter object. series: Dict[str, Dict[str, TimeSeries]] First dictionary specified the split, second dictionary specifies the type of series (target or covariate). scalers: Dict[str, Scaler] Dictionary of scalers with key indicating the type of series (target or covariate). """ # reshuffle formatter.reshuffle(seed) assert use_covs is not None, 'use_covs must be specified in the reshuffle_data call' # convert to series time_col = formatter.get_column('time') group_col = formatter.get_column('sid') target_col = formatter.get_column('target') static_cols = formatter.get_column('static_covs') static_cols = static_cols + [formatter.get_column('id')] if static_cols is not None else [formatter.get_column('id')] dynamic_cols = formatter.get_column('dynamic_covs') future_cols = formatter.get_column('future_covs') # build series series, scalers = make_series({'train': formatter.train_data, 'val': formatter.val_data, 'test': formatter.test_data.loc[~formatter.test_data.index.isin(formatter.test_idx_ood)], 'test_ood': formatter.test_data.loc[formatter.test_data.index.isin(formatter.test_idx_ood)]}, time_col, group_col, {'target': target_col, 'static': static_cols, 'dynamic': dynamic_cols, 'future': future_cols}) if not use_covs: # set dynamic and future covariates to None for split in ['train', 'val', 'test', 'test_ood']: for cov in ['dynamic', 'future']: series[split][cov] = None elif use_covs and cov_type == 'past': # use future covariates as dynamic covariates if series['train']['dynamic'] is None: for split in ['train', 'val', 'test', 'test_ood']: series[split]['dynamic'] = series[split]['future'] # or attach them to dynamic covariates else: for split in ['train', 'val', 'test', 'test_ood']: for i in range(len(series[split]['future'])): series[split]['dynamic'][i] = series[split]['dynamic'][i].concatenate(series[split]['future'][i], axis=1) elif use_covs and cov_type == 'dual': # set dynamic covariates to None, because they are not supported for split in ['train', 'val', 'test', 'test_ood']: series[split]['dynamic'] = None if use_static_covs: # attach static covariates to series for split in ['train', 'val', 'test', 'test_ood']: for i in range(len(series[split]['target'])): static_covs = series[split]['static'][i][0].pd_dataframe() series[split]['target'][i] = series[split]['target'][i].with_static_covariates(static_covs) return formatter, series, scalers class ScalerCustom: ''' Min-max scaler for TimeSeries that fits on all sequences simultaenously. Default Darts scaler fits one scaler per sequence in the list. Attributes ---------- scaler: Scaler Darts scaler object. min_: np.ndarray Per feature adjustment for minimum (see Scikit-learn). scale_: np.ndarray Per feature relative scaling of the data (see Scikit-learn). ''' def __init__(self): self.scaler = Scaler() self.min_ = None self.scale_ = None def fit(self, time_series: Union[List[TimeSeries], TimeSeries]) -> None: if isinstance(time_series, list): # extract series as Pandas dataframe df = pd.concat([ts.pd_dataframe() for ts in time_series]) value_cols = df.columns df.reset_index(inplace=True) # create new equally spaced time grid df['new_time'] = pd.date_range(start=df['time'].min(), periods=len(df), freq='1h') # fit scaler series = TimeSeries.from_dataframe(df, time_col='new_time', value_cols=value_cols) series = self.scaler.fit(series) else: series = self.scaler.fit(time_series) # extract min and scale self.min_ = self.scaler._fitted_params[0].min_ self.scale_ = self.scaler._fitted_params[0].scale_ def transform(self, time_series: Union[List[TimeSeries], TimeSeries]) -> Union[List[TimeSeries], TimeSeries]: if isinstance(time_series, list): # transform one by one series = [self.scaler.transform(ts) for ts in time_series] else: series = self.scaler.transform(time_series) return series def inverse_transform(self, time_series: Union[List[TimeSeries], TimeSeries]) -> Union[List[TimeSeries], TimeSeries]: if isinstance(time_series, list): # transform one by one series = [self.scaler.inverse_transform(ts) for ts in time_series] else: series = self.scaler.inverse_transform(time_series) return series def fit_transform(self, time_series: Union[List[TimeSeries], TimeSeries]) -> Union[List[TimeSeries], TimeSeries]: self.fit(time_series) series = self.transform(time_series) return series