Skip to content

base

Classes:

Name Description
DeepEstimator

Incremental wrapper around a PyTorch module with dynamic feature adaptation.

RollingDeepEstimator

Extension of :class:DeepEstimator with a fixed-size rolling window.

DeepEstimator

DeepEstimator(
    module: Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 0.001,
    device: str = "cpu",
    seed: int = 42,
    is_feature_incremental: bool = False,
    gradient_clip_value: float | None = None,
    **kwargs
)

Bases: Estimator

Incremental wrapper around a PyTorch module with dynamic feature adaptation.

This class augments a regular torch.nn.Module with utilities that make it compatible with the river incremental learning API. Beyond standard online optimisation it optionally supports feature-incremental learning: whenever previously unseen input feature names appear, the first trainable layer (the input layer) can be expanded on‑the‑fly so that the model seamlessly accepts the enlarged feature space without re‑initialisation.

The class also provides a persistence protocol (save/load/clone) that captures both the module weights and the runtime state (observed feature names, rolling buffers, etc.), allowing exact round‑trips across Python sessions. Optimisers are transparently rebuilt after structural changes so any newly created parameters participate in subsequent optimisation steps.

Typical workflow
  1. Instantiate with a vanilla PyTorch module (e.g. an nn.Sequential or a custom subclass).
  2. Feed samples via higher level task specific subclasses (e.g. classifier) that call _learn internally.
  3. (Optional) Enable is_feature_incremental=True for dynamic input growth.
  4. Persist with save and later restore with load.
Example

import torch from torch import nn from deep_river.base import DeepEstimator class TinyNet(nn.Module): ... def init(self, n_features=3): ... super().init() ... self.fc = nn.Linear(n_features, 2) ... def forward(self, x): ... return self.fc(x) est = DeepEstimator( ... module=TinyNet(3), ... loss_fn='mse', ... optimizer_fn='sgd', ... is_feature_incremental=True, ... ) est._update_observed_features({'a': 1.0, 'b': 2.0, 'c': 3.0}) # internal bookkeeping True

Notes
  • The class itself is task‑agnostic. Task specific behaviour (e.g. converting labels to one‑hot encodings) lives in subclasses such as Classifier or Regressor.
  • Only the first and last trainable leaf modules are treated as input and output layers. Non‑parametric layers (e.g. ReLU) are skipped.

Parameters:

Name Type Description Default
module Module

The PyTorch model whose parameters are to be updated incrementally.

required
loss_fn str | Callable

Loss identifier or callable passed to :func:get_loss_fn.

'mse'
optimizer_fn str | Callable

Optimiser identifier or optimiser class / factory.

'sgd'
lr float

Learning rate.

1e-3
device str

Device on which the module is run.

'cpu'
seed int

Random seed (sets torch.manual_seed).

42
is_feature_incremental bool

If True, expands the input layer when new feature names are encountered.

False
gradient_clip_value float | None

If provided, gradient norm is clipped to this value each optimisation step.

None
**kwargs dict

Additional custom arguments retained for reconstruction on clone / load.

{}

Attributes:

Name Type Description
module Module

The wrapped PyTorch module.

loss_func Callable

Resolved loss function callable.

optimizer Optimizer

Optimiser instance (rebuilt after structural changes).

input_layer Module | None

First trainable leaf module (may be None if module has no parameters).

output_layer Module | None

Last trainable leaf module.

observed_features SortedSet[str]

Ordered set of feature names seen so far.

module_input_len int | None

Cached original input size of the input layer (if identifiable).

Methods:

Name Description
clone

Return a fresh estimator instance with (optionally) copied state.

draw

Render a (partial) computational graph of the wrapped model.

load

Load a previously saved estimator.

save

Persist the estimator (architecture, weights, optimiser & runtime state).

Source code in deep_river/base.py
def __init__(
    self,
    module: torch.nn.Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 1e-3,
    device: str = "cpu",
    seed: int = 42,
    is_feature_incremental: bool = False,
    gradient_clip_value: float | None = None,
    **kwargs,
):
    super().__init__()
    self.module = module
    self.lr = lr
    self.loss_func = get_loss_fn(loss_fn)
    self.loss_fn = loss_fn
    self.optimizer = get_optim_fn(optimizer_fn)(
        self.module.parameters(), lr=self.lr
    )
    self.optimizer_fn = optimizer_fn
    self.device = device
    self.seed = seed
    self.is_feature_incremental = is_feature_incremental
    self.gradient_clip_value = gradient_clip_value

    self.kwargs = kwargs

    # Explicit Optional annotations to satisfy mypy when assigning None
    self.input_layer: Optional[torch.nn.Module] = None
    self.output_layer: Optional[torch.nn.Module] = None

    candidates = self._extract_candidate_layers(self.module)
    # Pick the first parameterised layer as input_layer
    for cand in candidates:
        if any(p.requires_grad for p in cand.parameters()):
            self.input_layer = cand
            break
    else:
        self.input_layer = candidates[0] if candidates else None
    # Pick the last parameterised layer as output_layer
    for cand in reversed(candidates):
        if any(p.requires_grad for p in cand.parameters()):
            self.output_layer = cand
            break
    if self.output_layer is None and candidates:
        self.output_layer = candidates[-1]

    # Store initial expected input length
    self.module_input_len = self._get_input_size() if self.input_layer else None
    self.observed_features: SortedSet = SortedSet()
    self.module.to(self.device)
    torch.manual_seed(seed)

clone

clone(
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
)

Return a fresh estimator instance with (optionally) copied state.

Parameters:

Name Type Description Default
new_params dict | None

Parameter overrides for the cloned instance.

None
include_attributes bool

If True, runtime state (observed features, buffers) is also copied.

False
copy_weights bool

If True, model weights are copied (otherwise the module is re‑initialised).

False
Source code in deep_river/base.py
def clone(
    self,
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
):
    """Return a fresh estimator instance with (optionally) copied state.

    Parameters
    ----------
    new_params : dict | None
        Parameter overrides for the cloned instance.
    include_attributes : bool, default=False
        If True, runtime state (observed features, buffers) is also copied.
    copy_weights : bool, default=False
        If True, model weights are copied (otherwise the module is re‑initialised).
    """
    new_params = new_params or {}
    copy_weights = new_params.pop("copy_weights", copy_weights)

    params = {**self._get_all_init_params(), **new_params}

    if "module" not in new_params:
        params["module"] = self._rebuild_module()

    new_est = self.__class__(**self._filter_kwargs(self.__class__.__init__, params))

    if copy_weights and hasattr(self.module, "state_dict"):
        new_est.module.load_state_dict(self.module.state_dict())

    if include_attributes:
        new_est._restore_runtime_state(self._get_runtime_state())

    return new_est

draw

draw()

Render a (partial) computational graph of the wrapped model.

Imports graphviz and torchviz lazily. Raises an informative ImportError if the optional dependencies are not installed.

Source code in deep_river/base.py
def draw(self):  # type: ignore[override]
    """Render a (partial) computational graph of the wrapped model.

    Imports ``graphviz`` and ``torchviz`` lazily. Raises an informative
    ImportError if the optional dependencies are not installed.
    """
    try:  # pragma: no cover
        from torchviz import make_dot  # type: ignore
    except Exception as err:  # noqa: BLE001
        raise ImportError(
            "graphviz and torchviz must be installed to draw the model."
        ) from err

    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

load classmethod

load(filepath: Union[str, Path])

Load a previously saved estimator.

The method reconstructs the estimator class, its wrapped module, optimiser state and runtime information (feature names, buffers, etc.).

Source code in deep_river/base.py
@classmethod
def load(cls, filepath: Union[str, Path]):
    """Load a previously saved estimator.

    The method reconstructs the estimator class, its wrapped module, optimiser
    state and runtime information (feature names, buffers, etc.).
    """
    with open(filepath, "rb") as f:
        state = pickle.load(f)

    estimator_cls = cls._import_from_path(state["estimator_class"])
    init_params = state["init_params"]

    # Rebuild module if needed
    if "module" in init_params and isinstance(init_params["module"], dict):
        module_info = init_params.pop("module")
        module_cls = cls._import_from_path(module_info["class"])
        module = module_cls(
            **cls._filter_kwargs(module_cls.__init__, module_info["kwargs"])
        )
        if state.get("model_state_dict"):
            module.load_state_dict(state["model_state_dict"])
        init_params["module"] = module

    estimator = estimator_cls(
        **cls._filter_kwargs(estimator_cls.__init__, init_params)
    )

    if state.get("optimizer_state_dict") and hasattr(estimator, "optimizer"):
        try:
            estimator.optimizer.load_state_dict(
                state["optimizer_state_dict"]  # type: ignore[arg-type]
            )
        except Exception:  # noqa: E722
            pass

    estimator._restore_runtime_state(state.get("runtime_state", {}))
    return estimator

save

save(filepath: Union[str, Path]) -> None

Persist the estimator (architecture, weights, optimiser & runtime state).

Parameters:

Name Type Description Default
filepath str | Path

Destination file. Parent directories are created automatically.

required
Source code in deep_river/base.py
def save(self, filepath: Union[str, Path]) -> None:
    """Persist the estimator (architecture, weights, optimiser & runtime state).

    Parameters
    ----------
    filepath : str | Path
        Destination file. Parent directories are created automatically.
    """
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)

    state = {
        "estimator_class": f"{type(self).__module__}.{type(self).__name__}",
        "init_params": self._get_all_init_params(),
        "model_state_dict": getattr(self.module, "state_dict", lambda: {})(),
        "optimizer_state_dict": getattr(self.optimizer, "state_dict", lambda: {})(),
        "runtime_state": self._get_runtime_state(),
    }

    with open(filepath, "wb") as f:
        pickle.dump(state, f)

RollingDeepEstimator

RollingDeepEstimator(
    module: Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 0.001,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs
)

Bases: DeepEstimator

Extension of :class:DeepEstimator with a fixed-size rolling window.

Maintains a collections.deque of the most recent window_size inputs enabling models (e.g. sequence learners) to condition on a short history. Optionally the model's own predictions can be appended to the window (via append_predict) to facilitate iterative forecasting.

Methods:

Name Description
clone

Return a fresh estimator instance with (optionally) copied state.

draw

Render a (partial) computational graph of the wrapped model.

load

Load a previously saved estimator.

save

Persist the estimator (architecture, weights, optimiser & runtime state).

Source code in deep_river/base.py
def __init__(
    self,
    module: torch.nn.Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 1e-3,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs,
):
    self.window_size = window_size
    self.append_predict = append_predict
    self._x_window: Deque = collections.deque(maxlen=window_size)
    self._batch_i = 0
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        lr=lr,
        device=device,
        seed=seed,
        **kwargs,
    )

clone

clone(
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
)

Return a fresh estimator instance with (optionally) copied state.

Parameters:

Name Type Description Default
new_params dict | None

Parameter overrides for the cloned instance.

None
include_attributes bool

If True, runtime state (observed features, buffers) is also copied.

False
copy_weights bool

If True, model weights are copied (otherwise the module is re‑initialised).

False
Source code in deep_river/base.py
def clone(
    self,
    new_params=None,
    include_attributes: bool = False,
    copy_weights: bool = False,
):
    """Return a fresh estimator instance with (optionally) copied state.

    Parameters
    ----------
    new_params : dict | None
        Parameter overrides for the cloned instance.
    include_attributes : bool, default=False
        If True, runtime state (observed features, buffers) is also copied.
    copy_weights : bool, default=False
        If True, model weights are copied (otherwise the module is re‑initialised).
    """
    new_params = new_params or {}
    copy_weights = new_params.pop("copy_weights", copy_weights)

    params = {**self._get_all_init_params(), **new_params}

    if "module" not in new_params:
        params["module"] = self._rebuild_module()

    new_est = self.__class__(**self._filter_kwargs(self.__class__.__init__, params))

    if copy_weights and hasattr(self.module, "state_dict"):
        new_est.module.load_state_dict(self.module.state_dict())

    if include_attributes:
        new_est._restore_runtime_state(self._get_runtime_state())

    return new_est

draw

draw()

Render a (partial) computational graph of the wrapped model.

Imports graphviz and torchviz lazily. Raises an informative ImportError if the optional dependencies are not installed.

Source code in deep_river/base.py
def draw(self):  # type: ignore[override]
    """Render a (partial) computational graph of the wrapped model.

    Imports ``graphviz`` and ``torchviz`` lazily. Raises an informative
    ImportError if the optional dependencies are not installed.
    """
    try:  # pragma: no cover
        from torchviz import make_dot  # type: ignore
    except Exception as err:  # noqa: BLE001
        raise ImportError(
            "graphviz and torchviz must be installed to draw the model."
        ) from err

    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

load classmethod

load(filepath: Union[str, Path])

Load a previously saved estimator.

The method reconstructs the estimator class, its wrapped module, optimiser state and runtime information (feature names, buffers, etc.).

Source code in deep_river/base.py
@classmethod
def load(cls, filepath: Union[str, Path]):
    """Load a previously saved estimator.

    The method reconstructs the estimator class, its wrapped module, optimiser
    state and runtime information (feature names, buffers, etc.).
    """
    with open(filepath, "rb") as f:
        state = pickle.load(f)

    estimator_cls = cls._import_from_path(state["estimator_class"])
    init_params = state["init_params"]

    # Rebuild module if needed
    if "module" in init_params and isinstance(init_params["module"], dict):
        module_info = init_params.pop("module")
        module_cls = cls._import_from_path(module_info["class"])
        module = module_cls(
            **cls._filter_kwargs(module_cls.__init__, module_info["kwargs"])
        )
        if state.get("model_state_dict"):
            module.load_state_dict(state["model_state_dict"])
        init_params["module"] = module

    estimator = estimator_cls(
        **cls._filter_kwargs(estimator_cls.__init__, init_params)
    )

    if state.get("optimizer_state_dict") and hasattr(estimator, "optimizer"):
        try:
            estimator.optimizer.load_state_dict(
                state["optimizer_state_dict"]  # type: ignore[arg-type]
            )
        except Exception:  # noqa: E722
            pass

    estimator._restore_runtime_state(state.get("runtime_state", {}))
    return estimator

save

save(filepath: Union[str, Path]) -> None

Persist the estimator (architecture, weights, optimiser & runtime state).

Parameters:

Name Type Description Default
filepath str | Path

Destination file. Parent directories are created automatically.

required
Source code in deep_river/base.py
def save(self, filepath: Union[str, Path]) -> None:
    """Persist the estimator (architecture, weights, optimiser & runtime state).

    Parameters
    ----------
    filepath : str | Path
        Destination file. Parent directories are created automatically.
    """
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)

    state = {
        "estimator_class": f"{type(self).__module__}.{type(self).__name__}",
        "init_params": self._get_all_init_params(),
        "model_state_dict": getattr(self.module, "state_dict", lambda: {})(),
        "optimizer_state_dict": getattr(self.optimizer, "state_dict", lambda: {})(),
        "runtime_state": self._get_runtime_state(),
    }

    with open(filepath, "wb") as f:
        pickle.dump(state, f)