Source code for brainpy.running.pathos_multiprocessing

# -*- coding: utf-8 -*-
# Copyright 2025 BrainX Ecosystem Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""The parallel execution of a BrainPy func on multiple CPU cores.

Specifically, these batch running functions include:

- ``cpu_ordered_parallel``: Performs a parallel ordered map.
- ``cpu_unordered_parallel``: Performs a parallel unordered map.
"""

import sys
from collections.abc import Sized
from typing import (Any, Callable, Generator, Iterable, List,
                    Union, Optional, Sequence, Dict)

from tqdm.auto import tqdm

from brainpy._errors import PackageMissingError

try:
    from pathos.helpers import cpu_count  # noqa
    from pathos.multiprocessing import ProcessPool  # noqa
    import multiprocess.context as ctx  # noqa

    ctx._force_start_method('spawn')
except ModuleNotFoundError:
    cpu_count = None
    ProcessPool = None

__all__ = [
    'cpu_ordered_parallel',
    'cpu_unordered_parallel',
]


def _parallel(
    ordered: bool,
    function: Callable,
    arguments: Union[Sequence[Iterable], Dict[str, Iterable]],
    num_process: Union[int, float] = None,
    num_task: int = None,
    **tqdm_kwargs: Any
) -> Generator:
    """Perform a parallel map with a progress bar.

    Parameters::

    ordered: bool
      True for an ordered map, false for an unordered map.
    function: callable, function
      The function to apply to each element of the given Iterables.
    arguments: sequence of Iterable, dict
      One or more Iterables containing the data to be mapped.
    num_process: int, float
      Number of threads used for parallel running. If `int`, it is
      the number of threads to be used; if `float`, it is the fraction
      of total threads to be used for running.
    num_task: int
      The total number of tasks in this parallel running.
    tqdm_kwargs: Any
      The setting for the progress bar.

    Returns::

    results: Iterable
        A generator which will apply the function to each element of the given Iterables
        in parallel in order with a progress bar.
    """
    if sys.platform == 'win32' and sys.version_info.minor >= 11:
        raise NotImplementedError('Multiprocessing is not available in Python >=3.11 on Windows. '
                                  'Please use Linux or MacOS, or Windows with Python <= 3.10.')

    if ProcessPool is None or cpu_count is None:
        raise PackageMissingError(
            '''
          Please install "pathos" package first. 
          
          >>>  pip install pathos
            '''
        )

    # Determine num_process
    if num_process is None:
        num_process = cpu_count()
    elif isinstance(num_process, int):
        pass
    elif isinstance(num_process, float):
        num_process = int(round(num_process * cpu_count()))
    else:
        raise ValueError('"num_process" must be an int or a float.')

    # arguments
    if isinstance(arguments, dict):
        keys = list(arguments.keys())
        arguments = list(arguments.values())
        run_f = lambda *args: function(**{key: arg for key, arg in zip(keys, args)})
    else:
        if not isinstance(arguments, (tuple, list)):
            raise TypeError('"arguments" must be a sequence of Iterable or a dict of Iterable. '
                            f'But we got {type(arguments)}')
        run_f = function

    # Determine length of tqdm
    lengths = [len(iterable) for iterable in arguments if isinstance(iterable, Sized)]
    num_task = num_task or (min(lengths) if lengths else None)

    # Create parallel generator
    pool = ProcessPool(nodes=num_process)
    if ordered:
        map_func = pool.imap
    else:
        map_func = pool.uimap

    # Choose tqdm variant
    for item in tqdm(map_func(run_f, *arguments), total=num_task, **tqdm_kwargs):
        yield item

    pool.clear()


[docs] def cpu_ordered_parallel( func: Callable, arguments: Union[Sequence[Iterable], Dict[str, Iterable]], num_process: Optional[Union[int, float]] = None, num_task: Optional[int] = None, **tqdm_kwargs: Any ) -> List[Any]: """Performs a parallel ordered map with a progress bar. Examples:: >>> import brainpy as bp >>> import brainpy.math as bm >>> import numpy as np >>> >>> def simulate(inp): >>> inp = bm.as_jax(inp) >>> hh = bp.dyn.HH(1) >>> runner = bp.DSRunner(hh, inputs=['input', inp], >>> monitors=['V', 'spike'], >>> progress_bar=False) >>> runner.run(100) >>> bm.clear_buffer_memory() # clear all cached data and functions >>> return runner.mon.spike.sum() >>> >>> if __name__ == '__main__': # This is important! >>> results = bp.running.cpu_unordered_parallel(simulate, [np.arange(1, 10, 100)], num_process=10) >>> print(results) Parameters:: func: callable, function The function to apply to each element of the given Iterables. arguments: sequence of Iterable, dict One or more Iterables containing the data to be mapped. num_process: int, float Number of threads used for parallel running. If `int`, it is the number of threads to be used; if `float`, it is the fraction of total threads to be used for running. num_task: int The total number of tasks in this parallel running. tqdm_kwargs: Any The setting for the progress bar. Returns:: results: list A list which will apply the function to each element of the given tasks. """ generator = _parallel(True, func, arguments, num_process=num_process, num_task=num_task, **tqdm_kwargs) return list(generator)
[docs] def cpu_unordered_parallel( func: Callable, arguments: Union[Sequence[Iterable], Dict[str, Iterable]], num_process: Optional[Union[int, float]] = None, num_task: Optional[int] = None, **tqdm_kwargs: Any ) -> List[Any]: """Performs a parallel unordered map with a progress bar. Examples:: >>> import brainpy as bp >>> import brainpy.math as bm >>> import numpy as np >>> >>> def simulate(inp): >>> inp = bm.as_jax(inp) >>> hh = bp.dyn.HH(1) >>> runner = bp.DSRunner(hh, inputs=['input', inp], >>> monitors=['V', 'spike'], >>> progress_bar=False) >>> runner.run(100) >>> bm.clear_buffer_memory() # clear all cached data and functions >>> return runner.mon.spike.sum() >>> >>> if __name__ == '__main__': # This is important! >>> results = bp.running.cpu_unordered_parallel(simulate, [np.arange(1, 10, 100)], num_process=10) >>> print(results) Parameters:: func: callable, function The function to apply to each element of the given Iterables. arguments: sequence of Iterable, dict One or more Iterables containing the data to be mapped. num_process: int, float Number of threads used for parallel running. If `int`, it is the number of threads to be used; if `float`, it is the fraction of total threads to be used for running. num_task: int The total number of tasks in this parallel running. tqdm_kwargs: Any The setting for the progress bar. Returns:: results: list A list which will apply the function to each element of the given tasks. """ generator = _parallel(False, func, arguments, num_process=num_process, num_task=num_task, **tqdm_kwargs) return list(generator)