Skip to content

ae

Classes:

Name Description
Autoencoder

Represents an initialized autoencoder for anomaly detection and feature learning.

Autoencoder

Autoencoder(
    module: Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 0.001,
    is_feature_incremental: bool = False,
    device: str = "cpu",
    seed: int = 42,
    **kwargs
)

Bases: DeepEstimator, AnomalyDetector

Represents an initialized autoencoder for anomaly detection and feature learning.

This class is built upon the DeepEstimatorInitialized and AnomalyDetector base classes. It provides methods for performing unsupervised learning through an autoencoder mechanism. The primary objective of the class is to train the autoencoder on input data and compute anomaly scores based on the reconstruction error. It supports learning on individual examples or entire batches of data.

Attributes:

Name Type Description
is_feature_incremental bool

Indicates whether the model is designed to increment features dynamically.

module Module

The PyTorch model representing the autoencoder architecture.

loss_fn Union[str, Callable]

Specifies the loss function to compute the reconstruction error.

optimizer_fn Union[str, Callable]

Specifies the optimizer to be used for training the autoencoder.

lr float

The learning rate for optimization.

device str

The device on which the model is loaded and trained (e.g., "cpu", "cuda").

seed int

Random seed for ensuring reproducibility.

Methods:

Name Description
clone

Return a fresh estimator instance with (optionally) copied state.

draw

Render a (partial) computational graph of the wrapped model.

learn_many

Performs one step of training with a batch of examples.

learn_one

Performs one step of training with a single example.

load

Load a previously saved estimator.

save

Persist the estimator (architecture, weights, optimiser & runtime state).

score_many

Returns an anomaly score for the provided batch of examples in

score_one

Returns an anomaly score for the provided example in the form of

Source code in deep_river/anomaly/ae.py
def __init__(
    self,
    module: torch.nn.Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 1e-3,
    is_feature_incremental: bool = False,
    device: str = "cpu",
    seed: int = 42,
    **kwargs,
):
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        lr=lr,
        is_feature_incremental=is_feature_incremental,
        device=device,
        seed=seed,
        **kwargs,
    )
    self.is_feature_incremental = is_feature_incremental

clone

clone(
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
)

Return a fresh estimator instance with (optionally) copied state.

Parameters:

Name Type Description Default
new_params dict | None

Parameter overrides for the cloned instance.

None
include_attributes bool

If True, runtime state (observed features, buffers) is also copied.

False
copy_weights bool

If True, model weights are copied (otherwise the module is re‑initialised).

False
Source code in deep_river/base.py
def clone(
    self,
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
):
    """Return a fresh estimator instance with (optionally) copied state.

    Parameters
    ----------
    new_params : dict | None
        Parameter overrides for the cloned instance.
    include_attributes : bool, default=False
        If True, runtime state (observed features, buffers) is also copied.
    copy_weights : bool, default=False
        If True, model weights are copied (otherwise the module is re‑initialised).
    """
    new_params = new_params or {}
    copy_weights = new_params.pop("copy_weights", copy_weights)

    params = {**self._get_all_init_params(), **new_params}

    if "module" not in new_params:
        params["module"] = self._rebuild_module()

    new_est = self.__class__(**self._filter_kwargs(self.__class__.__init__, params))

    if copy_weights and hasattr(self.module, "state_dict"):
        new_est.module.load_state_dict(self.module.state_dict())

    if include_attributes:
        new_est._restore_runtime_state(self._get_runtime_state())

    return new_est

draw

draw()

Render a (partial) computational graph of the wrapped model.

Imports graphviz and torchviz lazily. Raises an informative ImportError if the optional dependencies are not installed.

Source code in deep_river/base.py
def draw(self):  # type: ignore[override]
    """Render a (partial) computational graph of the wrapped model.

    Imports ``graphviz`` and ``torchviz`` lazily. Raises an informative
    ImportError if the optional dependencies are not installed.
    """
    try:  # pragma: no cover
        from torchviz import make_dot  # type: ignore
    except Exception as err:  # noqa: BLE001
        raise ImportError(
            "graphviz and torchviz must be installed to draw the model."
        ) from err

    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

learn_many

learn_many(X: DataFrame) -> None

Performs one step of training with a batch of examples.

Parameters:

Name Type Description Default
X DataFrame

Input batch of examples.

required
Source code in deep_river/anomaly/ae.py
def learn_many(self, X: pd.DataFrame) -> None:
    """
    Performs one step of training with a batch of examples.

    Parameters
    ----------
    X
        Input batch of examples.
    """

    self._update_observed_features(X)
    X_t = self._df2tensor(X)
    self._learn(X_t)

learn_one

learn_one(x: dict, y: Any = None, **kwargs) -> None

Performs one step of training with a single example.

Parameters:

Name Type Description Default
x dict

Input example.

required
**kwargs
{}
Source code in deep_river/anomaly/ae.py
def learn_one(self, x: dict, y: Any = None, **kwargs) -> None:
    """
    Performs one step of training with a single example.

    Parameters
    ----------
    x
        Input example.

    **kwargs
    """
    self._update_observed_features(x)
    self._learn(self._dict2tensor(x))

load classmethod

load(filepath: Union[str, Path])

Load a previously saved estimator.

The method reconstructs the estimator class, its wrapped module, optimiser state and runtime information (feature names, buffers, etc.).

Source code in deep_river/base.py
@classmethod
def load(cls, filepath: Union[str, Path]):
    """Load a previously saved estimator.

    The method reconstructs the estimator class, its wrapped module, optimiser
    state and runtime information (feature names, buffers, etc.).
    """
    with open(filepath, "rb") as f:
        state = pickle.load(f)

    estimator_cls = cls._import_from_path(state["estimator_class"])
    init_params = state["init_params"]

    # Rebuild module if needed
    if "module" in init_params and isinstance(init_params["module"], dict):
        module_info = init_params.pop("module")
        module_cls = cls._import_from_path(module_info["class"])
        module = module_cls(
            **cls._filter_kwargs(module_cls.__init__, module_info["kwargs"])
        )
        if state.get("model_state_dict"):
            module.load_state_dict(state["model_state_dict"])
        init_params["module"] = module

    estimator = estimator_cls(
        **cls._filter_kwargs(estimator_cls.__init__, init_params)
    )

    if state.get("optimizer_state_dict") and hasattr(estimator, "optimizer"):
        try:
            estimator.optimizer.load_state_dict(
                state["optimizer_state_dict"]  # type: ignore[arg-type]
            )
        except Exception:  # noqa: E722
            pass

    estimator._restore_runtime_state(state.get("runtime_state", {}))
    return estimator

save

save(filepath: Union[str, Path]) -> None

Persist the estimator (architecture, weights, optimiser & runtime state).

Parameters:

Name Type Description Default
filepath str | Path

Destination file. Parent directories are created automatically.

required
Source code in deep_river/base.py
def save(self, filepath: Union[str, Path]) -> None:
    """Persist the estimator (architecture, weights, optimiser & runtime state).

    Parameters
    ----------
    filepath : str | Path
        Destination file. Parent directories are created automatically.
    """
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)

    state = {
        "estimator_class": f"{type(self).__module__}.{type(self).__name__}",
        "init_params": self._get_all_init_params(),
        "model_state_dict": getattr(self.module, "state_dict", lambda: {})(),
        "optimizer_state_dict": getattr(self.optimizer, "state_dict", lambda: {})(),
        "runtime_state": self._get_runtime_state(),
    }

    with open(filepath, "wb") as f:
        pickle.dump(state, f)

score_many

score_many(X: DataFrame) -> ndarray

Returns an anomaly score for the provided batch of examples in the form of the autoencoder's reconstruction error.

Parameters:

Name Type Description Default
x

Input batch of examples.

required

Returns:

Type Description
float

Anomaly scores for the given batch of examples. Larger values indicate more anomalous examples.

Source code in deep_river/anomaly/ae.py
def score_many(self, X: pd.DataFrame) -> np.ndarray:
    """
    Returns an anomaly score for the provided batch of examples in
    the form of the autoencoder's reconstruction error.

    Parameters
    ----------
    x
        Input batch of examples.

    Returns
    -------
    float
        Anomaly scores for the given batch of examples. Larger values
        indicate more anomalous examples.
    """
    self._update_observed_features(X)
    x_t = self._df2tensor(X)

    self.module.eval()
    with torch.inference_mode():
        x_pred = self.module(x_t)
    loss = torch.mean(
        self.loss_func(x_pred, x_t, reduction="none"),
        dim=list(range(1, x_t.dim())),
    )
    score = loss.cpu().detach().numpy()
    return score

score_one

score_one(x: dict) -> float

Returns an anomaly score for the provided example in the form of the autoencoder's reconstruction error.

Parameters:

Name Type Description Default
x dict

Input example.

required

Returns:

Type Description
float

Anomaly score for the given example. Larger values indicate more anomalous examples.

Source code in deep_river/anomaly/ae.py
def score_one(self, x: dict) -> float:
    """
    Returns an anomaly score for the provided example in the form of
    the autoencoder's reconstruction error.

    Parameters
    ----------
    x
        Input example.

    Returns
    -------
    float
        Anomaly score for the given example. Larger values indicate
        more anomalous examples.

    """

    self._update_observed_features(x)
    x_t = self._dict2tensor(x)
    self.module.eval()
    with torch.inference_mode():
        x_pred = self.module(x_t)
    loss = self.loss_func(x_pred, x_t).item()
    return loss