Documentation Index
Fetch the complete documentation index at: https://nixtlaverse.nixtla.io/llms.txt
Use this file to discover all available pages before exploring further.
The FugueBackend class enables distributed computation for StatsForecast using Fugue, which provides a unified interface for Spark, Dask, and Ray backends without requiring code rewrites.
Overview
With FugueBackend, you can:
- Distribute forecasting and cross-validation across clusters
- Switch between Spark, Dask, and Ray without changing your code
- Scale to large datasets with parallel processing
- Maintain the same API as the standard StatsForecast interface
API Reference
FugueBackend
FugueBackend(engine=None, conf=None, **transform_kwargs)
Bases: ParallelBackend
FugueBackend for Distributed Computation.
Source code.
This class uses Fugue backend capable of distributing
computation on Spark, Dask and Ray without any rewrites.
Parameters:
| Name | Type | Description | Default |
|---|
engine | ExecutionEngine | A selection between Spark, Dask, and Ray. | None |
conf | Config | Engine configuration. | None |
**transform_kwargs | Any | Additional kwargs for Fugue’s transform method. | |
FugueBackend.forecast
forecast(*, df, freq, models, fallback_model, X_df, h, level, fitted, prediction_intervals, id_col, time_col, target_col)
Memory Efficient core.StatsForecast predictions with FugueBackend.
This method uses Fugue’s transform function, in combination with
core.StatsForecast’s forecast to efficiently fit a list of StatsForecast models.
Parameters:
| Name | Type | Description | Default |
|---|
df | DataFrame | Input DataFrame containing time series data. Must have columns for series identifiers, timestamps, and target values. Can optionally include exogenous features. | required |
freq | str or int | Frequency of the time series data. Must be a valid pandas or polars offset alias (e.g., ‘D’ for daily, ‘M’ for monthly, ‘H’ for hourly), or an integer representing the number of observations per cycle. | required |
models | List[Any] | List of instantiated StatsForecast model objects. Each model should implement the forecast interface. Models must have unique names, which can be set using the alias parameter. | required |
fallback_model | Any | Model to use when a primary model fails during fitting or forecasting. Only works with the forecast and cross_validation methods. If None, exceptions from failing models will be raised. | required |
X_df | DataFrame | DataFrame containing future exogenous variables. Required if any models use exogenous features. Must include future values for all time series and forecast horizon. | required |
h | int | Forecast horizon, the number of time steps ahead to predict. | required |
level | List[float] | Confidence levels between 0 and 100 for prediction intervals (e.g., [80, 95] for 80% and 95% intervals). | required |
fitted | bool | If True, stores in-sample (fitted) predictions which can be retrieved using forecast_fitted_values(). | required |
prediction_intervals | ConformalIntervals | Configuration for calibrating prediction intervals using Conformal Prediction. | required |
id_col | str | Name of the column containing unique identifiers for each time series. Defaults to ‘unique_id’. | required |
time_col | str | Name of the column containing timestamps or time indices. Values can be timestamps (datetime) or integers. Defaults to ‘ds’. | required |
target_col | str | Name of the column containing the target variable to forecast. Defaults to ‘y’. | required |
Returns:
| Type | Description |
|---|
Any | pandas.DataFrame: DataFrame with models columns for point predictions and probabilistic predictions for all fitted models |
FugueBackend.cross_validation
cross_validation(*, df, freq, models, fallback_model, h, n_windows, step_size, test_size, input_size, level, refit, fitted, prediction_intervals, id_col, time_col, target_col)
Temporal Cross-Validation with core.StatsForecast and FugueBackend.
This method uses Fugue’s transform function, in combination with
core.StatsForecast’s cross-validation to efficiently fit a list of StatsForecast
models through multiple training windows, in either chained or rolled manner.
StatsForecast.models’ speed along with Fugue’s distributed computation allow to
overcome this evaluation technique high computational costs. Temporal cross-validation
provides better model’s generalization measurements by increasing the test’s length
and diversity.
Parameters:
| Name | Type | Description | Default |
|---|
df | DataFrame | Input DataFrame containing time series data with columns for series identifiers, timestamps, and target values. | required |
freq | str or int | Frequency of the time series data. Must be a valid pandas or polars offset alias (e.g., ‘D’ for daily, ‘M’ for monthly, ‘H’ for hourly), or an integer representing the number of observations per cycle. | required |
models | List[Any] | List of instantiated StatsForecast model objects. Each model should implement the forecast interface. Models must have unique names, which can be set using the alias parameter. | required |
fallback_model | Any | Model to use when a primary model fails during fitting or forecasting. Only works with the forecast and cross_validation methods. If None, exceptions from failing models will be raised. | required |
h | int | Forecast horizon for each validation window. | required |
n_windows | int | Number of validation windows to create. Cannot be specified together with test_size. | required |
step_size | int | Number of time steps between consecutive validation windows. Smaller values create overlapping windows. | required |
test_size | int | Total size of the test period. If provided, n_windows is computed automatically. Overrides n_windows if specified. | required |
input_size | int | Maximum number of training observations to use for each window. If None, uses expanding windows with all available history. If specified, uses rolling windows of fixed size. | required |
level | List[float] | Confidence levels between 0 and 100 for prediction intervals (e.g., [80, 95]). | required |
refit | bool or int | Controls model refitting frequency. If True, refits models for every window. If False, fits once and uses the forward method. If an integer n, refits every n windows. Models must implement the forward method when refit is not True. | required |
fitted | bool | If True, stores in-sample predictions for each window, accessible via cross_validation_fitted_values(). | required |
prediction_intervals | ConformalIntervals | Configuration for calibrating prediction intervals using Conformal Prediction. Requires level to be specified. | required |
id_col | str | Name of the column containing unique identifiers for each time series. Defaults to ‘unique_id’. | required |
time_col | str | Name of the column containing timestamps or time indices. Defaults to ‘ds’. | required |
target_col | str | Name of the column containing the target variable. Defaults to ‘y’. | required |
Returns:
| Type | Description |
|---|
Any | pandas.DataFrame: DataFrame, with models columns for point predictions and probabilistic predictions for all fitted models. |
Quick Start
Basic Usage with Spark
from statsforecast.core import StatsForecast
from statsforecast.models import AutoARIMA, AutoETS
from statsforecast.utils import generate_series
from pyspark.sql import SparkSession
# Generate example data
n_series = 4
horizon = 7
series = generate_series(n_series)
# Create Spark session
spark = SparkSession.builder.getOrCreate()
# Convert unique_id to string and create Spark DataFrame
series['unique_id'] = series['unique_id'].astype(str)
sdf = spark.createDataFrame(series)
# Use StatsForecast with Spark DataFrame (automatically uses FugueBackend)
sf = StatsForecast(
models=[AutoETS(season_length=7)],
freq='D',
)
# Returns a Spark DataFrame
results = sf.cross_validation(
df=sdf,
h=horizon,
step_size=24,
n_windows=2,
level=[90]
)
results.show()
Basic Forecasting
from statsforecast import StatsForecast
from statsforecast.models import AutoETS
from statsforecast.utils import generate_series
# Generate data
series = generate_series(n_series=4)
# Standard usage (pandas/polars)
sf = StatsForecast(
models=[AutoETS(season_length=7)],
freq='D',
)
# Forecast with pandas DataFrame
sf.cross_validation(
df=series,
h=7,
step_size=24,
n_windows=2,
level=[90]
).head()
Dask Distributed Example
Here’s a complete example using Dask for distributed predictions:
import dask.dataframe as dd
from dask.distributed import Client
from fugue_dask import DaskExecutionEngine
from statsforecast import StatsForecast
from statsforecast.models import Naive
from statsforecast.utils import generate_series
# Generate synthetic panel data
df = generate_series(10)
df['unique_id'] = df['unique_id'].astype(str)
df = dd.from_pandas(df, npartitions=10)
# Instantiate Dask client and execution engine
dask_client = Client()
engine = DaskExecutionEngine(dask_client=dask_client)
# Create StatsForecast instance
sf = StatsForecast(models=[Naive()], freq='D')
Distributed Forecast
The FugueBackend automatically handles distributed forecasting when you pass a Dask/Spark/Ray DataFrame:
# Distributed predictions
forecast_df = sf.forecast(df=df, h=12).compute()
# With fitted values
sf = StatsForecast(models=[Naive()], freq='D')
forecast_df = sf.forecast(df=df, h=12, fitted=True).compute()
fitted_df = sf.forecast_fitted_values().compute()
Distributed Cross-Validation
Perform distributed temporal cross-validation across your cluster:
# Distributed cross-validation
cv_results = sf.cross_validation(
df=df,
h=12,
n_windows=2
).compute()
How It Works
-
Automatic Detection: When you pass a Spark, Dask, or Ray DataFrame to StatsForecast methods, the FugueBackend is automatically used.
-
Data Partitioning: Data is partitioned by
unique_id, allowing parallel processing across different time series.
-
Distributed Execution: Each partition is processed independently using the standard StatsForecast logic.
-
Result Aggregation: Results are collected and returned in the same format as the input (Spark/Dask/Ray DataFrame).
Supported Backends
- Apache Spark: For large-scale distributed processing
- Dask: For flexible distributed computing with Python
- Ray: For modern distributed machine learning workloads
Notes
- Ensure your cluster has sufficient resources for the number of time series and models
- The
unique_id column should be string type for distributed operations
- Use
.compute() on Dask DataFrames to materialize results
- Use
.show() or .collect() on Spark DataFrames to view results
See Also