Skip to content

rolling_regressor

Classes:

Name Description
RollingRegressor

Wrapper that feeds a sliding window of the most recent examples to the

RollingRegressorInitialized

RollingRegressorInitialized class built for regression tasks with a

RollingRegressor

RollingRegressor(
    module: Type[Module],
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 0.001,
    is_feature_incremental: bool = False,
    window_size: int = 10,
    append_predict: bool = False,
    device: str = "cpu",
    seed: int = 42,
    **kwargs
)

Bases: RollingDeepEstimator, Regressor

Wrapper that feeds a sliding window of the most recent examples to the wrapped PyTorch regression model.

Parameters:

Name Type Description Default
module Type[Module]

Torch Module that builds the autoencoder to be wrapped. The Module should accept parameter n_features so that the returned model's input shape can be determined based on the number of features in the initial training example.

required
loss_fn Union[str, Callable]

Loss function to be used for training the wrapped model. Can be a loss function provided by torch.nn.functional or one of the following: 'mse', 'l1', 'cross_entropy', 'binary_crossentropy', 'smooth_l1', 'kl_div'.

'mse'
optimizer_fn Union[str, Callable]

Optimizer to be used for training the wrapped model. Can be an optimizer class provided by torch.optim or one of the following: "adam", "adam_w", "sgd", "rmsprop", "lbfgs".

'sgd'
lr float

Learning rate of the optimizer.

0.001
is_feature_incremental bool

Whether the model should adapt to the appearance of previously features by adding units to the input layer of the network.

False
device str

Device to run the wrapped model on. Can be "cpu" or "cuda".

'cpu'
seed int

Random seed to be used for training the wrapped model.

42
window_size int

Number of recent examples to be fed to the wrapped model at each step.

10
append_predict bool

Whether to append inputs passed for prediction to the rolling window.

False
**kwargs

Parameters to be passed to the Module or the optimizer.

{}

Methods:

Name Description
clone

Clones the estimator.

draw

Draws the wrapped model.

initialize_module

Parameters

learn_one

Performs one step of training with the sliding

predict_one

Predicts the target value for the current sliding

Source code in deep_river/regression/rolling_regressor.py
def __init__(
    self,
    module: Type[torch.nn.Module],
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 1e-3,
    is_feature_incremental: bool = False,
    window_size: int = 10,
    append_predict: bool = False,
    device: str = "cpu",
    seed: int = 42,
    **kwargs,
):
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        device=device,
        optimizer_fn=optimizer_fn,
        lr=lr,
        is_feature_incremental=is_feature_incremental,
        window_size=window_size,
        append_predict=append_predict,
        seed=seed,
        **kwargs,
    )

clone

clone(
    new_params: dict[Any, Any] | None = None,
    include_attributes=False,
)

Clones the estimator.

Parameters:

Name Type Description Default
new_params dict[Any, Any] | None

New parameters to be passed to the cloned estimator.

None
include_attributes

If True, the attributes of the estimator will be copied to the cloned estimator. This is useful when the estimator is a transformer and the attributes are the learned parameters.

False

Returns:

Type Description
DeepEstimator

The cloned estimator.

Source code in deep_river/base.py
def clone(
    self,
    new_params: dict[Any, Any] | None = None,
    include_attributes=False,
):
    """Clones the estimator.

    Parameters
    ----------
    new_params
        New parameters to be passed to the cloned estimator.
    include_attributes
        If True, the attributes of the estimator will be copied to the
        cloned estimator. This is useful when the estimator is a
        transformer and the attributes are the learned parameters.

    Returns
    -------
    DeepEstimator
        The cloned estimator.
    """
    new_params = new_params or {}
    new_params.update(self.kwargs)
    new_params.update(self._get_params())
    new_params.update({"module": self.module_cls})

    clone = self.__class__(**new_params)
    if include_attributes:
        clone.__dict__.update(self.__dict__)
    return clone

draw

draw() -> Digraph

Draws the wrapped model.

Source code in deep_river/base.py
def draw(self) -> Digraph:
    """Draws the wrapped model."""
    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

initialize_module

initialize_module(x: dict | DataFrame, **kwargs)

Parameters:

Name Type Description Default
module

The instance or class or callable to be initialized, e.g. self.module.

required
kwargs dict

The keyword arguments to initialize the instance or class. Can be an empty dict.

{}

Returns:

Type Description
instance

The initialized component.

Source code in deep_river/base.py
def initialize_module(self, x: dict | pd.DataFrame, **kwargs):
    """
    Parameters
    ----------
    module
      The instance or class or callable to be initialized, e.g.
      ``self.module``.
    kwargs : dict
      The keyword arguments to initialize the instance or class. Can be an
      empty dict.
    Returns
    -------
    instance
      The initialized component.
    """
    torch.manual_seed(self.seed)
    if isinstance(x, Dict):
        n_features = len(x)
    elif isinstance(x, pd.DataFrame):
        n_features = len(x.columns)

    if not isinstance(self.module_cls, torch.nn.Module):
        self.module = self.module_cls(
            n_features=n_features,
            **self._filter_kwargs(self.module_cls, kwargs),
        )

    self.module.to(self.device)
    self.optimizer = self.optimizer_func(self.module.parameters(), lr=self.lr)
    self.module_initialized = True

    self._get_input_output_layers(n_features=n_features)

learn_one

learn_one(x: dict, y: RegTarget, **kwargs) -> None

Performs one step of training with the sliding window of the most recent examples.

Parameters:

Name Type Description Default
x dict

Input example.

required
y RegTarget

Target value.

required

Returns:

Type Description
RollingRegressor

The regressor itself.

Source code in deep_river/regression/rolling_regressor.py
def learn_one(self, x: dict, y: base.typing.RegTarget, **kwargs) -> None:
    """
    Performs one step of training with the sliding
    window of the most recent examples.

    Parameters
    ----------
    x
        Input example.
    y
        Target value.

    Returns
    -------
    RollingRegressor
        The regressor itself.
    """
    if not self.module_initialized:
        self._update_observed_features(x)
        self.initialize_module(x=x, **self.kwargs)

    self._adapt_input_dim(x)
    self._x_window.append([x.get(feature, 0) for feature in self.observed_features])

    if len(self._x_window) == self.window_size:
        x_t = deque2rolling_tensor(self._x_window, device=self.device)
        y_t = float2tensor(y, device=self.device)
        self._learn(x_t, y_t)

predict_one

predict_one(x: dict)

Predicts the target value for the current sliding window of most recent examples.

Parameters:

Name Type Description Default
x dict

Input example.

required

Returns:

Type Description
RegTarget

Predicted target value.

Source code in deep_river/regression/rolling_regressor.py
def predict_one(self, x: dict):
    """
    Predicts the target value for the current sliding
    window of most recent examples.

    Parameters
    ----------
    x
        Input example.

    Returns
    -------
    RegTarget
        Predicted target value.
    """
    if not self.module_initialized:

        self._update_observed_features(x)
        self.initialize_module(x=x, **self.kwargs)

    self._adapt_input_dim(x)
    x_win = self._x_window.copy()
    x_win.append([x.get(feature, 0) for feature in self.observed_features])
    if self.append_predict:
        self._x_window = x_win

    self.module.eval()
    with torch.inference_mode():
        x_t = deque2rolling_tensor(x_win, device=self.device)
        res = self.module(x_t).numpy(force=True).item()

    return res

RollingRegressorInitialized

RollingRegressorInitialized(
    module: Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Type[Optimizer]] = "sgd",
    lr: float = 0.001,
    is_feature_incremental: bool = False,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs
)

Bases: RollingDeepEstimatorInitialized, RegressorInitialized

RollingRegressorInitialized class built for regression tasks with a window-based learning mechanism. Handles incremental learning by maintaining a sliding window of training data for both individual examples and batches of data. Enables feature incremental updates and compatibility with PyTorch modules. Ideal for time-series or sequential data tasks where the training set changes dynamically.

Attributes:

Name Type Description
module Module

A PyTorch neural network model that defines the architecture of the regressor.

loss_fn Union[str, Callable]

Loss function used for optimization. Either a string (e.g., "mse") or a callable.

optimizer_fn Union[str, Type[Optimizer]]

Optimizer function or string used for training the neural network model.

lr float

Learning rate for the optimizer.

is_feature_incremental bool

Whether the model incrementally updates its features during training.

device str

Target device for model training and inference (e.g., "cpu", "cuda").

seed int

Random seed for reproducibility.

window_size int

Size of the sliding window used for storing the most recent training examples.

append_predict bool

Whether predictions should contribute to the sliding window data.

Methods:

Name Description
learn_many

Performs one step of training with the most recent training examples

learn_one

Performs one step of training with the most recent training examples

predict_many

Predict the probability of each label given the most recent examples

predict_one

Predict the probability of each label given the most recent examples

Source code in deep_river/regression/rolling_regressor.py
def __init__(
    self,
    module: torch.nn.Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Type[optim.Optimizer]] = "sgd",
    lr: float = 1e-3,
    is_feature_incremental: bool = False,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs,
):
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        lr=lr,
        is_feature_incremental=is_feature_incremental,
        device=device,
        seed=seed,
        window_size=window_size,
        append_predict=append_predict,
        **kwargs,
    )

learn_many

learn_many(X: DataFrame, y: Series) -> None

Performs one step of training with the most recent training examples stored in the sliding window.

Parameters:

Name Type Description Default
X DataFrame

Input examples.

required
y Series

Target values.

required

Returns:

Type Description
Self

The regressor itself.

Source code in deep_river/regression/rolling_regressor.py
def learn_many(self, X: pd.DataFrame, y: pd.Series) -> None:
    """
    Performs one step of training with the most recent training examples
    stored in the sliding window.

    Parameters
    ----------
    X
        Input examples.
    y
        Target values.

    Returns
    -------
    Self
        The regressor itself.
    """
    self._update_observed_features(X)

    X = X[list(self.observed_features)]
    self._x_window.extend(X.values.tolist())

    if len(self._x_window) == self.window_size:
        X_t = self._deque2rolling_tensor(self._x_window)

        # Convert y to tensor (ensuring proper shape for regression)
        y_t = torch.tensor(y.values, dtype=torch.float32, device=self.device).view(
            -1, 1
        )

        self._learn(x=X_t, y=y_t)

learn_one

learn_one(x: dict, y: RegTarget, **kwargs) -> None

Performs one step of training with the most recent training examples stored in the sliding window.

Parameters:

Name Type Description Default
x dict

Input example.

required
y RegTarget

Target value.

required

Returns:

Type Description
Self

The regressor itself.

Source code in deep_river/regression/rolling_regressor.py
def learn_one(self, x: dict, y: base.typing.RegTarget, **kwargs) -> None:
    """
    Performs one step of training with the most recent training examples
    stored in the sliding window.

    Parameters
    ----------
    x
        Input example.
    y
        Target value.

    Returns
    -------
    Self
        The regressor itself.
    """
    self._update_observed_features(x)

    self._x_window.append([x.get(feature, 0) for feature in self.observed_features])

    x_t = self._deque2rolling_tensor(self._x_window)

    # Convert y to tensor (ensuring proper shape for regression)
    y_t = torch.tensor([y], dtype=torch.float32, device=self.device).view(-1, 1)

    self._learn(x=x_t, y=y_t)

predict_many

predict_many(X: DataFrame) -> DataFrame

Predict the probability of each label given the most recent examples

Parameters:

Name Type Description Default
X DataFrame
required

Returns:

Type Description
DataFrame

DataFrame of probabilities for each label.

Source code in deep_river/regression/rolling_regressor.py
def predict_many(self, X: pd.DataFrame) -> pd.DataFrame:
    """
    Predict the probability of each label given the most recent examples

    Parameters
    ----------
    X

    Returns
    -------
    pd.DataFrame
        DataFrame of probabilities for each label.
    """

    self._update_observed_features(X)
    X = X[list(self.observed_features)]
    x_win = self._x_window.copy()
    x_win.extend(X.values.tolist())
    if self.append_predict:
        self._x_window = x_win

    self.module.eval()
    with torch.inference_mode():
        x_t = self._deque2rolling_tensor(x_win)
        y_preds = self.module(x_t).detach().tolist()
    return pd.DataFrame(y_preds if not y_preds.is_cuda else y_preds.cpu().numpy())

predict_one

predict_one(x: dict) -> RegTarget

Predict the probability of each label given the most recent examples stored in the sliding window.

Parameters:

Name Type Description Default
x dict

Input example.

required

Returns:

Type Description
Dict[ClfTarget, float]

Dictionary of probabilities for each label.

Source code in deep_river/regression/rolling_regressor.py
def predict_one(self, x: dict) -> base.typing.RegTarget:
    """
    Predict the probability of each label given the most recent examples
    stored in the sliding window.

    Parameters
    ----------
    x
        Input example.

    Returns
    -------
    Dict[ClfTarget, float]
        Dictionary of probabilities for each label.
    """
    self._update_observed_features(x)

    x_win = self._x_window.copy()
    x_win.append([x.get(feature, 0) for feature in self.observed_features])
    if self.append_predict:
        self._x_window = x_win

    self.module.eval()
    with torch.inference_mode():
        x_t = self._deque2rolling_tensor(x_win)
        res = self.module(x_t).numpy(force=True).item()

    return res