Skip to content

base

Classes:

Name Description
DeepEstimator

Abstract base class that implements basic functionality of

DeepEstimatorInitialized

Enhances PyTorch modules with dynamic adaptability to evolving features.

RollingDeepEstimator

Abstract base class that implements basic functionality of

RollingDeepEstimatorInitialized

RollingDeepEstimatorInitialized class for rolling window-based deep learning

DeepEstimator

DeepEstimator(
    module: Type[Module],
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 0.001,
    is_feature_incremental: bool = False,
    device: str = "cpu",
    seed: int = 42,
    **kwargs
)

Bases: Estimator

Abstract base class that implements basic functionality of River-compatible PyTorch wrappers.

Parameters:

Name Type Description Default
module Type[Module]

Torch Module that builds the autoencoder to be wrapped. The Module should accept parameter n_features so that the returned model's input shape can be determined based on the number of features in the initial training example.

required
loss_fn Union[str, Callable]

Loss function to be used for training the wrapped model. Can be a loss function provided by torch.nn.functional or one of the following: 'mse', 'l1', 'cross_entropy', 'binary_crossentropy', 'smooth_l1', 'kl_div'.

'mse'
optimizer_fn Union[str, Callable]

Optimizer to be used for training the wrapped model. Can be an optimizer class provided by torch.optim or one of the following: "adam", "adam_w", "sgd", "rmsprop", "lbfgs".

'sgd'
lr float

Learning rate of the optimizer.

0.001
device str

Device to run the wrapped model on. Can be "cpu" or "cuda".

'cpu'
seed int

Random seed to be used for training the wrapped model.

42
**kwargs

Parameters to be passed to the Module or the optimizer.

{}

Methods:

Name Description
clone

Clones the estimator.

draw

Draws the wrapped model.

initialize_module

Parameters

Source code in deep_river/base.py
def __init__(
    self,
    module: Type[torch.nn.Module],
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 1e-3,
    is_feature_incremental: bool = False,
    device: str = "cpu",
    seed: int = 42,
    **kwargs,
):
    super().__init__()
    self.module_cls = module
    self.module: torch.nn.Module = cast(torch.nn.Module, None)
    self.loss_func = get_loss_fn(loss_fn)
    self.loss_fn = loss_fn
    self.optimizer_func = get_optim_fn(optimizer_fn)
    self.optimizer_fn = optimizer_fn
    self.is_feature_incremental = is_feature_incremental
    self.is_class_incremental: bool = False
    self.observed_features: SortedSet[str] = SortedSet([])
    self.lr = lr
    self.device = device
    self.kwargs = kwargs
    self.seed = seed
    self.input_layer = cast(torch.nn.Module, None)
    self.input_expansion_instructions = cast(Dict, None)
    self.output_layer = cast(torch.nn.Module, None)
    self.output_expansion_instructions = cast(Dict, None)
    self.module_initialized = False

    torch.manual_seed(seed)

clone

clone(
    new_params: dict[Any, Any] | None = None,
    include_attributes=False,
)

Clones the estimator.

Parameters:

Name Type Description Default
new_params dict[Any, Any] | None

New parameters to be passed to the cloned estimator.

None
include_attributes

If True, the attributes of the estimator will be copied to the cloned estimator. This is useful when the estimator is a transformer and the attributes are the learned parameters.

False

Returns:

Type Description
DeepEstimator

The cloned estimator.

Source code in deep_river/base.py
def clone(
    self,
    new_params: dict[Any, Any] | None = None,
    include_attributes=False,
):
    """Clones the estimator.

    Parameters
    ----------
    new_params
        New parameters to be passed to the cloned estimator.
    include_attributes
        If True, the attributes of the estimator will be copied to the
        cloned estimator. This is useful when the estimator is a
        transformer and the attributes are the learned parameters.

    Returns
    -------
    DeepEstimator
        The cloned estimator.
    """
    new_params = new_params or {}
    new_params.update(self.kwargs)
    new_params.update(self._get_params())
    new_params.update({"module": self.module_cls})

    clone = self.__class__(**new_params)
    if include_attributes:
        clone.__dict__.update(self.__dict__)
    return clone

draw

draw() -> Digraph

Draws the wrapped model.

Source code in deep_river/base.py
def draw(self) -> Digraph:
    """Draws the wrapped model."""
    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

initialize_module

initialize_module(x: dict | DataFrame, **kwargs)

Parameters:

Name Type Description Default
module

The instance or class or callable to be initialized, e.g. self.module.

required
kwargs dict

The keyword arguments to initialize the instance or class. Can be an empty dict.

{}

Returns:

Type Description
instance

The initialized component.

Source code in deep_river/base.py
def initialize_module(self, x: dict | pd.DataFrame, **kwargs):
    """
    Parameters
    ----------
    module
      The instance or class or callable to be initialized, e.g.
      ``self.module``.
    kwargs : dict
      The keyword arguments to initialize the instance or class. Can be an
      empty dict.
    Returns
    -------
    instance
      The initialized component.
    """
    torch.manual_seed(self.seed)
    if isinstance(x, Dict):
        n_features = len(x)
    elif isinstance(x, pd.DataFrame):
        n_features = len(x.columns)

    if not isinstance(self.module_cls, torch.nn.Module):
        self.module = self.module_cls(
            n_features=n_features,
            **self._filter_kwargs(self.module_cls, kwargs),
        )

    self.module.to(self.device)
    self.optimizer = self.optimizer_func(self.module.parameters(), lr=self.lr)
    self.module_initialized = True

    self._get_input_output_layers(n_features=n_features)

DeepEstimatorInitialized

DeepEstimatorInitialized(
    module: Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 0.001,
    device: str = "cpu",
    seed: int = 42,
    is_feature_incremental: bool = False,
    **kwargs
)

Bases: Estimator

Enhances PyTorch modules with dynamic adaptability to evolving features.

The class extends the functionality of a base estimator by dynamically updating and expanding neural network layers to handle incremental changes in feature space. It supports feature set discovery, input size adjustments, weight expansion, and varied learning procedures. This makes it suitable for evolving input spaces while maintaining neural network integrity.

Attributes:

Name Type Description
module Module

The PyTorch model that serves as the backbone of this class's functionality.

lr float

Learning rate for model optimization.

loss_fn Union[str, Callable]

The loss function used for computing training error.

loss_func Callable

The compiled loss function produced via get_loss_fn.

optimizer Optimizer

The compiled optimizer used for updating model weights.

optimizer_fn Union[str, Callable]

The optimizer function or class used for training.

device str

The computational device (e.g., "cpu", "cuda") used for training.

seed int

The random seed for ensuring reproducible operations.

is_feature_incremental bool

Indicates whether the model should automatically expand based on new features.

kwargs dict

Additional arguments passed to the model and utilities.

input_layer Module

The input layer of the PyTorch model, determined dynamically.

output_layer Module

The output layer of the PyTorch model, determined dynamically.

observed_features SortedSet

Tracks all observed input features dynamically, allowing for feature incrementation.

Source code in deep_river/base.py
def __init__(
    self,
    module: torch.nn.Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 1e-3,
    device: str = "cpu",
    seed: int = 42,
    is_feature_incremental: bool = False,
    **kwargs,
):
    super().__init__()
    self.module = module
    self.lr = lr
    self.loss_func = get_loss_fn(loss_fn)
    self.loss_fn = loss_fn
    self.optimizer = get_optim_fn(optimizer_fn)(
        self.module.parameters(), lr=self.lr
    )
    self.optimizer_fn = optimizer_fn
    self.device = device
    self.seed = seed
    self.is_feature_incremental = is_feature_incremental

    self.kwargs = kwargs

    candidates = self._extract_candidate_layers(self.module)
    self.input_layer = candidates[0]
    self.output_layer = candidates[-1]

    # Set the expected input length based on the extracted input layer.
    self.module_input_len = self._get_input_size() if self.input_layer else None
    self.observed_features: SortedSet = SortedSet()
    self.module.to(self.device)
    torch.manual_seed(seed)

RollingDeepEstimator

RollingDeepEstimator(
    module: Type[Module],
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 0.001,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs
)

Bases: DeepEstimator

Abstract base class that implements basic functionality of River-compatible PyTorch wrappers including a rolling window to allow the model to make predictions based on multiple previous examples.

Parameters:

Name Type Description Default
module Type[Module]

Torch Module that builds the autoencoder to be wrapped. The Module should accept parameter n_features so that the returned model's input shape can be determined based on the number of features in the initial training example.

required
loss_fn Union[str, Callable]

Loss function to be used for training the wrapped model. Can be a loss function provided by torch.nn.functional or one of the following: 'mse', 'l1', 'cross_entropy', 'binary_crossentropy', 'smooth_l1', 'kl_div'.

'mse'
optimizer_fn Union[str, Callable]

Optimizer to be used for training the wrapped model. Can be an optimizer class provided by torch.optim or one of the following: "adam", "adam_w", "sgd", "rmsprop", "lbfgs".

'sgd'
lr float

Learning rate of the optimizer.

0.001
device str

Device to run the wrapped model on. Can be "cpu" or "cuda".

'cpu'
seed int

Random seed to be used for training the wrapped model.

42
window_size int

Size of the rolling window used for storing previous examples.

10
append_predict bool

Whether to append inputs passed for prediction to the rolling window.

False
**kwargs

Parameters to be passed to the Module or the optimizer.

{}

Methods:

Name Description
clone

Clones the estimator.

draw

Draws the wrapped model.

initialize_module

Parameters

Source code in deep_river/base.py
def __init__(
    self,
    module: Type[torch.nn.Module],
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 1e-3,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs,
):
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        lr=lr,
        device=device,
        seed=seed,
        **kwargs,
    )

    self.window_size = window_size
    self.append_predict = append_predict
    self._x_window: Deque = collections.deque(maxlen=window_size)
    self._batch_i = 0

clone

clone(
    new_params: dict[Any, Any] | None = None,
    include_attributes=False,
)

Clones the estimator.

Parameters:

Name Type Description Default
new_params dict[Any, Any] | None

New parameters to be passed to the cloned estimator.

None
include_attributes

If True, the attributes of the estimator will be copied to the cloned estimator. This is useful when the estimator is a transformer and the attributes are the learned parameters.

False

Returns:

Type Description
DeepEstimator

The cloned estimator.

Source code in deep_river/base.py
def clone(
    self,
    new_params: dict[Any, Any] | None = None,
    include_attributes=False,
):
    """Clones the estimator.

    Parameters
    ----------
    new_params
        New parameters to be passed to the cloned estimator.
    include_attributes
        If True, the attributes of the estimator will be copied to the
        cloned estimator. This is useful when the estimator is a
        transformer and the attributes are the learned parameters.

    Returns
    -------
    DeepEstimator
        The cloned estimator.
    """
    new_params = new_params or {}
    new_params.update(self.kwargs)
    new_params.update(self._get_params())
    new_params.update({"module": self.module_cls})

    clone = self.__class__(**new_params)
    if include_attributes:
        clone.__dict__.update(self.__dict__)
    return clone

draw

draw() -> Digraph

Draws the wrapped model.

Source code in deep_river/base.py
def draw(self) -> Digraph:
    """Draws the wrapped model."""
    first_parameter = next(self.module.parameters())
    input_shape = first_parameter.size()
    y_pred = self.module(torch.rand(input_shape))
    return make_dot(y_pred.mean(), params=dict(self.module.named_parameters()))

initialize_module

initialize_module(x: dict | DataFrame, **kwargs)

Parameters:

Name Type Description Default
module

The instance or class or callable to be initialized, e.g. self.module.

required
kwargs dict

The keyword arguments to initialize the instance or class. Can be an empty dict.

{}

Returns:

Type Description
instance

The initialized component.

Source code in deep_river/base.py
def initialize_module(self, x: dict | pd.DataFrame, **kwargs):
    """
    Parameters
    ----------
    module
      The instance or class or callable to be initialized, e.g.
      ``self.module``.
    kwargs : dict
      The keyword arguments to initialize the instance or class. Can be an
      empty dict.
    Returns
    -------
    instance
      The initialized component.
    """
    torch.manual_seed(self.seed)
    if isinstance(x, Dict):
        n_features = len(x)
    elif isinstance(x, pd.DataFrame):
        n_features = len(x.columns)

    if not isinstance(self.module_cls, torch.nn.Module):
        self.module = self.module_cls(
            n_features=n_features,
            **self._filter_kwargs(self.module_cls, kwargs),
        )

    self.module.to(self.device)
    self.optimizer = self.optimizer_func(self.module.parameters(), lr=self.lr)
    self.module_initialized = True

    self._get_input_output_layers(n_features=n_features)

RollingDeepEstimatorInitialized

RollingDeepEstimatorInitialized(
    module: Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 0.001,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs
)

Bases: DeepEstimatorInitialized

RollingDeepEstimatorInitialized class for rolling window-based deep learning model estimation.

This class extends the functionality of the DeepEstimatorInitialized class to support training and prediction using a rolling window. It maintains a fixed-size deque to store a rolling window of input data. It can optionally append predictions to the input window to facilitate iterative prediction workflows. This class is designed for advanced users who need rolling window functionality in their deep learning estimation pipelines.

Attributes:

Name Type Description
window_size int

The size of the rolling window used for training and prediction.

append_predict bool

Flag to indicate whether to append predictions into the rolling window.

_x_window Deque

A fixed-size deque object, which stores the most recent input window data.

_batch_i int

The internal counter for batch index tracking during training or prediction.

Source code in deep_river/base.py
def __init__(
    self,
    module: torch.nn.Module,
    loss_fn: Union[str, Callable] = "mse",
    optimizer_fn: Union[str, Callable] = "sgd",
    lr: float = 1e-3,
    device: str = "cpu",
    seed: int = 42,
    window_size: int = 10,
    append_predict: bool = False,
    **kwargs,
):
    self.window_size = window_size
    self.append_predict = append_predict
    self._x_window: Deque = collections.deque(maxlen=window_size)
    self._batch_i = 0
    super().__init__(
        module=module,
        loss_fn=loss_fn,
        optimizer_fn=optimizer_fn,
        lr=lr,
        device=device,
        seed=seed,
        **kwargs,
    )