Skip to content

rolling_ae

Classes:

Name Description
RollingAutoencoder

Rolling window autoencoder for streaming anomaly detection.

RollingAutoencoder

RollingAutoencoder(
    module: Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 0.001,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs
)

Bases: RollingDeepEstimator, AnomalyDetector

Rolling window autoencoder for streaming anomaly detection.

Maintains a fixed-size deque of the latest window_size observations and feeds them as a sequence tensor to the wrapped autoencoder module. The anomaly score is the reconstruction error for the current (or most recent) window. This design allows sequence context without retaining the full historical stream.

Parameters:

Name Type Description Default
module Module

Autoencoder (or encoder-only) style module operating on a rolling tensor.

required
loss_fn str | Callable

Loss for reconstruction error measurement.

'mse'
optimizer_fn str | Callable

Optimizer specification.

'sgd'
lr float

Learning rate.

1e-3
device str

Torch device.

'cpu'
seed int

Random seed.

42
window_size int

Number of past samples retained.

10
append_predict bool

If True, the scored sample (during prediction) is appended to the window.

False
**kwargs

Forwarded to :class:~deep_river.base.RollingDeepEstimator.

{}
Notes

The provided module should expect input shape roughly (seq_len, batch=1, n_features) which is what :func:deque2rolling_tensor produces.

Methods:

Name Description
clone

Return a fresh estimator instance with (optionally) copied state.

draw

Render a (partial) computational graph of the wrapped model.

learn_many

Batch update; extends window with rows from X and learns if full.

learn_one

Update model using a single sample appended to the rolling window.

load

Load a previously saved estimator.

save

Persist the estimator (architecture, weights, optimiser & runtime state).

score_many

Return list of reconstruction errors for each row in X.

score_one

Return reconstruction error for current window + candidate sample.

Source code in deep_river/anomaly/rolling_ae.py
def __init__(
    self,
    module: torch.nn.Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 1e-3,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs,
):
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        lr=lr,
        device=device,
        seed=seed,
        window_size=window_size,
        append_predict=append_predict,
        **kwargs,
    )

clone

clone(
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
)

Return a fresh estimator instance with (optionally) copied state.

Parameters:

Name Type Description Default
new_params dict | None

Parameter overrides for the cloned instance.

None
include_attributes bool

If True, runtime state (observed features, buffers) is also copied.

False
copy_weights bool

If True, model weights are copied (otherwise the module is re‑initialised).

False
Source code in deep_river/base.py
def clone(
    self,
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
):
    """Return a fresh estimator instance with (optionally) copied state.

    Parameters
    ----------
    new_params : dict | None
        Parameter overrides for the cloned instance.
    include_attributes : bool, default=False
        If True, runtime state (observed features, buffers) is also copied.
    copy_weights : bool, default=False
        If True, model weights are copied (otherwise the module is re‑initialised).
    """
    new_params = new_params or {}
    copy_weights = new_params.pop("copy_weights", copy_weights)

    params = {**self._get_all_init_params(), **new_params}

    if "module" not in new_params:
        params["module"] = self._rebuild_module()

    new_est = self.__class__(**self._filter_kwargs(self.__class__.__init__, params))

    if copy_weights and hasattr(self.module, "state_dict"):
        new_est.module.load_state_dict(self.module.state_dict())

    if include_attributes:
        new_est._restore_runtime_state(self._get_runtime_state())

    return new_est

draw

draw()

Render a (partial) computational graph of the wrapped model.

Imports graphviz and torchviz lazily. Raises an informative ImportError if the optional dependencies are not installed.

Source code in deep_river/base.py
def draw(self):  # type: ignore[override]
    """Render a (partial) computational graph of the wrapped model.

    Imports ``graphviz`` and ``torchviz`` lazily. Raises an informative
    ImportError if the optional dependencies are not installed.
    """
    try:  # pragma: no cover
        from torchviz import make_dot  # type: ignore
    except Exception as err:  # noqa: BLE001
        raise ImportError(
            "graphviz and torchviz must be installed to draw the model."
        ) from err

    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

learn_many

learn_many(X: DataFrame, y=None) -> None

Batch update; extends window with rows from X and learns if full.

Parameters:

Name Type Description Default
X DataFrame

DataFrame containing the input features for each sample.

required
y None

Ignored, present for compatibility.

None
Source code in deep_river/anomaly/rolling_ae.py
def learn_many(self, X: pd.DataFrame, y=None) -> None:
    """Batch update; extends window with rows from X and learns if full.

    Parameters
    ----------
    X : pd.DataFrame
        DataFrame containing the input features for each sample.
    y : None
        Ignored, present for compatibility.
    """
    self._update_observed_features(X)

    self._x_window.append(X.values.tolist())
    if len(self._x_window) == self.window_size:
        X_t = deque2rolling_tensor(self._x_window, device=self.device)
        self._learn(x=X_t)

learn_one

learn_one(x: dict, y: Any = None, **kwargs) -> None

Update model using a single sample appended to the rolling window.

Parameters:

Name Type Description Default
x dict

Dictionary containing feature name-value pairs for the sample.

required
y Any

Target value (not used in autoencoder training).

None
**kwargs

Additional keyword arguments.

{}
Source code in deep_river/anomaly/rolling_ae.py
def learn_one(self, x: dict, y: Any = None, **kwargs) -> None:
    """Update model using a single sample appended to the rolling window.

    Parameters
    ----------
    x : dict
        Dictionary containing feature name-value pairs for the sample.
    y : Any, optional
        Target value (not used in autoencoder training).
    **kwargs
        Additional keyword arguments.
    """
    self._update_observed_features(x)
    self._x_window.append(list(x.values()))

    x_t = deque2rolling_tensor(self._x_window, device=self.device)
    self._learn(x=x_t)

load classmethod

load(filepath: Union[str, Path])

Load a previously saved estimator.

The method reconstructs the estimator class, its wrapped module, optimiser state and runtime information (feature names, buffers, etc.).

Source code in deep_river/base.py
@classmethod
def load(cls, filepath: Union[str, Path]):
    """Load a previously saved estimator.

    The method reconstructs the estimator class, its wrapped module, optimiser
    state and runtime information (feature names, buffers, etc.).
    """
    with open(filepath, "rb") as f:
        state = pickle.load(f)

    estimator_cls = cls._import_from_path(state["estimator_class"])
    init_params = state["init_params"]

    # Rebuild module if needed
    if "module" in init_params and isinstance(init_params["module"], dict):
        module_info = init_params.pop("module")
        module_cls = cls._import_from_path(module_info["class"])
        module = module_cls(
            **cls._filter_kwargs(module_cls.__init__, module_info["kwargs"])
        )
        if state.get("model_state_dict"):
            module.load_state_dict(state["model_state_dict"])
        init_params["module"] = module

    estimator = estimator_cls(
        **cls._filter_kwargs(estimator_cls.__init__, init_params)
    )

    if state.get("optimizer_state_dict") and hasattr(estimator, "optimizer"):
        try:
            estimator.optimizer.load_state_dict(
                state["optimizer_state_dict"]  # type: ignore[arg-type]
            )
        except Exception:  # noqa: E722
            pass

    estimator._restore_runtime_state(state.get("runtime_state", {}))
    return estimator

save

save(filepath: Union[str, Path]) -> None

Persist the estimator (architecture, weights, optimiser & runtime state).

Parameters:

Name Type Description Default
filepath str | Path

Destination file. Parent directories are created automatically.

required
Source code in deep_river/base.py
def save(self, filepath: Union[str, Path]) -> None:
    """Persist the estimator (architecture, weights, optimiser & runtime state).

    Parameters
    ----------
    filepath : str | Path
        Destination file. Parent directories are created automatically.
    """
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)

    state = {
        "estimator_class": f"{type(self).__module__}.{type(self).__name__}",
        "init_params": self._get_all_init_params(),
        "model_state_dict": getattr(self.module, "state_dict", lambda: {})(),
        "optimizer_state_dict": getattr(self.optimizer, "state_dict", lambda: {})(),
        "runtime_state": self._get_runtime_state(),
    }

    with open(filepath, "wb") as f:
        pickle.dump(state, f)

score_many

score_many(X: DataFrame) -> List[Any]

Return list of reconstruction errors for each row in X.

If the window is not yet full, zeros are returned for alignment.

Parameters:

Name Type Description Default
X DataFrame

DataFrame containing the input features for each sample.

required

Returns:

Type Description
List[float]

List of computed anomaly scores (reconstruction errors) for each sample in X.

Source code in deep_river/anomaly/rolling_ae.py
def score_many(self, X: pd.DataFrame) -> List[Any]:
    """Return list of reconstruction errors for each row in X.

    If the window is not yet full, zeros are returned for alignment.

    Parameters
    ----------
    X : pd.DataFrame
        DataFrame containing the input features for each sample.

    Returns
    -------
    List[float]
        List of computed anomaly scores (reconstruction errors) for each sample in X.
    """
    self._update_observed_features(X)
    x_win = self._x_window.copy()
    x_win.append(X.values.tolist())
    if self.append_predict:
        self._x_window.append(X.values.tolist())

    if len(self._x_window) == self.window_size:
        X_t = deque2rolling_tensor(x_win, device=self.device)
        self.module.eval()
        with torch.inference_mode():
            x_pred = self.module(X_t)
        loss = torch.mean(
            self.loss_func(x_pred, x_pred, reduction="none"),
            dim=list(range(1, x_pred.dim())),
        )
        losses = loss.detach().numpy()
        if len(losses) < len(X):
            losses = np.pad(losses, (len(X) - len(losses), 0))
        return losses.tolist()
    else:
        return np.zeros(len(X)).tolist()

score_one

score_one(x: dict) -> float

Return reconstruction error for current window + candidate sample.

Parameters:

Name Type Description Default
x dict

Dictionary containing feature name-value pairs for the candidate sample.

required

Returns:

Type Description
float

Computed anomaly score (reconstruction error).

Source code in deep_river/anomaly/rolling_ae.py
def score_one(self, x: dict) -> float:
    """Return reconstruction error for current window + candidate sample.

    Parameters
    ----------
    x : dict
        Dictionary containing feature name-value pairs for the candidate sample.

    Returns
    -------
    float
        Computed anomaly score (reconstruction error).
    """
    res = 0.0
    self._update_observed_features(x)
    if len(self._x_window) == self.window_size:
        x_win = self._x_window.copy()
        x_win.append(list(x.values()))
        x_t = deque2rolling_tensor(x_win, device=self.device)
        self.module.eval()
        with torch.inference_mode():
            x_pred = self.module(x_t)
        loss = self.loss_func(x_pred, x_t)
        res = loss.item()

    if self.append_predict:
        self._x_window.append(list(x.values()))
    return res