# -*- coding: utf-8 -*-
import operator
from typing import Union, Optional, Sequence, Any
import jax
import numpy as np
from jax import numpy as jnp
from jax.dtypes import canonicalize_dtype
from jax.tree_util import register_pytree_node_class
from brainpy.errors import MathError
from . import defaults
bm = None
__all__ = [
'Array', 'ndarray', 'JaxArray', # alias of Array
'ShardedArray',
]
# Ways to change values in a zero-dimensional array
# -----
# Reference: https://stackoverflow.com/questions/56954714/how-do-i-assign-to-a-zero-dimensional-numpy-array
#
# >>> x = np.array(10)
# 1. index the original array with ellipsis or an empty tuple
# >>> x[...] = 2
# >>> x[()] = 2
_all_slice = slice(None, None, None)
def _check_input_array(array):
if isinstance(array, Array):
return array.value
elif isinstance(array, np.ndarray):
return jnp.asarray(array)
else:
return array
def _return(a):
if defaults.numpy_func_return == 'bp_array' and isinstance(a, jax.Array) and a.ndim > 0:
return Array(a)
return a
def _as_jax_array_(obj):
return obj.value if isinstance(obj, Array) else obj
def _check_out(out):
if not isinstance(out, Array):
raise TypeError(f'out must be an instance of brainpy Array. But got {type(out)}')
def _get_dtype(v):
if hasattr(v, 'dtype'):
dtype = v.dtype
else:
dtype = canonicalize_dtype(type(v))
return dtype
[docs]
@register_pytree_node_class
class Array(object):
"""Multiple-dimensional array in BrainPy.
Compared to ``jax.Array``, :py:class:`~.Array` has the following advantages:
- In-place updating is supported.
>>> import brainpy.math as bm
>>> a = bm.asarray([1, 2, 3.])
>>> a[0] = 10.
- Keep sharding constraints during computation.
- More dense array operations with PyTorch syntax.
"""
__slots__ = ('_value', )
def __init__(self, value, dtype: Any = None):
# array value
if isinstance(value, Array):
value = value._value
elif isinstance(value, (tuple, list, np.ndarray)):
value = jnp.asarray(value)
if dtype is not None:
value = jnp.asarray(value, dtype=dtype)
self._value = value
def _check_tracer(self):
self_value = self.value
if hasattr(self_value, '_trace') and hasattr(self_value._trace.main, 'jaxpr_stack'):
if len(self_value._trace.main.jaxpr_stack) == 0:
raise RuntimeError('This Array is modified during the transformation. '
'BrainPy only supports transformations for Variable. '
'Please declare it as a Variable.') from jax.core.escaped_tracer_error(self_value, None)
return self_value
@property
def sharding(self):
return self._value.sharding
@property
def addressable_shards(self):
return self._value.addressable_shards
@property
def value(self):
# return the value
return self._value
@value.setter
def value(self, value):
self_value = self._check_tracer()
if isinstance(value, Array):
value = value.value
elif isinstance(value, np.ndarray):
value = jnp.asarray(value)
elif isinstance(value, jax.Array):
pass
else:
value = jnp.asarray(value)
# check
if value.shape != self_value.shape:
raise MathError(f"The shape of the original data is {self_value.shape}, "
f"while we got {value.shape}.")
if value.dtype != self_value.dtype:
raise MathError(f"The dtype of the original data is {self_value.dtype}, "
f"while we got {value.dtype}.")
self._value = value
[docs]
def update(self, value):
"""Update the value of this Array.
"""
self.value = value
@property
def dtype(self):
"""Variable dtype."""
return _get_dtype(self._value)
@property
def shape(self):
"""Variable shape."""
return self.value.shape
@property
def ndim(self):
return self.value.ndim
@property
def imag(self):
return _return(self.value.image)
@property
def real(self):
return _return(self.value.real)
@property
def size(self):
return self.value.size
@property
def T(self):
return _return(self.value.T)
# ----------------------- #
# Python inherent methods #
# ----------------------- #
def __repr__(self) -> str:
print_code = repr(self.value)
if ', dtype' in print_code:
print_code = print_code.split(', dtype')[0] + ')'
prefix = f'{self.__class__.__name__}'
prefix2 = f'{self.__class__.__name__}(value='
if '\n' in print_code:
lines = print_code.split("\n")
blank1 = " " * len(prefix2)
lines[0] = prefix2 + lines[0]
for i in range(1, len(lines)):
lines[i] = blank1 + lines[i]
lines[-1] += ","
blank2 = " " * (len(prefix) + 1)
lines.append(f'{blank2}dtype={self.dtype})')
print_code = "\n".join(lines)
else:
print_code = prefix2 + print_code + f', dtype={self.dtype})'
return print_code
def __format__(self, format_spec: str) -> str:
return format(self.value)
def __iter__(self):
"""Solve the issue of DeviceArray.__iter__.
Details please see JAX issues:
- https://github.com/google/jax/issues/7713
- https://github.com/google/jax/pull/3821
"""
for i in range(self.value.shape[0]):
yield self.value[i]
def __getitem__(self, index):
if isinstance(index, slice) and (index == _all_slice):
return self.value
elif isinstance(index, tuple):
index = tuple((x.value if isinstance(x, Array) else x) for x in index)
elif isinstance(index, Array):
index = index.value
return self.value[index]
def __setitem__(self, index, value):
# value is Array
if isinstance(value, Array):
value = value.value
# value is numpy.ndarray
elif isinstance(value, np.ndarray):
value = jnp.asarray(value)
# index is a tuple
if isinstance(index, tuple):
index = tuple(_check_input_array(x) for x in index)
# index is Array
elif isinstance(index, Array):
index = index.value
# index is numpy.ndarray
elif isinstance(index, np.ndarray):
index = jnp.asarray(index)
# update
self_value = self._check_tracer()
self.value = self_value.at[index].set(value)
# ---------- #
# operations #
# ---------- #
def __len__(self) -> int:
return len(self.value)
def __neg__(self):
return _return(self.value.__neg__())
def __pos__(self):
return _return(self.value.__pos__())
def __abs__(self):
return _return(self.value.__abs__())
def __invert__(self):
return _return(self.value.__invert__())
def __eq__(self, oc):
return _return(self.value == _check_input_array(oc))
def __ne__(self, oc):
return _return(self.value != _check_input_array(oc))
def __lt__(self, oc):
return _return(self.value < _check_input_array(oc))
def __le__(self, oc):
return _return(self.value <= _check_input_array(oc))
def __gt__(self, oc):
return _return(self.value > _check_input_array(oc))
def __ge__(self, oc):
return _return(self.value >= _check_input_array(oc))
def __add__(self, oc):
return _return(self.value + _check_input_array(oc))
def __radd__(self, oc):
return _return(self.value + _check_input_array(oc))
def __iadd__(self, oc):
# a += b
self.value = self.value + _check_input_array(oc)
return self
def __sub__(self, oc):
return _return(self.value - _check_input_array(oc))
def __rsub__(self, oc):
return _return(_check_input_array(oc) - self.value)
def __isub__(self, oc):
# a -= b
self.value = self.value - _check_input_array(oc)
return self
def __mul__(self, oc):
return _return(self.value * _check_input_array(oc))
def __rmul__(self, oc):
return _return(_check_input_array(oc) * self.value)
def __imul__(self, oc):
# a *= b
self.value = self.value * _check_input_array(oc)
return self
def __rdiv__(self, oc):
return _return(_check_input_array(oc) / self.value)
def __truediv__(self, oc):
return _return(self.value / _check_input_array(oc))
def __rtruediv__(self, oc):
return _return(_check_input_array(oc) / self.value)
def __itruediv__(self, oc):
# a /= b
self.value = self.value / _check_input_array(oc)
return self
def __floordiv__(self, oc):
return _return(self.value // _check_input_array(oc))
def __rfloordiv__(self, oc):
return _return(_check_input_array(oc) // self.value)
def __ifloordiv__(self, oc):
# a //= b
self.value = self.value // _check_input_array(oc)
return self
def __divmod__(self, oc):
return _return(self.value.__divmod__(_check_input_array(oc)))
def __rdivmod__(self, oc):
return _return(self.value.__rdivmod__(_check_input_array(oc)))
def __mod__(self, oc):
return _return(self.value % _check_input_array(oc))
def __rmod__(self, oc):
return _return(_check_input_array(oc) % self.value)
def __imod__(self, oc):
# a %= b
self.value = self.value % _check_input_array(oc)
return self
def __pow__(self, oc):
return _return(self.value ** _check_input_array(oc))
def __rpow__(self, oc):
return _return(_check_input_array(oc) ** self.value)
def __ipow__(self, oc):
# a **= b
self.value = self.value ** _check_input_array(oc)
return self
def __matmul__(self, oc):
return _return(self.value @ _check_input_array(oc))
def __rmatmul__(self, oc):
return _return(_check_input_array(oc) @ self.value)
def __imatmul__(self, oc):
# a @= b
self.value = self.value @ _check_input_array(oc)
return self
def __and__(self, oc):
return _return(self.value & _check_input_array(oc))
def __rand__(self, oc):
return _return(_check_input_array(oc) & self.value)
def __iand__(self, oc):
# a &= b
self.value = self.value & _check_input_array(oc)
return self
def __or__(self, oc):
return _return(self.value | _check_input_array(oc))
def __ror__(self, oc):
return _return(_check_input_array(oc) | self.value)
def __ior__(self, oc):
# a |= b
self.value = self.value | _check_input_array(oc)
return self
def __xor__(self, oc):
return _return(self.value ^ _check_input_array(oc))
def __rxor__(self, oc):
return _return(_check_input_array(oc) ^ self.value)
def __ixor__(self, oc):
# a ^= b
self.value = self.value ^ _check_input_array(oc)
return self
def __lshift__(self, oc):
return _return(self.value << _check_input_array(oc))
def __rlshift__(self, oc):
return _return(_check_input_array(oc) << self.value)
def __ilshift__(self, oc):
# a <<= b
self.value = self.value << _check_input_array(oc)
return self
def __rshift__(self, oc):
return _return(self.value >> _check_input_array(oc))
def __rrshift__(self, oc):
return _return(_check_input_array(oc) >> self.value)
def __irshift__(self, oc):
# a >>= b
self.value = self.value >> _check_input_array(oc)
return self
def __round__(self, ndigits=None):
return _return(self.value.__round__(ndigits))
# ----------------------- #
# JAX methods #
# ----------------------- #
@property
def at(self):
return self.value.at
def block_host_until_ready(self, *args):
return self.value.block_host_until_ready(*args)
def block_until_ready(self, *args):
return self.value.block_until_ready(*args)
def device(self):
return self.value.device()
@property
def device_buffer(self):
return self.value.device_buffer
# ----------------------- #
# NumPy methods #
# ----------------------- #
[docs]
def all(self, axis=None, keepdims=False):
"""Returns True if all elements evaluate to True."""
r = self.value.all(axis=axis, keepdims=keepdims)
return _return(r)
[docs]
def any(self, axis=None, keepdims=False):
"""Returns True if any of the elements of a evaluate to True."""
r = self.value.any(axis=axis, keepdims=keepdims)
return _return(r)
[docs]
def argmax(self, axis=None):
"""Return indices of the maximum values along the given axis."""
return _return(self.value.argmax(axis=axis))
[docs]
def argmin(self, axis=None):
"""Return indices of the minimum values along the given axis."""
return _return(self.value.argmin(axis=axis))
[docs]
def argpartition(self, kth, axis=-1, kind='introselect', order=None):
"""Returns the indices that would partition this array."""
return _return(self.value.argpartition(kth=kth, axis=axis, kind=kind, order=order))
[docs]
def argsort(self, axis=-1, kind=None, order=None):
"""Returns the indices that would sort this array."""
return _return(self.value.argsort(axis=axis, kind=kind, order=order))
[docs]
def astype(self, dtype):
"""Copy of the array, cast to a specified type.
Parameters
----------
dtype: str, dtype
Typecode or data-type to which the array is cast.
"""
if dtype is None:
return _return(self.value)
else:
return _return(self.value.astype(dtype))
[docs]
def byteswap(self, inplace=False):
"""Swap the bytes of the array elements
Toggle between low-endian and big-endian data representation by
returning a byteswapped array, optionally swapped in-place.
Arrays of byte-strings are not swapped. The real and imaginary
parts of a complex number are swapped individually."""
return _return(self.value.byteswap(inplace=inplace))
[docs]
def choose(self, choices, mode='raise'):
"""Use an index array to construct a new array from a set of choices."""
return _return(self.value.choose(choices=_as_jax_array_(choices), mode=mode))
[docs]
def clip(self, min=None, max=None, out=None, ):
"""Return an array whose values are limited to [min, max]. One of max or min must be given."""
min = _as_jax_array_(min)
max = _as_jax_array_(max)
r = self.value.clip(min=min, max=max)
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
[docs]
def compress(self, condition, axis=None):
"""Return selected slices of this array along given axis."""
return _return(self.value.compress(condition=_as_jax_array_(condition), axis=axis))
[docs]
def conj(self):
"""Complex-conjugate all elements."""
return _return(self.value.conj())
[docs]
def conjugate(self):
"""Return the complex conjugate, element-wise."""
return _return(self.value.conjugate())
[docs]
def copy(self):
"""Return a copy of the array."""
return _return(self.value.copy())
[docs]
def cumprod(self, axis=None, dtype=None):
"""Return the cumulative product of the elements along the given axis."""
return _return(self.value.cumprod(axis=axis, dtype=dtype))
[docs]
def cumsum(self, axis=None, dtype=None):
"""Return the cumulative sum of the elements along the given axis."""
return _return(self.value.cumsum(axis=axis, dtype=dtype))
[docs]
def diagonal(self, offset=0, axis1=0, axis2=1):
"""Return specified diagonals."""
return _return(self.value.diagonal(offset=offset, axis1=axis1, axis2=axis2))
[docs]
def dot(self, b):
"""Dot product of two arrays."""
return _return(self.value.dot(_as_jax_array_(b)))
[docs]
def fill(self, value):
"""Fill the array with a scalar value."""
self.value = jnp.ones_like(self.value) * value
def flatten(self):
return _return(self.value.flatten())
[docs]
def item(self, *args):
"""Copy an element of an array to a standard Python scalar and return it."""
return self.value.item(*args)
[docs]
def max(self, axis=None, keepdims=False, *args, **kwargs):
"""Return the maximum along a given axis."""
res = self.value.max(axis=axis, keepdims=keepdims, *args, **kwargs)
return _return(res)
[docs]
def mean(self, axis=None, dtype=None, keepdims=False, *args, **kwargs):
"""Returns the average of the array elements along given axis."""
res = self.value.mean(axis=axis, dtype=dtype, keepdims=keepdims, *args, **kwargs)
return _return(res)
[docs]
def min(self, axis=None, keepdims=False, *args, **kwargs):
"""Return the minimum along a given axis."""
res = self.value.min(axis=axis, keepdims=keepdims, *args, **kwargs)
return _return(res)
[docs]
def nonzero(self):
"""Return the indices of the elements that are non-zero."""
return tuple(_return(a) for a in self.value.nonzero())
[docs]
def prod(self, axis=None, dtype=None, keepdims=False, initial=1, where=True):
"""Return the product of the array elements over the given axis."""
res = self.value.prod(axis=axis, dtype=dtype, keepdims=keepdims, initial=initial, where=where)
return _return(res)
[docs]
def ptp(self, axis=None, keepdims=False):
"""Peak to peak (maximum - minimum) value along a given axis."""
r = self.value.ptp(axis=axis, keepdims=keepdims)
return _return(r)
[docs]
def put(self, indices, values):
"""Replaces specified elements of an array with given values.
Parameters
----------
indices: array_like
Target indices, interpreted as integers.
values: array_like
Values to place in the array at target indices.
"""
self.__setitem__(indices, values)
[docs]
def ravel(self, order=None):
"""Return a flattened array."""
return _return(self.value.ravel(order=order))
[docs]
def repeat(self, repeats, axis=None):
"""Repeat elements of an array."""
return _return(self.value.repeat(repeats=repeats, axis=axis))
[docs]
def reshape(self, *shape, order='C'):
"""Returns an array containing the same data with a new shape."""
return _return(self.value.reshape(*shape, order=order))
[docs]
def resize(self, new_shape):
"""Change shape and size of array in-place."""
self.value = self.value.reshape(new_shape)
[docs]
def round(self, decimals=0):
"""Return ``a`` with each element rounded to the given number of decimals."""
return _return(self.value.round(decimals=decimals))
[docs]
def searchsorted(self, v, side='left', sorter=None):
"""Find indices where elements should be inserted to maintain order.
Find the indices into a sorted array `a` such that, if the
corresponding elements in `v` were inserted before the indices, the
order of `a` would be preserved.
Assuming that `a` is sorted:
====== ============================
`side` returned index `i` satisfies
====== ============================
left ``a[i-1] < v <= a[i]``
right ``a[i-1] <= v < a[i]``
====== ============================
Parameters
----------
v : array_like
Values to insert into `a`.
side : {'left', 'right'}, optional
If 'left', the index of the first suitable location found is given.
If 'right', return the last such index. If there is no suitable
index, return either 0 or N (where N is the length of `a`).
sorter : 1-D array_like, optional
Optional array of integer indices that sort array a into ascending
order. They are typically the result of argsort.
Returns
-------
indices : array of ints
Array of insertion points with the same shape as `v`.
"""
return _return(self.value.searchsorted(v=_as_jax_array_(v), side=side, sorter=sorter))
[docs]
def sort(self, axis=-1, kind='quicksort', order=None):
"""Sort an array in-place.
Parameters
----------
axis : int, optional
Axis along which to sort. Default is -1, which means sort along the
last axis.
kind : {'quicksort', 'mergesort', 'heapsort', 'stable'}
Sorting algorithm. The default is 'quicksort'. Note that both 'stable'
and 'mergesort' use timsort under the covers and, in general, the
actual implementation will vary with datatype. The 'mergesort' option
is retained for backwards compatibility.
order : str or list of str, optional
When `a` is an array with fields defined, this argument specifies
which fields to compare first, second, etc. A single field can
be specified as a string, and not all fields need be specified,
but unspecified fields will still be used, in the order in which
they come up in the dtype, to break ties.
"""
self.value = self.value.sort(axis=axis, kind=kind, order=order)
[docs]
def squeeze(self, axis=None):
"""Remove axes of length one from ``a``."""
return _return(self.value.squeeze(axis=axis))
[docs]
def std(self, axis=None, dtype=None, ddof=0, keepdims=False):
"""Compute the standard deviation along the specified axis.
Returns the standard deviation, a measure of the spread of a distribution,
of the array elements. The standard deviation is computed for the
flattened array by default, otherwise over the specified axis.
Parameters
----------
axis : None or int or tuple of ints, optional
Axis or axes along which the standard deviation is computed. The
default is to compute the standard deviation of the flattened array.
If this is a tuple of ints, a standard deviation is performed over
multiple axes, instead of a single axis or all the axes as before.
dtype : dtype, optional
Type to use in computing the standard deviation. For arrays of
integer type the default is float64, for arrays of float types it is
the same as the array type.
ddof : int, optional
Means Delta Degrees of Freedom. The divisor used in calculations
is ``N - ddof``, where ``N`` represents the number of elements.
By default `ddof` is zero.
keepdims : bool, optional
If this is set to True, the axes which are reduced are left
in the result as dimensions with size one. With this option,
the result will broadcast correctly against the input array.
If the default value is passed, then `keepdims` will not be
passed through to the `std` method of sub-classes of
`ndarray`, however any non-default value will be. If the
sub-class' method does not implement `keepdims` any
exceptions will be raised.
Returns
-------
standard_deviation : ndarray, see dtype parameter above.
If `out` is None, return a new array containing the standard deviation,
otherwise return a reference to the output array.
"""
r = self.value.std(axis=axis, dtype=dtype, ddof=ddof, keepdims=keepdims)
return _return(r)
[docs]
def sum(self, axis=None, dtype=None, keepdims=False, initial=0, where=True):
"""Return the sum of the array elements over the given axis."""
res = self.value.sum(axis=axis, dtype=dtype, keepdims=keepdims, initial=initial, where=where)
return _return(res)
[docs]
def swapaxes(self, axis1, axis2):
"""Return a view of the array with `axis1` and `axis2` interchanged."""
return _return(self.value.swapaxes(axis1, axis2))
[docs]
def split(self, indices_or_sections, axis=0):
"""Split an array into multiple sub-arrays as views into ``ary``.
Parameters
----------
indices_or_sections : int, 1-D array
If `indices_or_sections` is an integer, N, the array will be divided
into N equal arrays along `axis`. If such a split is not possible,
an error is raised.
If `indices_or_sections` is a 1-D array of sorted integers, the entries
indicate where along `axis` the array is split. For example,
``[2, 3]`` would, for ``axis=0``, result in
- ary[:2]
- ary[2:3]
- ary[3:]
If an index exceeds the dimension of the array along `axis`,
an empty sub-array is returned correspondingly.
axis : int, optional
The axis along which to split, default is 0.
Returns
-------
sub-arrays : list of ndarrays
A list of sub-arrays as views into `ary`.
"""
return [_return(a) for a in jnp.split(self.value, indices_or_sections, axis=axis)]
[docs]
def take(self, indices, axis=None, mode=None):
"""Return an array formed from the elements of a at the given indices."""
return _return(self.value.take(indices=_as_jax_array_(indices), axis=axis, mode=mode))
[docs]
def tobytes(self):
"""Construct Python bytes containing the raw data bytes in the array.
Constructs Python bytes showing a copy of the raw contents of data memory.
The bytes object is produced in C-order by default. This behavior is
controlled by the ``order`` parameter."""
return self.value.tobytes()
[docs]
def tolist(self):
"""Return the array as an ``a.ndim``-levels deep nested list of Python scalars.
Return a copy of the array data as a (nested) Python list.
Data items are converted to the nearest compatible builtin Python type, via
the `~numpy.ndarray.item` function.
If ``a.ndim`` is 0, then since the depth of the nested list is 0, it will
not be a list at all, but a simple Python scalar.
"""
return self.value.tolist()
[docs]
def trace(self, offset=0, axis1=0, axis2=1, dtype=None):
"""Return the sum along diagonals of the array."""
return _return(self.value.trace(offset=offset, axis1=axis1, axis2=axis2, dtype=dtype))
[docs]
def transpose(self, *axes):
"""Returns a view of the array with axes transposed.
For a 1-D array this has no effect, as a transposed vector is simply the
same vector. To convert a 1-D array into a 2D column vector, an additional
dimension must be added. `np.atleast2d(a).T` achieves this, as does
`a[:, np.newaxis]`.
For a 2-D array, this is a standard matrix transpose.
For an n-D array, if axes are given, their order indicates how the
axes are permuted (see Examples). If axes are not provided and
``a.shape = (i[0], i[1], ... i[n-2], i[n-1])``, then
``a.transpose().shape = (i[n-1], i[n-2], ... i[1], i[0])``.
Parameters
----------
axes : None, tuple of ints, or `n` ints
* None or no argument: reverses the order of the axes.
* tuple of ints: `i` in the `j`-th place in the tuple means `a`'s
`i`-th axis becomes `a.transpose()`'s `j`-th axis.
* `n` ints: same as an n-tuple of the same ints (this form is
intended simply as a "convenience" alternative to the tuple form)
Returns
-------
out : ndarray
View of `a`, with axes suitably permuted.
"""
return _return(self.value.transpose(*axes))
[docs]
def tile(self, reps):
"""Construct an array by repeating A the number of times given by reps.
If `reps` has length ``d``, the result will have dimension of
``max(d, A.ndim)``.
If ``A.ndim < d``, `A` is promoted to be d-dimensional by prepending new
axes. So a shape (3,) array is promoted to (1, 3) for 2-D replication,
or shape (1, 1, 3) for 3-D replication. If this is not the desired
behavior, promote `A` to d-dimensions manually before calling this
function.
If ``A.ndim > d``, `reps` is promoted to `A`.ndim by pre-pending 1's to it.
Thus for an `A` of shape (2, 3, 4, 5), a `reps` of (2, 2) is treated as
(1, 1, 2, 2).
Note : Although tile may be used for broadcasting, it is strongly
recommended to use numpy's broadcasting operations and functions.
Parameters
----------
reps : array_like
The number of repetitions of `A` along each axis.
Returns
-------
c : ndarray
The tiled output array.
"""
return _return(self.value.tile(_as_jax_array_(reps)))
[docs]
def var(self, axis=None, dtype=None, ddof=0, keepdims=False):
"""Returns the variance of the array elements, along given axis."""
r = self.value.var(axis=axis, dtype=dtype, ddof=ddof, keepdims=keepdims)
return _return(r)
[docs]
def view(self, *args, dtype=None):
r"""New view of array with the same data.
This function is compatible with pytorch syntax.
Returns a new tensor with the same data as the :attr:`self` tensor but of a
different :attr:`shape`.
The returned tensor shares the same data and must have the same number
of elements, but may have a different size. For a tensor to be viewed, the new
view size must be compatible with its original size and stride, i.e., each new
view dimension must either be a subspace of an original dimension, or only span
across original dimensions :math:`d, d+1, \dots, d+k` that satisfy the following
contiguity-like condition that :math:`\forall i = d, \dots, d+k-1`,
.. math::
\text{stride}[i] = \text{stride}[i+1] \times \text{size}[i+1]
Otherwise, it will not be possible to view :attr:`self` tensor as :attr:`shape`
without copying it (e.g., via :meth:`contiguous`). When it is unclear whether a
:meth:`view` can be performed, it is advisable to use :meth:`reshape`, which
returns a view if the shapes are compatible, and copies (equivalent to calling
:meth:`contiguous`) otherwise.
Args:
shape (int...): the desired size
Example::
>>> x = brainpy.math.random.randn(4, 4)
>>> x.size
[4, 4]
>>> y = x.view(16)
>>> y.size
[16]
>>> z = x.view(-1, 8) # the size -1 is inferred from other dimensions
>>> z.size
[2, 8]
>>> a = brainpy.math.random.randn(1, 2, 3, 4)
>>> a.size
[1, 2, 3, 4]
>>> b = a.transpose(1, 2) # Swaps 2nd and 3rd dimension
>>> b.size
[1, 3, 2, 4]
>>> c = a.view(1, 3, 2, 4) # Does not change tensor layout in memory
>>> c.size
[1, 3, 2, 4]
>>> brainpy.math.equal(b, c)
False
.. method:: view(dtype) -> Tensor
:noindex:
Returns a new tensor with the same data as the :attr:`self` tensor but of a
different :attr:`dtype`.
If the element size of :attr:`dtype` is different than that of ``self.dtype``,
then the size of the last dimension of the output will be scaled
proportionally. For instance, if :attr:`dtype` element size is twice that of
``self.dtype``, then each pair of elements in the last dimension of
:attr:`self` will be combined, and the size of the last dimension of the output
will be half that of :attr:`self`. If :attr:`dtype` element size is half that
of ``self.dtype``, then each element in the last dimension of :attr:`self` will
be split in two, and the size of the last dimension of the output will be
double that of :attr:`self`. For this to be possible, the following conditions
must be true:
* ``self.dim()`` must be greater than 0.
* ``self.stride(-1)`` must be 1.
Additionally, if the element size of :attr:`dtype` is greater than that of
``self.dtype``, the following conditions must be true as well:
* ``self.size(-1)`` must be divisible by the ratio between the element
sizes of the dtypes.
* ``self.storage_offset()`` must be divisible by the ratio between the
element sizes of the dtypes.
* The strides of all dimensions, except the last dimension, must be
divisible by the ratio between the element sizes of the dtypes.
If any of the above conditions are not met, an error is thrown.
Args:
dtype (:class:`dtype`): the desired dtype
Example::
>>> x = brainpy.math.random.randn(4, 4)
>>> x
Array([[ 0.9482, -0.0310, 1.4999, -0.5316],
[-0.1520, 0.7472, 0.5617, -0.8649],
[-2.4724, -0.0334, -0.2976, -0.8499],
[-0.2109, 1.9913, -0.9607, -0.6123]])
>>> x.dtype
brainpy.math.float32
>>> y = x.view(brainpy.math.int32)
>>> y
tensor([[ 1064483442, -1124191867, 1069546515, -1089989247],
[-1105482831, 1061112040, 1057999968, -1084397505],
[-1071760287, -1123489973, -1097310419, -1084649136],
[-1101533110, 1073668768, -1082790149, -1088634448]],
dtype=brainpy.math.int32)
>>> y[0, 0] = 1000000000
>>> x
tensor([[ 0.0047, -0.0310, 1.4999, -0.5316],
[-0.1520, 0.7472, 0.5617, -0.8649],
[-2.4724, -0.0334, -0.2976, -0.8499],
[-0.2109, 1.9913, -0.9607, -0.6123]])
>>> x.view(brainpy.math.cfloat)
tensor([[ 0.0047-0.0310j, 1.4999-0.5316j],
[-0.1520+0.7472j, 0.5617-0.8649j],
[-2.4724-0.0334j, -0.2976-0.8499j],
[-0.2109+1.9913j, -0.9607-0.6123j]])
>>> x.view(brainpy.math.cfloat).size
[4, 2]
>>> x.view(brainpy.math.uint8)
tensor([[ 0, 202, 154, 59, 182, 243, 253, 188, 185, 252, 191, 63, 240, 22,
8, 191],
[227, 165, 27, 190, 128, 72, 63, 63, 146, 203, 15, 63, 22, 106,
93, 191],
[205, 59, 30, 192, 112, 206, 8, 189, 7, 95, 152, 190, 12, 147,
89, 191],
[ 43, 246, 87, 190, 235, 226, 254, 63, 111, 240, 117, 191, 177, 191,
28, 191]], dtype=brainpy.math.uint8)
>>> x.view(brainpy.math.uint8).size
[4, 16]
"""
if len(args) == 0:
if dtype is None:
raise ValueError('Provide dtype or shape.')
else:
return _return(self.value.view(dtype))
else:
if isinstance(args[0], int): # shape
if dtype is not None:
raise ValueError('Provide one of dtype or shape. Not both.')
return _return(self.value.reshape(*args))
else: # dtype
assert not isinstance(args[0], int)
assert dtype is None
return _return(self.value.view(args[0]))
# ------------------
# NumPy support
# ------------------
[docs]
def numpy(self, dtype=None):
"""Convert to numpy.ndarray."""
# warnings.warn('Deprecated since 2.1.12. Please use ".to_numpy()" instead.', DeprecationWarning)
return np.asarray(self.value, dtype=dtype)
[docs]
def to_numpy(self, dtype=None):
"""Convert to numpy.ndarray."""
return np.asarray(self.value, dtype=dtype)
[docs]
def to_jax(self, dtype=None):
"""Convert to jax.numpy.ndarray."""
if dtype is None:
return self.value
else:
return jnp.asarray(self.value, dtype=dtype)
def __array__(self, dtype=None):
"""Support ``numpy.array()`` and ``numpy.asarray()`` functions."""
return np.asarray(self.value, dtype=dtype)
def __jax_array__(self):
return self.value
[docs]
def as_variable(self):
"""As an instance of Variable."""
global bm
if bm is None: from brainpy import math as bm
return bm.Variable(self)
def __format__(self, specification):
return self.value.__format__(specification)
def __bool__(self) -> bool:
return self.value.__bool__()
def __float__(self):
return self.value.__float__()
def __int__(self):
return self.value.__int__()
def __complex__(self):
return self.value.__complex__()
def __hex__(self):
assert self.ndim == 0, 'hex only works on scalar values'
return hex(self.value) # type: ignore
def __oct__(self):
assert self.ndim == 0, 'oct only works on scalar values'
return oct(self.value) # type: ignore
def __index__(self):
return operator.index(self.value)
def __dlpack__(self):
from jax.dlpack import to_dlpack # pylint: disable=g-import-not-at-top
return to_dlpack(self.value)
# ----------------------
# PyTorch compatibility
# ----------------------
[docs]
def unsqueeze(self, dim: int) -> 'Array':
"""
Array.unsqueeze(dim) -> Array, or so called Tensor
equals
Array.expand_dims(dim)
See :func:`brainpy.math.unsqueeze`
"""
return _return(jnp.expand_dims(self.value, dim))
[docs]
def expand_dims(self, axis: Union[int, Sequence[int]]) -> 'Array':
"""
self.expand_dims(axis: int|Sequence[int])
1. 如果axis类型为int:
返回一个在self基础上的第axis维度前插入一个维度Array,
axis<0表示倒数第|axis|维度,
令n=len(self._value.shape),则axis的范围为[-(n+1),n]
2. 如果axis类型为Sequence[int]:
则返回依次扩展axis[i]的结果,
即self.expand_dims(axis)==self.expand_dims(axis[0]).expand_dims(axis[1])...expand_dims(axis[len(axis)-1])
1. If the type of axis is int:
Returns an Array of dimensions inserted before the axis dimension based on self,
The first | axis < 0 indicates the bottom axis | dimensions,
Set n=len(self._value.shape), then axis has the range [-(n+1),n]
2. If the type of axis is Sequence[int] :
Returns the result of extending axis[i] in sequence,
self.expand_dims(axis)==self.expand_dims(axis[0]).expand_dims(axis[1])... expand_dims(axis[len(axis)-1])
"""
return _return(jnp.expand_dims(self.value, axis))
[docs]
def expand_as(self, array: Union['Array', jax.Array, np.ndarray]) -> 'Array':
"""
Expand an array to a shape of another array.
Parameters
----------
array : Array
Returns
-------
expanded : Array
A readonly view on the original array with the given shape of array. It is
typically not contiguous. Furthermore, more than one element of a
expanded array may refer to a single memory location.
"""
return _return(jnp.broadcast_to(self.value, array))
def pow(self, index: int):
return _return(self.value ** index)
[docs]
def addr(
self,
vec1: Union['Array', jax.Array, np.ndarray],
vec2: Union['Array', jax.Array, np.ndarray],
*,
beta: float = 1.0,
alpha: float = 1.0,
out: Optional[Union['Array', jax.Array, np.ndarray]] = None
) -> Optional['Array']:
r"""Performs the outer-product of vectors ``vec1`` and ``vec2`` and adds it to the matrix ``input``.
Optional values beta and alpha are scaling factors on the outer product
between vec1 and vec2 and the added matrix input respectively.
.. math::
out = \beta \mathrm{input} + \alpha (\text{vec1} \bigtimes \text{vec2})
Args:
vec1: the first vector of the outer product
vec2: the second vector of the outer product
beta: multiplier for input
alpha: multiplier
out: the output tensor.
"""
vec1 = _as_jax_array_(vec1)
vec2 = _as_jax_array_(vec2)
r = alpha * jnp.outer(vec1, vec2) + beta * self.value
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
def addr_(
self,
vec1: Union['Array', jax.Array, np.ndarray],
vec2: Union['Array', jax.Array, np.ndarray],
*,
beta: float = 1.0,
alpha: float = 1.0
):
vec1 = _as_jax_array_(vec1)
vec2 = _as_jax_array_(vec2)
r = alpha * jnp.outer(vec1, vec2) + beta * self.value
self.value = r
return self
def outer(self, other: Union['Array', jax.Array, np.ndarray]) -> 'Array':
other = _as_jax_array_(other)
return _return(jnp.outer(self.value, other.value))
def abs(self, *, out: Optional[Union['Array', jax.Array, np.ndarray]] = None) -> Optional['Array']:
r = jnp.abs(self.value)
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
[docs]
def abs_(self):
"""
in-place version of Array.abs()
"""
self.value = jnp.abs(self.value)
return self
def add_(self, value):
self.value += value
return self
[docs]
def absolute(self, *, out: Optional[Union['Array', jax.Array, np.ndarray]] = None) -> Optional['Array']:
"""
alias of Array.abs
"""
return self.abs(out=out)
[docs]
def absolute_(self):
"""
alias of Array.abs_()
"""
return self.abs_()
def mul(self, value):
return _return(self.value * value)
[docs]
def mul_(self, value):
"""
In-place version of :meth:`~Array.mul`.
"""
self.value *= value
return self
[docs]
def multiply(self, value): # real signature unknown; restored from __doc__
"""
multiply(value) -> Tensor
See :func:`torch.multiply`.
"""
return self.value * value
[docs]
def multiply_(self, value): # real signature unknown; restored from __doc__
"""
multiply_(value) -> Tensor
In-place version of :meth:`~Tensor.multiply`.
"""
self.value *= value
return self
def sin(self, *, out: Optional[Union['Array', jax.Array, np.ndarray]] = None) -> Optional['Array']:
r = jnp.sin(self.value)
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
def sin_(self):
self.value = jnp.sin(self.value)
return self
def cos_(self):
self.value = jnp.cos(self.value)
return self
def cos(self, *, out: Optional[Union['Array', jax.Array, np.ndarray]] = None) -> Optional['Array']:
r = jnp.cos(self.value)
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
def tan_(self):
self.value = jnp.tan(self.value)
return self
def tan(self, *, out: Optional[Union['Array', jax.Array, np.ndarray]] = None) -> Optional['Array']:
r = jnp.tan(self.value)
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
def sinh_(self):
self.value = jnp.tanh(self.value)
return self
def sinh(self, *, out: Optional[Union['Array', jax.Array, np.ndarray]] = None) -> Optional['Array']:
r = jnp.tanh(self.value)
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
def cosh_(self):
self.value = jnp.cosh(self.value)
return self
def cosh(self, *, out: Optional[Union['Array', jax.Array, np.ndarray]] = None) -> Optional['Array']:
r = jnp.cosh(self.value)
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
def tanh_(self):
self.value = jnp.tanh(self.value)
return self
def tanh(self, *, out: Optional[Union['Array', jax.Array, np.ndarray]] = None) -> Optional['Array']:
r = jnp.tanh(self.value)
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
def arcsin_(self):
self.value = jnp.arcsin(self.value)
return self
def arcsin(self, *, out: Optional[Union['Array', jax.Array, np.ndarray]] = None) -> Optional['Array']:
r = jnp.arcsin(self.value)
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
def arccos_(self):
self.value = jnp.arccos(self.value)
return self
def arccos(self, *, out: Optional[Union['Array', jax.Array, np.ndarray]] = None) -> Optional['Array']:
r = jnp.arccos(self.value)
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
def arctan_(self):
self.value = jnp.arctan(self.value)
return self
def arctan(self, *, out: Optional[Union['Array', jax.Array, np.ndarray]] = None) -> Optional['Array']:
r = jnp.arctan(self.value)
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
[docs]
def clamp(
self,
min_value: Optional[Union['Array', jax.Array, np.ndarray]] = None,
max_value: Optional[Union['Array', jax.Array, np.ndarray]] = None,
*,
out: Optional[Union['Array', jax.Array, np.ndarray]] = None
) -> Optional['Array']:
"""
return the value between min_value and max_value,
if min_value is None, then no lower bound,
if max_value is None, then no upper bound.
"""
min_value = _as_jax_array_(min_value)
max_value = _as_jax_array_(max_value)
r = jnp.clip(self.value, max_value, max_value)
if out is None:
return _return(r)
else:
_check_out(out)
out.value = r
[docs]
def clamp_(self,
min_value: Optional[Union['Array', jax.Array, np.ndarray]] = None,
max_value: Optional[Union['Array', jax.Array, np.ndarray]] = None):
"""
return the value between min_value and max_value,
if min_value is None, then no lower bound,
if max_value is None, then no upper bound.
"""
self.clamp(min_value, max_value, out=self)
return self
[docs]
def clip_(self,
min_value: Optional[Union['Array', jax.Array, np.ndarray]] = None,
max_value: Optional[Union['Array', jax.Array, np.ndarray]] = None):
"""
alias for clamp_
"""
self.value = self.clip(min_value, max_value, out=self)
return self
def clone(self) -> 'Array':
return _return(self.value.copy())
def copy_(self, src: Union['Array', jax.Array, np.ndarray]) -> 'Array':
self.value = jnp.copy(_as_jax_array_(src))
return self
def cov_with(
self,
y: Optional[Union['Array', jax.Array, np.ndarray]] = None,
rowvar: bool = True,
bias: bool = False,
ddof: Optional[int] = None,
fweights: Union['Array', jax.Array, np.ndarray] = None,
aweights: Union['Array', jax.Array, np.ndarray] = None
) -> 'Array':
y = _as_jax_array_(y)
fweights = _as_jax_array_(fweights)
aweights = _as_jax_array_(aweights)
r = jnp.cov(self.value, y, rowvar, bias, fweights, aweights)
return _return(r)
[docs]
def expand(self, *sizes) -> 'Array':
"""
Expand an array to a new shape.
Parameters
----------
sizes : tuple or int
The shape of the desired array. A single integer ``i`` is interpreted
as ``(i,)``.
Returns
-------
expanded : Array
A readonly view on the original array with the given shape. It is
typically not contiguous. Furthermore, more than one element of a
expanded array may refer to a single memory location.
"""
l_ori = len(self.shape)
l_tar = len(sizes)
base = l_tar - l_ori
sizes_list = list(sizes)
if base < 0:
raise ValueError(f'the number of sizes provided ({len(sizes)}) must be greater or equal to the number of '
f'dimensions in the tensor ({len(self.shape)})')
for i, v in enumerate(sizes[:base]):
if v < 0:
raise ValueError(
f'The expanded size of the tensor ({v}) isn\'t allowed in a leading, non-existing dimension {i + 1}')
for i, v in enumerate(self.shape):
sizes_list[base + i] = v if sizes_list[base + i] == -1 else sizes_list[base + i]
if v != 1 and sizes_list[base + i] != v:
raise ValueError(
f'The expanded size of the tensor ({sizes_list[base + i]}) must match the existing size ({v}) at non-singleton '
f'dimension {i}. Target sizes: {sizes}. Tensor sizes: {self.shape}')
return _return(jnp.broadcast_to(self.value, sizes_list))
def tree_flatten(self):
return (self.value,), None
@classmethod
def tree_unflatten(cls, aux_data, flat_contents):
return cls(*flat_contents)
def zero_(self):
self.value = jnp.zeros_like(self.value)
return self
def fill_(self, value):
self.fill(value)
return self
def uniform_(self, low=0., high=1.):
global bm
if bm is None: from brainpy import math as bm
self.value = bm.random.uniform(low, high, self.shape)
return self
[docs]
def log_normal_(self, mean=1, std=2):
r"""Fills self tensor with numbers samples from the log-normal distribution parameterized by the given mean
:math:`\mu` and standard deviation :math:`\sigma`. Note that mean and std are the mean and standard
deviation of the underlying normal distribution, and not of the returned distribution:
.. math::
f(x)=\frac{1}{x \sigma \sqrt{2 \pi}} e^{-\frac{(\ln x-\mu)^2}{2 \sigma^2}}
Args:
mean: the mean value.
std: the standard deviation.
"""
global bm
if bm is None: from brainpy import math as bm
self.value = bm.random.lognormal(mean, std, self.shape)
return self
[docs]
def normal_(self, ):
"""
Fills self tensor with elements samples from the normal distribution parameterized by mean and std.
"""
global bm
if bm is None: from brainpy import math as bm
self.value = bm.random.randn(*self.shape)
return self
def cuda(self):
self.value = jax.device_put(self.value, jax.devices('cuda')[0])
return self
def cpu(self):
self.value = jax.device_put(self.value, jax.devices('cpu')[0])
return self
# dtype exchanging #
# ---------------- #
def bool(self): return jnp.asarray(self.value, dtype=jnp.bool_)
def int(self): return jnp.asarray(self.value, dtype=jnp.int32)
def long(self): return jnp.asarray(self.value, dtype=jnp.int64)
def half(self): return jnp.asarray(self.value, dtype=jnp.float16)
def float(self): return jnp.asarray(self.value, dtype=jnp.float32)
def double(self): return jnp.asarray(self.value, dtype=jnp.float64)
setattr(Array, "__array_priority__", 100)
JaxArray = Array
ndarray = Array
[docs]
@register_pytree_node_class
class ShardedArray(Array):
"""The sharded array, which stores data across multiple devices.
A drawback of sharding is that the data may not be evenly distributed on shards.
Args:
value: the array value.
dtype: the array type.
keep_sharding: keep the array sharding information using ``jax.lax.with_sharding_constraint``. Default True.
"""
__slots__ = ('_value', '_keep_sharding')
def __init__(self, value, dtype: Any = None, *, keep_sharding: bool = True):
super().__init__(value, dtype)
self._keep_sharding = keep_sharding
@property
def value(self):
"""The value stored in this array.
Returns:
The stored data.
"""
v = self._value
# keep sharding constraints
if self._keep_sharding and hasattr(v, 'sharding') and (v.sharding is not None):
return jax.lax.with_sharding_constraint(v, v.sharding)
# return the value
return v
@value.setter
def value(self, value):
self_value = self._check_tracer()
if isinstance(value, Array):
value = value.value
elif isinstance(value, np.ndarray):
value = jnp.asarray(value)
elif isinstance(value, jax.Array):
pass
else:
value = jnp.asarray(value)
# check
if value.shape != self_value.shape:
raise MathError(f"The shape of the original data is {self_value.shape}, "
f"while we got {value.shape}.")
if value.dtype != self_value.dtype:
raise MathError(f"The dtype of the original data is {self_value.dtype}, "
f"while we got {value.dtype}.")
self._value = value