Skip to content

rolling_ae

Classes:

Name Description
RollingAutoencoder

Wrapper for PyTorch autoencoder models that uses the networks

RollingAutoencoderInitialized

RollingAutoencoder

RollingAutoencoder(
    module: Type[Module],
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 0.001,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs
)

Bases: RollingDeepEstimator, AnomalyDetector

Wrapper for PyTorch autoencoder models that uses the networks reconstruction error for scoring the anomalousness of a given example. The class also features a rolling window to allow the model to make predictions based on the reconstructability of multiple previous examples.

Parameters:

Name Type Description Default
module Type[Module]

Torch module that builds the autoencoder to be wrapped. The module should accept inputs with shape (window_size, batch_size, n_features). It should also feature a parameter n_features used to adapt the network to the number of features in the initial training example.

required
loss_fn Union[str, Callable]

Loss function to be used for training the wrapped model. Can be a loss function provided by torch.nn.functional or one of the following: 'mse', 'l1', 'cross_entropy', 'binary_crossentropy', 'smooth_l1', 'kl_div'.

'mse'
optimizer_fn Union[str, Callable]

Optimizer to be used for training the wrapped model. Can be an optimizer class provided by torch.optim or one of the following: "adam", "adam_w", "sgd", "rmsprop", "lbfgs".

'sgd'
lr float

Learning rate of the optimizer.

0.001
device str

Device to run the wrapped model on. Can be "cpu" or "cuda".

'cpu'
seed int

Random seed to be used for training the wrapped model.

42
window_size int

Size of the rolling window used for storing previous examples.

10
append_predict bool

Whether to append inputs passed for prediction to the rolling window.

False
**kwargs

Parameters to be passed to the Module or the optimizer.

{}

Methods:

Name Description
clone

Clones the estimator.

draw

Draws the wrapped model.

initialize_module

Parameters

learn_many

Performs one step of training with a batch of examples.

learn_one

Performs one step of training with a single example.

Source code in deep_river/anomaly/rolling_ae.py
def __init__(
    self,
    module: Type[torch.nn.Module],
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 1e-3,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs,
):
    warnings.warn(
        "This is deprecated and will be removed in future releases. "
        "Please instead use the AutoencoderInitialized class and "
        "initialize the module beforehand"
    )
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        lr=lr,
        device=device,
        seed=seed,
        window_size=window_size,
        append_predict=append_predict,
        **kwargs,
    )

clone

clone(
    new_params: dict[Any, Any] | None = None,
    include_attributes=False,
)

Clones the estimator.

Parameters:

Name Type Description Default
new_params dict[Any, Any] | None

New parameters to be passed to the cloned estimator.

None
include_attributes

If True, the attributes of the estimator will be copied to the cloned estimator. This is useful when the estimator is a transformer and the attributes are the learned parameters.

False

Returns:

Type Description
DeepEstimator

The cloned estimator.

Source code in deep_river/base.py
def clone(
    self,
    new_params: dict[Any, Any] | None = None,
    include_attributes=False,
):
    """Clones the estimator.

    Parameters
    ----------
    new_params
        New parameters to be passed to the cloned estimator.
    include_attributes
        If True, the attributes of the estimator will be copied to the
        cloned estimator. This is useful when the estimator is a
        transformer and the attributes are the learned parameters.

    Returns
    -------
    DeepEstimator
        The cloned estimator.
    """
    new_params = new_params or {}
    new_params.update(self.kwargs)
    new_params.update(self._get_params())
    new_params.update({"module": self.module_cls})

    clone = self.__class__(**new_params)
    if include_attributes:
        clone.__dict__.update(self.__dict__)
    return clone

draw

draw() -> Digraph

Draws the wrapped model.

Source code in deep_river/base.py
def draw(self) -> Digraph:
    """Draws the wrapped model."""
    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

initialize_module

initialize_module(x: dict | DataFrame, **kwargs)

Parameters:

Name Type Description Default
module

The instance or class or callable to be initialized, e.g. self.module.

required
kwargs dict

The keyword arguments to initialize the instance or class. Can be an empty dict.

{}

Returns:

Type Description
instance

The initialized component.

Source code in deep_river/base.py
def initialize_module(self, x: dict | pd.DataFrame, **kwargs):
    """
    Parameters
    ----------
    module
      The instance or class or callable to be initialized, e.g.
      ``self.module``.
    kwargs : dict
      The keyword arguments to initialize the instance or class. Can be an
      empty dict.
    Returns
    -------
    instance
      The initialized component.
    """
    torch.manual_seed(self.seed)
    if isinstance(x, Dict):
        n_features = len(x)
    elif isinstance(x, pd.DataFrame):
        n_features = len(x.columns)

    if not isinstance(self.module_cls, torch.nn.Module):
        self.module = self.module_cls(
            n_features=n_features,
            **self._filter_kwargs(self.module_cls, kwargs),
        )

    self.module.to(self.device)
    self.optimizer = self.optimizer_func(self.module.parameters(), lr=self.lr)
    self.module_initialized = True

    self._get_input_output_layers(n_features=n_features)

learn_many

learn_many(X: DataFrame, y=None) -> None

Performs one step of training with a batch of examples.

Parameters:

Name Type Description Default
X DataFrame

Input batch of examples.

required
y

Should be None

None

Returns:

Type Description
RollingAutoencoder

The estimator itself.

Source code in deep_river/anomaly/rolling_ae.py
def learn_many(self, X: pd.DataFrame, y=None) -> None:
    """
    Performs one step of training with a batch of examples.

    Parameters
    ----------
    X
        Input batch of examples.

    y
        Should be None

    Returns
    -------
    RollingAutoencoder
        The estimator itself.
    """
    if not self.module_initialized:
        self._update_observed_features(X)
        self.initialize_module(x=X, **self.kwargs)

    self._x_window.append(X.values.tolist())
    if len(self._x_window) == self.window_size:
        X_t = deque2rolling_tensor(self._x_window, device=self.device)
        self._learn(x=X_t)

learn_one

learn_one(x: dict, y: Any = None, **kwargs) -> None

Performs one step of training with a single example.

Parameters:

Name Type Description Default
x dict

Input example.

required

Returns:

Type Description
RollingAutoencoder

The estimator itself.

Source code in deep_river/anomaly/rolling_ae.py
def learn_one(self, x: dict, y: Any = None, **kwargs) -> None:
    """
    Performs one step of training with a single example.

    Parameters
    ----------
    x
        Input example.

    Returns
    -------
    RollingAutoencoder
        The estimator itself.
    """
    if not self.module_initialized:
        self._update_observed_features(x)
        self.initialize_module(x=x, **self.kwargs)

    self._x_window.append(list(x.values()))

    if len(self._x_window) == self.window_size:
        x_t = deque2rolling_tensor(self._x_window, device=self.device)
        self._learn(x=x_t)

RollingAutoencoderInitialized

RollingAutoencoderInitialized(
    module: Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 0.001,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs
)

Bases: RollingDeepEstimatorInitialized, AnomalyDetector

Source code in deep_river/anomaly/rolling_ae.py
def __init__(
    self,
    module: torch.nn.Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 1e-3,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs,
):
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        lr=lr,
        device=device,
        seed=seed,
        window_size=window_size,
        append_predict=append_predict,
        **kwargs,
    )