Skip to content

zoo

Classes:

Name Description
LSTMClassifier

Rolling LSTM classifier with dynamic class expansion.

LogisticRegression

Incremental logistic regression with optional dynamic class expansion.

MultiLayerPerceptron

Configurable multi-layer perceptron with dynamic class expansion.

RNNClassifier

Rolling RNN classifier with dynamic class expansion.

LSTMClassifier

LSTMClassifier(
    n_features: int = 10,
    hidden_size: int = 16,
    n_init_classes: int = 2,
    loss_fn: Union[str, Callable] = "cross_entropy",
    optimizer_fn: Union[str, Type[Optimizer]] = "sgd",
    lr: float = 0.001,
    output_is_logit: bool = True,
    is_feature_incremental: bool = False,
    is_class_incremental: bool = True,
    device: str = "cpu",
    seed: int = 42,
    gradient_clip_value: float | None = None,
    **kwargs
)

Bases: RollingClassifier

Rolling LSTM classifier with dynamic class expansion.

An LSTM backbone feeds into a linear head that produces logits. Designed for sequential/temporal streams processed via a rolling window (see :class:RollingClassifier). The output layer (head) expands when new classes are observed (if enabled).

Parameters:

Name Type Description Default
n_features int

Number of input features per timestep.

10
hidden_size int

Hidden state dimensionality of the LSTM.

16
n_init_classes int

Initial number of output classes.

2
loss_fn str | Callable

Training loss.

'cross_entropy'
optimizer_fn str | type

Optimizer specification.

'sgd'
lr float

Learning rate.

1e-3
output_is_logit bool

Indicates outputs are logits (enables proper conversion in predict_proba).

True
is_feature_incremental bool

Whether to dynamically expand the input layer when new features appear.

False
is_class_incremental bool

Whether to expand the output layer for new class labels.

True
device str

Torch device.

'cpu'
seed int

Random seed.

42
gradient_clip_value float | None

Optional gradient norm clipping value.

None

Examples:

Deterministischer Test mit dem Phishing-Datenstrom: Rekurrente Gewichte & Kopf-Parameter werden genullt; Bias erzwingt Klasse 0 unabhängig vom Input. (Nur zur Illustration; Lernrate 0 verhindert Updates.)::

>>> import torch, random, numpy as np
>>> from torch import manual_seed
>>> from river import datasets
>>> from river import metrics
>>> from deep_river.classification.zoo import LSTMClassifier
>>> _ = manual_seed(42); random.seed(42); np.random.seed(42)
>>> stream = datasets.Phishing()
>>> samples = {}
>>> for x, y in stream:
...     if y not in samples:
...         samples[y] = x
...     if len(samples) == 2:
...         break
>>> x0, x1 = samples[0], samples[1]
>>> n_features = len(x0)
>>> lstm_clf = LSTMClassifier(n_features=n_features, hidden_size=3, n_init_classes=2,
...                           is_class_incremental=False, is_feature_incremental=False,
...                           lr=0.0, optimizer_fn='sgd')
>>> lstm_clf.learn_one(x0, 0)
>>> acc = metrics.Accuracy()
>>> for i, (x, y) in enumerate(datasets.Phishing().take(200)):
...     lstm_clf.learn_one(x, y)
...     if i > 0:
...         y_pred = lstm_clf.predict_one(x)
...         acc.update(y, y_pred)
>>> assert 0.0 <= acc.get() <= 1.0
>>> print(f"Accuracy: {acc.get():.4f}")  # doctest: +ELLIPSIS
Accuracy: ...

Methods:

Name Description
clone

Return a fresh estimator instance with (optionally) copied state.

draw

Render a (partial) computational graph of the wrapped model.

learn_many

Batch update: extend window with rows of X and perform a step.

learn_one

Learn from a single (x, y) updating the rolling window.

load

Load a previously saved estimator.

predict_proba_many

Return probability DataFrame for multiple samples with rolling context.

predict_proba_one

Return class probability mapping for one sample using rolling context.

save

Persist the estimator (architecture, weights, optimiser & runtime state).

Source code in deep_river/classification/zoo.py
def __init__(
    self,
    n_features: int = 10,
    hidden_size: int = 16,
    n_init_classes: int = 2,
    loss_fn: Union[str, Callable] = "cross_entropy",
    optimizer_fn: Union[str, Type[optim.Optimizer]] = "sgd",
    lr: float = 1e-3,
    output_is_logit: bool = True,
    is_feature_incremental: bool = False,
    is_class_incremental: bool = True,
    device: str = "cpu",
    seed: int = 42,
    gradient_clip_value: float | None = None,
    **kwargs,
):
    self.n_features = n_features
    self.hidden_size = hidden_size
    self.n_init_classes = n_init_classes
    module = LSTMClassifier.LSTMModule(
        n_features=n_features,
        hidden_size=hidden_size,
        n_init_classes=n_init_classes,
    )
    if "module" in kwargs:
        del kwargs["module"]
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        output_is_logit=output_is_logit,
        is_feature_incremental=is_feature_incremental,
        is_class_incremental=is_class_incremental,
        device=device,
        lr=lr,
        seed=seed,
        gradient_clip_value=gradient_clip_value,
        **kwargs,
    )

clone

clone(
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
)

Return a fresh estimator instance with (optionally) copied state.

Parameters:

Name Type Description Default
new_params dict | None

Parameter overrides for the cloned instance.

None
include_attributes bool

If True, runtime state (observed features, buffers) is also copied.

False
copy_weights bool

If True, model weights are copied (otherwise the module is re‑initialised).

False
Source code in deep_river/base.py
def clone(
    self,
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
):
    """Return a fresh estimator instance with (optionally) copied state.

    Parameters
    ----------
    new_params : dict | None
        Parameter overrides for the cloned instance.
    include_attributes : bool, default=False
        If True, runtime state (observed features, buffers) is also copied.
    copy_weights : bool, default=False
        If True, model weights are copied (otherwise the module is re‑initialised).
    """
    new_params = new_params or {}
    copy_weights = new_params.pop("copy_weights", copy_weights)

    params = {**self._get_all_init_params(), **new_params}

    if "module" not in new_params:
        params["module"] = self._rebuild_module()

    new_est = self.__class__(**self._filter_kwargs(self.__class__.__init__, params))

    if copy_weights and hasattr(self.module, "state_dict"):
        new_est.module.load_state_dict(self.module.state_dict())

    if include_attributes:
        new_est._restore_runtime_state(self._get_runtime_state())

    return new_est

draw

draw()

Render a (partial) computational graph of the wrapped model.

Imports graphviz and torchviz lazily. Raises an informative ImportError if the optional dependencies are not installed.

Source code in deep_river/base.py
def draw(self):  # type: ignore[override]
    """Render a (partial) computational graph of the wrapped model.

    Imports ``graphviz`` and ``torchviz`` lazily. Raises an informative
    ImportError if the optional dependencies are not installed.
    """
    try:  # pragma: no cover
        from torchviz import make_dot  # type: ignore
    except Exception as err:  # noqa: BLE001
        raise ImportError(
            "graphviz and torchviz must be installed to draw the model."
        ) from err

    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

learn_many

learn_many(X: DataFrame, y: Series) -> None

Batch update: extend window with rows of X and perform a step.

Source code in deep_river/classification/rolling_classifier.py
def learn_many(self, X: pd.DataFrame, y: pd.Series) -> None:
    """Batch update: extend window with rows of X and perform a step."""
    self._update_observed_targets(y)
    self._update_observed_features(X)
    X = X[list(self.observed_features)]
    self._x_window.extend(X.values.tolist())
    X_t = self._deque2rolling_tensor(self._x_window)
    self._learn(x=X_t, y=y)

learn_one

learn_one(x: dict, y: ClfTarget, **kwargs) -> None

Learn from a single (x, y) updating the rolling window.

Source code in deep_river/classification/rolling_classifier.py
def learn_one(self, x: dict, y: ClfTarget, **kwargs) -> None:
    """Learn from a single (x, y) updating the rolling window."""
    self._update_observed_features(x)
    self._update_observed_targets(y)
    self._x_window.append([x.get(feature, 0) for feature in self.observed_features])
    x_t = self._deque2rolling_tensor(self._x_window)
    self._learn(x=x_t, y=y)

load classmethod

load(filepath: Union[str, Path])

Load a previously saved estimator.

The method reconstructs the estimator class, its wrapped module, optimiser state and runtime information (feature names, buffers, etc.).

Source code in deep_river/base.py
@classmethod
def load(cls, filepath: Union[str, Path]):
    """Load a previously saved estimator.

    The method reconstructs the estimator class, its wrapped module, optimiser
    state and runtime information (feature names, buffers, etc.).
    """
    with open(filepath, "rb") as f:
        state = pickle.load(f)

    estimator_cls = cls._import_from_path(state["estimator_class"])
    init_params = state["init_params"]

    # Rebuild module if needed
    if "module" in init_params and isinstance(init_params["module"], dict):
        module_info = init_params.pop("module")
        module_cls = cls._import_from_path(module_info["class"])
        module = module_cls(
            **cls._filter_kwargs(module_cls.__init__, module_info["kwargs"])
        )
        if state.get("model_state_dict"):
            module.load_state_dict(state["model_state_dict"])
        init_params["module"] = module

    estimator = estimator_cls(
        **cls._filter_kwargs(estimator_cls.__init__, init_params)
    )

    if state.get("optimizer_state_dict") and hasattr(estimator, "optimizer"):
        try:
            estimator.optimizer.load_state_dict(
                state["optimizer_state_dict"]  # type: ignore[arg-type]
            )
        except Exception:  # noqa: E722
            pass

    estimator._restore_runtime_state(state.get("runtime_state", {}))
    return estimator

predict_proba_many

predict_proba_many(X: DataFrame) -> DataFrame

Return probability DataFrame for multiple samples with rolling context.

Source code in deep_river/classification/rolling_classifier.py
def predict_proba_many(self, X: pd.DataFrame) -> pd.DataFrame:
    """Return probability DataFrame for multiple samples with rolling context."""
    self._update_observed_features(X)
    X = X[list(self.observed_features)]
    x_win = self._x_window.copy()
    x_win.extend(X.values.tolist())
    if self.append_predict:
        self._x_window = x_win
    self.module.eval()
    with torch.inference_mode():
        x_t = self._deque2rolling_tensor(x_win)
        probas = self.module(x_t).detach().tolist()
    return pd.DataFrame(probas)

predict_proba_one

predict_proba_one(x: dict) -> Dict[ClfTarget, float]

Return class probability mapping for one sample using rolling context.

Source code in deep_river/classification/rolling_classifier.py
def predict_proba_one(self, x: dict) -> Dict[ClfTarget, float]:
    """Return class probability mapping for one sample using rolling context."""
    self._update_observed_features(x)
    x_win = self._x_window.copy()
    x_win.append([x.get(feature, 0) for feature in self.observed_features])
    if self.append_predict:
        self._x_window = x_win
    self.module.eval()
    with torch.inference_mode():
        x_t = self._deque2rolling_tensor(x_win)
        y_pred = self.module(x_t)
        proba = output2proba(y_pred, self.observed_classes, self.output_is_logit)
    return cast(Dict[ClfTarget, float], proba[0])

save

save(filepath: Union[str, Path]) -> None

Persist the estimator (architecture, weights, optimiser & runtime state).

Parameters:

Name Type Description Default
filepath str | Path

Destination file. Parent directories are created automatically.

required
Source code in deep_river/base.py
def save(self, filepath: Union[str, Path]) -> None:
    """Persist the estimator (architecture, weights, optimiser & runtime state).

    Parameters
    ----------
    filepath : str | Path
        Destination file. Parent directories are created automatically.
    """
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)

    state = {
        "estimator_class": f"{type(self).__module__}.{type(self).__name__}",
        "init_params": self._get_all_init_params(),
        "model_state_dict": getattr(self.module, "state_dict", lambda: {})(),
        "optimizer_state_dict": getattr(self.optimizer, "state_dict", lambda: {})(),
        "runtime_state": self._get_runtime_state(),
    }

    with open(filepath, "wb") as f:
        pickle.dump(state, f)

LogisticRegression

LogisticRegression(
    n_features: int = 10,
    n_init_classes: int = 2,
    loss_fn: Union[str, Callable] = "cross_entropy",
    optimizer_fn: Union[str, Type[Optimizer]] = "sgd",
    lr: float = 0.001,
    output_is_logit: bool = True,
    is_feature_incremental: bool = False,
    is_class_incremental: bool = True,
    device: str = "cpu",
    seed: int = 42,
    gradient_clip_value: float | None = None,
    **kwargs
)

Bases: Classifier

Incremental logistic regression with optional dynamic class expansion.

This variant outputs raw logits (no internal softmax) so that losses like cross_entropy can be applied directly. The output layer can grow in response to newly observed class labels when is_class_incremental=True.

Parameters:

Name Type Description Default
n_features int

Initial number of input features.

10
n_init_classes int

Initial number of output units/classes. Expanded automatically if new classes appear and class incrementality is enabled.

2
loss_fn str | Callable

Training loss.

'cross_entropy'
optimizer_fn str | type

Optimizer specification.

'sgd'
lr float

Learning rate.

1e-3
output_is_logit bool

Indicates outputs are logits (enables proper conversion in predict_proba).

True
is_feature_incremental bool

Whether to dynamically expand the input layer when new features appear.

False
is_class_incremental bool

Whether to expand the output layer for new class labels.

True
device str

Torch device.

'cpu'
seed int

Random seed.

42
gradient_clip_value float | None

Optional gradient norm clipping value.

None
**kwargs

Forwarded to the parent constructor.

{}

Examples:

Streaming binary classification on the Phishing dataset. The exact Accuracy value may vary depending on library version and hardware::

>>> import random, numpy as np, torch
>>> from torch import manual_seed
>>> from river import datasets, metrics
>>> from deep_river.classification.zoo import LogisticRegression
>>> _ = manual_seed(42); random.seed(42); np.random.seed(42)
>>> first_x, _ = next(iter(datasets.Phishing()))
>>> clf = LogisticRegression(
...     n_features=len(first_x), n_init_classes=2,
...     optimizer_fn='sgd', lr=1e-2, is_class_incremental=True,
... )
>>> acc = metrics.Accuracy()
>>> for i, (x, y) in enumerate(datasets.Phishing().take(200)):
...     clf.learn_one(x, y)
...     if i > 0:
...         y_pred = clf.predict_one(x)
...         acc.update(y, y_pred)
>>> assert 0.5 <= acc.get() <= 1.0
>>> print(f"Accuracy: {acc.get():.4f}")  # doctest: +ELLIPSIS
Accuracy: ...

Methods:

Name Description
clone

Return a fresh estimator instance with (optionally) copied state.

draw

Render a (partial) computational graph of the wrapped model.

learn_many

Learn from a batch of instances.

learn_one

Learn from a single instance.

load

Load a previously saved estimator.

predict_proba_many

Predict probabilities for a batch of instances.

predict_proba_one

Predict class membership probabilities for one instance.

save

Persist the estimator (architecture, weights, optimiser & runtime state).

Source code in deep_river/classification/zoo.py
def __init__(
    self,
    n_features: int = 10,
    n_init_classes: int = 2,
    loss_fn: Union[str, Callable] = "cross_entropy",
    optimizer_fn: Union[str, Type[optim.Optimizer]] = "sgd",
    lr: float = 1e-3,
    output_is_logit: bool = True,
    is_feature_incremental: bool = False,
    is_class_incremental: bool = True,
    device: str = "cpu",
    seed: int = 42,
    gradient_clip_value: float | None = None,
    **kwargs,
):
    self.n_features = n_features
    self.n_init_classes = n_init_classes
    module = LogisticRegression.LRModule(
        n_features=n_features, n_init_classes=n_init_classes
    )
    if "module" in kwargs:
        del kwargs["module"]
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        output_is_logit=output_is_logit,
        is_feature_incremental=is_feature_incremental,
        is_class_incremental=is_class_incremental,
        device=device,
        lr=lr,
        seed=seed,
        gradient_clip_value=gradient_clip_value,
        **kwargs,
    )

clone

clone(
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
)

Return a fresh estimator instance with (optionally) copied state.

Parameters:

Name Type Description Default
new_params dict | None

Parameter overrides for the cloned instance.

None
include_attributes bool

If True, runtime state (observed features, buffers) is also copied.

False
copy_weights bool

If True, model weights are copied (otherwise the module is re‑initialised).

False
Source code in deep_river/base.py
def clone(
    self,
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
):
    """Return a fresh estimator instance with (optionally) copied state.

    Parameters
    ----------
    new_params : dict | None
        Parameter overrides for the cloned instance.
    include_attributes : bool, default=False
        If True, runtime state (observed features, buffers) is also copied.
    copy_weights : bool, default=False
        If True, model weights are copied (otherwise the module is re‑initialised).
    """
    new_params = new_params or {}
    copy_weights = new_params.pop("copy_weights", copy_weights)

    params = {**self._get_all_init_params(), **new_params}

    if "module" not in new_params:
        params["module"] = self._rebuild_module()

    new_est = self.__class__(**self._filter_kwargs(self.__class__.__init__, params))

    if copy_weights and hasattr(self.module, "state_dict"):
        new_est.module.load_state_dict(self.module.state_dict())

    if include_attributes:
        new_est._restore_runtime_state(self._get_runtime_state())

    return new_est

draw

draw()

Render a (partial) computational graph of the wrapped model.

Imports graphviz and torchviz lazily. Raises an informative ImportError if the optional dependencies are not installed.

Source code in deep_river/base.py
def draw(self):  # type: ignore[override]
    """Render a (partial) computational graph of the wrapped model.

    Imports ``graphviz`` and ``torchviz`` lazily. Raises an informative
    ImportError if the optional dependencies are not installed.
    """
    try:  # pragma: no cover
        from torchviz import make_dot  # type: ignore
    except Exception as err:  # noqa: BLE001
        raise ImportError(
            "graphviz and torchviz must be installed to draw the model."
        ) from err

    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

learn_many

learn_many(X: DataFrame, y: Series) -> None

Learn from a batch of instances.

Parameters:

Name Type Description Default
X DataFrame

Batch of feature rows.

required
y Series

Corresponding labels.

required
Source code in deep_river/classification/classifier.py
def learn_many(self, X: pd.DataFrame, y: pd.Series) -> None:
    """Learn from a batch of instances.

    Parameters
    ----------
    X : pandas.DataFrame
        Batch of feature rows.
    y : pandas.Series
        Corresponding labels.
    """
    self._update_observed_features(X)
    self._update_observed_targets(y)
    x_t = self._df2tensor(X)
    if self.loss_fn == "cross_entropy":
        self._classification_step_cross_entropy(x_t, y)
    else:
        self._learn(x_t, y)

learn_one

learn_one(x: dict, y: ClfTarget) -> None

Learn from a single instance.

Parameters:

Name Type Description Default
x dict

Feature dictionary.

required
y hashable

Class label.

required
Source code in deep_river/classification/classifier.py
def learn_one(self, x: dict, y: base.typing.ClfTarget) -> None:
    """Learn from a single instance.

    Parameters
    ----------
    x : dict
        Feature dictionary.
    y : hashable
        Class label.
    """
    self._update_observed_features(x)
    self._update_observed_targets(y)
    x_t = self._dict2tensor(x)
    if self.loss_fn == "cross_entropy":
        self._classification_step_cross_entropy(x_t, y)
    else:
        # One-hot pathway / other losses
        self._learn(x_t, y)

load classmethod

load(filepath: Union[str, Path])

Load a previously saved estimator.

The method reconstructs the estimator class, its wrapped module, optimiser state and runtime information (feature names, buffers, etc.).

Source code in deep_river/base.py
@classmethod
def load(cls, filepath: Union[str, Path]):
    """Load a previously saved estimator.

    The method reconstructs the estimator class, its wrapped module, optimiser
    state and runtime information (feature names, buffers, etc.).
    """
    with open(filepath, "rb") as f:
        state = pickle.load(f)

    estimator_cls = cls._import_from_path(state["estimator_class"])
    init_params = state["init_params"]

    # Rebuild module if needed
    if "module" in init_params and isinstance(init_params["module"], dict):
        module_info = init_params.pop("module")
        module_cls = cls._import_from_path(module_info["class"])
        module = module_cls(
            **cls._filter_kwargs(module_cls.__init__, module_info["kwargs"])
        )
        if state.get("model_state_dict"):
            module.load_state_dict(state["model_state_dict"])
        init_params["module"] = module

    estimator = estimator_cls(
        **cls._filter_kwargs(estimator_cls.__init__, init_params)
    )

    if state.get("optimizer_state_dict") and hasattr(estimator, "optimizer"):
        try:
            estimator.optimizer.load_state_dict(
                state["optimizer_state_dict"]  # type: ignore[arg-type]
            )
        except Exception:  # noqa: E722
            pass

    estimator._restore_runtime_state(state.get("runtime_state", {}))
    return estimator

predict_proba_many

predict_proba_many(X: DataFrame) -> DataFrame

Predict probabilities for a batch of instances.

Parameters:

Name Type Description Default
X DataFrame

Feature matrix.

required

Returns:

Type Description
DataFrame

Each row sums to 1 (multi-class) or has two columns for binary.

Source code in deep_river/classification/classifier.py
def predict_proba_many(self, X: pd.DataFrame) -> pd.DataFrame:
    """Predict probabilities for a batch of instances.

    Parameters
    ----------
    X : pandas.DataFrame
        Feature matrix.

    Returns
    -------
    pandas.DataFrame
        Each row sums to 1 (multi-class) or has two columns for binary.
    """
    self._update_observed_features(X)
    x_t = self._df2tensor(X)
    self.module.eval()
    with torch.inference_mode():
        y_preds = self.module(x_t)
    return pd.DataFrame(
        output2proba(y_preds, self.observed_classes, self.output_is_logit)
    )

predict_proba_one

predict_proba_one(x: dict) -> dict[ClfTarget, float]

Predict class membership probabilities for one instance.

Parameters:

Name Type Description Default
x dict

Feature dictionary.

required

Returns:

Type Description
dict

Mapping from label -> probability.

Source code in deep_river/classification/classifier.py
def predict_proba_one(self, x: dict) -> dict[base.typing.ClfTarget, float]:
    """Predict class membership probabilities for one instance.

    Parameters
    ----------
    x : dict
        Feature dictionary.

    Returns
    -------
    dict
        Mapping from label -> probability.
    """
    self._update_observed_features(x)
    x_t = self._dict2tensor(x)
    self.module.eval()
    with torch.inference_mode():
        y_pred = self.module(x_t)
    raw = output2proba(y_pred, self.observed_classes, self.output_is_logit)[0]
    return cast(dict[base.typing.ClfTarget, float], raw)

save

save(filepath: Union[str, Path]) -> None

Persist the estimator (architecture, weights, optimiser & runtime state).

Parameters:

Name Type Description Default
filepath str | Path

Destination file. Parent directories are created automatically.

required
Source code in deep_river/base.py
def save(self, filepath: Union[str, Path]) -> None:
    """Persist the estimator (architecture, weights, optimiser & runtime state).

    Parameters
    ----------
    filepath : str | Path
        Destination file. Parent directories are created automatically.
    """
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)

    state = {
        "estimator_class": f"{type(self).__module__}.{type(self).__name__}",
        "init_params": self._get_all_init_params(),
        "model_state_dict": getattr(self.module, "state_dict", lambda: {})(),
        "optimizer_state_dict": getattr(self.optimizer, "state_dict", lambda: {})(),
        "runtime_state": self._get_runtime_state(),
    }

    with open(filepath, "wb") as f:
        pickle.dump(state, f)

MultiLayerPerceptron

MultiLayerPerceptron(
    n_features: int = 10,
    n_width: int = 5,
    n_layers: int = 5,
    n_init_classes: int = 2,
    loss_fn: Union[str, Callable] = "cross_entropy",
    optimizer_fn: Union[str, Type[Optimizer]] = "sgd",
    lr: float = 0.001,
    output_is_logit: bool = True,
    is_feature_incremental: bool = False,
    is_class_incremental: bool = True,
    device: str = "cpu",
    seed: int = 42,
    gradient_clip_value: float | None = None,
    **kwargs
)

Bases: Classifier

Configurable multi-layer perceptron with dynamic class expansion.

Hidden layers use ReLU activations; the output layer emits raw logits.

Parameters:

Name Type Description Default
n_features int

Initial number of features.

10
n_width int

Width (units) of each hidden layer.

5
n_layers int

Number of hidden layers (>=1). If 1, only the input layer feeds the output.

5
n_init_classes int

Initial number of classes/output units.

2
loss_fn Union[str, Callable]

is_class_incremental, device, seed, gradient_clip_value, **kwargs See :class:LogisticRegression.

'cross_entropy'
optimizer_fn Union[str, Callable]

is_class_incremental, device, seed, gradient_clip_value, **kwargs See :class:LogisticRegression.

'cross_entropy'
lr Union[str, Callable]

is_class_incremental, device, seed, gradient_clip_value, **kwargs See :class:LogisticRegression.

'cross_entropy'
output_is_logit Union[str, Callable]

is_class_incremental, device, seed, gradient_clip_value, **kwargs See :class:LogisticRegression.

'cross_entropy'
is_feature_incremental Union[str, Callable]

is_class_incremental, device, seed, gradient_clip_value, **kwargs See :class:LogisticRegression.

'cross_entropy'

Examples:

Phishing dataset stream with online Accuracy. The exact value may vary
depending on library version and hardware::

>>> import random, numpy as np, torch
>>> from torch import manual_seed
>>> from river import datasets, metrics
>>> from deep_river.classification.zoo import MultiLayerPerceptron
>>> _ = manual_seed(42); random.seed(42); np.random.seed(42)
>>> first_x, _ = next(iter(datasets.Phishing()))
>>> mlp = MultiLayerPerceptron(
...     n_features=len(first_x), n_width=8, n_layers=2, n_init_classes=2,
...     optimizer_fn='sgd', lr=5e-3, is_class_incremental=True,
... )
>>> acc = metrics.Accuracy()
>>> for i, (x, y) in enumerate(datasets.Phishing().take(200)):
...     mlp.learn_one(x, y)
...     if i > 0:
...         y_pred = mlp.predict_one(x)
...         acc.update(y, y_pred)
>>> assert 0.5 <= acc.get() <= 1.0
>>> print(f"Accuracy: {acc.get():.4f}")  # doctest: +ELLIPSIS
Accuracy: ...

Methods:

Name Description
clone

Return a fresh estimator instance with (optionally) copied state.

draw

Render a (partial) computational graph of the wrapped model.

learn_many

Learn from a batch of instances.

learn_one

Learn from a single instance.

load

Load a previously saved estimator.

predict_proba_many

Predict probabilities for a batch of instances.

predict_proba_one

Predict class membership probabilities for one instance.

save

Persist the estimator (architecture, weights, optimiser & runtime state).

Source code in deep_river/classification/zoo.py
def __init__(
    self,
    n_features: int = 10,
    n_width: int = 5,
    n_layers: int = 5,
    n_init_classes: int = 2,
    loss_fn: Union[str, Callable] = "cross_entropy",
    optimizer_fn: Union[str, Type[optim.Optimizer]] = "sgd",
    lr: float = 1e-3,
    output_is_logit: bool = True,
    is_feature_incremental: bool = False,
    is_class_incremental: bool = True,
    device: str = "cpu",
    seed: int = 42,
    gradient_clip_value: float | None = None,
    **kwargs,
):
    self.n_features = n_features
    self.n_width = n_width
    self.n_layers = n_layers
    self.n_init_classes = n_init_classes
    module = MultiLayerPerceptron.MLPModule(
        n_width=n_width,
        n_layers=n_layers,
        n_features=n_features,
        n_init_classes=n_init_classes,
    )
    if "module" in kwargs:
        del kwargs["module"]
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        output_is_logit=output_is_logit,
        is_feature_incremental=is_feature_incremental,
        is_class_incremental=is_class_incremental,
        device=device,
        lr=lr,
        seed=seed,
        gradient_clip_value=gradient_clip_value,
        **kwargs,
    )

clone

clone(
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
)

Return a fresh estimator instance with (optionally) copied state.

Parameters:

Name Type Description Default
new_params dict | None

Parameter overrides for the cloned instance.

None
include_attributes bool

If True, runtime state (observed features, buffers) is also copied.

False
copy_weights bool

If True, model weights are copied (otherwise the module is re‑initialised).

False
Source code in deep_river/base.py
def clone(
    self,
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
):
    """Return a fresh estimator instance with (optionally) copied state.

    Parameters
    ----------
    new_params : dict | None
        Parameter overrides for the cloned instance.
    include_attributes : bool, default=False
        If True, runtime state (observed features, buffers) is also copied.
    copy_weights : bool, default=False
        If True, model weights are copied (otherwise the module is re‑initialised).
    """
    new_params = new_params or {}
    copy_weights = new_params.pop("copy_weights", copy_weights)

    params = {**self._get_all_init_params(), **new_params}

    if "module" not in new_params:
        params["module"] = self._rebuild_module()

    new_est = self.__class__(**self._filter_kwargs(self.__class__.__init__, params))

    if copy_weights and hasattr(self.module, "state_dict"):
        new_est.module.load_state_dict(self.module.state_dict())

    if include_attributes:
        new_est._restore_runtime_state(self._get_runtime_state())

    return new_est

draw

draw()

Render a (partial) computational graph of the wrapped model.

Imports graphviz and torchviz lazily. Raises an informative ImportError if the optional dependencies are not installed.

Source code in deep_river/base.py
def draw(self):  # type: ignore[override]
    """Render a (partial) computational graph of the wrapped model.

    Imports ``graphviz`` and ``torchviz`` lazily. Raises an informative
    ImportError if the optional dependencies are not installed.
    """
    try:  # pragma: no cover
        from torchviz import make_dot  # type: ignore
    except Exception as err:  # noqa: BLE001
        raise ImportError(
            "graphviz and torchviz must be installed to draw the model."
        ) from err

    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

learn_many

learn_many(X: DataFrame, y: Series) -> None

Learn from a batch of instances.

Parameters:

Name Type Description Default
X DataFrame

Batch of feature rows.

required
y Series

Corresponding labels.

required
Source code in deep_river/classification/classifier.py
def learn_many(self, X: pd.DataFrame, y: pd.Series) -> None:
    """Learn from a batch of instances.

    Parameters
    ----------
    X : pandas.DataFrame
        Batch of feature rows.
    y : pandas.Series
        Corresponding labels.
    """
    self._update_observed_features(X)
    self._update_observed_targets(y)
    x_t = self._df2tensor(X)
    if self.loss_fn == "cross_entropy":
        self._classification_step_cross_entropy(x_t, y)
    else:
        self._learn(x_t, y)

learn_one

learn_one(x: dict, y: ClfTarget) -> None

Learn from a single instance.

Parameters:

Name Type Description Default
x dict

Feature dictionary.

required
y hashable

Class label.

required
Source code in deep_river/classification/classifier.py
def learn_one(self, x: dict, y: base.typing.ClfTarget) -> None:
    """Learn from a single instance.

    Parameters
    ----------
    x : dict
        Feature dictionary.
    y : hashable
        Class label.
    """
    self._update_observed_features(x)
    self._update_observed_targets(y)
    x_t = self._dict2tensor(x)
    if self.loss_fn == "cross_entropy":
        self._classification_step_cross_entropy(x_t, y)
    else:
        # One-hot pathway / other losses
        self._learn(x_t, y)

load classmethod

load(filepath: Union[str, Path])

Load a previously saved estimator.

The method reconstructs the estimator class, its wrapped module, optimiser state and runtime information (feature names, buffers, etc.).

Source code in deep_river/base.py
@classmethod
def load(cls, filepath: Union[str, Path]):
    """Load a previously saved estimator.

    The method reconstructs the estimator class, its wrapped module, optimiser
    state and runtime information (feature names, buffers, etc.).
    """
    with open(filepath, "rb") as f:
        state = pickle.load(f)

    estimator_cls = cls._import_from_path(state["estimator_class"])
    init_params = state["init_params"]

    # Rebuild module if needed
    if "module" in init_params and isinstance(init_params["module"], dict):
        module_info = init_params.pop("module")
        module_cls = cls._import_from_path(module_info["class"])
        module = module_cls(
            **cls._filter_kwargs(module_cls.__init__, module_info["kwargs"])
        )
        if state.get("model_state_dict"):
            module.load_state_dict(state["model_state_dict"])
        init_params["module"] = module

    estimator = estimator_cls(
        **cls._filter_kwargs(estimator_cls.__init__, init_params)
    )

    if state.get("optimizer_state_dict") and hasattr(estimator, "optimizer"):
        try:
            estimator.optimizer.load_state_dict(
                state["optimizer_state_dict"]  # type: ignore[arg-type]
            )
        except Exception:  # noqa: E722
            pass

    estimator._restore_runtime_state(state.get("runtime_state", {}))
    return estimator

predict_proba_many

predict_proba_many(X: DataFrame) -> DataFrame

Predict probabilities for a batch of instances.

Parameters:

Name Type Description Default
X DataFrame

Feature matrix.

required

Returns:

Type Description
DataFrame

Each row sums to 1 (multi-class) or has two columns for binary.

Source code in deep_river/classification/classifier.py
def predict_proba_many(self, X: pd.DataFrame) -> pd.DataFrame:
    """Predict probabilities for a batch of instances.

    Parameters
    ----------
    X : pandas.DataFrame
        Feature matrix.

    Returns
    -------
    pandas.DataFrame
        Each row sums to 1 (multi-class) or has two columns for binary.
    """
    self._update_observed_features(X)
    x_t = self._df2tensor(X)
    self.module.eval()
    with torch.inference_mode():
        y_preds = self.module(x_t)
    return pd.DataFrame(
        output2proba(y_preds, self.observed_classes, self.output_is_logit)
    )

predict_proba_one

predict_proba_one(x: dict) -> dict[ClfTarget, float]

Predict class membership probabilities for one instance.

Parameters:

Name Type Description Default
x dict

Feature dictionary.

required

Returns:

Type Description
dict

Mapping from label -> probability.

Source code in deep_river/classification/classifier.py
def predict_proba_one(self, x: dict) -> dict[base.typing.ClfTarget, float]:
    """Predict class membership probabilities for one instance.

    Parameters
    ----------
    x : dict
        Feature dictionary.

    Returns
    -------
    dict
        Mapping from label -> probability.
    """
    self._update_observed_features(x)
    x_t = self._dict2tensor(x)
    self.module.eval()
    with torch.inference_mode():
        y_pred = self.module(x_t)
    raw = output2proba(y_pred, self.observed_classes, self.output_is_logit)[0]
    return cast(dict[base.typing.ClfTarget, float], raw)

save

save(filepath: Union[str, Path]) -> None

Persist the estimator (architecture, weights, optimiser & runtime state).

Parameters:

Name Type Description Default
filepath str | Path

Destination file. Parent directories are created automatically.

required
Source code in deep_river/base.py
def save(self, filepath: Union[str, Path]) -> None:
    """Persist the estimator (architecture, weights, optimiser & runtime state).

    Parameters
    ----------
    filepath : str | Path
        Destination file. Parent directories are created automatically.
    """
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)

    state = {
        "estimator_class": f"{type(self).__module__}.{type(self).__name__}",
        "init_params": self._get_all_init_params(),
        "model_state_dict": getattr(self.module, "state_dict", lambda: {})(),
        "optimizer_state_dict": getattr(self.optimizer, "state_dict", lambda: {})(),
        "runtime_state": self._get_runtime_state(),
    }

    with open(filepath, "wb") as f:
        pickle.dump(state, f)

RNNClassifier

RNNClassifier(
    n_features: int = 10,
    hidden_size: int = 16,
    num_layers: int = 1,
    nonlinearity: str = "tanh",
    n_init_classes: int = 2,
    loss_fn: Union[str, Callable] = "cross_entropy",
    optimizer_fn: Union[str, Type[Optimizer]] = "adam",
    lr: float = 0.001,
    output_is_logit: bool = True,
    is_feature_incremental: bool = False,
    is_class_incremental: bool = True,
    device: str = "cpu",
    seed: int = 42,
    gradient_clip_value: float | None = None,
    **kwargs
)

Bases: RollingClassifier

Rolling RNN classifier with dynamic class expansion.

Uses a (stacked) nn.RNN backbone followed by a linear head that produces raw logits. Designed for streaming sequential data via a fixed-size rolling window handled by :class:RollingClassifier.

Parameters:

Name Type Description Default
n_features int

Number of input features per timestep.

10
hidden_size int

Hidden state dimensionality of the RNN.

16
num_layers int

Number of stacked RNN layers.

1
nonlinearity str

Non-linearity used inside the RNN ('tanh' or 'relu').

'tanh'
n_init_classes int

Initial number of classes/output units.

2
loss_fn str | Callable

Training loss.

'cross_entropy'
optimizer_fn str | type

Optimizer specification.

'sgd'
lr float

Learning rate.

1e-3
output_is_logit bool

Indicates outputs are logits (enables proper conversion in predict_proba).

True
is_feature_incremental bool

Whether to dynamically expand the input layer when new features appear.

False
is_class_incremental bool

Whether to expand the output layer for new class labels.

True
device str

Torch device.

'cpu'
seed int

Random seed.

42
gradient_clip_value float | None

Optional gradient norm clipping value.

None

Examples:

>>> import torch, random, numpy as np
>>> from torch import manual_seed
>>> from river import metrics
>>> from river import datasets
>>> from deep_river.classification.zoo import RNNClassifier
>>> _ = manual_seed(42); random.seed(42); np.random.seed(42)
>>> stream = datasets.Phishing()
>>> samples = {}
>>> for x, y in stream:
...     if y not in samples:
...         samples[y] = x
...     if len(samples) == 2:
...         break
>>> x0, x1 = samples[0], samples[1]
>>> n_features = len(x0)
>>> rnn_clf = RNNClassifier(n_features=n_features, hidden_size=3, n_init_classes=2,
...                         is_class_incremental=False, is_feature_incremental=False)
>>> acc = metrics.Accuracy()
>>> for i, (x, y) in enumerate(datasets.Phishing().take(200)):
...     rnn_clf.learn_one(x, y)
...     if i > 0:
...         y_pred = rnn_clf.predict_one(x)
...         acc.update(y, y_pred)
>>> assert 0.0 <= acc.get() <= 1.0
>>> print(f"Accuracy: {acc.get():.4f}")  # doctest: +ELLIPSIS
Accuracy: ...

Methods:

Name Description
clone

Return a fresh estimator instance with (optionally) copied state.

draw

Render a (partial) computational graph of the wrapped model.

learn_many

Batch update: extend window with rows of X and perform a step.

learn_one

Learn from a single (x, y) updating the rolling window.

load

Load a previously saved estimator.

predict_proba_many

Return probability DataFrame for multiple samples with rolling context.

predict_proba_one

Return class probability mapping for one sample using rolling context.

save

Persist the estimator (architecture, weights, optimiser & runtime state).

Source code in deep_river/classification/zoo.py
def __init__(
    self,
    n_features: int = 10,
    hidden_size: int = 16,
    num_layers: int = 1,
    nonlinearity: str = "tanh",
    n_init_classes: int = 2,
    loss_fn: Union[str, Callable] = "cross_entropy",
    optimizer_fn: Union[str, Type[optim.Optimizer]] = "adam",
    lr: float = 1e-3,
    output_is_logit: bool = True,
    is_feature_incremental: bool = False,
    is_class_incremental: bool = True,
    device: str = "cpu",
    seed: int = 42,
    gradient_clip_value: float | None = None,
    **kwargs,
):
    self.n_features = n_features
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.nonlinearity = nonlinearity
    self.n_init_classes = n_init_classes
    module = RNNClassifier.RNNModule(
        n_features=n_features,
        hidden_size=hidden_size,
        num_layers=num_layers,
        nonlinearity=nonlinearity,
        n_init_classes=n_init_classes,
    )
    if "module" in kwargs:
        del kwargs["module"]
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        output_is_logit=output_is_logit,
        is_feature_incremental=is_feature_incremental,
        is_class_incremental=is_class_incremental,
        device=device,
        lr=lr,
        seed=seed,
        gradient_clip_value=gradient_clip_value,
        **kwargs,
    )

clone

clone(
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
)

Return a fresh estimator instance with (optionally) copied state.

Parameters:

Name Type Description Default
new_params dict | None

Parameter overrides for the cloned instance.

None
include_attributes bool

If True, runtime state (observed features, buffers) is also copied.

False
copy_weights bool

If True, model weights are copied (otherwise the module is re‑initialised).

False
Source code in deep_river/base.py
def clone(
    self,
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
):
    """Return a fresh estimator instance with (optionally) copied state.

    Parameters
    ----------
    new_params : dict | None
        Parameter overrides for the cloned instance.
    include_attributes : bool, default=False
        If True, runtime state (observed features, buffers) is also copied.
    copy_weights : bool, default=False
        If True, model weights are copied (otherwise the module is re‑initialised).
    """
    new_params = new_params or {}
    copy_weights = new_params.pop("copy_weights", copy_weights)

    params = {**self._get_all_init_params(), **new_params}

    if "module" not in new_params:
        params["module"] = self._rebuild_module()

    new_est = self.__class__(**self._filter_kwargs(self.__class__.__init__, params))

    if copy_weights and hasattr(self.module, "state_dict"):
        new_est.module.load_state_dict(self.module.state_dict())

    if include_attributes:
        new_est._restore_runtime_state(self._get_runtime_state())

    return new_est

draw

draw()

Render a (partial) computational graph of the wrapped model.

Imports graphviz and torchviz lazily. Raises an informative ImportError if the optional dependencies are not installed.

Source code in deep_river/base.py
def draw(self):  # type: ignore[override]
    """Render a (partial) computational graph of the wrapped model.

    Imports ``graphviz`` and ``torchviz`` lazily. Raises an informative
    ImportError if the optional dependencies are not installed.
    """
    try:  # pragma: no cover
        from torchviz import make_dot  # type: ignore
    except Exception as err:  # noqa: BLE001
        raise ImportError(
            "graphviz and torchviz must be installed to draw the model."
        ) from err

    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

learn_many

learn_many(X: DataFrame, y: Series) -> None

Batch update: extend window with rows of X and perform a step.

Source code in deep_river/classification/rolling_classifier.py
def learn_many(self, X: pd.DataFrame, y: pd.Series) -> None:
    """Batch update: extend window with rows of X and perform a step."""
    self._update_observed_targets(y)
    self._update_observed_features(X)
    X = X[list(self.observed_features)]
    self._x_window.extend(X.values.tolist())
    X_t = self._deque2rolling_tensor(self._x_window)
    self._learn(x=X_t, y=y)

learn_one

learn_one(x: dict, y: ClfTarget, **kwargs) -> None

Learn from a single (x, y) updating the rolling window.

Source code in deep_river/classification/rolling_classifier.py
def learn_one(self, x: dict, y: ClfTarget, **kwargs) -> None:
    """Learn from a single (x, y) updating the rolling window."""
    self._update_observed_features(x)
    self._update_observed_targets(y)
    self._x_window.append([x.get(feature, 0) for feature in self.observed_features])
    x_t = self._deque2rolling_tensor(self._x_window)
    self._learn(x=x_t, y=y)

load classmethod

load(filepath: Union[str, Path])

Load a previously saved estimator.

The method reconstructs the estimator class, its wrapped module, optimiser state and runtime information (feature names, buffers, etc.).

Source code in deep_river/base.py
@classmethod
def load(cls, filepath: Union[str, Path]):
    """Load a previously saved estimator.

    The method reconstructs the estimator class, its wrapped module, optimiser
    state and runtime information (feature names, buffers, etc.).
    """
    with open(filepath, "rb") as f:
        state = pickle.load(f)

    estimator_cls = cls._import_from_path(state["estimator_class"])
    init_params = state["init_params"]

    # Rebuild module if needed
    if "module" in init_params and isinstance(init_params["module"], dict):
        module_info = init_params.pop("module")
        module_cls = cls._import_from_path(module_info["class"])
        module = module_cls(
            **cls._filter_kwargs(module_cls.__init__, module_info["kwargs"])
        )
        if state.get("model_state_dict"):
            module.load_state_dict(state["model_state_dict"])
        init_params["module"] = module

    estimator = estimator_cls(
        **cls._filter_kwargs(estimator_cls.__init__, init_params)
    )

    if state.get("optimizer_state_dict") and hasattr(estimator, "optimizer"):
        try:
            estimator.optimizer.load_state_dict(
                state["optimizer_state_dict"]  # type: ignore[arg-type]
            )
        except Exception:  # noqa: E722
            pass

    estimator._restore_runtime_state(state.get("runtime_state", {}))
    return estimator

predict_proba_many

predict_proba_many(X: DataFrame) -> DataFrame

Return probability DataFrame for multiple samples with rolling context.

Source code in deep_river/classification/rolling_classifier.py
def predict_proba_many(self, X: pd.DataFrame) -> pd.DataFrame:
    """Return probability DataFrame for multiple samples with rolling context."""
    self._update_observed_features(X)
    X = X[list(self.observed_features)]
    x_win = self._x_window.copy()
    x_win.extend(X.values.tolist())
    if self.append_predict:
        self._x_window = x_win
    self.module.eval()
    with torch.inference_mode():
        x_t = self._deque2rolling_tensor(x_win)
        probas = self.module(x_t).detach().tolist()
    return pd.DataFrame(probas)

predict_proba_one

predict_proba_one(x: dict) -> Dict[ClfTarget, float]

Return class probability mapping for one sample using rolling context.

Source code in deep_river/classification/rolling_classifier.py
def predict_proba_one(self, x: dict) -> Dict[ClfTarget, float]:
    """Return class probability mapping for one sample using rolling context."""
    self._update_observed_features(x)
    x_win = self._x_window.copy()
    x_win.append([x.get(feature, 0) for feature in self.observed_features])
    if self.append_predict:
        self._x_window = x_win
    self.module.eval()
    with torch.inference_mode():
        x_t = self._deque2rolling_tensor(x_win)
        y_pred = self.module(x_t)
        proba = output2proba(y_pred, self.observed_classes, self.output_is_logit)
    return cast(Dict[ClfTarget, float], proba[0])

save

save(filepath: Union[str, Path]) -> None

Persist the estimator (architecture, weights, optimiser & runtime state).

Parameters:

Name Type Description Default
filepath str | Path

Destination file. Parent directories are created automatically.

required
Source code in deep_river/base.py
def save(self, filepath: Union[str, Path]) -> None:
    """Persist the estimator (architecture, weights, optimiser & runtime state).

    Parameters
    ----------
    filepath : str | Path
        Destination file. Parent directories are created automatically.
    """
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)

    state = {
        "estimator_class": f"{type(self).__module__}.{type(self).__name__}",
        "init_params": self._get_all_init_params(),
        "model_state_dict": getattr(self.module, "state_dict", lambda: {})(),
        "optimizer_state_dict": getattr(self.optimizer, "state_dict", lambda: {})(),
        "runtime_state": self._get_runtime_state(),
    }

    with open(filepath, "wb") as f:
        pickle.dump(state, f)