glucosedao_gpu / utils /darts_dataset.py
Livia_Zaharia
added code for the first time
bacf16b
raw
history blame
42.5 kB
import sys
import os
import yaml
import random
from typing import Any, BinaryIO, Callable, Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
from scipy import stats
import pandas as pd
import darts
from darts import models
from darts import metrics
from darts import TimeSeries
from darts.dataprocessing.transformers import Scaler
from pytorch_lightning.callbacks import Callback
# for darts dataset
from darts.logging import get_logger, raise_if_not
from darts.utils.data.training_dataset import PastCovariatesTrainingDataset, \
DualCovariatesTrainingDataset, \
MixedCovariatesTrainingDataset
from darts.utils.data.inference_dataset import PastCovariatesInferenceDataset, \
DualCovariatesInferenceDataset, \
MixedCovariatesInferenceDataset
from darts.utils.data.utils import CovariateType
# import data formatter
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from data_formatter.base import *
def get_valid_sampling_locations(target_series: Union[TimeSeries, Sequence[TimeSeries]],
output_chunk_length: int = 12,
input_chunk_length: int = 12,
random_state: Optional[int] = 0,
max_samples_per_ts: Optional[int] = None):
"""
Get valid sampling indices data for the model.
Parameters
----------
target_series
The target time series.
output_chunk_length
The length of the output chunk.
input_chunk_length
The length of the input chunk.
use_static_covariates
Whether to use static covariates.
max_samples_per_ts
The maximum number of samples per time series.
"""
random.seed(random_state)
valid_sampling_locations = {}
total_length = input_chunk_length + output_chunk_length
for id, series in enumerate(target_series):
num_entries = len(series)
if num_entries >= total_length:
valid_sampling_locations[id] = [i for i in range(num_entries - total_length + 1)]
if max_samples_per_ts is not None:
updated_sampling_locations = {}
for id, locations in valid_sampling_locations.items():
if len(locations) > max_samples_per_ts:
updated_sampling_locations[id] = random.sample(locations, max_samples_per_ts)
else:
updated_sampling_locations[id] = locations
valid_sampling_locations = updated_sampling_locations
return valid_sampling_locations
class SamplingDatasetPast(PastCovariatesTrainingDataset):
def __init__(
self,
target_series: Union[TimeSeries, Sequence[TimeSeries]],
covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
output_chunk_length: int = 12,
input_chunk_length: int = 12,
use_static_covariates: bool = True,
random_state: Optional[int] = 0,
max_samples_per_ts: Optional[int] = None,
remove_nan: bool = False,
) -> None:
"""
Parameters
----------
target_series
One or a sequence of target `TimeSeries`.
covariates:
Optionally, one or a sequence of `TimeSeries` containing past-observed covariates. If this parameter is set,
the provided sequence must have the same length as that of `target_series`. Moreover, all
covariates in the sequence must have a time span large enough to contain all the required slices.
The joint slicing of the target and covariates is relying on the time axes of both series.
output_chunk_length
The length of the "output" series emitted by the model
input_chunk_length
The length of the "input" series fed to the model
use_static_covariates
Whether to use/include static covariate data from input series.
random_state
The random state to use for sampling.
max_samples_per_ts
The maximum number of samples to be drawn from each time series. If None, all samples will be drawn.
remove_nan
Whether to remove None from the output. E.g. if no covariates are provided, the covariates output will be None
or (optionally) removed from the __getitem__ output.
"""
super().__init__()
self.remove_nan = remove_nan
self.target_series = (
[target_series] if isinstance(target_series, TimeSeries) else target_series
)
self.covariates = (
[covariates] if isinstance(covariates, TimeSeries) else covariates
)
# checks
raise_if_not(
covariates is None or len(self.target_series) == len(self.covariates),
"The provided sequence of target series must have the same length as "
"the provided sequence of covariate series.",
)
# get valid sampling locations
self.valid_sampling_locations = get_valid_sampling_locations(target_series,
output_chunk_length,
input_chunk_length,
random_state,
max_samples_per_ts)
# set parameters
self.output_chunk_length = output_chunk_length
self.input_chunk_length = input_chunk_length
self.total_length = input_chunk_length + output_chunk_length
self.total_number_samples = sum([len(v) for v in self.valid_sampling_locations.values()])
self.use_static_covariates = use_static_covariates
def __len__(self):
"""
Returns the total number of possible (input, target) splits.
"""
return self.total_number_samples
def __getitem__(self, idx: int):
# get idx of target series
target_idx = 0
while idx >= len(self.valid_sampling_locations[target_idx]):
idx -= len(self.valid_sampling_locations[target_idx])
target_idx += 1
# get sampling location within the target series
sampling_location = self.valid_sampling_locations[target_idx][idx]
# get target series
target_series = self.target_series[target_idx].values()
past_target_series = target_series[sampling_location : sampling_location + self.input_chunk_length]
future_target_series = target_series[sampling_location + self.input_chunk_length : sampling_location + self.total_length]
# get covariates
if self.covariates is not None:
covariates = self.covariates[target_idx].values()
covariates = covariates[sampling_location : sampling_location + self.input_chunk_length]
else:
covariates = None
# get static covariates
if self.use_static_covariates:
static_covariates = self.target_series[target_idx].static_covariates_values(copy=True)
else:
static_covariates = None
# return elements that are not None
if self.remove_nan:
out = []
out += [past_target_series] if past_target_series is not None else []
out += [covariates] if covariates is not None else []
out += [static_covariates] if static_covariates is not None else []
out += [future_target_series] if future_target_series is not None else []
return tuple(out)
else:
return tuple([past_target_series,
covariates,
static_covariates,
future_target_series])
class SamplingDatasetDual(DualCovariatesTrainingDataset):
def __init__(
self,
target_series: Union[TimeSeries, Sequence[TimeSeries]],
covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
output_chunk_length: int = 12,
input_chunk_length: int = 12,
use_static_covariates: bool = True,
random_state: Optional[int] = 0,
max_samples_per_ts: Optional[int] = None,
remove_nan: bool = False,
) -> None:
"""
Parameters
----------
target_series
One or a sequence of target `TimeSeries`.
covariates:
Optionally, one or a sequence of `TimeSeries` containing future-known covariates. If this parameter is set,
the provided sequence must have the same length as that of `target_series`. Moreover, all
covariates in the sequence must have a time span large enough to contain all the required slices.
The joint slicing of the target and covariates is relying on the time axes of both series.
output_chunk_length
The length of the "output" series emitted by the model
input_chunk_length
The length of the "input" series fed to the model
use_static_covariates
Whether to use/include static covariate data from input series.
random_state
The random state to use for sampling.
max_samples_per_ts
The maximum number of samples to be drawn from each time series. If None, all samples will be drawn.
remove_nan
Whether to remove None from the output. E.g. if no covariates are provided, the covariates output will be None
or (optionally) removed from the __getitem__ output.
"""
super().__init__()
self.remove_nan = remove_nan
self.target_series = (
[target_series] if isinstance(target_series, TimeSeries) else target_series
)
self.covariates = (
[covariates] if isinstance(covariates, TimeSeries) else covariates
)
# checks
raise_if_not(
covariates is None or len(self.target_series) == len(self.covariates),
"The provided sequence of target series must have the same length as "
"the provided sequence of covariate series.",
)
# get valid sampling locations
self.valid_sampling_locations = get_valid_sampling_locations(target_series,
output_chunk_length,
input_chunk_length,
random_state,
max_samples_per_ts,)
# set parameters
self.output_chunk_length = output_chunk_length
self.input_chunk_length = input_chunk_length
self.total_length = input_chunk_length + output_chunk_length
self.total_number_samples = sum([len(v) for v in self.valid_sampling_locations.values()])
self.use_static_covariates = use_static_covariates
def __len__(self):
"""
Returns the total number of possible (input, target) splits.
"""
return self.total_number_samples
def __getitem__(self, idx: int):
# get idx of target series
target_idx = 0
while idx >= len(self.valid_sampling_locations[target_idx]):
idx -= len(self.valid_sampling_locations[target_idx])
target_idx += 1
# get sampling location within the target series
sampling_location = self.valid_sampling_locations[target_idx][idx]
# get target series
target_series = self.target_series[target_idx].values()
past_target_series = target_series[sampling_location : sampling_location + self.input_chunk_length]
future_target_series = target_series[sampling_location + self.input_chunk_length : sampling_location + self.total_length]
# get covariates
if self.covariates is not None:
covariates = self.covariates[target_idx].values()
past_covariates = covariates[sampling_location : sampling_location + self.input_chunk_length]
future_covariates = covariates[sampling_location + self.input_chunk_length : sampling_location + self.total_length]
else:
past_covariates = None
future_covariates = None
# get static covariates
if self.use_static_covariates:
static_covariates = self.target_series[target_idx].static_covariates_values(copy=True)
else:
static_covariates = None
# return elements that are not None
if self.remove_nan:
out = []
out += [past_target_series] if past_target_series is not None else []
out += [past_covariates] if past_covariates is not None else []
out += [future_covariates] if future_covariates is not None else []
out += [static_covariates] if static_covariates is not None else []
out += [future_target_series] if future_target_series is not None else []
return tuple(out)
else:
return tuple([past_target_series,
past_covariates,
future_covariates,
static_covariates,
future_target_series])
class SamplingDatasetMixed(MixedCovariatesTrainingDataset):
def __init__(
self,
target_series: Union[TimeSeries, Sequence[TimeSeries]],
past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
output_chunk_length: int = 12,
input_chunk_length: int = 12,
use_static_covariates: bool = True,
random_state: Optional[int] = 0,
max_samples_per_ts: Optional[int] = None,
remove_nan: bool = False,
) -> None:
"""
Parameters
----------
target_series
One or a sequence of target `TimeSeries`.
past_covariates
Optionally, one or a sequence of `TimeSeries` containing past-observed covariates. If this parameter is set,
the provided sequence must have the same length as that of `target_series`. Moreover, all
covariates in the sequence must have a time span large enough to contain all the required slices.
The joint slicing of the target and covariates is relying on the time axes of both series.
future_covariates
Optionally, one or a sequence of `TimeSeries` containing future-known covariates. This has to follow
the same constraints as `past_covariates`.
output_chunk_length
The length of the "output" series emitted by the model
input_chunk_length
The length of the "input" series fed to the model
use_static_covariates
Whether to use/include static covariate data from input series.
random_state
The random state to use for sampling.
max_samples_per_ts
The maximum number of samples to be drawn from each time series. If None, all samples will be drawn.
remove_nan
Whether to remove None from the output. E.g. if no covariates are provided, the covariates output will be None
or (optionally) removed from the __getitem__ output.
"""
super().__init__()
self.remove_nan = remove_nan
self.target_series = (
[target_series] if isinstance(target_series, TimeSeries) else target_series
)
self.past_covariates = (
[past_covariates] if isinstance(past_covariates, TimeSeries) else past_covariates
)
self.future_covariates = (
[future_covariates] if isinstance(future_covariates, TimeSeries) else future_covariates
)
# checks
raise_if_not(
future_covariates is None or len(self.target_series) == len(self.future_covariates),
"The provided sequence of target series must have the same length as "
"the provided sequence of covariate series.",
)
raise_if_not(
past_covariates is None or len(self.target_series) == len(self.past_covariates),
"The provided sequence of target series must have the same length as "
"the provided sequence of covariate series.",
)
# get valid sampling locations
self.valid_sampling_locations = get_valid_sampling_locations(target_series,
output_chunk_length,
input_chunk_length,
random_state,
max_samples_per_ts,)
# set parameters
self.output_chunk_length = output_chunk_length
self.input_chunk_length = input_chunk_length
self.total_length = input_chunk_length + output_chunk_length
self.total_number_samples = sum([len(v) for v in self.valid_sampling_locations.values()])
self.use_static_covariates = use_static_covariates
def __len__(self):
"""
Returns the total number of possible (input, target) splits.
"""
return self.total_number_samples
def __getitem__(self, idx: int):
# get idx of target series
target_idx = 0
while idx >= len(self.valid_sampling_locations[target_idx]):
idx -= len(self.valid_sampling_locations[target_idx])
target_idx += 1
# get sampling location within the target series
sampling_location = self.valid_sampling_locations[target_idx][idx]
# get target series
target_series = self.target_series[target_idx].values()
past_target_series = target_series[sampling_location : sampling_location + self.input_chunk_length]
future_target_series = target_series[sampling_location + self.input_chunk_length : sampling_location + self.total_length]
# get past covariates
if self.past_covariates is not None:
past_covariates = self.past_covariates[target_idx].values()
past_covariates = past_covariates[sampling_location : sampling_location + self.input_chunk_length]
else:
past_covariates = None
# get future covariates
if self.future_covariates is not None:
future_covariates = self.future_covariates[target_idx].values()
historic_future_covariates = future_covariates[sampling_location : sampling_location + self.input_chunk_length]
future_covariates = future_covariates[sampling_location + self.input_chunk_length : sampling_location + self.total_length]
else:
future_covariates = None
historic_future_covariates = None
# get static covariates
if self.use_static_covariates:
static_covariates = self.target_series[target_idx].static_covariates_values(copy=True)
else:
static_covariates = None
# return elements that are not None
if self.remove_nan:
out = []
out += [past_target_series] if past_target_series is not None else []
out += [past_covariates] if past_covariates is not None else []
out += [historic_future_covariates] if historic_future_covariates is not None else []
out += [future_covariates] if future_covariates is not None else []
out += [static_covariates] if static_covariates is not None else []
out += [future_target_series] if future_target_series is not None else []
return tuple(out)
else:
return tuple([past_target_series,
past_covariates,
historic_future_covariates,
future_covariates,
static_covariates,
future_target_series])
class SamplingDatasetInferenceMixed(MixedCovariatesInferenceDataset):
def __init__(
self,
target_series: Union[TimeSeries, Sequence[TimeSeries]],
past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
n: int = 1,
input_chunk_length: int = 12,
output_chunk_length: int = 1,
use_static_covariates: bool = True,
random_state: Optional[int] = 0,
max_samples_per_ts: Optional[int] = None,
array_output_only: bool = False,
):
"""
Parameters
----------
target_series
One or a sequence of target `TimeSeries`.
past_covariates
Optionally, one or a sequence of `TimeSeries` containing past-observed covariates. If this parameter is set,
the provided sequence must have the same length as that of `target_series`. Moreover, all
covariates in the sequence must have a time span large enough to contain all the required slices.
The joint slicing of the target and covariates is relying on the time axes of both series.
future_covariates
Optionally, one or a sequence of `TimeSeries` containing future-known covariates. This has to follow
the same constraints as `past_covariates`.
n
Number of predictions into the future, could be greater than the output chunk length, in which case, the model
will be called autorregressively.
output_chunk_length
The length of the "output" series emitted by the model
input_chunk_length
The length of the "input" series fed to the model
use_static_covariates
Whether to use/include static covariate data from input series.
random_state
The random state to use for sampling.
max_samples_per_ts
The maximum number of samples to be drawn from each time series. If None, all samples will be drawn.
array_output_only
Whether __getitem__ returns only the arrays or adds the full `TimeSeries` object to the output tuple
This may cause problems with the torch collate and loader functions but works for Darts.
"""
super().__init__(target_series = target_series,
past_covariates = past_covariates,
future_covariates = future_covariates,
n = n,
input_chunk_length = input_chunk_length,
output_chunk_length = output_chunk_length,)
self.target_series = (
[target_series] if isinstance(target_series, TimeSeries) else target_series
)
self.past_covariates = (
[past_covariates] if isinstance(past_covariates, TimeSeries) else past_covariates
)
self.future_covariates = (
[future_covariates] if isinstance(future_covariates, TimeSeries) else future_covariates
)
# checks
raise_if_not(
future_covariates is None or len(self.target_series) == len(self.future_covariates),
"The provided sequence of target series must have the same length as "
"the provided sequence of covariate series.",
)
raise_if_not(
past_covariates is None or len(self.target_series) == len(self.past_covariates),
"The provided sequence of target series must have the same length as "
"the provided sequence of covariate series.",
)
# get valid sampling locations
self.valid_sampling_locations = get_valid_sampling_locations(target_series,
output_chunk_length,
input_chunk_length,
random_state,
max_samples_per_ts,)
# set parameters
self.output_chunk_length = output_chunk_length
self.input_chunk_length = input_chunk_length
self.total_length = input_chunk_length + output_chunk_length
self.total_number_samples = sum([len(v) for v in self.valid_sampling_locations.values()])
self.use_static_covariates = use_static_covariates
self.array_output_only = array_output_only
def __len__(self):
"""
Returns the total number of possible (input, target) splits.
"""
return self.total_number_samples
def __getitem__(self, idx: int):
# get idx of target series
target_idx = 0
while idx >= len(self.valid_sampling_locations[target_idx]):
idx -= len(self.valid_sampling_locations[target_idx])
target_idx += 1
# get sampling location within the target series
sampling_location = self.valid_sampling_locations[target_idx][idx]
# get target series
target_series = self.target_series[target_idx]
past_target_series_with_time = target_series[sampling_location : sampling_location + self.input_chunk_length]
past_end = past_target_series_with_time.time_index[-1]
target_series = self.target_series[target_idx].values()
past_target_series = target_series[sampling_location : sampling_location + self.input_chunk_length]
# get past covariates
if self.past_covariates is not None:
past_covariates = self.past_covariates[target_idx].values()
past_covariates = past_covariates[sampling_location : sampling_location + self.input_chunk_length]
future_past_covariates = past_covariates[sampling_location + self.input_chunk_length : sampling_location + self.total_length]
else:
past_covariates = None
future_past_covariates = None
# get future covariates
if self.future_covariates is not None:
future_covariates = self.future_covariates[target_idx].values()
historic_future_covariates = future_covariates[sampling_location : sampling_location + self.input_chunk_length]
future_covariates = future_covariates[sampling_location + self.input_chunk_length : sampling_location + self.total_length]
else:
future_covariates = None
historic_future_covariates = None
# get static covariates
if self.use_static_covariates:
static_covariates = self.target_series[target_idx].static_covariates_values(copy=True)
else:
static_covariates = None
# whether to remove Timeseries and None and return only arrays
if self.array_output_only:
out = []
out += [past_target_series] if past_target_series is not None else []
out += [past_covariates] if past_covariates is not None else []
out += [historic_future_covariates] if historic_future_covariates is not None else []
out += [future_covariates] if future_covariates is not None else []
out += [future_past_covariates] if future_past_covariates is not None else []
out += [static_covariates] if static_covariates is not None else []
return tuple(out)
else:
return tuple([past_target_series,
past_covariates,
historic_future_covariates,
future_covariates,
future_past_covariates,
static_covariates,
past_target_series_with_time,
past_end + past_target_series_with_time.freq
])
def evalsample(
self, idx: int
) -> TimeSeries:
"""
Returns the future target series at the given index.
"""
# get idx of target series
target_idx = 0
while idx >= len(self.valid_sampling_locations[target_idx]):
idx -= len(self.valid_sampling_locations[target_idx])
target_idx += 1
# get sampling location within the target series
sampling_location = self.valid_sampling_locations[target_idx][idx]
# get target series
target_series = self.target_series[target_idx][sampling_location + self.input_chunk_length : sampling_location + self.total_length]
return target_series
class SamplingDatasetInferencePast(PastCovariatesInferenceDataset):
def __init__(
self,
target_series: Union[TimeSeries, Sequence[TimeSeries]],
covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
n: int = 1,
input_chunk_length: int = 12,
output_chunk_length: int = 1,
use_static_covariates: bool = True,
random_state: Optional[int] = 0,
max_samples_per_ts: Optional[int] = None,
array_output_only: bool = False,
):
"""
Parameters
----------
target_series
One or a sequence of target `TimeSeries`.
past_covariates
Optionally, one or a sequence of `TimeSeries` containing past-observed covariates. If this parameter is set,
the provided sequence must have the same length as that of `target_series`. Moreover, all
covariates in the sequence must have a time span large enough to contain all the required slices.
The joint slicing of the target and covariates is relying on the time axes of both series.
n
Number of predictions into the future, could be greater than the output chunk length, in which case, the model
will be called autorregressively.
output_chunk_length
The length of the "output" series emitted by the model
input_chunk_length
The length of the "input" series fed to the model
use_static_covariates
Whether to use/include static covariate data from input series.
random_state
The random state to use for sampling.
max_samples_per_ts
The maximum number of samples to be drawn from each time series. If None, all samples will be drawn.
array_output_only
Whether __getitem__ returns only the arrays or adds the full `TimeSeries` object to the output tuple
This may cause problems with the torch collate and loader functions but works for Darts.
"""
super().__init__(target_series = target_series,
covariates = covariates,
n = n,
input_chunk_length = input_chunk_length,
output_chunk_length = output_chunk_length,)
self.target_series = (
[target_series] if isinstance(target_series, TimeSeries) else target_series
)
self.covariates = (
[covariates] if isinstance(covariates, TimeSeries) else covariates
)
raise_if_not(
covariates is None or len(self.target_series) == len(self.covariates),
"The provided sequence of target series must have the same length as "
"the provided sequence of covariate series.",
)
# get valid sampling locations
self.valid_sampling_locations = get_valid_sampling_locations(target_series,
output_chunk_length,
input_chunk_length,
random_state,
max_samples_per_ts,)
# set parameters
self.output_chunk_length = output_chunk_length
self.input_chunk_length = input_chunk_length
self.total_length = input_chunk_length + output_chunk_length
self.total_number_samples = sum([len(v) for v in self.valid_sampling_locations.values()])
self.use_static_covariates = use_static_covariates
self.array_output_only = array_output_only
def __len__(self):
"""
Returns the total number of possible (input, target) splits.
"""
return self.total_number_samples
def __getitem__(self, idx: int):
# get idx of target series
target_idx = 0
while idx >= len(self.valid_sampling_locations[target_idx]):
idx -= len(self.valid_sampling_locations[target_idx])
target_idx += 1
# get sampling location within the target series
sampling_location = self.valid_sampling_locations[target_idx][idx]
# get target series
target_series = self.target_series[target_idx]
past_target_series_with_time = target_series[sampling_location : sampling_location + self.input_chunk_length]
past_end = past_target_series_with_time.time_index[-1]
target_series = self.target_series[target_idx].values()
past_target_series = target_series[sampling_location : sampling_location + self.input_chunk_length]
# get past covariates
if self.covariates is not None:
past_covariates = self.covariates[target_idx].values()
past_covariates = past_covariates[sampling_location : sampling_location + self.input_chunk_length]
future_past_covariates = past_covariates[sampling_location + self.input_chunk_length : sampling_location + self.total_length]
else:
past_covariates = None
future_past_covariates = None
# get static covariates
if self.use_static_covariates:
static_covariates = self.target_series[target_idx].static_covariates_values(copy=True)
else:
static_covariates = None
# return arrays or arrays with TimeSeries
if self.array_output_only:
out = []
out += [past_target_series] if past_target_series is not None else []
out += [past_covariates] if past_covariates is not None else []
out += [future_past_covariates] if future_past_covariates is not None else []
out += [static_covariates] if static_covariates is not None else []
return tuple(out)
else:
return tuple([past_target_series,
past_covariates,
future_past_covariates,
static_covariates,
past_target_series_with_time,
past_end + past_target_series_with_time.freq])
def evalsample(
self, idx: int
) -> TimeSeries:
"""
Returns the future target series at the given index.
"""
# get idx of target series
target_idx = 0
while idx >= len(self.valid_sampling_locations[target_idx]):
idx -= len(self.valid_sampling_locations[target_idx])
target_idx += 1
# get sampling location within the target series
sampling_location = self.valid_sampling_locations[target_idx][idx]
# get target series
target_series = self.target_series[target_idx][sampling_location + self.input_chunk_length : sampling_location + self.total_length]
return target_series
class SamplingDatasetInferenceDual(DualCovariatesInferenceDataset):
def __init__(
self,
target_series: Union[TimeSeries, Sequence[TimeSeries]],
covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
n: int = 12,
input_chunk_length: int = 12,
output_chunk_length: int = 1,
use_static_covariates: bool = True,
random_state: Optional[int] = 0,
max_samples_per_ts: Optional[int] = None,
array_output_only: bool = False,
):
"""
Parameters
----------
target_series
One or a sequence of target `TimeSeries`.
covariates
Optionally, some future-known covariates that are used for predictions. This argument is required
if the model was trained with future-known covariates.
n
Number of predictions into the future, could be greater than the output chunk length, in which case, the model
will be called autorregressively.
output_chunk_length
The length of the "output" series emitted by the model
input_chunk_length
The length of the "input" series fed to the model
use_static_covariates
Whether to use/include static covariate data from input series.
random_state
The random state to use for sampling.
max_samples_per_ts
The maximum number of samples to be drawn from each time series. If None, all samples will be drawn.
array_output_only
Whether __getitem__ returns only the arrays or adds the full `TimeSeries` object to the output tuple
This may cause problems with the torch collate and loader functions but works for Darts.
"""
super().__init__(target_series = target_series,
covariates = covariates,
n = n,
input_chunk_length = input_chunk_length,
output_chunk_length = output_chunk_length,)
self.target_series = (
[target_series] if isinstance(target_series, TimeSeries) else target_series
)
self.covariates = (
[covariates] if isinstance(covariates, TimeSeries) else covariates
)
raise_if_not(
covariates is None or len(self.target_series) == len(self.covariates),
"The provided sequence of target series must have the same length as "
"the provided sequence of covariate series.",
)
# get valid sampling locations
self.valid_sampling_locations = get_valid_sampling_locations(target_series,
output_chunk_length,
input_chunk_length,
random_state,
max_samples_per_ts,)
# set parameters
self.output_chunk_length = output_chunk_length
self.input_chunk_length = input_chunk_length
self.total_length = input_chunk_length + output_chunk_length
self.total_number_samples = sum([len(v) for v in self.valid_sampling_locations.values()])
self.use_static_covariates = use_static_covariates
self.array_output_only = array_output_only
def __len__(self):
"""
Returns the total number of possible (input, target) splits.
"""
return self.total_number_samples
def __getitem__(self, idx: int):
# get idx of target series
target_idx = 0
while idx >= len(self.valid_sampling_locations[target_idx]):
idx -= len(self.valid_sampling_locations[target_idx])
target_idx += 1
# get sampling location within the target series
sampling_location = self.valid_sampling_locations[target_idx][idx]
# get target series
target_series = self.target_series[target_idx]
past_target_series_with_time = target_series[sampling_location : sampling_location + self.input_chunk_length]
past_end = past_target_series_with_time.time_index[-1]
target_series = self.target_series[target_idx].values()
past_target_series = target_series[sampling_location : sampling_location + self.input_chunk_length]
# get past covariates
if self.covariates is not None:
future_covariates = self.covariates[target_idx].values()
historic_future_covariates = future_covariates[sampling_location : sampling_location + self.input_chunk_length]
future_covariates = future_covariates[sampling_location + self.input_chunk_length : sampling_location + self.total_length]
else:
historic_future_covariates = None
future_covariates = None
# get static covariates
if self.use_static_covariates:
static_covariates = self.target_series[target_idx].static_covariates_values(copy=True)
else:
static_covariates = None
# return arrays or arrays with TimeSeries
if self.array_output_only:
out = []
out += [past_target_series] if past_target_series is not None else []
out += [historic_future_covariates] if historic_future_covariates is not None else []
out += [future_covariates] if future_covariates is not None else []
out += [static_covariates] if static_covariates is not None else []
return tuple(out)
else:
return tuple([past_target_series,
historic_future_covariates,
future_covariates,
static_covariates,
past_target_series_with_time,
past_end + past_target_series_with_time.freq,])
def evalsample(
self, idx: int
) -> TimeSeries:
"""
Returns the future target series at the given index.
"""
# get idx of target series
target_idx = 0
while idx >= len(self.valid_sampling_locations[target_idx]):
idx -= len(self.valid_sampling_locations[target_idx])
target_idx += 1
# get sampling location within the target series
sampling_location = self.valid_sampling_locations[target_idx][idx]
# get target series
target_series = self.target_series[target_idx][sampling_location + self.input_chunk_length : sampling_location + self.total_length]
return target_series