Spaces:
Sleeping
Sleeping
import sys | |
import os | |
import yaml | |
import random | |
from typing import Any, BinaryIO, Callable, Dict, List, Optional, Sequence, Tuple, Union | |
import numpy as np | |
from scipy import stats | |
import pandas as pd | |
import darts | |
from darts import models | |
from darts import metrics | |
from darts import TimeSeries | |
from darts.dataprocessing.transformers import Scaler | |
from pytorch_lightning.callbacks import Callback | |
# for darts dataset | |
from darts.logging import get_logger, raise_if_not | |
from darts.utils.data.training_dataset import PastCovariatesTrainingDataset, \ | |
DualCovariatesTrainingDataset, \ | |
MixedCovariatesTrainingDataset | |
from darts.utils.data.inference_dataset import PastCovariatesInferenceDataset, \ | |
DualCovariatesInferenceDataset, \ | |
MixedCovariatesInferenceDataset | |
from darts.utils.data.utils import CovariateType | |
# import data formatter | |
sys.path.append(os.path.join(os.path.dirname(__file__), '..')) | |
from data_formatter.base import * | |
def get_valid_sampling_locations(target_series: Union[TimeSeries, Sequence[TimeSeries]], | |
output_chunk_length: int = 12, | |
input_chunk_length: int = 12, | |
random_state: Optional[int] = 0, | |
max_samples_per_ts: Optional[int] = None): | |
""" | |
Get valid sampling indices data for the model. | |
Parameters | |
---------- | |
target_series | |
The target time series. | |
output_chunk_length | |
The length of the output chunk. | |
input_chunk_length | |
The length of the input chunk. | |
use_static_covariates | |
Whether to use static covariates. | |
max_samples_per_ts | |
The maximum number of samples per time series. | |
""" | |
random.seed(random_state) | |
valid_sampling_locations = {} | |
total_length = input_chunk_length + output_chunk_length | |
for id, series in enumerate(target_series): | |
num_entries = len(series) | |
if num_entries >= total_length: | |
valid_sampling_locations[id] = [i for i in range(num_entries - total_length + 1)] | |
if max_samples_per_ts is not None: | |
updated_sampling_locations = {} | |
for id, locations in valid_sampling_locations.items(): | |
if len(locations) > max_samples_per_ts: | |
updated_sampling_locations[id] = random.sample(locations, max_samples_per_ts) | |
else: | |
updated_sampling_locations[id] = locations | |
valid_sampling_locations = updated_sampling_locations | |
return valid_sampling_locations | |
class SamplingDatasetPast(PastCovariatesTrainingDataset): | |
def __init__( | |
self, | |
target_series: Union[TimeSeries, Sequence[TimeSeries]], | |
covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, | |
output_chunk_length: int = 12, | |
input_chunk_length: int = 12, | |
use_static_covariates: bool = True, | |
random_state: Optional[int] = 0, | |
max_samples_per_ts: Optional[int] = None, | |
remove_nan: bool = False, | |
) -> None: | |
""" | |
Parameters | |
---------- | |
target_series | |
One or a sequence of target `TimeSeries`. | |
covariates: | |
Optionally, one or a sequence of `TimeSeries` containing past-observed covariates. If this parameter is set, | |
the provided sequence must have the same length as that of `target_series`. Moreover, all | |
covariates in the sequence must have a time span large enough to contain all the required slices. | |
The joint slicing of the target and covariates is relying on the time axes of both series. | |
output_chunk_length | |
The length of the "output" series emitted by the model | |
input_chunk_length | |
The length of the "input" series fed to the model | |
use_static_covariates | |
Whether to use/include static covariate data from input series. | |
random_state | |
The random state to use for sampling. | |
max_samples_per_ts | |
The maximum number of samples to be drawn from each time series. If None, all samples will be drawn. | |
remove_nan | |
Whether to remove None from the output. E.g. if no covariates are provided, the covariates output will be None | |
or (optionally) removed from the __getitem__ output. | |
""" | |
super().__init__() | |
self.remove_nan = remove_nan | |
self.target_series = ( | |
[target_series] if isinstance(target_series, TimeSeries) else target_series | |
) | |
self.covariates = ( | |
[covariates] if isinstance(covariates, TimeSeries) else covariates | |
) | |
# checks | |
raise_if_not( | |
covariates is None or len(self.target_series) == len(self.covariates), | |
"The provided sequence of target series must have the same length as " | |
"the provided sequence of covariate series.", | |
) | |
# get valid sampling locations | |
self.valid_sampling_locations = get_valid_sampling_locations(target_series, | |
output_chunk_length, | |
input_chunk_length, | |
random_state, | |
max_samples_per_ts) | |
# set parameters | |
self.output_chunk_length = output_chunk_length | |
self.input_chunk_length = input_chunk_length | |
self.total_length = input_chunk_length + output_chunk_length | |
self.total_number_samples = sum([len(v) for v in self.valid_sampling_locations.values()]) | |
self.use_static_covariates = use_static_covariates | |
def __len__(self): | |
""" | |
Returns the total number of possible (input, target) splits. | |
""" | |
return self.total_number_samples | |
def __getitem__(self, idx: int): | |
# get idx of target series | |
target_idx = 0 | |
while idx >= len(self.valid_sampling_locations[target_idx]): | |
idx -= len(self.valid_sampling_locations[target_idx]) | |
target_idx += 1 | |
# get sampling location within the target series | |
sampling_location = self.valid_sampling_locations[target_idx][idx] | |
# get target series | |
target_series = self.target_series[target_idx].values() | |
past_target_series = target_series[sampling_location : sampling_location + self.input_chunk_length] | |
future_target_series = target_series[sampling_location + self.input_chunk_length : sampling_location + self.total_length] | |
# get covariates | |
if self.covariates is not None: | |
covariates = self.covariates[target_idx].values() | |
covariates = covariates[sampling_location : sampling_location + self.input_chunk_length] | |
else: | |
covariates = None | |
# get static covariates | |
if self.use_static_covariates: | |
static_covariates = self.target_series[target_idx].static_covariates_values(copy=True) | |
else: | |
static_covariates = None | |
# return elements that are not None | |
if self.remove_nan: | |
out = [] | |
out += [past_target_series] if past_target_series is not None else [] | |
out += [covariates] if covariates is not None else [] | |
out += [static_covariates] if static_covariates is not None else [] | |
out += [future_target_series] if future_target_series is not None else [] | |
return tuple(out) | |
else: | |
return tuple([past_target_series, | |
covariates, | |
static_covariates, | |
future_target_series]) | |
class SamplingDatasetDual(DualCovariatesTrainingDataset): | |
def __init__( | |
self, | |
target_series: Union[TimeSeries, Sequence[TimeSeries]], | |
covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, | |
output_chunk_length: int = 12, | |
input_chunk_length: int = 12, | |
use_static_covariates: bool = True, | |
random_state: Optional[int] = 0, | |
max_samples_per_ts: Optional[int] = None, | |
remove_nan: bool = False, | |
) -> None: | |
""" | |
Parameters | |
---------- | |
target_series | |
One or a sequence of target `TimeSeries`. | |
covariates: | |
Optionally, one or a sequence of `TimeSeries` containing future-known covariates. If this parameter is set, | |
the provided sequence must have the same length as that of `target_series`. Moreover, all | |
covariates in the sequence must have a time span large enough to contain all the required slices. | |
The joint slicing of the target and covariates is relying on the time axes of both series. | |
output_chunk_length | |
The length of the "output" series emitted by the model | |
input_chunk_length | |
The length of the "input" series fed to the model | |
use_static_covariates | |
Whether to use/include static covariate data from input series. | |
random_state | |
The random state to use for sampling. | |
max_samples_per_ts | |
The maximum number of samples to be drawn from each time series. If None, all samples will be drawn. | |
remove_nan | |
Whether to remove None from the output. E.g. if no covariates are provided, the covariates output will be None | |
or (optionally) removed from the __getitem__ output. | |
""" | |
super().__init__() | |
self.remove_nan = remove_nan | |
self.target_series = ( | |
[target_series] if isinstance(target_series, TimeSeries) else target_series | |
) | |
self.covariates = ( | |
[covariates] if isinstance(covariates, TimeSeries) else covariates | |
) | |
# checks | |
raise_if_not( | |
covariates is None or len(self.target_series) == len(self.covariates), | |
"The provided sequence of target series must have the same length as " | |
"the provided sequence of covariate series.", | |
) | |
# get valid sampling locations | |
self.valid_sampling_locations = get_valid_sampling_locations(target_series, | |
output_chunk_length, | |
input_chunk_length, | |
random_state, | |
max_samples_per_ts,) | |
# set parameters | |
self.output_chunk_length = output_chunk_length | |
self.input_chunk_length = input_chunk_length | |
self.total_length = input_chunk_length + output_chunk_length | |
self.total_number_samples = sum([len(v) for v in self.valid_sampling_locations.values()]) | |
self.use_static_covariates = use_static_covariates | |
def __len__(self): | |
""" | |
Returns the total number of possible (input, target) splits. | |
""" | |
return self.total_number_samples | |
def __getitem__(self, idx: int): | |
# get idx of target series | |
target_idx = 0 | |
while idx >= len(self.valid_sampling_locations[target_idx]): | |
idx -= len(self.valid_sampling_locations[target_idx]) | |
target_idx += 1 | |
# get sampling location within the target series | |
sampling_location = self.valid_sampling_locations[target_idx][idx] | |
# get target series | |
target_series = self.target_series[target_idx].values() | |
past_target_series = target_series[sampling_location : sampling_location + self.input_chunk_length] | |
future_target_series = target_series[sampling_location + self.input_chunk_length : sampling_location + self.total_length] | |
# get covariates | |
if self.covariates is not None: | |
covariates = self.covariates[target_idx].values() | |
past_covariates = covariates[sampling_location : sampling_location + self.input_chunk_length] | |
future_covariates = covariates[sampling_location + self.input_chunk_length : sampling_location + self.total_length] | |
else: | |
past_covariates = None | |
future_covariates = None | |
# get static covariates | |
if self.use_static_covariates: | |
static_covariates = self.target_series[target_idx].static_covariates_values(copy=True) | |
else: | |
static_covariates = None | |
# return elements that are not None | |
if self.remove_nan: | |
out = [] | |
out += [past_target_series] if past_target_series is not None else [] | |
out += [past_covariates] if past_covariates is not None else [] | |
out += [future_covariates] if future_covariates is not None else [] | |
out += [static_covariates] if static_covariates is not None else [] | |
out += [future_target_series] if future_target_series is not None else [] | |
return tuple(out) | |
else: | |
return tuple([past_target_series, | |
past_covariates, | |
future_covariates, | |
static_covariates, | |
future_target_series]) | |
class SamplingDatasetMixed(MixedCovariatesTrainingDataset): | |
def __init__( | |
self, | |
target_series: Union[TimeSeries, Sequence[TimeSeries]], | |
past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, | |
future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, | |
output_chunk_length: int = 12, | |
input_chunk_length: int = 12, | |
use_static_covariates: bool = True, | |
random_state: Optional[int] = 0, | |
max_samples_per_ts: Optional[int] = None, | |
remove_nan: bool = False, | |
) -> None: | |
""" | |
Parameters | |
---------- | |
target_series | |
One or a sequence of target `TimeSeries`. | |
past_covariates | |
Optionally, one or a sequence of `TimeSeries` containing past-observed covariates. If this parameter is set, | |
the provided sequence must have the same length as that of `target_series`. Moreover, all | |
covariates in the sequence must have a time span large enough to contain all the required slices. | |
The joint slicing of the target and covariates is relying on the time axes of both series. | |
future_covariates | |
Optionally, one or a sequence of `TimeSeries` containing future-known covariates. This has to follow | |
the same constraints as `past_covariates`. | |
output_chunk_length | |
The length of the "output" series emitted by the model | |
input_chunk_length | |
The length of the "input" series fed to the model | |
use_static_covariates | |
Whether to use/include static covariate data from input series. | |
random_state | |
The random state to use for sampling. | |
max_samples_per_ts | |
The maximum number of samples to be drawn from each time series. If None, all samples will be drawn. | |
remove_nan | |
Whether to remove None from the output. E.g. if no covariates are provided, the covariates output will be None | |
or (optionally) removed from the __getitem__ output. | |
""" | |
super().__init__() | |
self.remove_nan = remove_nan | |
self.target_series = ( | |
[target_series] if isinstance(target_series, TimeSeries) else target_series | |
) | |
self.past_covariates = ( | |
[past_covariates] if isinstance(past_covariates, TimeSeries) else past_covariates | |
) | |
self.future_covariates = ( | |
[future_covariates] if isinstance(future_covariates, TimeSeries) else future_covariates | |
) | |
# checks | |
raise_if_not( | |
future_covariates is None or len(self.target_series) == len(self.future_covariates), | |
"The provided sequence of target series must have the same length as " | |
"the provided sequence of covariate series.", | |
) | |
raise_if_not( | |
past_covariates is None or len(self.target_series) == len(self.past_covariates), | |
"The provided sequence of target series must have the same length as " | |
"the provided sequence of covariate series.", | |
) | |
# get valid sampling locations | |
self.valid_sampling_locations = get_valid_sampling_locations(target_series, | |
output_chunk_length, | |
input_chunk_length, | |
random_state, | |
max_samples_per_ts,) | |
# set parameters | |
self.output_chunk_length = output_chunk_length | |
self.input_chunk_length = input_chunk_length | |
self.total_length = input_chunk_length + output_chunk_length | |
self.total_number_samples = sum([len(v) for v in self.valid_sampling_locations.values()]) | |
self.use_static_covariates = use_static_covariates | |
def __len__(self): | |
""" | |
Returns the total number of possible (input, target) splits. | |
""" | |
return self.total_number_samples | |
def __getitem__(self, idx: int): | |
# get idx of target series | |
target_idx = 0 | |
while idx >= len(self.valid_sampling_locations[target_idx]): | |
idx -= len(self.valid_sampling_locations[target_idx]) | |
target_idx += 1 | |
# get sampling location within the target series | |
sampling_location = self.valid_sampling_locations[target_idx][idx] | |
# get target series | |
target_series = self.target_series[target_idx].values() | |
past_target_series = target_series[sampling_location : sampling_location + self.input_chunk_length] | |
future_target_series = target_series[sampling_location + self.input_chunk_length : sampling_location + self.total_length] | |
# get past covariates | |
if self.past_covariates is not None: | |
past_covariates = self.past_covariates[target_idx].values() | |
past_covariates = past_covariates[sampling_location : sampling_location + self.input_chunk_length] | |
else: | |
past_covariates = None | |
# get future covariates | |
if self.future_covariates is not None: | |
future_covariates = self.future_covariates[target_idx].values() | |
historic_future_covariates = future_covariates[sampling_location : sampling_location + self.input_chunk_length] | |
future_covariates = future_covariates[sampling_location + self.input_chunk_length : sampling_location + self.total_length] | |
else: | |
future_covariates = None | |
historic_future_covariates = None | |
# get static covariates | |
if self.use_static_covariates: | |
static_covariates = self.target_series[target_idx].static_covariates_values(copy=True) | |
else: | |
static_covariates = None | |
# return elements that are not None | |
if self.remove_nan: | |
out = [] | |
out += [past_target_series] if past_target_series is not None else [] | |
out += [past_covariates] if past_covariates is not None else [] | |
out += [historic_future_covariates] if historic_future_covariates is not None else [] | |
out += [future_covariates] if future_covariates is not None else [] | |
out += [static_covariates] if static_covariates is not None else [] | |
out += [future_target_series] if future_target_series is not None else [] | |
return tuple(out) | |
else: | |
return tuple([past_target_series, | |
past_covariates, | |
historic_future_covariates, | |
future_covariates, | |
static_covariates, | |
future_target_series]) | |
class SamplingDatasetInferenceMixed(MixedCovariatesInferenceDataset): | |
def __init__( | |
self, | |
target_series: Union[TimeSeries, Sequence[TimeSeries]], | |
past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, | |
future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, | |
n: int = 1, | |
input_chunk_length: int = 12, | |
output_chunk_length: int = 1, | |
use_static_covariates: bool = True, | |
random_state: Optional[int] = 0, | |
max_samples_per_ts: Optional[int] = None, | |
array_output_only: bool = False, | |
): | |
""" | |
Parameters | |
---------- | |
target_series | |
One or a sequence of target `TimeSeries`. | |
past_covariates | |
Optionally, one or a sequence of `TimeSeries` containing past-observed covariates. If this parameter is set, | |
the provided sequence must have the same length as that of `target_series`. Moreover, all | |
covariates in the sequence must have a time span large enough to contain all the required slices. | |
The joint slicing of the target and covariates is relying on the time axes of both series. | |
future_covariates | |
Optionally, one or a sequence of `TimeSeries` containing future-known covariates. This has to follow | |
the same constraints as `past_covariates`. | |
n | |
Number of predictions into the future, could be greater than the output chunk length, in which case, the model | |
will be called autorregressively. | |
output_chunk_length | |
The length of the "output" series emitted by the model | |
input_chunk_length | |
The length of the "input" series fed to the model | |
use_static_covariates | |
Whether to use/include static covariate data from input series. | |
random_state | |
The random state to use for sampling. | |
max_samples_per_ts | |
The maximum number of samples to be drawn from each time series. If None, all samples will be drawn. | |
array_output_only | |
Whether __getitem__ returns only the arrays or adds the full `TimeSeries` object to the output tuple | |
This may cause problems with the torch collate and loader functions but works for Darts. | |
""" | |
super().__init__(target_series = target_series, | |
past_covariates = past_covariates, | |
future_covariates = future_covariates, | |
n = n, | |
input_chunk_length = input_chunk_length, | |
output_chunk_length = output_chunk_length,) | |
self.target_series = ( | |
[target_series] if isinstance(target_series, TimeSeries) else target_series | |
) | |
self.past_covariates = ( | |
[past_covariates] if isinstance(past_covariates, TimeSeries) else past_covariates | |
) | |
self.future_covariates = ( | |
[future_covariates] if isinstance(future_covariates, TimeSeries) else future_covariates | |
) | |
# checks | |
raise_if_not( | |
future_covariates is None or len(self.target_series) == len(self.future_covariates), | |
"The provided sequence of target series must have the same length as " | |
"the provided sequence of covariate series.", | |
) | |
raise_if_not( | |
past_covariates is None or len(self.target_series) == len(self.past_covariates), | |
"The provided sequence of target series must have the same length as " | |
"the provided sequence of covariate series.", | |
) | |
# get valid sampling locations | |
self.valid_sampling_locations = get_valid_sampling_locations(target_series, | |
output_chunk_length, | |
input_chunk_length, | |
random_state, | |
max_samples_per_ts,) | |
# set parameters | |
self.output_chunk_length = output_chunk_length | |
self.input_chunk_length = input_chunk_length | |
self.total_length = input_chunk_length + output_chunk_length | |
self.total_number_samples = sum([len(v) for v in self.valid_sampling_locations.values()]) | |
self.use_static_covariates = use_static_covariates | |
self.array_output_only = array_output_only | |
def __len__(self): | |
""" | |
Returns the total number of possible (input, target) splits. | |
""" | |
return self.total_number_samples | |
def __getitem__(self, idx: int): | |
# get idx of target series | |
target_idx = 0 | |
while idx >= len(self.valid_sampling_locations[target_idx]): | |
idx -= len(self.valid_sampling_locations[target_idx]) | |
target_idx += 1 | |
# get sampling location within the target series | |
sampling_location = self.valid_sampling_locations[target_idx][idx] | |
# get target series | |
target_series = self.target_series[target_idx] | |
past_target_series_with_time = target_series[sampling_location : sampling_location + self.input_chunk_length] | |
past_end = past_target_series_with_time.time_index[-1] | |
target_series = self.target_series[target_idx].values() | |
past_target_series = target_series[sampling_location : sampling_location + self.input_chunk_length] | |
# get past covariates | |
if self.past_covariates is not None: | |
past_covariates = self.past_covariates[target_idx].values() | |
past_covariates = past_covariates[sampling_location : sampling_location + self.input_chunk_length] | |
future_past_covariates = past_covariates[sampling_location + self.input_chunk_length : sampling_location + self.total_length] | |
else: | |
past_covariates = None | |
future_past_covariates = None | |
# get future covariates | |
if self.future_covariates is not None: | |
future_covariates = self.future_covariates[target_idx].values() | |
historic_future_covariates = future_covariates[sampling_location : sampling_location + self.input_chunk_length] | |
future_covariates = future_covariates[sampling_location + self.input_chunk_length : sampling_location + self.total_length] | |
else: | |
future_covariates = None | |
historic_future_covariates = None | |
# get static covariates | |
if self.use_static_covariates: | |
static_covariates = self.target_series[target_idx].static_covariates_values(copy=True) | |
else: | |
static_covariates = None | |
# whether to remove Timeseries and None and return only arrays | |
if self.array_output_only: | |
out = [] | |
out += [past_target_series] if past_target_series is not None else [] | |
out += [past_covariates] if past_covariates is not None else [] | |
out += [historic_future_covariates] if historic_future_covariates is not None else [] | |
out += [future_covariates] if future_covariates is not None else [] | |
out += [future_past_covariates] if future_past_covariates is not None else [] | |
out += [static_covariates] if static_covariates is not None else [] | |
return tuple(out) | |
else: | |
return tuple([past_target_series, | |
past_covariates, | |
historic_future_covariates, | |
future_covariates, | |
future_past_covariates, | |
static_covariates, | |
past_target_series_with_time, | |
past_end + past_target_series_with_time.freq | |
]) | |
def evalsample( | |
self, idx: int | |
) -> TimeSeries: | |
""" | |
Returns the future target series at the given index. | |
""" | |
# get idx of target series | |
target_idx = 0 | |
while idx >= len(self.valid_sampling_locations[target_idx]): | |
idx -= len(self.valid_sampling_locations[target_idx]) | |
target_idx += 1 | |
# get sampling location within the target series | |
sampling_location = self.valid_sampling_locations[target_idx][idx] | |
# get target series | |
target_series = self.target_series[target_idx][sampling_location + self.input_chunk_length : sampling_location + self.total_length] | |
return target_series | |
class SamplingDatasetInferencePast(PastCovariatesInferenceDataset): | |
def __init__( | |
self, | |
target_series: Union[TimeSeries, Sequence[TimeSeries]], | |
covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, | |
n: int = 1, | |
input_chunk_length: int = 12, | |
output_chunk_length: int = 1, | |
use_static_covariates: bool = True, | |
random_state: Optional[int] = 0, | |
max_samples_per_ts: Optional[int] = None, | |
array_output_only: bool = False, | |
): | |
""" | |
Parameters | |
---------- | |
target_series | |
One or a sequence of target `TimeSeries`. | |
past_covariates | |
Optionally, one or a sequence of `TimeSeries` containing past-observed covariates. If this parameter is set, | |
the provided sequence must have the same length as that of `target_series`. Moreover, all | |
covariates in the sequence must have a time span large enough to contain all the required slices. | |
The joint slicing of the target and covariates is relying on the time axes of both series. | |
n | |
Number of predictions into the future, could be greater than the output chunk length, in which case, the model | |
will be called autorregressively. | |
output_chunk_length | |
The length of the "output" series emitted by the model | |
input_chunk_length | |
The length of the "input" series fed to the model | |
use_static_covariates | |
Whether to use/include static covariate data from input series. | |
random_state | |
The random state to use for sampling. | |
max_samples_per_ts | |
The maximum number of samples to be drawn from each time series. If None, all samples will be drawn. | |
array_output_only | |
Whether __getitem__ returns only the arrays or adds the full `TimeSeries` object to the output tuple | |
This may cause problems with the torch collate and loader functions but works for Darts. | |
""" | |
super().__init__(target_series = target_series, | |
covariates = covariates, | |
n = n, | |
input_chunk_length = input_chunk_length, | |
output_chunk_length = output_chunk_length,) | |
self.target_series = ( | |
[target_series] if isinstance(target_series, TimeSeries) else target_series | |
) | |
self.covariates = ( | |
[covariates] if isinstance(covariates, TimeSeries) else covariates | |
) | |
raise_if_not( | |
covariates is None or len(self.target_series) == len(self.covariates), | |
"The provided sequence of target series must have the same length as " | |
"the provided sequence of covariate series.", | |
) | |
# get valid sampling locations | |
self.valid_sampling_locations = get_valid_sampling_locations(target_series, | |
output_chunk_length, | |
input_chunk_length, | |
random_state, | |
max_samples_per_ts,) | |
# set parameters | |
self.output_chunk_length = output_chunk_length | |
self.input_chunk_length = input_chunk_length | |
self.total_length = input_chunk_length + output_chunk_length | |
self.total_number_samples = sum([len(v) for v in self.valid_sampling_locations.values()]) | |
self.use_static_covariates = use_static_covariates | |
self.array_output_only = array_output_only | |
def __len__(self): | |
""" | |
Returns the total number of possible (input, target) splits. | |
""" | |
return self.total_number_samples | |
def __getitem__(self, idx: int): | |
# get idx of target series | |
target_idx = 0 | |
while idx >= len(self.valid_sampling_locations[target_idx]): | |
idx -= len(self.valid_sampling_locations[target_idx]) | |
target_idx += 1 | |
# get sampling location within the target series | |
sampling_location = self.valid_sampling_locations[target_idx][idx] | |
# get target series | |
target_series = self.target_series[target_idx] | |
past_target_series_with_time = target_series[sampling_location : sampling_location + self.input_chunk_length] | |
past_end = past_target_series_with_time.time_index[-1] | |
target_series = self.target_series[target_idx].values() | |
past_target_series = target_series[sampling_location : sampling_location + self.input_chunk_length] | |
# get past covariates | |
if self.covariates is not None: | |
past_covariates = self.covariates[target_idx].values() | |
past_covariates = past_covariates[sampling_location : sampling_location + self.input_chunk_length] | |
future_past_covariates = past_covariates[sampling_location + self.input_chunk_length : sampling_location + self.total_length] | |
else: | |
past_covariates = None | |
future_past_covariates = None | |
# get static covariates | |
if self.use_static_covariates: | |
static_covariates = self.target_series[target_idx].static_covariates_values(copy=True) | |
else: | |
static_covariates = None | |
# return arrays or arrays with TimeSeries | |
if self.array_output_only: | |
out = [] | |
out += [past_target_series] if past_target_series is not None else [] | |
out += [past_covariates] if past_covariates is not None else [] | |
out += [future_past_covariates] if future_past_covariates is not None else [] | |
out += [static_covariates] if static_covariates is not None else [] | |
return tuple(out) | |
else: | |
return tuple([past_target_series, | |
past_covariates, | |
future_past_covariates, | |
static_covariates, | |
past_target_series_with_time, | |
past_end + past_target_series_with_time.freq]) | |
def evalsample( | |
self, idx: int | |
) -> TimeSeries: | |
""" | |
Returns the future target series at the given index. | |
""" | |
# get idx of target series | |
target_idx = 0 | |
while idx >= len(self.valid_sampling_locations[target_idx]): | |
idx -= len(self.valid_sampling_locations[target_idx]) | |
target_idx += 1 | |
# get sampling location within the target series | |
sampling_location = self.valid_sampling_locations[target_idx][idx] | |
# get target series | |
target_series = self.target_series[target_idx][sampling_location + self.input_chunk_length : sampling_location + self.total_length] | |
return target_series | |
class SamplingDatasetInferenceDual(DualCovariatesInferenceDataset): | |
def __init__( | |
self, | |
target_series: Union[TimeSeries, Sequence[TimeSeries]], | |
covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, | |
n: int = 12, | |
input_chunk_length: int = 12, | |
output_chunk_length: int = 1, | |
use_static_covariates: bool = True, | |
random_state: Optional[int] = 0, | |
max_samples_per_ts: Optional[int] = None, | |
array_output_only: bool = False, | |
): | |
""" | |
Parameters | |
---------- | |
target_series | |
One or a sequence of target `TimeSeries`. | |
covariates | |
Optionally, some future-known covariates that are used for predictions. This argument is required | |
if the model was trained with future-known covariates. | |
n | |
Number of predictions into the future, could be greater than the output chunk length, in which case, the model | |
will be called autorregressively. | |
output_chunk_length | |
The length of the "output" series emitted by the model | |
input_chunk_length | |
The length of the "input" series fed to the model | |
use_static_covariates | |
Whether to use/include static covariate data from input series. | |
random_state | |
The random state to use for sampling. | |
max_samples_per_ts | |
The maximum number of samples to be drawn from each time series. If None, all samples will be drawn. | |
array_output_only | |
Whether __getitem__ returns only the arrays or adds the full `TimeSeries` object to the output tuple | |
This may cause problems with the torch collate and loader functions but works for Darts. | |
""" | |
super().__init__(target_series = target_series, | |
covariates = covariates, | |
n = n, | |
input_chunk_length = input_chunk_length, | |
output_chunk_length = output_chunk_length,) | |
self.target_series = ( | |
[target_series] if isinstance(target_series, TimeSeries) else target_series | |
) | |
self.covariates = ( | |
[covariates] if isinstance(covariates, TimeSeries) else covariates | |
) | |
raise_if_not( | |
covariates is None or len(self.target_series) == len(self.covariates), | |
"The provided sequence of target series must have the same length as " | |
"the provided sequence of covariate series.", | |
) | |
# get valid sampling locations | |
self.valid_sampling_locations = get_valid_sampling_locations(target_series, | |
output_chunk_length, | |
input_chunk_length, | |
random_state, | |
max_samples_per_ts,) | |
# set parameters | |
self.output_chunk_length = output_chunk_length | |
self.input_chunk_length = input_chunk_length | |
self.total_length = input_chunk_length + output_chunk_length | |
self.total_number_samples = sum([len(v) for v in self.valid_sampling_locations.values()]) | |
self.use_static_covariates = use_static_covariates | |
self.array_output_only = array_output_only | |
def __len__(self): | |
""" | |
Returns the total number of possible (input, target) splits. | |
""" | |
return self.total_number_samples | |
def __getitem__(self, idx: int): | |
# get idx of target series | |
target_idx = 0 | |
while idx >= len(self.valid_sampling_locations[target_idx]): | |
idx -= len(self.valid_sampling_locations[target_idx]) | |
target_idx += 1 | |
# get sampling location within the target series | |
sampling_location = self.valid_sampling_locations[target_idx][idx] | |
# get target series | |
target_series = self.target_series[target_idx] | |
past_target_series_with_time = target_series[sampling_location : sampling_location + self.input_chunk_length] | |
past_end = past_target_series_with_time.time_index[-1] | |
target_series = self.target_series[target_idx].values() | |
past_target_series = target_series[sampling_location : sampling_location + self.input_chunk_length] | |
# get past covariates | |
if self.covariates is not None: | |
future_covariates = self.covariates[target_idx].values() | |
historic_future_covariates = future_covariates[sampling_location : sampling_location + self.input_chunk_length] | |
future_covariates = future_covariates[sampling_location + self.input_chunk_length : sampling_location + self.total_length] | |
else: | |
historic_future_covariates = None | |
future_covariates = None | |
# get static covariates | |
if self.use_static_covariates: | |
static_covariates = self.target_series[target_idx].static_covariates_values(copy=True) | |
else: | |
static_covariates = None | |
# return arrays or arrays with TimeSeries | |
if self.array_output_only: | |
out = [] | |
out += [past_target_series] if past_target_series is not None else [] | |
out += [historic_future_covariates] if historic_future_covariates is not None else [] | |
out += [future_covariates] if future_covariates is not None else [] | |
out += [static_covariates] if static_covariates is not None else [] | |
return tuple(out) | |
else: | |
return tuple([past_target_series, | |
historic_future_covariates, | |
future_covariates, | |
static_covariates, | |
past_target_series_with_time, | |
past_end + past_target_series_with_time.freq,]) | |
def evalsample( | |
self, idx: int | |
) -> TimeSeries: | |
""" | |
Returns the future target series at the given index. | |
""" | |
# get idx of target series | |
target_idx = 0 | |
while idx >= len(self.valid_sampling_locations[target_idx]): | |
idx -= len(self.valid_sampling_locations[target_idx]) | |
target_idx += 1 | |
# get sampling location within the target series | |
sampling_location = self.valid_sampling_locations[target_idx][idx] | |
# get target series | |
target_series = self.target_series[target_idx][sampling_location + self.input_chunk_length : sampling_location + self.total_length] | |
return target_series | |