Adding New Pipeline Types to FLoRIN¶
FLoRIN offers a number of pipeline options (Serial, Multithread, Multiprocess, etc.) out of the box, but what if you need a different model? This example will show how to create a custom pipeline class with a different style of execution.
SLURMPipeline¶
Suppose you work on a cluster that uses SLURM and want to submit a job to a queue. This requires a pipeline that
- Accepts parameters to configure
sbatch
- Sets up a job script
- Submits the job script for processing
- Blocks until all jobs are finished
Such a pipeline may look like this
import re
import subprocess
import time
import dill # dill is installed with florin
from florin.pipelines import Pipeline
class SLURMPipeline(Pipeline):
"""Pipeline that sets up and runs a SLURM job.
Parameters
----------
operations : callables
The functions of the pipeline.
Other Parameters
----------------
Keyword arguments corresponding to SLURM directives, e.g. qos='debug',
time=60, etc. These are dynamically added to the jobscript before
submission.
"""
def __init__(self, *operations, **kwargs):
super(SLURMPipeline, self).__init__(*operations)
self.slurm_directives = kwargs
def run(self, data):
"""Submit and run a pipeline on SLURM.
Parameters
----------
data : list
The input to the first function in the pipeline, e.g. a
filepath for florin.load().
"""
# Serialize this current pipeline
pipeline_path = 'my_pipeline.pkl'
self.dump(pipeline_path)
# Set up the job script. This sets up the shebang header, then
# iterates over the provided #SBATCH disrectives and sets each one
# up on its own line, then finally invokes srun to deserialize the
# pipeline and run it on the data.
jobscript = "#/usr/bin/env bash"
jobscript = '\n'.join(
['#!/usr/bin/env bash'] +
['#SBATCH --{}={}'.format(key, val) for key, val in self.slurm_directives.items()] +
['srun python -m florin.run {} $1'.format(pipeline_path)])
# Dump the jobscript to file
with open('my_jobscript.job', 'w') as f:
f.write(jobscript)
jobids = []
# Submit one job for each data item.
for item in data:
out = subprocess.check_output(['sbatch', my_jobscript, item])
jobids.append(re.search(r'([\d]+)', out).group())
# Wait until all jobs have completed to exit.
while len(jobids) > 0:
time.sleep(10)
completed = []
for jid in jobids:
out = subprocess.check_output(['sacct', '-j', jid])
if re.search(r'(COMPLETE)', out):
completed.add(jid)
for jid in completed:
jobids.remove(jid)
Note that this code is untested and by no means guaranteed to work, it is only meant to be a non-trivial example of what a custom pipeline may look like.
Other Examples¶
Another great source of examples for setting up custom pipelines is the
florin.pipelines
module, where the source code for the officially
supported pipelines.