Source code for brainpy.algorithms.offline

# -*- coding: utf-8 -*-

import warnings

import numpy as np
from jax.lax import while_loop

import brainpy.math as bm
from brainpy.base import Base
from brainpy.types import Array
from .utils import (Sigmoid,
                    Regularization, L1Regularization, L1L2Regularization, L2Regularization,
                    polynomial_features, normalize)

__all__ = [
  # base class for offline training algorithm
  'OfflineAlgorithm',

  # training methods
  'LinearRegression',
  'RidgeRegression',
  'LassoRegression',
  'LogisticRegression',
  'PolynomialRegression',
  'PolynomialRidgeRegression',
  'ElasticNetRegression',

  # general supports
  'get_supported_offline_methods',
  'register_offline_method',
]

name2func = dict()


[docs]class OfflineAlgorithm(Base): """Base class for offline training algorithm."""
[docs] def __init__(self, name=None): super(OfflineAlgorithm, self).__init__(name=name)
def __call__(self, identifier, target, input, output): """The training procedure. Parameters ---------- identifier: str The variable name. target: JaxArray, ndarray The 2d target data with the shape of `(num_batch, num_output)`. input: JaxArray, ndarray The 2d input data with the shape of `(num_batch, num_input)`. output: JaxArray, ndarray The 2d output data with the shape of `(num_batch, num_output)`. Returns ------- weight: JaxArray The weights after fit. """ return self.call(identifier, target, input, output) def call(self, identifier, targets, inputs, outputs) -> Array: """The training procedure. Parameters ---------- identifier: str The identifier. inputs: JaxArray, jax.numpy.ndarray, numpy.ndarray The 3d input data with the shape of `(num_batch, num_time, num_input)`, or, the 2d input data with the shape of `(num_time, num_input)`. targets: JaxArray, jax.numpy.ndarray, numpy.ndarray The 3d target data with the shape of `(num_batch, num_time, num_output)`, or the 2d target data with the shape of `(num_time, num_output)`. outputs: JaxArray, jax.numpy.ndarray, numpy.ndarray The 3d output data with the shape of `(num_batch, num_time, num_output)`, or the 2d output data with the shape of `(num_time, num_output)`. Returns ------- weight: JaxArray The weights after fit. """ raise NotImplementedError('Must implement the __call__ function by the subclass itself.') def __repr__(self): return self.__class__.__name__ def initialize(self, identifier, *args, **kwargs): pass
def _check_data_2d_atls(x): if x.ndim < 2: raise ValueError(f'Data must be a 2d tensor. But we got {x.ndim}d: {x.shape}.') if x.ndim != 2: return x.reshape((-1, x.shape[-1])) else: return x class RegressionAlgorithm(OfflineAlgorithm): """ Base regression model. Models the relationship between a scalar dependent variable y and the independent variables X. Parameters ---------- max_iter: int The number of training iterations the algorithm will tune the weights for. learning_rate: float The step length that will be used when updating the weights. """ def __init__( self, max_iter: int = None, learning_rate: float = None, regularizer: Regularization = None, name: str = None ): super(RegressionAlgorithm, self).__init__(name=name) self.max_iter = max_iter self.learning_rate = learning_rate self.regularizer = regularizer def initialize(self, identifier, *args, **kwargs): pass def init_weights(self, n_features, n_out): """ Initialize weights randomly [-1/N, 1/N] """ limit = 1 / np.sqrt(n_features) return bm.random.uniform(-limit, limit, (n_features, n_out)) def gradient_descent_solve(self, targets, inputs, outputs=None): # checking inputs = _check_data_2d_atls(bm.asarray(inputs)) targets = _check_data_2d_atls(bm.asarray(targets)) # initialize weights w = self.init_weights(inputs.shape[1], targets.shape[1]) def cond_fun(a): i, par_old, par_new = a return bm.logical_and(bm.logical_not(bm.allclose(par_old, par_new)), i < self.max_iter).value def body_fun(a): i, _, par_new = a # Gradient of regularization loss w.r.t w y_pred = inputs.dot(par_new) grad_w = bm.dot(inputs.T, -(targets - y_pred)) + self.regularizer.grad(par_new) # Update the weights par_new2 = par_new - self.learning_rate * grad_w return i + 1, par_new, par_new2 # Tune parameters for n iterations r = while_loop(cond_fun, body_fun, (0, w - 1e-8, w)) return r[-1] def predict(self, W, X): return bm.dot(X, W)
[docs]class LinearRegression(RegressionAlgorithm): """Training algorithm of least-square regression. Parameters ---------- name: str The name of the algorithm. """
[docs] def __init__( self, name: str = None, # parameters for using gradient descent max_iter: int = 1000, learning_rate: float = 0.001, gradient_descent: bool = False, ): super(LinearRegression, self).__init__(name=name, max_iter=max_iter, learning_rate=learning_rate, regularizer=Regularization(0.)) self.gradient_descent = gradient_descent
def call(self, identifier, targets, inputs, outputs=None): # checking inputs = _check_data_2d_atls(bm.asarray(inputs)) targets = _check_data_2d_atls(bm.asarray(targets)) # solving if self.gradient_descent: return self.gradient_descent_solve(targets, inputs) else: weights = bm.linalg.lstsq(inputs, targets) return weights[0]
name2func['linear'] = LinearRegression name2func['lstsq'] = LinearRegression
[docs]class RidgeRegression(RegressionAlgorithm): """Training algorithm of ridge regression. Parameters ---------- alpha: float The regularization coefficient. .. versionadded:: 2.2.0 beta: float The regularization coefficient. .. deprecated:: 2.2.0 Please use `alpha` to set regularization factor. name: str The name of the algorithm. """
[docs] def __init__( self, alpha: float = 1e-7, beta: float = None, name: str = None, # parameters for using gradient descent max_iter: int = 1000, learning_rate: float = 0.001, gradient_descent: bool = False, ): if beta is not None: warnings.warn(f"Please use 'alpha' to set regularization factor. " f"'beta' has been deprecated since version 2.2.0.", UserWarning) alpha = beta super(RidgeRegression, self).__init__(name=name, max_iter=max_iter, learning_rate=learning_rate, regularizer=L2Regularization(alpha=alpha)) self.gradient_descent = gradient_descent
def call(self, identifier, targets, inputs, outputs=None): # checking inputs = _check_data_2d_atls(bm.asarray(inputs)) targets = _check_data_2d_atls(bm.asarray(targets)) # solving if self.gradient_descent: return self.gradient_descent_solve(targets, inputs) else: temp = inputs.T @ inputs if self.regularizer.alpha > 0.: temp += self.regularizer.alpha * bm.eye(inputs.shape[-1]) weights = bm.linalg.pinv(temp) @ (inputs.T @ targets) return weights def __repr__(self): return f'{self.__class__.__name__}(beta={self.regularizer.alpha})'
name2func['ridge'] = RidgeRegression
[docs]class LassoRegression(RegressionAlgorithm): """Lasso regression method for offline training. Parameters ---------- alpha: float Constant that multiplies the L1 term. Defaults to 1.0. `alpha = 0` is equivalent to an ordinary least square. max_iter: int The maximum number of iterations. degree: int The degree of the polynomial that the independent variable X will be transformed to. name: str The name of the algorithm. """
[docs] def __init__( self, alpha: float = 1.0, degree: int = 2, add_bias: bool = False, name: str = None, # parameters for using gradient descent max_iter: int = 1000, learning_rate: float = 0.001, gradient_descent: bool = True, ): super(LassoRegression, self).__init__(name=name, max_iter=max_iter, learning_rate=learning_rate, regularizer=L1Regularization(alpha=alpha)) self.gradient_descent = gradient_descent self.add_bias = add_bias assert gradient_descent self.degree = degree
def call(self, identifier, targets, inputs, outputs=None): # checking inputs = _check_data_2d_atls(bm.asarray(inputs)) targets = _check_data_2d_atls(bm.asarray(targets)) # solving inputs = normalize(polynomial_features(inputs, degree=self.degree, add_bias=self.add_bias)) return super(LassoRegression, self).gradient_descent_solve(targets, inputs) def predict(self, W, X): X = _check_data_2d_atls(bm.asarray(X)) X = normalize(polynomial_features(X, degree=self.degree, add_bias=self.add_bias)) return super(LassoRegression, self).predict(W, X)
name2func['lasso'] = LassoRegression
[docs]class LogisticRegression(RegressionAlgorithm): """Logistic regression method for offline training. Parameters ---------- learning_rate: float The step length that will be taken when following the negative gradient during training. gradient_descent: boolean True or false depending on if gradient descent should be used when training. If false then we use batch optimization by least squares. max_iter: int The number of iteration to optimize the parameters. name: str The name of the algorithm. """
[docs] def __init__( self, learning_rate: float = .1, gradient_descent: bool = True, max_iter: int = 4000, name: str = None, ): super(LogisticRegression, self).__init__(name=name, max_iter=max_iter, learning_rate=learning_rate) self.gradient_descent = gradient_descent self.sigmoid = Sigmoid()
def call(self, identifier, targets, inputs, outputs=None) -> Array: # prepare data inputs = _check_data_2d_atls(bm.asarray(inputs)) targets = _check_data_2d_atls(bm.asarray(targets)) if targets.shape[-1] != 1: raise ValueError(f'Target must be a scalar, but got multiple variables: {targets.shape}. ') targets = targets.flatten() # initialize parameters param = self.init_weights(inputs.shape[1], targets.shape[1]) def cond_fun(a): i, par_old, par_new = a return bm.logical_and(bm.logical_not(bm.allclose(par_old, par_new)), i < self.max_iter).value def body_fun(a): i, par_old, par_new = a # Make a new prediction y_pred = self.sigmoid(inputs.dot(par_new)) if self.gradient_descent: # Move against the gradient of the loss function with # respect to the parameters to minimize the loss par_new2 = par_new - self.learning_rate * (y_pred - targets).dot(inputs) else: gradient = self.sigmoid.grad(inputs.dot(par_new)) diag_grad = bm.zeros((gradient.size, gradient.size)) diag = bm.arange(gradient.size) diag_grad[diag, diag] = gradient par_new2 = bm.linalg.pinv(inputs.T.dot(diag_grad).dot(inputs)).dot(inputs.T).dot( diag_grad.dot(inputs).dot(par_new) + targets - y_pred) return i + 1, par_new, par_new2 # Tune parameters for n iterations r = while_loop(cond_fun, body_fun, (0, param+1., param)) return r[-1] def predict(self, W, X): return self.sigmoid(X @ W)
name2func['logistic'] = LogisticRegression
[docs]class PolynomialRegression(LinearRegression):
[docs] def __init__( self, degree: int = 2, name: str = None, add_bias: bool = False, # parameters for using gradient descent max_iter: int = 1000, learning_rate: float = 0.001, gradient_descent: bool = True, ): super(PolynomialRegression, self).__init__(name=name, max_iter=max_iter, learning_rate=learning_rate, gradient_descent=gradient_descent) self.degree = degree self.add_bias = add_bias
def call(self, identifier, targets, inputs, outputs=None): inputs = _check_data_2d_atls(bm.asarray(inputs)) targets = _check_data_2d_atls(bm.asarray(targets)) inputs = polynomial_features(inputs, degree=self.degree, add_bias=self.add_bias) return super(PolynomialRegression, self).call(identifier, targets, inputs) def predict(self, W, X): X = _check_data_2d_atls(bm.asarray(X)) X = polynomial_features(X, degree=self.degree, add_bias=self.add_bias) return super(PolynomialRegression, self).predict(W, X)
name2func['polynomial'] = PolynomialRegression
[docs]class PolynomialRidgeRegression(RidgeRegression):
[docs] def __init__( self, alpha: float = 1.0, degree: int = 2, name: str = None, add_bias: bool = False, # parameters for using gradient descent max_iter: int = 1000, learning_rate: float = 0.001, gradient_descent: bool = True, ): super(PolynomialRidgeRegression, self).__init__(alpha=alpha, name=name, max_iter=max_iter, learning_rate=learning_rate, gradient_descent=gradient_descent) self.degree = degree self.add_bias = add_bias
def call(self, identifier, targets, inputs, outputs=None): # checking inputs = _check_data_2d_atls(bm.asarray(inputs)) targets = _check_data_2d_atls(bm.asarray(targets)) inputs = polynomial_features(inputs, degree=self.degree, add_bias=self.add_bias) return super(PolynomialRidgeRegression, self).call(identifier, targets, inputs) def predict(self, W, X): X = _check_data_2d_atls(bm.asarray(X)) X = polynomial_features(X, degree=self.degree, add_bias=self.add_bias) return super(PolynomialRidgeRegression, self).predict(W, X)
name2func['polynomial_ridge'] = PolynomialRidgeRegression
[docs]class ElasticNetRegression(RegressionAlgorithm): """ Parameters: ----------- degree: int The degree of the polynomial that the independent variable X will be transformed to. reg_factor: float The factor that will determine the amount of regularization and feature shrinkage. l1_ration: float Weighs the contribution of l1 and l2 regularization. n_iterations: float The number of training iterations the algorithm will tune the weights for. learning_rate: float The step length that will be used when updating the weights. """
[docs] def __init__( self, alpha: float = 1.0, degree: int = 2, l1_ratio: float = 0.5, name: str = None, add_bias: bool = False, # parameters for using gradient descent max_iter: int = 1000, learning_rate: float = 0.001, gradient_descent: bool = True, ): super(ElasticNetRegression, self).__init__( name=name, max_iter=max_iter, learning_rate=learning_rate, regularizer=L1L2Regularization(alpha=alpha, l1_ratio=l1_ratio) ) self.degree = degree self.add_bias = add_bias self.gradient_descent = gradient_descent assert gradient_descent
def call(self, identifier, targets, inputs, outputs=None): # checking inputs = _check_data_2d_atls(bm.asarray(inputs)) targets = _check_data_2d_atls(bm.asarray(targets)) # solving inputs = normalize(polynomial_features(inputs, degree=self.degree)) return super(ElasticNetRegression, self).gradient_descent_solve(targets, inputs) def predict(self, W, X): X = _check_data_2d_atls(bm.asarray(X)) X = normalize(polynomial_features(X, degree=self.degree, add_bias=self.add_bias)) return super(ElasticNetRegression, self).predict(W, X)
name2func['elastic_net'] = ElasticNetRegression
[docs]def get_supported_offline_methods(): """Get all supported offline training methods.""" return tuple(name2func.keys())
[docs]def register_offline_method(name: str, method: OfflineAlgorithm): """Register a new offline learning method. Parameters ---------- name: str The method name. method: OfflineAlgorithm The function method. """ if name in name2func: raise ValueError(f'"{name}" has been registered in offline training methods.') if not isinstance(method, OfflineAlgorithm): raise ValueError(f'"method" must be an instance {OfflineAlgorithm.__name__}, but we got {type(method)}') name2func[name] = method
def get(name: str) -> OfflineAlgorithm: """Get the training function according to the training method name.""" if name not in name2func: raise ValueError(f'All offline methods are: {get_supported_offline_methods()}.\n' f'But we got {name}.') return name2func[name]