Source code for brainpy._src.encoding.stateless_encoding

# -*- coding: utf-8 -*-

from typing import Optional

import brainpy.math as bm
from brainpy import check
from .base import Encoder

__all__ = [
  'PoissonEncoder',
  'DiffEncoder',
]


[docs] class PoissonEncoder(Encoder): r"""Encode the rate input as the Poisson spike train. Expected inputs should be between 0 and 1. If not, the input :math:`x` will be normalized to :math:`x_{\text{normalize}}` within ``[0, 1]`` according to :math:`x_{\text{normalize}} = \frac{x-\text{min_val}}{\text{max_val} - \text{min_val}}`. Given the input :math:`x`, the poisson encoder will output spikes whose firing probability is :math:`x_{\text{normalize}}`. Examples:: import brainpy as bp import brainpy.math as bm img = bm.random.random((10, 2)) # image to encode (normalized to [0., 1.]) encoder = bp.encoding.PoissonEncoder() # the encoder # encode the image at each time for run_index in range(100): spike = encoder.single_step(img) # do something # or, encode the image at multiple times once spikes = encoder.multi_steps(img, n_time=10.) Args: min_val: float. The minimal value in the given data `x`, used to the data normalization. max_val: float. The maximum value in the given data `x`, used to the data normalization. gain: float. Scale input features by the gain, defaults to ``1``. offset: float. Shift input features by the offset, defaults to ``0``. first_spk_time: float. The time to first spike, defaults to ``0``. """ def __init__( self, min_val: Optional[float] = None, max_val: Optional[float] = None, gain: float = 1.0, offset: float = 0.0, first_spk_time: float = 0., ): super().__init__() self.min_val = check.is_float(min_val, 'min_val', allow_none=True) self.max_val = check.is_float(max_val, 'max_val', allow_none=True) self.gain = check.is_float(gain, allow_none=False) self.offset = check.is_float(offset, allow_none=False) self.first_spk_time = check.is_float(first_spk_time) self.first_spk_step = int(self.first_spk_time / bm.get_dt())
[docs] def single_step(self, x, i_step: int = None): """Generate spikes at the single step according to the inputs. Args: x: Array. The rate input. i_step: int. The time step to generate spikes. Returns: out: Array. The encoded spike train. """ if i_step is None: return self.multi_steps(x, n_time=None) else: return bm.cond(bm.as_jax(i_step < self.first_spk_step), self._zero_out, self.multi_steps, x)
[docs] def multi_steps(self, x, n_time: Optional[float]): """Generate spikes at multiple steps according to the inputs. Args: x: Array. The rate input. n_time: float. Encode rate values as spike trains in the given time length. ``n_time`` is converted into the ``n_step`` according to `n_step = int(n_time / brainpy.math.dt)`. - If ``n_time=None``, encode the rate values at the current time step. Users should repeatedly call it to encode `x` as a spike train. - Else, given the ``x`` with shape ``(S, ...)``, the encoded spike train is the array with shape ``(n_step, S, ...)``. Returns: out: Array. The encoded spike train. """ n_time = int(n_time / bm.get_dt()) if (self.min_val is not None) and (self.max_val is not None): x = (x - self.min_val) / (self.max_val - self.min_val) x = x * self.gain + self.offset if n_time is not None and self.first_spk_step > 0: pre = bm.zeros((self.first_spk_step,) + x.shape, dtype=x.dtype) shape = ((n_time - self.first_spk_step,) + x.shape) post = bm.asarray(bm.random.rand(*shape) < x, dtype=x.dtype) return bm.cat([pre, post], axis=0) else: shape = x.shape if (n_time is None) else ((n_time - self.first_spk_step,) + x.shape) return bm.asarray(bm.random.rand(*shape) < x, dtype=x.dtype)
def _zero_out(self, x): return bm.zeros_like(x)
[docs] class DiffEncoder(Encoder): """Generate spike only when the difference between two subsequent time steps meets a threshold. Optionally include `off_spikes` for negative changes. Example:: >>> a = bm.array([1, 2, 2.9, 3, 3.9]) >>> encoder = DiffEncoder(threshold=1) >>> encoder.multi_steps(a) Array([1., 0., 0., 0.]) >>> encoder = DiffEncoder(threshold=1, padding=True) >>> encoder.multi_steps(a) Array([0., 1., 0., 0., 0.]) >>> b = bm.array([1, 2, 0, 2, 2.9]) >>> encoder = DiffEncoder(threshold=1, off_spike=True) >>> encoder.multi_steps(b) Array([ 1., 1., -1., 1., 0.]) >>> encoder = DiffEncoder(threshold=1, padding=True, off_spike=True) >>> encoder.multi_steps(b) Array([ 0., 1., -1., 1., 0.]) Args: threshold: float. Input features with a change greater than the thresold across one timestep will generate a spike, defaults to ``0.1``. padding: bool. Used to change how the first time step of spikes are measured. If ``True``, the first time step will be repeated with itself resulting in ``0``'s for the output spikes. If ``False``, the first time step will be padded with ``0``'s, defaults to ``False``. off_spike: bool. If ``True``, negative spikes for changes less than ``-threshold``, defaults to ``False``. """ def __init__( self, threshold: float = 0.1, padding: bool = False, off_spike: bool = False, ): super().__init__() self.threshold = threshold self.padding = padding self.off_spike = off_spike def single_step(self, *args, **kwargs): raise NotImplementedError(f'{DiffEncoder.__class__.__name__} does not support single-step encoding.')
[docs] def multi_steps(self, x): """Encoding multistep inputs with the spiking trains. Args: x: Array. The array with the shape of `(num_step, ....)`. Returns: out: Array. The spike train. """ if self.padding: diff = bm.diff(x, axis=0, prepend=x[:1]) else: diff = bm.diff(x, axis=0, prepend=bm.zeros((1,) + x.shape[1:], dtype=x.dtype)) if self.off_spike: on_spk = bm.asarray(diff >= self.threshold, dtype=x.dtype) off_spk = -bm.asarray(diff <= -self.threshold, dtype=x.dtype) return on_spk + off_spk else: return bm.asarray(diff >= self.threshold, dtype=x.dtype)