Source code for mppi.Utilities.Parallel

"""
This module contains some tools to perform parallel procedures using the python
multiprocessing package

"""
import multiprocessing as mp, time as tm
import numpy as np
from datetime import timedelta

[docs]def loop(func, pars, *args, ntasks = 4, verbose = True, **kwargs): """ Perform a parallel loop over the values of the pars array and compute the values of the function func, using ntasks parallel processes Args: func (function) : a function that returns a value for each element of pars pars (:py:class:`array`) : array with the values iterate by the loop ntask (:py:class:`int`) : number of parallel tasks verbose (:py:class:`bool`) : determine the amount of information provided on terminal args, kwargs : arguments and keyword arguments passed to func """ def func_loop(func,pars_subset,task,output,*args,**kwargs): """ Evaluate the function func for all the values inside a single task. Add the dictionary with the results of the task to the queue of the multiprocess """ results = [] for p in pars_subset: results.append(func(p,*args,**kwargs)) output.put({task:np.array(results)}) pars_split = np.array_split(pars,ntasks) if verbose : print('Run a parallel loop with %s tasks...'%ntasks) t0 = tm.time() output = mp.Queue() tasks = [mp.Process(target=func_loop, args=(func,pars_split[task],task,output,*args,), kwargs=kwargs) for task in range(ntasks)] for p in tasks: p.start() results_dict = {} for p in tasks: results_dict.update(output.get()) results = np.concatenate([results_dict[i] for i in range(ntasks)]) if verbose : deltaTime = int(tm.time()-t0) dT_str = "{:0>8}".format(str(timedelta(seconds=deltaTime))) print('Loop executed in',dT_str) return results