# -*- coding: utf-8 -*-
from functools import partial
from typing import Optional
from jax import vmap, jit, numpy as jnp
import numpy as np
import brainpy.math as bm
from brainpy.errors import ConnectorError
from brainpy.tools import numba_seed, numba_jit, numba_range, format_seed
from brainpy._src.tools.package import SUPPORT_NUMBA
from .base import *
__all__ = [
'FixedProb',
'FixedPreNum',
'FixedPostNum',
'FixedTotalNum',
'GaussianProb',
'ProbDist',
'SmallWorld',
'ScaleFreeBA',
'ScaleFreeBADual',
'PowerLaw',
]
[docs]
class FixedProb(TwoEndConnector):
"""Connect the post-synaptic neurons with fixed probability.
Parameters
----------
prob: float
The conn probability.
pre_ratio: float
The ratio of pre-synaptic neurons to connect.
include_self : bool
Whether create (i, i) conn?
allow_multi_conn: bool
Allow one pre-synaptic neuron connects to multiple post-synaptic neurons?
.. versionadded:: 2.2.3.2
seed : optional, int
Seed the random generator.
"""
def __init__(self,
prob,
pre_ratio=1.,
include_self=True,
allow_multi_conn=False,
seed=None,
**kwargs):
super(FixedProb, self).__init__(**kwargs)
assert 0. <= prob <= 1.
assert 0. <= pre_ratio <= 1.
self.prob = prob
self.pre_ratio = pre_ratio
self.include_self = include_self
self.seed = format_seed(seed)
self.allow_multi_conn = allow_multi_conn
self._jaxrand = bm.random.default_rng(self.seed)
self._nprand = np.random.RandomState(self.seed)
def __repr__(self):
return (f'{self.__class__.__name__}(prob={self.prob}, pre_ratio={self.pre_ratio}, '
f'include_self={self.include_self}, allow_multi_conn={self.allow_multi_conn}, '
f'seed={self.seed})')
def _iii(self):
if (not self.include_self) and (self.pre_num != self.post_num):
raise ConnectorError(f'We found pre_num != post_num ({self.pre_num} != {self.post_num}). '
f'But `include_self` is set to True.')
if self.pre_ratio < 1.:
pre_num_to_select = int(self.pre_num * self.pre_ratio)
pre_ids = self._jaxrand.choice(self.pre_num, size=(pre_num_to_select,), replace=False)
else:
pre_num_to_select = self.pre_num
pre_ids = jnp.arange(self.pre_num)
post_num_total = self.post_num
post_num_to_select = int(self.post_num * self.prob)
if self.allow_multi_conn:
selected_post_ids = self._jaxrand.randint(0, post_num_total, (pre_num_to_select, post_num_to_select))
else:
if SUPPORT_NUMBA:
rng = np.random
numba_seed(self._nprand.randint(0, int(1e8)))
else:
rng = self._nprand
@numba_jit # (parallel=True, nogil=True)
def single_conn():
posts = np.zeros((pre_num_to_select, post_num_to_select), dtype=IDX_DTYPE)
for i in numba_range(pre_num_to_select):
posts[i] = rng.choice(post_num_total, post_num_to_select, replace=False)
return posts
selected_post_ids = jnp.asarray(single_conn())
return pre_num_to_select, post_num_to_select, bm.as_jax(selected_post_ids), bm.as_jax(pre_ids)
[docs]
def build_coo(self):
_, post_num_to_select, selected_post_ids, pre_ids = self._iii()
selected_post_ids = selected_post_ids.flatten()
selected_pre_ids = jnp.repeat(pre_ids, post_num_to_select)
if not self.include_self:
true_ids = selected_pre_ids != selected_post_ids
selected_pre_ids = selected_pre_ids[true_ids]
selected_post_ids = selected_post_ids[true_ids]
return selected_pre_ids.astype(get_idx_type()), selected_post_ids.astype(get_idx_type())
[docs]
def build_csr(self):
pre_num_to_select, post_num_to_select, selected_post_ids, pre_ids = self._iii()
pre_nums = jnp.ones(pre_num_to_select) * post_num_to_select
if not self.include_self:
true_ids = selected_post_ids == jnp.reshape(pre_ids, (-1, 1))
pre_nums -= jnp.sum(true_ids, axis=1)
selected_post_ids = selected_post_ids.flatten()[jnp.logical_not(true_ids).flatten()]
else:
selected_post_ids = selected_post_ids.flatten()
selected_pre_inptr = jnp.cumsum(jnp.concatenate([jnp.zeros(1), pre_nums]))
return selected_post_ids.astype(get_idx_type()), selected_pre_inptr.astype(get_idx_type())
[docs]
def build_mat(self):
if self.pre_ratio < 1.:
pre_state = self._jaxrand.uniform(size=(self.pre_num, 1)) < self.pre_ratio
mat = (self._jaxrand.uniform(size=(self.pre_num, self.post_num)) < self.prob) * pre_state
else:
mat = (self._jaxrand.uniform(size=(self.pre_num, self.post_num)) < self.prob)
mat = bm.asarray(mat)
if not self.include_self:
bm.fill_diagonal(mat, False)
return mat.astype(MAT_DTYPE)
[docs]
class FixedTotalNum(TwoEndConnector):
"""Connect the synaptic neurons with fixed total number.
Parameters
----------
num : float,int
The conn total number.
allow_multi_conn : bool, optional
Whether allow one pre-synaptic neuron connects to multiple post-synaptic neurons.
seed: int, optional
The random number seed.
"""
def __init__(self,
num,
allow_multi_conn=False,
seed=None, **kwargs):
super().__init__(**kwargs)
if isinstance(num, int):
assert num >= 0, '"num" must be a non-negative integer.'
elif isinstance(num, float):
assert 0. <= num <= 1., '"num" must be in [0., 1.).'
else:
raise ConnectorError(f'Unknown type: {type(num)}')
self.num = num
self.seed = format_seed(seed)
self.allow_multi_conn = allow_multi_conn
self.rng = bm.random.RandomState(self.seed)
[docs]
def build_coo(self):
mat_element_num = self.pre_num * self.post_num
if self.num > mat_element_num:
raise ConnectorError(f'"num" must be smaller than "all2all num", '
f'but got {self.num} > {mat_element_num}')
if self.allow_multi_conn:
selected_pre_ids = self.rng.randint(0, self.pre_num, (self.num,))
selected_post_ids = self.rng.randint(0, self.post_num, (self.num,))
else:
index = self.rng.choice(mat_element_num, size=(self.num,), replace=False)
selected_pre_ids = index // self.post_num
selected_post_ids = index % self.post_num
return selected_pre_ids.astype(get_idx_type()), selected_post_ids.astype(get_idx_type())
def __repr__(self):
return f'{self.__class__.__name__}(num={self.num}, seed={self.seed})'
class FixedNum(TwoEndConnector):
def __init__(self,
num,
include_self=True,
allow_multi_conn=False,
seed=None,
**kwargs):
super(FixedNum, self).__init__(**kwargs)
if isinstance(num, int):
assert num >= 0, '"num" must be a non-negative integer.'
elif isinstance(num, float):
assert 0. <= num <= 1., '"num" must be in [0., 1.).'
else:
raise ConnectorError(f'Unknown type: {type(num)}')
self.num = num
self.seed = format_seed(seed)
self.include_self = include_self
self.allow_multi_conn = allow_multi_conn
self.rng = bm.random.RandomState(self.seed) if allow_multi_conn else np.random.RandomState(self.seed)
def __repr__(self):
return f'{self.__class__.__name__}(num={self.num}, include_self={self.include_self}, seed={self.seed})'
[docs]
class FixedPreNum(FixedNum):
"""Connect a fixed number pf pre-synaptic neurons for each post-synaptic neuron.
Parameters
----------
num : float, int
The conn probability (if "num" is float) or the fixed number of
connectivity (if "num" is int).
include_self : bool
Whether create (i, i) conn ?
seed : None, int
Seed the random generator.
allow_multi_conn: bool
Allow one pre-synaptic neuron connects to multiple post-synaptic neurons?
.. versionadded:: 2.2.3.2
"""
[docs]
def build_coo(self):
if isinstance(self.num, int) and self.num > self.pre_num:
raise ConnectorError(f'"num" must be smaller than "pre_num", '
f'but got {self.num} > {self.pre_num}')
if (not self.include_self) and (self.pre_num != self.post_num):
raise ConnectorError(f'We found pre_num != post_num ({self.pre_num} != {self.post_num}). '
f'But `include_self` is set to True.')
pre_num_to_select = int(self.pre_num * self.num) if isinstance(self.num, float) else self.num
pre_num_total = self.pre_num
post_num_total = self.post_num
if self.allow_multi_conn:
selected_pre_ids = self.rng.randint(0, pre_num_total, (post_num_total, pre_num_to_select,))
else:
if SUPPORT_NUMBA:
rng = np.random
numba_seed(self.rng.randint(0, int(1e8)))
else:
rng = self.rng
@numba_jit # (parallel=True, nogil=True)
def single_conn():
posts = np.zeros((post_num_total, pre_num_to_select), dtype=IDX_DTYPE)
for i in numba_range(post_num_total):
posts[i] = rng.choice(pre_num_total, pre_num_to_select, replace=False)
return posts
selected_pre_ids = jnp.asarray(single_conn())
post_nums = jnp.ones((post_num_total,), dtype=get_idx_type()) * pre_num_to_select
if not self.include_self:
true_ids = selected_pre_ids == jnp.reshape(jnp.arange(pre_num_total), (-1, 1))
post_nums -= jnp.sum(true_ids, axis=1)
selected_pre_ids = selected_pre_ids.flatten()[jnp.logical_not(true_ids).flatten()]
else:
selected_pre_ids = selected_pre_ids.flatten()
selected_post_ids = jnp.repeat(jnp.arange(post_num_total), post_nums)
return selected_pre_ids.astype(get_idx_type()), selected_post_ids.astype(get_idx_type())
[docs]
class FixedPostNum(FixedNum):
"""Connect the fixed number of post-synaptic neurons for each pre-synaptic neuron.
Parameters
----------
num : float, int
The conn probability (if "num" is float) or the fixed number of
connectivity (if "num" is int).
include_self : bool
Whether create (i, i) conn ?
seed : None, int
Seed the random generator.
allow_multi_conn: bool
Allow one pre-synaptic neuron connects to multiple post-synaptic neurons?
.. versionadded:: 2.2.3.2
"""
def _ii(self):
if isinstance(self.num, int) and self.num > self.post_num:
raise ConnectorError(f'"num" must be smaller than "post_num", '
f'but got {self.num} > {self.post_num}')
if (not self.include_self) and (self.pre_num != self.post_num):
raise ConnectorError(f'We found pre_num != post_num ({self.pre_num} != {self.post_num}). '
f'But `include_self` is set to True.')
post_num_to_select = int(self.post_num * self.num) if isinstance(self.num, float) else self.num
pre_num_to_select = self.pre_num
pre_ids = jnp.arange(self.pre_num)
post_num_total = self.post_num
if self.allow_multi_conn:
selected_post_ids = self.rng.randint(0, post_num_total, (pre_num_to_select, post_num_to_select,))
else:
if SUPPORT_NUMBA:
rng = np.random
numba_seed(self.rng.randint(0, int(1e8)))
else:
rng = self.rng
@numba_jit # (parallel=True, nogil=True)
def single_conn():
posts = np.zeros((pre_num_to_select, post_num_to_select), dtype=IDX_DTYPE)
for i in numba_range(pre_num_to_select):
posts[i] = rng.choice(post_num_total, post_num_to_select, replace=False)
return posts
selected_post_ids = jnp.asarray(single_conn())
return pre_num_to_select, post_num_to_select, bm.as_jax(selected_post_ids), bm.as_jax(pre_ids)
[docs]
def build_coo(self):
_, post_num_to_select, selected_post_ids, pre_ids = self._ii()
selected_post_ids = selected_post_ids.flatten()
selected_pre_ids = jnp.repeat(pre_ids, post_num_to_select)
if not self.include_self:
true_ids = selected_pre_ids != selected_post_ids
selected_pre_ids = selected_pre_ids[true_ids]
selected_post_ids = selected_post_ids[true_ids]
return selected_pre_ids.astype(get_idx_type()), selected_post_ids.astype(get_idx_type())
[docs]
def build_csr(self):
pre_num_to_select, post_num_to_select, selected_post_ids, pre_ids = self._ii()
pre_nums = jnp.ones(pre_num_to_select) * post_num_to_select
if not self.include_self:
true_ids = selected_post_ids == jnp.reshape(pre_ids, (-1, 1))
pre_nums -= jnp.sum(true_ids, axis=1)
selected_post_ids = selected_post_ids.flatten()[jnp.logical_not(true_ids).flatten()]
else:
selected_post_ids = selected_post_ids.flatten()
selected_pre_inptr = jnp.cumsum(jnp.concatenate([jnp.zeros(1), pre_nums]))
return selected_post_ids.astype(get_idx_type()), selected_pre_inptr.astype(get_idx_type())
@jit
@partial(vmap, in_axes=(0, None, None))
def gaussian_prob_dist_cal1(i_value, post_values, sigma):
dists = jnp.abs(i_value - post_values)
exp_dists = jnp.exp(-(jnp.sqrt(jnp.sum(dists ** 2, axis=0)) / sigma) ** 2 / 2)
return bm.asarray(exp_dists)
@jit
@partial(vmap, in_axes=(0, None, None, None))
def gaussian_prob_dist_cal2(i_value, post_values, value_sizes, sigma):
dists = jnp.abs(i_value - post_values)
dists = jnp.where(dists > (value_sizes / 2), value_sizes - dists, dists)
exp_dists = jnp.exp(-(jnp.sqrt(jnp.sum(dists ** 2, axis=0)) / sigma) ** 2 / 2)
return bm.asarray(exp_dists)
[docs]
class GaussianProb(OneEndConnector):
r"""Builds a Gaussian connectivity pattern within a population of neurons,
where the connection probability decay according to the gaussian function.
Specifically, for any pair of neurons :math:`(i, j)`,
.. math::
p(i, j)=\exp(-\frac{\sum_{k=1}^n |v_k^i - v_k^j|^2 }{2\sigma^2})
where :math:`v_k^i` is the :math:`i`-th neuron's encoded value at dimension :math:`k`.
Parameters
----------
sigma : float
Width of the Gaussian function.
encoding_values : optional, list, tuple, int, float
The value ranges to encode for neurons at each axis.
- If `values` is not provided, the neuron only encodes each positional
information, i.e., :math:`(i, j, k, ...)`, where :math:`i, j, k` is
the index in the high-dimensional space.
- If `values` is a single tuple/list of int/float, neurons at each dimension
will encode the same range of values. For example, ``values=(0, np.pi)``,
neurons at each dimension will encode a continuous value space ``[0, np.pi]``.
- If `values` is a tuple/list of list/tuple, it means the value space will be
different for each dimension. For example, ``values=((-np.pi, np.pi), (10, 20), (0, 2 * np.pi))``.
periodic_boundary : bool
Whether the neuron encode the value space with the periodic boundary.
normalize : bool
Whether normalize the connection probability .
include_self : bool
Whether create the connection at the same position.
seed : int
The random seed.
"""
def __init__(
self,
sigma: float,
encoding_values: Optional[np.ndarray] = None,
normalize: bool = True,
include_self: bool = True,
periodic_boundary: bool = False,
seed: int = None,
**kwargs
):
super(GaussianProb, self).__init__(**kwargs)
self.sigma = sigma
self.encoding_values = encoding_values
self.normalize = normalize
self.include_self = include_self
self.periodic_boundary = periodic_boundary
self.seed = format_seed(seed)
self.rng = np.random.RandomState(self.seed)
def __repr__(self):
return (f'{self.__class__.__name__}(sigma={self.sigma}, '
f'normalize={self.normalize}, '
f'periodic_boundary={self.periodic_boundary}, '
f'include_self={self.include_self}, '
f'seed={self.seed})')
[docs]
def build_mat(self, isOptimized=True):
self.rng = np.random.RandomState(self.seed)
# value range to encode
if self.encoding_values is None:
value_ranges = tuple([(0, s) for s in self.pre_size])
elif isinstance(self.encoding_values, (tuple, list)):
if len(self.encoding_values) == 0:
raise ConnectorError(f'encoding_values has a length of 0.')
elif isinstance(self.encoding_values[0], (int, float)):
assert len(self.encoding_values) == 2
assert self.encoding_values[0] < self.encoding_values[1]
value_ranges = tuple([self.encoding_values for _ in self.pre_size])
elif isinstance(self.encoding_values[0], (tuple, list)):
if len(self.encoding_values) != len(self.pre_size):
raise ConnectorError(f'The network size has {len(self.pre_size)} dimensions, while '
f'the encoded values provided only has {len(self.encoding_values)}-D. '
f'Error in {str(self)}.')
for v in self.encoding_values:
assert isinstance(v[0], (int, float))
assert len(v) == 2
value_ranges = tuple(self.encoding_values)
else:
raise ConnectorError(f'Unsupported encoding values: {self.encoding_values}')
else:
raise ConnectorError(f'Unsupported encoding values: {self.encoding_values}')
# values
values = [np.linspace(vs[0], vs[1], n + 1)[:n] for vs, n in zip(value_ranges, self.pre_size)]
# post_values = np.stack([v.flatten() for v in np.meshgrid(*values, indexing='ij')])
post_values = np.stack([v.flatten() for v in np.meshgrid(*values)])
value_sizes = np.array([v[1] - v[0] for v in value_ranges])
if value_sizes.ndim < post_values.ndim:
value_sizes = np.expand_dims(value_sizes, axis=tuple([i + 1 for i in range(post_values.ndim - 1)]))
# probability of connections
if isOptimized:
i_value_list = np.zeros(shape=(self.pre_num, len(self.pre_size), 1))
for i in range(self.pre_num):
list_index = i
# values for node i
i_coordinate = tuple()
for s in self.pre_size[:-1]:
i, pos = divmod(i, s)
i_coordinate += (pos,)
i_coordinate += (i,)
i_value = np.array([values[i][c] for i, c in enumerate(i_coordinate)])
if i_value.ndim < post_values.ndim:
i_value = np.expand_dims(i_value, axis=tuple([i + 1 for i in range(post_values.ndim - 1)]))
i_value_list[list_index] = i_value
if self.periodic_boundary:
prob_mat = gaussian_prob_dist_cal2(i_value_list, post_values, value_sizes, self.sigma)
else:
prob_mat = gaussian_prob_dist_cal1(i_value_list, post_values, self.sigma)
else:
prob_mat = []
for i in range(self.pre_num):
# values for node i
i_coordinate = tuple()
for s in self.pre_size[:-1]:
i, pos = divmod(i, s)
i_coordinate += (pos,)
i_coordinate += (i,)
i_value = np.array([values[i][c] for i, c in enumerate(i_coordinate)])
if i_value.ndim < post_values.ndim:
i_value = np.expand_dims(i_value, axis=tuple([i + 1 for i in range(post_values.ndim - 1)]))
# distances
dists = np.abs(i_value - post_values)
if self.periodic_boundary:
dists = np.where(dists > value_sizes / 2, value_sizes - dists, dists)
exp_dists = np.exp(-(np.linalg.norm(dists, axis=0) / self.sigma) ** 2 / 2)
prob_mat.append(exp_dists)
prob_mat = np.stack(prob_mat)
if self.normalize:
prob_mat /= prob_mat.max()
# connectivity
conn_mat = np.asarray(prob_mat) >= self.rng.random(prob_mat.shape)
if not self.include_self:
np.fill_diagonal(conn_mat, False)
return conn_mat
[docs]
class SmallWorld(TwoEndConnector):
"""Build a Watts–Strogatz small-world graph.
Parameters
----------
num_neighbor : int
Each node is joined with its `k` nearest neighbors in a ring
topology.
prob : float
The probability of rewiring each edge
directed : bool
Whether the graph is a directed graph.
include_self : bool
Whether include the node self.
Notes
-----
First create a ring over :math:`num\_node` nodes [1]_. Then each node in the ring is
joined to its :math:`num\_neighbor` nearest neighbors (or :math:`num\_neighbor - 1` neighbors
if :math:`num\_neighbor` is odd). Then shortcuts are created by replacing some edges as
follows: for each edge :math:`(u, v)` in the underlying ":math:`num\_node`-ring with
:math:`num\_neighbor` nearest neighbors" with probability :math:`prob` replace it with a new
edge :math:`(u, w)` with uniformly random choice of existing node :math:`w`.
References
----------
.. [1] Duncan J. Watts and Steven H. Strogatz,
Collective dynamics of small-world networks,
Nature, 393, pp. 440--442, 1998.
"""
def __init__(
self,
num_neighbor,
prob,
directed=False,
include_self=False,
seed=None,
**kwargs
):
super(SmallWorld, self).__init__(**kwargs)
self.prob = prob
self.directed = directed
self.num_neighbor = num_neighbor
self.include_self = include_self
self.seed = format_seed(seed)
self.rng = np.random.RandomState(seed=self.seed)
rng = np.random if SUPPORT_NUMBA else self.rng
def _smallworld_rewire(i, all_j):
if rng.random(1) < prob:
non_connected = np.where(np.logical_not(all_j))[0]
if len(non_connected) <= 1:
return -1
# Enforce no self-loops or multiple edges
w = rng.choice(non_connected)
while (not include_self) and w == i:
# non_connected.remove(w)
w = rng.choice(non_connected)
return w
else:
return -1
self._connect = numba_jit(_smallworld_rewire)
def __repr__(self):
return (f'{self.__class__.__name__}(prob={self.prob}, '
f'directed={self.directed}, '
f'num_neighbor={self.num_neighbor}, '
f'include_self={self.include_self}, '
f'seed={self.seed})')
[docs]
def build_conn(self):
assert self.pre_size == self.post_size
# seed
self.seed = self.rng.randint(1, int(1e7))
numba_seed(self.seed)
if isinstance(self.pre_size, int) or (isinstance(self.pre_size, (tuple, list)) and len(self.pre_size) == 1):
num_node = self.pre_num
if self.num_neighbor > num_node:
raise ConnectorError("num_neighbor > num_node, choose smaller num_neighbor or larger num_node")
# If k == n, the graph is complete not Watts-Strogatz
if self.num_neighbor == num_node:
conn = np.ones((num_node, num_node), dtype=MAT_DTYPE)
else:
conn = np.zeros((num_node, num_node), dtype=MAT_DTYPE)
nodes = np.array(list(range(num_node))) # nodes are labeled 0 to n-1
# connect each node to k/2 neighbors
for j in range(1, self.num_neighbor // 2 + 1):
targets = np.concatenate([nodes[j:], nodes[0:j]]) # first j nodes are now last in list
conn[nodes, targets] = True
conn[targets, nodes] = True
# rewire edges from each node
# loop over all nodes in order (label) and neighbors in order (distance)
# no self loops or multiple edges allowed
for j in range(1, self.num_neighbor // 2 + 1): # outer loop is neighbors
targets = np.concatenate([nodes[j:], nodes[0:j]]) # first j nodes are now last in list
if self.directed:
# inner loop in node order
for u, v in zip(nodes, targets):
w = self._connect(prob=self.prob, i=u, all_j=conn[u])
if w != -1:
conn[u, v] = False
conn[u, w] = True
w = self._connect(prob=self.prob, i=u, all_j=conn[:, u])
if w != -1:
conn[v, u] = False
conn[w, u] = True
else:
# inner loop in node order
for u, v in zip(nodes, targets):
w = self._connect(i=u, all_j=conn[u])
if w != -1:
conn[u, v] = False
conn[v, u] = False
conn[u, w] = True
conn[w, u] = True
# conn = np.asarray(conn, dtype=MAT_DTYPE)
else:
raise ConnectorError('Currently only support 1D ring connection.')
return 'mat', conn
# def _random_subset(seq, m, rng):
# """Return m unique elements from seq.
#
# This differs from random.sample which can return repeated
# elements if seq holds repeated elements.
#
# Note: rng is a random.Random or numpy.random.RandomState instance.
# """
# targets = set()
# while len(targets) < m:
# x = rng.choice(seq)
# targets.add(x)
# return targets
[docs]
class ScaleFreeBA(TwoEndConnector):
"""Build a random graph according to the Barabási–Albert preferential
attachment model.
A graph of :math:`num\_node` nodes is grown by attaching new nodes each with
:math:`m` edges that are preferentially attached to existing nodes
with high degree.
Parameters
----------
m : int
Number of edges to attach from a new node to existing nodes
seed : integer, random_state, or None (default)
Indicator of random number generation state.
Raises
------
ConnectorError
If `m` does not satisfy ``1 <= m < n``.
References
----------
.. [1] A. L. Barabási and R. Albert "Emergence of scaling in
random networks", Science 286, pp 509-512, 1999.
"""
def __init__(self, m, directed=False, seed=None, **kwargs):
super(ScaleFreeBA, self).__init__(**kwargs)
self.m = m
self.directed = directed
self.seed = format_seed(seed)
self.rng = np.random.RandomState(self.seed)
rng = np.random if SUPPORT_NUMBA else self.rng
def _random_subset(seq, m):
targets = set()
while len(targets) < m:
x = rng.choice(seq)
targets.add(x)
return targets
self._connect = numba_jit(_random_subset)
def __repr__(self):
return (f'{self.__class__.__name__}(m={self.m}, '
f'directed={self.directed}, '
f'seed={self.seed})')
[docs]
def build_mat(self, isOptimized=True):
assert self.pre_num == self.post_num
# seed
self.rng = np.random.RandomState(self.seed)
numba_seed(self.seed)
num_node = self.pre_num
if self.m < 1 or self.m >= num_node:
raise ConnectorError(f"Barabási–Albert network must have m >= 1 and "
f"m < n, while m = {self.m} and n = {num_node}")
# Add m initial nodes (m0 in barabasi-speak)
conn = np.zeros((num_node, num_node), dtype=MAT_DTYPE)
# Target nodes for new edges
targets = list(range(self.m))
# List of existing nodes, with nodes repeated once for each adjacent edge
if not isOptimized:
repeated_nodes = []
# Start adding the other n-m nodes. The first node is m.
source = self.m
while source < num_node:
# Add edges to m nodes from the source.
origins = [source] * self.m
conn[origins, targets] = True
if not self.directed:
conn[targets, origins] = True
# Add one node to the list for each new edge just created.
repeated_nodes.extend(targets)
# And the new node "source" has m edges to add to the list.
repeated_nodes.extend([source] * self.m)
# Now choose m unique nodes from the existing nodes
# Pick uniformly from repeated_nodes (preferential attachment)
targets = list(self._connect(np.asarray(repeated_nodes), self.m))
source += 1
return conn
# List of existing nodes, with nodes repeated once for each adjacent edge
# Preallocate repeated_nodes as a numpy array
repeated_nodes = np.empty(2 * num_node * self.m, dtype=int)
size_repeated_nodes = 0
# Start adding the other n-m nodes. The first node is m.
source = self.m
while source < num_node:
# Add edges to m nodes from the source.
origins = [source] * self.m
conn[origins, targets] = True
if not self.directed:
conn[targets, origins] = True
# Add one node to the list for each new edge just created.
repeated_nodes[size_repeated_nodes:size_repeated_nodes + self.m] = targets
size_repeated_nodes += self.m
# And the new node "source" has m edges to add to the list.
repeated_nodes[size_repeated_nodes:size_repeated_nodes + self.m] = source
size_repeated_nodes += self.m
# Now choose m unique nodes from the existing nodes
# Pick uniformly from repeated_nodes (preferential attachment)
targets = list(self._connect(repeated_nodes[:size_repeated_nodes], self.m))
source += 1
return conn
[docs]
class ScaleFreeBADual(TwoEndConnector):
r"""Build a random graph according to the dual Barabási–Albert preferential
attachment model.
A graph of :math::`num\_node` nodes is grown by attaching new nodes each with either $m_1$
edges (with probability :math:`p`) or :math:`m_2` edges (with probability :math:`1-p`) that
are preferentially attached to existing nodes with high degree.
Parameters
----------
m1 : int
Number of edges to attach from a new node to existing nodes with probability :math:`p`
m2 : int
Number of edges to attach from a new node to existing nodes with probability :math:`1-p`
p : float
The probability of attaching :math:`m\_1` edges (as opposed to :math:`m\_2` edges)
seed : integer, random_state, or None (default)
Indicator of random number generation state.
Raises
------
ConnectorError
If `m1` and `m2` do not satisfy ``1 <= m1,m2 < n`` or `p` does not satisfy ``0 <= p <= 1``.
References
----------
.. [1] N. Moshiri "The dual-Barabasi-Albert model", arXiv:1810.10538.
"""
def __init__(self, m1, m2, p, directed=False, seed=None, **kwargs):
super(ScaleFreeBADual, self).__init__(**kwargs)
self.m1 = m1
self.m2 = m2
self.p = p
self.directed = directed
self.seed = format_seed(seed)
self.rng = np.random.RandomState(self.seed)
rng = np.random if SUPPORT_NUMBA else self.rng
def _random_subset(seq, m):
targets = set()
while len(targets) < m:
x = rng.choice(seq)
targets.add(x)
return targets
self._connect = numba_jit(_random_subset)
def __repr__(self):
return (f'{self.__class__.__name__}(m1={self.m1}, m2={self.m2}, '
f'p={self.p}, directed={self.directed}, seed={self.seed})')
[docs]
def build_mat(self, isOptimized=True):
assert self.pre_num == self.post_num
# seed
self.rng = np.random.RandomState(self.seed)
numba_seed(self.seed)
num_node = self.pre_num
if self.m1 < 1 or self.m1 >= num_node:
raise ConnectorError(f"Dual Barabási–Albert network must have m1 >= 1 and m1 < num_node, "
f"while m1 = {self.m1} and num_node = {num_node}.")
if self.m2 < 1 or self.m2 >= num_node:
raise ConnectorError(f"Dual Barabási–Albert network must have m2 >= 1 and m2 < num_node, "
f"while m2 = {self.m2} and num_node = {num_node}.")
if self.p < 0 or self.p > 1:
raise ConnectorError(f"Dual Barabási–Albert network must have 0 <= p <= 1, while p = {self.p}")
# Add max(m1,m2) initial nodes (m0 in barabasi-speak)
conn = np.zeros((num_node, num_node), dtype=MAT_DTYPE)
if not isOptimized:
# List of existing nodes, with nodes repeated once for each adjacent edge
repeated_nodes = []
# Start adding the remaining nodes.
source = max(self.m1, self.m2)
# Pick which m to use first time (m1 or m2)
m = self.m1 if self.rng.random() < self.p else self.m2
# Target nodes for new edges
targets = list(range(m))
while source < num_node:
# Add edges to m nodes from the source.
origins = [source] * m
conn[origins, targets] = True
if not self.directed:
conn[targets, origins] = True
# Add one node to the list for each new edge just created.
repeated_nodes.extend(targets)
# And the new node "source" has m edges to add to the list.
repeated_nodes.extend([source] * m)
# Pick which m to use next time (m1 or m2)
m = self.m1 if self.rng.random() < self.p else self.m2
# Now choose m unique nodes from the existing nodes
# Pick uniformly from repeated_nodes (preferential attachment)
targets = list(self._connect(np.asarray(repeated_nodes), m))
source += 1
return conn
# List of existing nodes, with nodes repeated once for each adjacent edge
# Preallocate repeated_nodes as a numpy array
repeated_nodes = np.empty(2 * num_node * max(self.m1, self.m2), dtype=int)
size_repeated_nodes = 0
# Start adding the remaining nodes.
source = max(self.m1, self.m2)
# Pick which m to use first time (m1 or m2)
m = self.m1 if self.rng.random() < self.p else self.m2
# Target nodes for new edges
targets = list(range(m))
while source < num_node:
# Add edges to m nodes from the source.
origins = [source] * m
conn[origins, targets] = True
if not self.directed:
conn[targets, origins] = True
# Add one node to the list for each new edge just created.
repeated_nodes[size_repeated_nodes:size_repeated_nodes + m] = targets
size_repeated_nodes += m
# And the new node "source" has m edges to add to the list.
repeated_nodes[size_repeated_nodes:size_repeated_nodes + m] = source
size_repeated_nodes += m
# Pick which m to use next time (m1 or m2)
m = self.m1 if self.rng.random() < self.p else self.m2
# Now choose m unique nodes from the existing nodes
# Pick uniformly from repeated_nodes (preferential attachment)
targets = list(self._connect(repeated_nodes[:size_repeated_nodes], m))
source += 1
return conn
[docs]
class PowerLaw(TwoEndConnector):
"""Holme and Kim algorithm for growing graphs with powerlaw
degree distribution and approximate average clustering.
Parameters
----------
m : int
the number of random edges to add for each new node
p : float,
Probability of adding a triangle after adding a random edge
seed : integer, random_state, or None (default)
Indicator of random number generation state.
Notes
-----
The average clustering has a hard time getting above a certain
cutoff that depends on :math:`m`. This cutoff is often quite low. The
transitivity (fraction of triangles to possible triangles) seems to
decrease with network size.
It is essentially the Barabási–Albert (BA) growth model with an
extra step that each random edge is followed by a chance of
making an edge to one of its neighbors too (and thus a triangle).
This algorithm improves on BA in the sense that it enables a
higher average clustering to be attained if desired.
It seems possible to have a disconnected graph with this algorithm
since the initial :math:`m` nodes may not be all linked to a new node
on the first iteration like the BA model.
Raises
------
ConnectorError
If :math:`m` does not satisfy :math:`1 <= m <= n` or :math:`p` does not
satisfy :math:`0 <= p <= 1`.
References
----------
.. [1] P. Holme and B. J. Kim,
"Growing scale-free networks with tunable clustering",
Phys. Rev. E, 65, 026107, 2002.
"""
def __init__(self, m: int, p: float, directed=False, seed=None, **kwargs):
super(PowerLaw, self).__init__(**kwargs)
self.m = m
self.p = p
if self.p > 1 or self.p < 0:
raise ConnectorError(f"p must be in [0,1], while p={self.p}")
self.directed = directed
self.seed = format_seed(seed)
self.rng = np.random.RandomState(self.seed)
rng = np.random if SUPPORT_NUMBA else self.rng
def _random_subset(seq, m):
targets = set()
while len(targets) < m:
x = rng.choice(seq)
targets.add(x)
return targets
self._connect = numba_jit(_random_subset)
def __repr__(self):
return (f'{self.__class__.__name__}(m={self.m}, p={self.p}, directed={self.directed}, seed={self.seed})')
[docs]
def build_mat(self, isOptimized=True):
assert self.pre_num == self.post_num
# seed
self.rng = np.random.RandomState(self.seed)
numba_seed(self.seed)
num_node = self.pre_num
if self.m < 1 or num_node < self.m:
raise ConnectorError(f"Must have m>1 and m<n, while m={self.m} and n={num_node}")
# add m initial nodes (m0 in barabasi-speak)
conn = np.zeros((num_node, num_node), dtype=MAT_DTYPE)
if not isOptimized:
repeated_nodes = list(range(self.m)) # list of existing nodes to sample from
# with nodes repeated once for each adjacent edge
source = self.m # next node is m
while source < num_node: # Now add the other n-1 nodes
possible_targets = self._connect(np.asarray(repeated_nodes), self.m)
# do one preferential attachment for new node
target = possible_targets.pop()
conn[source, target] = True
if not self.directed:
conn[target, source] = True
repeated_nodes.append(target) # add one node to list for each new link
count = 1
while count < self.m: # add m-1 more new links
if self.rng.random() < self.p: # clustering step: add triangle
neighbors = np.where(conn[target])[0]
neighborhood = [nbr for nbr in neighbors if not conn[source, nbr] and not nbr == source]
if neighborhood: # if there is a neighbor without a link
nbr = self.rng.choice(neighborhood)
conn[source, nbr] = True # add triangle
if not self.directed:
conn[nbr, source] = True
repeated_nodes.append(nbr)
count = count + 1
continue # go to top of while loop
# else do preferential attachment step if above fails
target = possible_targets.pop()
conn[source, target] = True
if not self.directed:
conn[target, source] = True
repeated_nodes.append(target)
count = count + 1
repeated_nodes.extend([source] * self.m) # add source node to list m times
source += 1
return conn
# Preallocate repeated_nodes as a numpy array
repeated_nodes = np.empty(2 * num_node * self.m, dtype=int)
repeated_nodes[:self.m] = np.arange(self.m)
size_repeated_nodes = self.m
source = self.m # next node is m
while source < num_node: # Now add the other n-1 nodes
possible_targets = list(self._connect(repeated_nodes[:size_repeated_nodes], self.m))
possible_targets.reverse()
# do one preferential attachment for new node
target = possible_targets.pop()
conn[source, target] = True
if not self.directed:
conn[target, source] = True
repeated_nodes[size_repeated_nodes] = target
size_repeated_nodes += 1
count = 1
while count < self.m: # add m-1 more new links
if self.rng.random() < self.p: # clustering step: add triangle
neighbors = np.where(conn[target])[0]
neighborhood = [nbr for nbr in neighbors if not conn[source, nbr] and nbr != source]
if neighborhood: # if there is a neighbor without a link
nbr = self.rng.choice(neighborhood)
conn[source, nbr] = True # add triangle
if not self.directed:
conn[nbr, source] = True
repeated_nodes[size_repeated_nodes] = nbr
size_repeated_nodes += 1
count += 1
continue # go to top of while loop
# else do preferential attachment step if above fails
target = possible_targets.pop()
conn[source, target] = True
if not self.directed:
conn[target, source] = True
repeated_nodes[size_repeated_nodes] = target
size_repeated_nodes += 1
count += 1
repeated_nodes[size_repeated_nodes:size_repeated_nodes + self.m] = source
size_repeated_nodes += self.m
source += 1
return conn
@numba_jit
def pos2ind(pos, size):
idx = 0
for i, p in enumerate(pos):
idx += p * np.prod(size[i + 1:])
return idx
[docs]
class ProbDist(TwoEndConnector):
"""Connection with a maximum distance under a probability `p`.
.. versionadded:: 2.1.13
Parameters
----------
dist: float, int
The maximum distance between two points.
prob: float
The connection probability, within 0. and 1.
pre_ratio: float
The ratio of pre-synaptic neurons to connect.
seed: optional, int
The random seed.
include_self: bool
Whether include the point at the same position.
"""
def __init__(self, dist=1, prob=1., pre_ratio=1., seed=None, include_self=True, **kwargs):
super(ProbDist, self).__init__(**kwargs)
self.prob = prob
self.pre_ratio = pre_ratio
self.dist = dist
self.seed = format_seed(seed)
self.rng = np.random.RandomState(self.seed)
self.include_self = include_self
rng = np.random if SUPPORT_NUMBA else self.rng
@numba_jit
def _connect_1d_jit(pre_pos, pre_size, post_size, n_dim):
all_post_ids = np.zeros(post_size[0], dtype=IDX_DTYPE)
all_pre_ids = np.zeros(post_size[0], dtype=IDX_DTYPE)
size = 0
if rng.random() < pre_ratio:
normalized_pos = np.zeros(n_dim)
for i in range(n_dim):
pre_len = pre_size[i]
post_len = post_size[i]
normalized_pos[i] = pre_pos[i] * post_len / pre_len
for i in range(post_size[0]):
post_pos = np.asarray((i,))
d = np.abs(pre_pos[0] - post_pos[0])
if d <= dist:
if d == 0. and not include_self:
continue
if rng.random() <= prob:
all_post_ids[size] = pos2ind(post_pos, post_size)
all_pre_ids[size] = pos2ind(pre_pos, pre_size)
size += 1
return all_pre_ids[:size], all_post_ids[:size]
@numba_jit
def _connect_2d_jit(pre_pos, pre_size, post_size, n_dim):
max_size = post_size[0] * post_size[1]
all_post_ids = np.zeros(max_size, dtype=IDX_DTYPE)
all_pre_ids = np.zeros(max_size, dtype=IDX_DTYPE)
size = 0
if rng.random() < pre_ratio:
normalized_pos = np.zeros(n_dim)
for i in range(n_dim):
pre_len = pre_size[i]
post_len = post_size[i]
normalized_pos[i] = pre_pos[i] * post_len / pre_len
for i in range(post_size[0]):
for j in range(post_size[1]):
post_pos = np.asarray((i, j))
d = np.sqrt(np.sum(np.square(pre_pos - post_pos)))
if d <= dist:
if d == 0. and not include_self:
continue
if rng.random() <= prob:
all_post_ids[size] = pos2ind(post_pos, post_size)
all_pre_ids[size] = pos2ind(pre_pos, pre_size)
size += 1
return all_pre_ids[:size], all_post_ids[:size] # Return filled part of the arrays
@numba_jit
def _connect_3d_jit(pre_pos, pre_size, post_size, n_dim):
max_size = post_size[0] * post_size[1] * post_size[2]
all_post_ids = np.zeros(max_size, dtype=IDX_DTYPE)
all_pre_ids = np.zeros(max_size, dtype=IDX_DTYPE)
size = 0
if rng.random() < pre_ratio:
normalized_pos = np.zeros(n_dim)
for i in range(n_dim):
pre_len = pre_size[i]
post_len = post_size[i]
normalized_pos[i] = pre_pos[i] * post_len / pre_len
for i in range(post_size[0]):
for j in range(post_size[1]):
for k in range(post_size[2]):
post_pos = np.asarray((i, j, k))
d = np.sqrt(np.sum(np.square(pre_pos - post_pos)))
if d <= dist:
if d == 0. and not include_self:
continue
if rng.random() <= prob:
all_post_ids[size] = pos2ind(post_pos, post_size)
all_pre_ids[size] = pos2ind(pre_pos, pre_size)
size += 1
return all_pre_ids[:size], all_post_ids[:size]
@numba_jit
def _connect_4d_jit(pre_pos, pre_size, post_size, n_dim):
max_size = post_size[0] * post_size[1] * post_size[2] * post_size[3]
all_post_ids = np.zeros(max_size, dtype=IDX_DTYPE)
all_pre_ids = np.zeros(max_size, dtype=IDX_DTYPE)
size = 0
if rng.random() < pre_ratio:
normalized_pos = np.zeros(n_dim)
for i in range(n_dim):
pre_len = pre_size[i]
post_len = post_size[i]
normalized_pos[i] = pre_pos[i] * post_len / pre_len
for i in range(post_size[0]):
for j in range(post_size[1]):
for k in range(post_size[2]):
for l in range(post_size[3]):
post_pos = np.asarray((i, j, k, l))
d = np.sqrt(np.sum(np.square(pre_pos - post_pos)))
if d <= dist:
if d == 0. and not include_self:
continue
if rng.random() <= prob:
all_post_ids[size] = pos2ind(post_pos, post_size)
all_pre_ids[size] = pos2ind(pre_pos, pre_size)
size += 1
return all_pre_ids[:size], all_post_ids[:size]
self._connect_1d_jit = _connect_1d_jit
self._connect_2d_jit = _connect_2d_jit
self._connect_3d_jit = _connect_3d_jit
self._connect_4d_jit = _connect_4d_jit
[docs]
def build_coo(self, isOptimized=True):
if len(self.pre_size) != len(self.post_size):
raise ValueError('The dimensions of shapes of two objects to establish connections should '
f'be the same. But we got dimension {len(self.pre_size)} != {len(self.post_size)}. '
f'Specifically, pre size = {self.pre_size}, post size = {self.post_size}')
self.rng = np.random.RandomState(self.seed)
numba_seed(self.seed)
# connections
n_dim = len(self.pre_size)
if n_dim == 1:
f = self._connect_1d_jit
elif n_dim == 2:
f = self._connect_2d_jit
elif n_dim == 3:
f = self._connect_3d_jit
elif n_dim == 4:
f = self._connect_4d_jit
else:
raise NotImplementedError('Does not support the network dimension bigger than 4.')
pre_size = np.asarray(self.pre_size)
post_size = np.asarray(self.post_size)
connected_pres = []
connected_posts = []
pre_ids = np.meshgrid(*(np.arange(p) for p in self.pre_size), indexing='ij')
pre_ids = tuple([(np.moveaxis(p, 0, 1).flatten()) if p.ndim > 1 else p.flatten() for p in pre_ids])
size = np.prod(pre_size)
for i in range(size):
pre_pos = np.asarray([p[i] for p in pre_ids])
pres, posts = f(pre_pos, pre_size=pre_size, post_size=post_size, n_dim=n_dim)
connected_pres.extend(pres)
connected_posts.extend(posts)
return np.asarray(connected_pres), np.asarray(connected_posts)