Source code for florin.pipelines.mpi

"""MPI-based multiprocessing pipeline.

Classes
-------
MPIPipeline
    MPI-based multiprocessing pipeline.
"""

import dill
from mpi4py import MPI
from mpi4py.futures import MPIPoolExecutor

from florin.pipelines.pipeline import Pipeline


MPI.pickle.__init__(dill.dumps, dill.loads)


[docs]class MPIPipeline(Pipeline): """MPI-based multiprocessing pipeline. Parameters ---------- operations : callables Sequence of operations to run in the pipeline. Notes ----- MPI is configured by wrapping Python in an ``mpiexec`` or ``mpirun`` call at runtime. """
[docs] def run(self, data): with MPIPoolExecutor() as executor: result = executor.map(self.operations, data) return result