import sys import os import yaml import random from typing import Any, BinaryIO, Callable, Dict, List, Optional, Sequence, Tuple, Union import numpy as np from scipy import stats import pandas as pd import darts from darts import models from darts import metrics from darts import TimeSeries # import data formatter sys.path.append(os.path.join(os.path.dirname(__file__), '..')) from data_formatter.base import * from utils.darts_processing import * def _get_values( series: TimeSeries, stochastic_quantile: Optional[float] = 0.5 ) -> np.ndarray: """ Returns the numpy values of a time series. For stochastic series, return either all sample values with (stochastic_quantile=None) or the quantile sample value with (stochastic_quantile {>=0,<=1}) """ if series.is_deterministic: series_values = series.univariate_values() else: # stochastic if stochastic_quantile is None: series_values = series.all_values(copy=False) else: series_values = series.quantile_timeseries( quantile=stochastic_quantile ).univariate_values() return series_values def _get_values_or_raise( series_a: TimeSeries, series_b: TimeSeries, intersect: bool, stochastic_quantile: Optional[float] = 0.5, remove_nan_union: bool = False, ) -> Tuple[np.ndarray, np.ndarray]: """Returns the processed numpy values of two time series. Processing can be customized with arguments `intersect, stochastic_quantile, remove_nan_union`. Raises a ValueError if the two time series (or their intersection) do not have the same time index. Parameters ---------- series_a A univariate deterministic ``TimeSeries`` instance (the actual series). series_b A univariate (deterministic or stochastic) ``TimeSeries`` instance (the predicted series). intersect A boolean for whether or not to only consider the time intersection between `series_a` and `series_b` stochastic_quantile Optionally, for stochastic predicted series, return either all sample values with (`stochastic_quantile=None`) or any deterministic quantile sample values by setting `stochastic_quantile=quantile` {>=0,<=1}. remove_nan_union By setting `remove_non_union` to True, remove all indices from `series_a` and `series_b` which have a NaN value in either of the two input series. """ series_a_common = series_a.slice_intersect(series_b) if intersect else series_a series_b_common = series_b.slice_intersect(series_a) if intersect else series_b series_a_det = _get_values(series_a_common, stochastic_quantile=stochastic_quantile) series_b_det = _get_values(series_b_common, stochastic_quantile=stochastic_quantile) if not remove_nan_union: return series_a_det, series_b_det b_is_deterministic = bool(len(series_b_det.shape) == 1) if b_is_deterministic: isnan_mask = np.logical_or(np.isnan(series_a_det), np.isnan(series_b_det)) else: isnan_mask = np.logical_or( np.isnan(series_a_det), np.isnan(series_b_det).any(axis=2).flatten() ) return np.delete(series_a_det, isnan_mask), np.delete( series_b_det, isnan_mask, axis=0 ) def rescale_and_backtest(series: Union[TimeSeries, Sequence[TimeSeries]], forecasts: Union[TimeSeries, Sequence[TimeSeries], Sequence[Sequence[TimeSeries]]], metric: Union[ Callable[[TimeSeries, TimeSeries], float], List[Callable[[TimeSeries, TimeSeries], float]], ], scaler: Callable[[TimeSeries], TimeSeries] = None, reduction: Union[Callable[[np.ndarray], float], None] = np.mean, likelihood: str = "GaussianMean", cal_thresholds: Optional[np.ndarray] = np.linspace(0, 1, 11), ): """ Backtest the historical forecasts (as provided by Darts) on the series. Parameters ---------- series The target time series. forecasts The forecasts. scaler The scaler used to scale the series. metric The metric or metrics to use for backtesting. reduction The reduction to apply to the metric. likelihood The likelihood to use for evaluating the model. cal_thresholds The thresholds to use for computing the calibration error. Returns ------- np.ndarray Error array. If the reduction is none, array is of shape (n, p) where n is the total number of samples (forecasts) and p is the number of metrics. If the reduction is not none, array is of shape (k, p), where k is the number of series. float The estimated log-likelihood of the model on the data. np.ndarray The ECE for each time point in the forecast. """ series = [series] if isinstance(series, TimeSeries) else series forecasts = [forecasts] if isinstance(forecasts, TimeSeries) else forecasts metric = [metric] if not isinstance(metric, list) else metric # compute errors: 1) reverse scaling forecasts and true values, 2)compute errors backtest_list = [] for idx in range(len(series)): if scaler is not None: series[idx] = scaler.inverse_transform(series[idx]) forecasts[idx] = [scaler.inverse_transform(f) for f in forecasts[idx]] errors = [ [metric_f(series[idx], f) for metric_f in metric] if len(metric) > 1 else metric[0](series[idx], f) for f in forecasts[idx] ] if reduction is None: backtest_list.append(np.array(errors)) else: backtest_list.append(reduction(np.array(errors), axis=0)) backtest_list = np.vstack(backtest_list) if likelihood == "GaussianMean": # compute likelihood est_var = [] for idx, target_ts in enumerate(series): est_var += [metrics.mse(target_ts, f) for f in forecasts[idx]] est_var = np.mean(est_var) forecast_len = forecasts[0][0].n_timesteps log_likelihood = -0.5*forecast_len - 0.5*np.log(2*np.pi*est_var) # compute calibration error: 1) cdf values 2) compute calibration error # compute the cdf values cdf_vals = [] for idx in range(len(series)): for forecast in forecasts[idx]: y_true, y_pred = _get_values_or_raise(series[idx], forecast, intersect=True, remove_nan_union=True) y_true, y_pred = y_true.flatten(), y_pred.flatten() cdf_vals.append(stats.norm.cdf(y_true, loc=y_pred, scale=np.sqrt(est_var))) cdf_vals = np.vstack(cdf_vals) # compute the prediction calibration cal_error = np.zeros(forecasts[0][0].n_timesteps) for p in cal_thresholds: est_p = (cdf_vals <= p).astype(float) est_p = np.mean(est_p, axis=0) cal_error += (est_p - p) ** 2 return backtest_list, log_likelihood, cal_error def rescale_and_test(series: Union[TimeSeries, Sequence[TimeSeries]], forecasts: Union[TimeSeries, Sequence[TimeSeries]], metric: Union[ Callable[[TimeSeries, TimeSeries], float], List[Callable[[TimeSeries, TimeSeries], float]], ], scaler: Callable[[TimeSeries], TimeSeries] = None, likelihood: str = "GaussianMean", cal_thresholds: Optional[np.ndarray] = np.linspace(0, 1, 11), ): """ Test the forecasts on the series. Parameters ---------- series The target time series. forecasts The forecasts. scaler The scaler used to scale the series. metric The metric or metrics to use for backtesting. reduction The reduction to apply to the metric. likelihood The likelihood to use for evaluating the likelihood and calibration of model. cal_thresholds The thresholds to use for computing the calibration error. Returns ------- np.ndarray Error array. If the reduction is none, array is of shape (n, p) where n is the total number of samples (forecasts) and p is the number of metrics. If the reduction is not none, array is of shape (k, p), where k is the number of series. float The estimated log-likelihood of the model on the data. np.ndarray The ECE for each time point in the forecast. """ series = [series] if isinstance(series, TimeSeries) else series forecasts = [forecasts] if isinstance(forecasts, TimeSeries) else forecasts metric = [metric] if not isinstance(metric, list) else metric # compute errors: 1) reverse scaling forecasts and true values, 2)compute errors series = scaler.inverse_transform(series) forecasts = scaler.inverse_transform(forecasts) errors = [ [metric_f(t, f) for metric_f in metric] if len(metric) > 1 else metric[0](t, f) for (t, f) in zip(series, forecasts) ] errors = np.array(errors) if likelihood == "GaussianMean": # compute likelihood est_var = [metrics.mse(t, f) for (t, f) in zip(series, forecasts)] est_var = np.mean(est_var) forecast_len = forecasts[0].n_timesteps log_likelihood = -0.5*forecast_len - 0.5*np.log(2*np.pi*est_var) # compute calibration error: 1) cdf values 2) compute calibration error # compute the cdf values cdf_vals = [] for t, f in zip(series, forecasts): t, f = _get_values_or_raise(t, f, intersect=True, remove_nan_union=True) t, f = t.flatten(), f.flatten() cdf_vals.append(stats.norm.cdf(t, loc=f, scale=np.sqrt(est_var))) cdf_vals = np.vstack(cdf_vals) # compute the prediction calibration cal_error = np.zeros(forecasts[0].n_timesteps) for p in cal_thresholds: est_p = (cdf_vals <= p).astype(float) est_p = np.mean(est_p, axis=0) cal_error += (est_p - p) ** 2 if likelihood == "Quantile": # no likelihood since we don't have a parametric model log_likelihood = 0 # compute calibration error: 1) get quantiles 2) compute calibration error cal_error = np.zeros(forecasts[0].n_timesteps) for p in cal_thresholds: est_p = 0 for t, f in zip(series, forecasts): q = f.quantile(p) t, q = _get_values_or_raise(t, q, intersect=True, remove_nan_union=True) t, q = t.flatten(), q.flatten() est_p += (t <= q).astype(float) est_p = (est_p / len(series)).flatten() cal_error += (est_p - p) ** 2 return errors, log_likelihood, cal_error