Skip to content

rolling_regressor

Classes:

Name Description
RollingRegressor

Incremental regressor with a fixed-size rolling window.

RollingRegressor

RollingRegressor(
    module: Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Type[Optimizer]] = "sgd",
    lr: float = 0.001,
    is_feature_incremental: bool = False,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs
)

Bases: RollingDeepEstimator, Regressor

Incremental regressor with a fixed-size rolling window.

Maintains the most recent window_size observations in a deque and feeds them as a (sequence_length, batch=1, n_features) tensor to the wrapped PyTorch module. This enables simple sequence style conditioning for models such as RNN/LSTM/GRU without storing the full historical stream.

Parameters:

Name Type Description Default
module Module

Wrapped regression module (expects rolling tensor input shape).

required
loss_fn str | Callable

Loss used for optimisation.

'mse'
optimizer_fn str | type

Optimizer specification.

'sgd'
lr float

Learning rate.

1e-3
is_feature_incremental bool

Whether to expand the first trainable layer when new feature names appear.

False
device str

Torch device.

'cpu'
seed int

Random seed.

42
window_size int

Number of most recent samples kept in the rolling buffer.

10
append_predict bool

If True, predicted samples (during prediction) are appended to the window enabling simple autoregressive rollouts.

False
**kwargs

Forwarded to :class:~deep_river.base.RollingDeepEstimator.

{}

Examples:

Real-world regression example using the Bikes dataset from river. We keep only
the numeric features so the rolling tensor construction succeeds. A small GRU
is trained online and we track a running MAE. The exact value may vary across
library versions and hardware.
>>> import random, numpy as np
>>> from torch import nn, manual_seed
>>> from river import datasets, metrics
>>> from deep_river.regression.rolling_regressor import RollingRegressor
>>> _ = manual_seed(42)
>>> random.seed(42)
>>> np.random.seed(42)
>>> first_x, _ = next(iter(datasets.Bikes()))
>>> numeric_keys = sorted([k for k, v in first_x.items() if isinstance(v, (int, float))])
>>> class TinySeq(nn.Module):
...     def __init__(self, n_features):
...         super().__init__()
...         self.rnn = nn.GRU(n_features, 8)
...         self.head = nn.Linear(8, 1)
...     def forward(self, x):
...         out, _ = self.rnn(x)
...         return self.head(out[-1])
>>> model = RollingRegressor(module=TinySeq(len(numeric_keys)), window_size=8)
>>> mae = metrics.MAE()
>>> window_size = 8
>>> for i, (x, y) in enumerate(datasets.Bikes().take(200)):
...     x_num = {k: x[k] for k in numeric_keys}
...     if i >= window_size:
...         y_pred = model.predict_one(x_num)
...         mae.update(y, y_pred)
...     model.learn_one(x_num, y)
>>> assert 0.0 <= mae.get() < 15.0
>>> print(f"MAE: {mae.get():.4f}")
MAE: ...

Methods:

Name Description
clone

Return a fresh estimator instance with (optionally) copied state.

draw

Render a (partial) computational graph of the wrapped model.

learn_many

Batch update with multiple samples using the rolling window.

learn_one

Update model using a single (x, y) and current rolling window.

load

Load a previously saved estimator.

predict_many

Predict targets for multiple samples (appends to a copy of the window).

predict_one

Predict a single regression target using rolling context.

save

Persist the estimator (architecture, weights, optimiser & runtime state).

Source code in deep_river/regression/rolling_regressor.py
def __init__(
    self,
    module: torch.nn.Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Type[optim.Optimizer]] = "sgd",
    lr: float = 1e-3,
    is_feature_incremental: bool = False,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs,
):
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        lr=lr,
        is_feature_incremental=is_feature_incremental,
        device=device,
        seed=seed,
        window_size=window_size,
        append_predict=append_predict,
        **kwargs,
    )

clone

clone(
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
)

Return a fresh estimator instance with (optionally) copied state.

Parameters:

Name Type Description Default
new_params dict | None

Parameter overrides for the cloned instance.

None
include_attributes bool

If True, runtime state (observed features, buffers) is also copied.

False
copy_weights bool

If True, model weights are copied (otherwise the module is re‑initialised).

False
Source code in deep_river/base.py
def clone(
    self,
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
):
    """Return a fresh estimator instance with (optionally) copied state.

    Parameters
    ----------
    new_params : dict | None
        Parameter overrides for the cloned instance.
    include_attributes : bool, default=False
        If True, runtime state (observed features, buffers) is also copied.
    copy_weights : bool, default=False
        If True, model weights are copied (otherwise the module is re‑initialised).
    """
    new_params = new_params or {}
    copy_weights = new_params.pop("copy_weights", copy_weights)

    params = {**self._get_all_init_params(), **new_params}

    if "module" not in new_params:
        params["module"] = self._rebuild_module()

    new_est = self.__class__(**self._filter_kwargs(self.__class__.__init__, params))

    if copy_weights and hasattr(self.module, "state_dict"):
        new_est.module.load_state_dict(self.module.state_dict())

    if include_attributes:
        new_est._restore_runtime_state(self._get_runtime_state())

    return new_est

draw

draw()

Render a (partial) computational graph of the wrapped model.

Imports graphviz and torchviz lazily. Raises an informative ImportError if the optional dependencies are not installed.

Source code in deep_river/base.py
def draw(self):  # type: ignore[override]
    """Render a (partial) computational graph of the wrapped model.

    Imports ``graphviz`` and ``torchviz`` lazily. Raises an informative
    ImportError if the optional dependencies are not installed.
    """
    try:  # pragma: no cover
        from torchviz import make_dot  # type: ignore
    except Exception as err:  # noqa: BLE001
        raise ImportError(
            "graphviz and torchviz must be installed to draw the model."
        ) from err

    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

learn_many

learn_many(X: DataFrame, y: Series) -> None

Batch update with multiple samples using the rolling window.

Only performs an optimisation step once the internal window has reached window_size length to ensure a full sequence is available.

Source code in deep_river/regression/rolling_regressor.py
def learn_many(self, X: pd.DataFrame, y: pd.Series) -> None:
    """Batch update with multiple samples using the rolling window.

    Only performs an optimisation step once the internal window has reached
    ``window_size`` length to ensure a full sequence is available.
    """
    self._update_observed_features(X)

    X = X[list(self.observed_features)]
    self._x_window.extend(X.values.tolist())

    if len(self._x_window) == self.window_size:
        X_t = self._deque2rolling_tensor(self._x_window)

        # Convert y to tensor (ensuring proper shape for regression)
        y_t = torch.tensor(y.values, dtype=torch.float32, device=self.device).view(
            -1, 1
        )

        self._learn(x=X_t, y=y_t)

learn_one

learn_one(x: dict, y: RegTarget, **kwargs) -> None

Update model using a single (x, y) and current rolling window.

Parameters:

Name Type Description Default
x dict

Feature mapping.

required
y float

Target value.

required
Source code in deep_river/regression/rolling_regressor.py
def learn_one(self, x: dict, y: base.typing.RegTarget, **kwargs) -> None:
    """Update model using a single (x, y) and current rolling window.

    Parameters
    ----------
    x : dict
        Feature mapping.
    y : float
        Target value.
    """
    self._update_observed_features(x)

    self._x_window.append([x.get(feature, 0) for feature in self.observed_features])

    x_t = self._deque2rolling_tensor(self._x_window)

    # Convert y to tensor (ensuring proper shape for regression)
    y_t = torch.tensor([y], dtype=torch.float32, device=self.device).view(-1, 1)

    self._learn(x=x_t, y=y_t)

load classmethod

load(filepath: Union[str, Path])

Load a previously saved estimator.

The method reconstructs the estimator class, its wrapped module, optimiser state and runtime information (feature names, buffers, etc.).

Source code in deep_river/base.py
@classmethod
def load(cls, filepath: Union[str, Path]):
    """Load a previously saved estimator.

    The method reconstructs the estimator class, its wrapped module, optimiser
    state and runtime information (feature names, buffers, etc.).
    """
    with open(filepath, "rb") as f:
        state = pickle.load(f)

    estimator_cls = cls._import_from_path(state["estimator_class"])
    init_params = state["init_params"]

    # Rebuild module if needed
    if "module" in init_params and isinstance(init_params["module"], dict):
        module_info = init_params.pop("module")
        module_cls = cls._import_from_path(module_info["class"])
        module = module_cls(
            **cls._filter_kwargs(module_cls.__init__, module_info["kwargs"])
        )
        if state.get("model_state_dict"):
            module.load_state_dict(state["model_state_dict"])
        init_params["module"] = module

    estimator = estimator_cls(
        **cls._filter_kwargs(estimator_cls.__init__, init_params)
    )

    if state.get("optimizer_state_dict") and hasattr(estimator, "optimizer"):
        try:
            estimator.optimizer.load_state_dict(
                state["optimizer_state_dict"]  # type: ignore[arg-type]
            )
        except Exception:  # noqa: E722
            pass

    estimator._restore_runtime_state(state.get("runtime_state", {}))
    return estimator

predict_many

predict_many(X: DataFrame) -> DataFrame

Predict targets for multiple samples (appends to a copy of the window).

Returns a single-column DataFrame named 'y_pred'.

Source code in deep_river/regression/rolling_regressor.py
def predict_many(self, X: pd.DataFrame) -> pd.DataFrame:
    """Predict targets for multiple samples (appends to a copy of the window).

    Returns a single-column DataFrame named ``'y_pred'``.
    """

    self._update_observed_features(X)
    X = X[list(self.observed_features)]
    x_win = self._x_window.copy()
    x_win.extend(X.values.tolist())
    if self.append_predict:
        self._x_window = x_win

    self.module.eval()
    with torch.inference_mode():
        x_t = self._deque2rolling_tensor(x_win)
        y_preds = self.module(x_t)
        if isinstance(y_preds, torch.Tensor):
            y_preds = y_preds.detach().cpu().view(-1).numpy().tolist()

    return pd.DataFrame({"y_pred": y_preds})

predict_one

predict_one(x: dict) -> RegTarget

Predict a single regression target using rolling context.

Parameters:

Name Type Description Default
x dict

Feature mapping.

required

Returns:

Type Description
float

Predicted target value.

Source code in deep_river/regression/rolling_regressor.py
def predict_one(self, x: dict) -> base.typing.RegTarget:
    """Predict a single regression target using rolling context.

    Parameters
    ----------
    x : dict
        Feature mapping.

    Returns
    -------
    float
        Predicted target value.
    """
    self._update_observed_features(x)

    x_win = self._x_window.copy()
    x_win.append([x.get(feature, 0) for feature in self.observed_features])
    if self.append_predict:
        self._x_window = x_win

    self.module.eval()
    with torch.inference_mode():
        x_t = self._deque2rolling_tensor(x_win)
        y_pred = self.module(x_t)
        if isinstance(y_pred, torch.Tensor):
            y_pred = y_pred.detach().view(-1)[-1].cpu().numpy().item()
        else:
            y_pred = float(y_pred)

    return y_pred

save

save(filepath: Union[str, Path]) -> None

Persist the estimator (architecture, weights, optimiser & runtime state).

Parameters:

Name Type Description Default
filepath str | Path

Destination file. Parent directories are created automatically.

required
Source code in deep_river/base.py
def save(self, filepath: Union[str, Path]) -> None:
    """Persist the estimator (architecture, weights, optimiser & runtime state).

    Parameters
    ----------
    filepath : str | Path
        Destination file. Parent directories are created automatically.
    """
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)

    state = {
        "estimator_class": f"{type(self).__module__}.{type(self).__name__}",
        "init_params": self._get_all_init_params(),
        "model_state_dict": getattr(self.module, "state_dict", lambda: {})(),
        "optimizer_state_dict": getattr(self.optimizer, "state_dict", lambda: {})(),
        "runtime_state": self._get_runtime_state(),
    }

    with open(filepath, "wb") as f:
        pickle.dump(state, f)