# -*- coding: utf-8 -*-
from brainpy._src.integrators import constants, utils
from brainpy._src.integrators.sde.base import SDEIntegrator
from .generic import register_sde_integrator
__all__ = [
'SRK1W1',
'SRK2W1',
'KlPl',
]
def _noise_terms(code_lines, variables, triple_integral=True):
# num_vars = len(variables)
# if num_vars > 1:
# code_lines.append(f' all_I1 = math.normal(0.0, dt_sqrt, ({num_vars},)+math.shape({variables[0]}))')
# code_lines.append(f' all_I0 = math.normal(0.0, dt_sqrt, ({num_vars},)+math.shape({variables[0]}))')
# code_lines.append(f' all_I10 = 0.5 * {constants.DT} * (all_I1 + all_I0 / 3.0 ** 0.5)')
# code_lines.append(f' all_I11 = 0.5 * (all_I1 ** 2 - {constants.DT})')
# if triple_integral:
# code_lines.append(f' all_I111 = (all_I1 ** 3 - 3 * {constants.DT} * all_I1) / 6')
# code_lines.append(f' ')
# for i, var in enumerate(variables):
# code_lines.append(f' {var}_I1 = all_I1[{i}]')
# code_lines.append(f' {var}_I0 = all_I0[{i}]')
# code_lines.append(f' {var}_I10 = all_I10[{i}]')
# code_lines.append(f' {var}_I11 = all_I11[{i}]')
# if triple_integral:
# code_lines.append(f' {var}_I111 = all_I111[{i}]')
# code_lines.append(f' ')
# else:
# var = variables[0]
# code_lines.append(f' {var}_I1 = math.normal(0.0, dt_sqrt, math.shape({var}))')
# code_lines.append(f' {var}_I0 = math.normal(0.0, dt_sqrt, math.shape({var}))')
# code_lines.append(f' {var}_I10 = 0.5 * {constants.DT} * ({var}_I1 + {var}_I0 / 3.0 ** 0.5)')
# code_lines.append(f' {var}_I11 = 0.5 * ({var}_I1 ** 2 - {constants.DT})')
# if triple_integral:
# code_lines.append(f' {var}_I111 = ({var}_I1 ** 3 - 3 * {constants.DT} * {var}_I1) / 6')
# code_lines.append(' ')
for var in variables:
code_lines.append(f' {var}_I1 = dt_sqrt * random.randn(*math.shape({var}))')
code_lines.append(f' {var}_I0 = dt_sqrt * random.randn(*math.shape({var}))')
code_lines.append(f' {var}_I10 = 0.5 * {constants.DT} * ({var}_I1 + {var}_I0 / 3.0 ** 0.5)')
code_lines.append(f' {var}_I11 = 0.5 * ({var}_I1 ** 2 - {constants.DT})')
if triple_integral:
code_lines.append(f' {var}_I111 = ({var}_I1 ** 3 - 3 * {constants.DT} * {var}_I1) / 6')
code_lines.append(' ')
def _state1(code_lines, variables, parameters):
f_names = [f'{var}_f_H0s1' for var in variables]
g_names = [f'{var}_g_H1s1' for var in variables]
code_lines.append(f' {", ".join(f_names)} = f({", ".join(variables + parameters)})')
code_lines.append(f' {", ".join(g_names)} = g({", ".join(variables + parameters)})')
code_lines.append(' ')
[docs]
class SRK1W1(SDEIntegrator):
r"""Order 2.0 weak SRK methods for SDEs with scalar Wiener process.
This method has have strong orders :math:`(p_d, p_s) = (2.0,1.5)`.
The Butcher table is:
.. math::
\begin{array}{l|llll|llll|llll}
0 &&&&& &&&& &&&& \\
3/4 &3/4&&&& 3/2&&& &&&& \\
0 &0&0&0&& 0&0&0&& &&&&\\
\hline
0 \\
1/4 & 1/4&&& & 1/2&&&\\
1 & 1&0&&& -1&0&\\
1/4& 0&0&1/4&& -5&3&1/2\\
\hline
& 1/3& 2/3& 0 & 0 & -1 & 4/3 & 2/3&0 & -1 &4/3 &-1/3 &0 \\
\hline
& &&&& 2 &-4/3 & -2/3 & 0 & -2 & 5/3 & -2/3 & 1
\end{array}
References
----------
.. [1] Rößler, Andreas. "Strong and weak approximation methods for stochastic differential
equations—some recent developments." Recent developments in applied probability and
statistics. Physica-Verlag HD, 2010. 127-153.
.. [2] Rößler, Andreas. "Runge–Kutta methods for the strong approximation of solutions of
stochastic differential equations." SIAM Journal on Numerical Analysis 48.3
(2010): 922-952.
"""
def __init__(self, f, g, dt=None, name=None, show_code=False,
var_type=None, intg_type=None, wiener_type=None, state_delays=None):
super(SRK1W1, self).__init__(f=f, g=g, dt=dt, show_code=show_code, name=name,
var_type=var_type, intg_type=intg_type,
wiener_type=wiener_type, state_delays=state_delays)
assert self.wiener_type == constants.SCALAR_WIENER
self.build()
def build(self):
# 2. code lines
self.code_lines.append(f' {constants.DT}_sqrt = {constants.DT} ** 0.5')
# 2.1 noise
_noise_terms(self.code_lines, self.variables, triple_integral=True)
# 2.2 stage 1
_state1(self.code_lines, self.variables, self.parameters)
# 2.3 stage 2
all_H0s2, all_H1s2 = [], []
for var in self.variables:
self.code_lines.append(f' {var}_H0s2 = {var} + {constants.DT} * 0.75 * {var}_f_H0s1 + '
f'1.5 * {var}_g_H1s1 * {var}_I10 / {constants.DT}')
all_H0s2.append(f'{var}_H0s2')
self.code_lines.append(f' {var}_H1s2 = {var} + {constants.DT} * 0.25 * {var}_f_H0s1 + '
f'dt_sqrt * 0.5 * {var}_g_H1s1')
all_H1s2.append(f'{var}_H1s2')
all_H0s2.append(f't + 0.75 * {constants.DT}') # t
all_H1s2.append(f't + 0.25 * {constants.DT}') # t
f_names = [f'{var}_f_H0s2' for var in self.variables]
self.code_lines.append(f' {", ".join(f_names)} = f({", ".join(all_H0s2 + self.parameters[1:])})')
g_names = [f'{var}_g_H1s2' for var in self.variables]
self.code_lines.append(f' {", ".join(g_names)} = g({", ".join(all_H1s2 + self.parameters[1:])})')
self.code_lines.append(' ')
# 2.4 state 3
all_H1s3 = []
for var in self.variables:
self.code_lines.append(f' {var}_H1s3 = {var} + {constants.DT} * {var}_f_H0s1 - dt_sqrt * {var}_g_H1s1')
all_H1s3.append(f'{var}_H1s3')
all_H1s3.append(f't + {constants.DT}') # t
g_names = [f'{var}_g_H1s3' for var in self.variables]
self.code_lines.append(f' {", ".join(g_names)} = g({", ".join(all_H1s3 + self.parameters[1:])})')
self.code_lines.append(' ')
# 2.5 state 4
all_H1s4 = []
for var in self.variables:
self.code_lines.append(f' {var}_H1s4 = {var} + 0.25 * {constants.DT} * {var}_f_H0s1 + dt_sqrt * '
f'(-5 * {var}_g_H1s1 + 3 * {var}_g_H1s2 + 0.5 * {var}_g_H1s3)')
all_H1s4.append(f'{var}_H1s4')
all_H1s4.append(f't + 0.25 * {constants.DT}') # t
g_names = [f'{var}_g_H1s4' for var in self.variables]
self.code_lines.append(f' {", ".join(g_names)} = g({", ".join(all_H1s4 + self.parameters[1:])})')
self.code_lines.append(' ')
# 2.6 final stage
for var in self.variables:
self.code_lines.append(f' {var}_f1 = {var}_f_H0s1/3 + {var}_f_H0s2 * 2/3')
self.code_lines.append(
f' {var}_g1 = -{var}_I1 - {var}_I11/dt_sqrt + 2 * {var}_I10/{constants.DT} - 2 * {var}_I111/{constants.DT}')
self.code_lines.append(f' {var}_g2 = {var}_I1 * 4/3 + {var}_I11 / dt_sqrt * 4/3 - '
f'{var}_I10 / {constants.DT} * 4/3 + {var}_I111 / {constants.DT} * 5/3')
self.code_lines.append(f' {var}_g3 = {var}_I1 * 2/3 - {var}_I11/dt_sqrt/3 - '
f'{var}_I10 / {constants.DT} * 2/3 - {var}_I111 / {constants.DT} * 2/3')
self.code_lines.append(f' {var}_g4 = {var}_I111 / {constants.DT}')
self.code_lines.append(f' {var}_new = {var} + {constants.DT} * {var}_f1 + {var}_g1 * {var}_g_H1s1 + '
f'{var}_g2 * {var}_g_H1s2 + {var}_g3 * {var}_g_H1s3 + {var}_g4 * {var}_g_H1s4')
self.code_lines.append(' ')
# returns
new_vars = [f'{var}_new' for var in self.variables]
self.code_lines.append(f' return {", ".join(new_vars)}')
# return and compile
self.integral = utils.compile_code(
code_scope={k: v for k, v in self.code_scope.items()},
code_lines=self.code_lines,
show_code=self.show_code,
func_name=self.func_name)
register_sde_integrator('srk1w1', SRK1W1)
[docs]
class SRK2W1(SDEIntegrator):
r"""Order 1.5 Strong SRK Methods for SDEs with Scalar Noise.
This method has have strong orders :math:`(p_d, p_s) = (3.0,1.5)`.
The Butcher table is:
.. math::
\begin{array}{c|cccc|cccc|ccc|}
0 & 0 & 0 & 0 & 0 & 0 & 0 & 0 & 0 & & & & \\
1 & 1 & 0 & 0 & 0 & 0 & 0 & 0 & 0 & & & & \\
1 / 2 & 1 / 4 & 1 / 4 & 0 & 0 & 1 & 1 / 2 & 0 & 0 & & & & \\
0 & 0 & 0 & 0 & 0 & 0 & 0 & 0 & 0 & & & & \\
\hline 0 & 0 & 0 & 0 & 0 & 0 & 0 & 0 & 0 & & & & \\
1 / 4 & 1 / 4 & 0 & 0 & 0 & -1 / 2 & 0 & 0 & 0 & & & & \\
1 & 1 & 0 & 0 & 0 & 1 & 0 & 0 & 0 & & & & \\
1 / 4 & 0 & 0 & 1 / 4 & 0 & 2 & -1 & 1 / 2 & 0 & & & & \\
\hline & 1 / 6 & 1 / 6 & 2 / 3 & 0 & -1 & 4 / 3 & 2 / 3 & 0 & -1 & -4 / 3 & 1 / 3 & 0 \\
\hline & & & & &2 & -4 / 3 & -2 / 3 & 0 & -2 & 5 / 3 & -2 / 3 & 1
\end{array}
References
----------
.. [1] Rößler, Andreas. "Strong and weak approximation methods for stochastic differential
equations—some recent developments." Recent developments in applied probability and
statistics. Physica-Verlag HD, 2010. 127-153.
.. [2] Rößler, Andreas. "Runge–Kutta methods for the strong approximation of solutions of
stochastic differential equations." SIAM Journal on Numerical Analysis 48.3
(2010): 922-952.
"""
def __init__(self, f, g, dt=None, name=None, show_code=False,
var_type=None, intg_type=None, wiener_type=None, state_delays=None):
super(SRK2W1, self).__init__(f=f, g=g, dt=dt, show_code=show_code, name=name,
var_type=var_type, intg_type=intg_type,
wiener_type=wiener_type, state_delays=state_delays)
assert self.wiener_type == constants.SCALAR_WIENER
self.build()
def build(self):
self.code_lines.append(f' {constants.DT}_sqrt = {constants.DT} ** 0.5')
# 2.1 noise
_noise_terms(self.code_lines, self.variables, triple_integral=True)
# 2.2 stage 1
_state1(self.code_lines, self.variables, self.parameters)
# 2.3 stage 2
# ----
# H0s2 = x + dt * f_H0s1
# H1s2 = x + dt * 0.25 * f_H0s1 - dt_sqrt * 0.5 * g_H1s1
# f_H0s2 = f(H0s2, t + dt, *args)
# g_H1s2 = g(H1s2, t + 0.25 * dt, *args)
all_H0s2, all_H1s2 = [], []
for var in self.variables:
self.code_lines.append(f' {var}_H0s2 = {var} + {constants.DT} * {var}_f_H0s1')
all_H0s2.append(f'{var}_H0s2')
self.code_lines.append(f' {var}_H1s2 = {var} + {constants.DT} * 0.25 * {var}_f_H0s1 - '
f'dt_sqrt * 0.5 * {var}_g_H1s1')
all_H1s2.append(f'{var}_H1s2')
all_H0s2.append(f't + {constants.DT}') # t
all_H1s2.append(f't + 0.25 * {constants.DT}') # t
f_names = [f'{var}_f_H0s2' for var in self.variables]
self.code_lines.append(f' {", ".join(f_names)} = f({", ".join(all_H0s2 + self.parameters[1:])})')
g_names = [f'{var}_g_H1s2' for var in self.variables]
self.code_lines.append(f' {", ".join(g_names)} = g({", ".join(all_H1s2 + self.parameters[1:])})')
self.code_lines.append(' ')
# 2.4 state 3
# ---
# H0s3 = x + dt * (0.25 * f_H0s1 + 0.25 * f_H0s2) + (g_H1s1 + 0.5 * g_H1s2) * I10 / dt
# H1s3 = x + dt * f_H0s1 + dt_sqrt * g_H1s1
# f_H0s3 = g(H0s3, t + 0.5 * dt, *args)
# g_H1s3 = g(H1s3, t + dt, *args)
all_H0s3, all_H1s3 = [], []
for var in self.variables:
self.code_lines.append(f' {var}_H0s3 = {var} + {constants.DT} * (0.25 * {var}_f_H0s1 + 0.25 * {var}_f_H0s2) + '
f'({var}_g_H1s1 + 0.5 * {var}_g_H1s2) * {var}_I10 / {constants.DT}')
all_H0s3.append(f'{var}_H0s3')
self.code_lines.append(f' {var}_H1s3 = {var} + {constants.DT} * {var}_f_H0s1 + dt_sqrt * {var}_g_H1s1')
all_H1s3.append(f'{var}_H1s3')
all_H0s3.append(f't + 0.5 * {constants.DT}') # t
all_H1s3.append(f't + {constants.DT}') # t
f_names = [f'{var}_f_H0s3' for var in self.variables]
g_names = [f'{var}_g_H1s3' for var in self.variables]
self.code_lines.append(f' {", ".join(f_names)} = f({", ".join(all_H0s3 + self.parameters[1:])})')
self.code_lines.append(f' {", ".join(g_names)} = g({", ".join(all_H1s3 + self.parameters[1:])})')
self.code_lines.append(' ')
# 2.5 state 4
# ----
# H1s4 = x + dt * 0.25 * f_H0s3 + dt_sqrt * (2 * g_H1s1 - g_H1s2 + 0.5 * g_H1s3)
# g_H1s4 = g(H1s4, t + 0.25 * dt, *args)
all_H1s4 = []
for var in self.variables:
self.code_lines.append(f' {var}_H1s4 = {var} + 0.25 * {constants.DT} * {var}_f_H0s1 + dt_sqrt * '
f'(2 * {var}_g_H1s1 - {var}_g_H1s2 + 0.5 * {var}_g_H1s3)')
all_H1s4.append(f'{var}_H1s4')
all_H1s4.append(f't + 0.25 * {constants.DT}') # t
g_names = [f'{var}_g_H1s4' for var in self.variables]
self.code_lines.append(f' {", ".join(g_names)} = g({", ".join(all_H1s4 + self.parameters[1:])})')
self.code_lines.append(' ')
# 2.6 final stage
# ----
# f1 = f_H0s1 / 6 + f_H0s2 / 6 + f_H0s3 * 2 / 3
# g1 = - I1 + I11 / dt_sqrt + 2 * I10 / dt - 2 * I111 / dt
# g2 = I1 * 4 / 3 - I11 / dt_sqrt * 4 / 3 - I10 / dt * 4 / 3 + I111 / dt * 5 / 3
# g3 = I1 * 2 / 3 + I11 / dt_sqrt / 3 - I10 / dt * 2 / 3 - I111 / dt * 2 / 3
# g4 = I111 / dt
# y1 = x + dt * f1 + g1 * g_H1s1 + g2 * g_H1s2 + g3 * g_H1s3 + g4 * g_H1s4
for var in self.variables:
self.code_lines.append(f' {var}_f1 = {var}_f_H0s1/6 + {var}_f_H0s2/6 + {var}_f_H0s3*2/3')
self.code_lines.append(
f' {var}_g1 = -{var}_I1 + {var}_I11/dt_sqrt + 2 * {var}_I10/{constants.DT} - 2 * {var}_I111/{constants.DT}')
self.code_lines.append(f' {var}_g2 = {var}_I1 * 4/3 - {var}_I11 / dt_sqrt * 4/3 - '
f'{var}_I10 / {constants.DT} * 4/3 + {var}_I111 / {constants.DT} * 5/3')
self.code_lines.append(f' {var}_g3 = {var}_I1 * 2/3 + {var}_I11/dt_sqrt/3 - '
f'{var}_I10 / {constants.DT} * 2/3 - {var}_I111 / {constants.DT} * 2/3')
self.code_lines.append(f' {var}_g4 = {var}_I111 / {constants.DT}')
self.code_lines.append(f' {var}_new = {var} + {constants.DT} * {var}_f1 + {var}_g1 * {var}_g_H1s1 + '
f'{var}_g2 * {var}_g_H1s2 + {var}_g3 * {var}_g_H1s3 + {var}_g4 * {var}_g_H1s4')
self.code_lines.append(' ')
# returns
new_vars = [f'{var}_new' for var in self.variables]
self.code_lines.append(f' return {", ".join(new_vars)}')
# return and compile
self.integral = utils.compile_code(
code_scope={k: v for k, v in self.code_scope.items()},
code_lines=self.code_lines,
show_code=self.show_code,
func_name=self.func_name)
register_sde_integrator('srk2w1', SRK2W1)
[docs]
class KlPl(SDEIntegrator):
def __init__(self, f, g, dt=None, name=None, show_code=False,
var_type=None, intg_type=None, wiener_type=None, state_delays=None):
super(KlPl, self).__init__(f=f, g=g, dt=dt, show_code=show_code, name=name,
var_type=var_type, intg_type=intg_type,
wiener_type=wiener_type, state_delays=state_delays)
assert self.wiener_type == constants.SCALAR_WIENER
self.build()
def build(self):
self.code_lines.append(f' {constants.DT}_sqrt = {constants.DT} ** 0.5')
# 2.1 noise
_noise_terms(self.code_lines, self.variables, triple_integral=False)
# 2.2 stage 1
_state1(self.code_lines, self.variables, self.parameters)
# 2.3 stage 2
# ----
# H1s2 = x + dt * f_H0s1 + dt_sqrt * g_H1s1
# g_H1s2 = g(H1s2, t0, *args)
all_H1s2 = []
for var in self.variables:
self.code_lines.append(f' {var}_H1s2 = {var} + {constants.DT} * {var}_f_H0s1 + dt_sqrt * {var}_g_H1s1')
all_H1s2.append(f'{var}_H1s2')
g_names = [f'{var}_g_H1s2' for var in self.variables]
self.code_lines.append(f' {", ".join(g_names)} = g({", ".join(all_H1s2 + self.parameters)})')
self.code_lines.append(' ')
# 2.4 final stage
# ----
# g1 = (I1 - I11 / dt_sqrt + I10 / dt)
# g2 = I11 / dt_sqrt
# y1 = x + dt * f_H0s1 + g1 * g_H1s1 + g2 * g_H1s2
for var in self.variables:
self.code_lines.append(f' {var}_g1 = -{var}_I1 + {var}_I11/dt_sqrt + {var}_I10/{constants.DT}')
self.code_lines.append(f' {var}_g2 = {var}_I11 / dt_sqrt')
self.code_lines.append(f' {var}_new = {var} + {constants.DT} * {var}_f_H0s1 + '
f'{var}_g1 * {var}_g_H1s1 + {var}_g2 * {var}_g_H1s2')
self.code_lines.append(' ')
# returns
new_vars = [f'{var}_new' for var in self.variables]
self.code_lines.append(f' return {", ".join(new_vars)}')
# return and compile
self.integral = utils.compile_code(
code_scope={k: v for k, v in self.code_scope.items()},
code_lines=self.code_lines,
show_code=self.show_code,
func_name=self.func_name)
register_sde_integrator('klpl', KlPl)