Conveyor - Intuitive Python pipelines¶
Conveyor is a multiprocessing framework for creating intuitive data pipelines. With Conveyor, you can easily create stream-based pipelines to efficiently perform a series of operations on data, a task especially useful in the fields of machine learning and scientific computing. Creating a pipelined job is as easy as writing a function.
Why use Conveyor?¶
The answer is simple: throughput. It’s like putting a second load of laundry in the washer while a previous load is in the dryer. By breaking down a problem into smaller serial tasks, we perform the smaller tasks in parallel and increase the efficiency of whatever problem we’re trying to solve.
Installation¶
Install Conveyor with pip install parallel-conveyor
.
A quick and trivial example¶
Let’s say we wanted to build a short pipeline that computed the fourth root of a number (done in two steps) and the cube of a number (in one step). In this case, we would describe this pipeline visually as such:
Schematic
To express it with Conveyor, we simply build the pipeline as follows
from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe, ReplicatingFork, Join
from math import sqrt
def square_root(arg):
return sqrt(arg)
def cube(arg):
return arg ** 3
with Pipeline() as pl:
# Duplicate the input
pl.add(ReplicatingFork(2))
# On first copy, compute the sqrt, on the second, the cube
pl.add(Processor(square_root), Processor(cube))
# On first copy, compute the sqrt, on the second, do nothing
pl.add(Processor(square_root), Pipe())
# Join the two data streams
pl.add(Join(2))
# Print the results
pl.add(Processor(print))
# Run the pipeline with three different inputs
pl.run([16, 3, 81])
$ python3 sample.py
2.0
1.3160740129524924
3.0
4096
27
531441
Contents¶
Quickstart guide¶
The entire idea behind Conveyor is to process data as if it is moving through a stream (in Conveyor, this is called a pipeline). Each pipeline contains a series of stages in which data is manipulated in some way before being passed out of the other end of the pipeline. Each stage runs as its own process, meaning that when processing large batches of data, each transformation of the data can happen in parallel.
Pipelines¶
Pipelines form the core of Conveyor. They are the object through which data flows and is transformed. Initialize a pipeline with the following:
from conveyor.pipeline import Pipeline
pl = Pipeline()
We can add stages to a pipeline using the .add()
function, and run them using .run()
. Additionally, we can use the with
keyword to define a Pipeline to automatically free up resources used after we’re done using the pipeline. You will often see this method used in our documentation.
from conveyor.pipeline import Pipeline
with Pipeline() as pl:
# Code involving the pipeline goes here
Stages¶
We now need to add stages to our pipeline. Stages will allow us to process our data and acheive throughput to enhance the performance of our program. Stages are added using the Pipeline object’s .add()
function. Stages come in 4 main types:
Processors¶
Processors are the Conveyor’s logical compute cores. Each processor will accept some data as input, transform that input and optionally return an output. Processors can maintain an internal state. Each of these processors is user-defined and wrapped in its own Python instance, allowing parallel execution of multiple processors at the same time. A user can specify sets of processors that should execute serially as joined by pipes or sets of processors that should act in parallel as defined by forks.
We can create a Processor with Processor(job)
where the argument job
serves as a callback to function that will run in parallel. Each callback should be written such that it has a single argument that represents a single block of data to be processed. This argument can be an integer, string, tuple, object or any other data type. At the end of the function, we must return another single block of data to be handled by the next stage in the pipeline.
Writing jobs as functions is relatively straightforward: when the pipeline runs, each stage will receive a data block as a function argument, perform its processing, then spit out a result as a return value. Additionally, single-argument functions from other libraries or Python’s standard library can be used (ie: print()
).
from conveyor.pipeline import Pipeline
from conveyor.stages import Processor
def job(arg):
return arg + 1
with Pipeline() as pl:
pl.add(Processor(job))
Pipes¶
Pipes serve as links between processors. They act as a stream, transporting data from one point in the pipeline to another. Pipes are implemented as a producer-consumer buffer where a leading processor acts as the producer and a trailing processor acts as a consumer. They don’t use any processing power on their own and can be implicitly added by Conveyor to fill in gaps between processors.
from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe
def job(arg):
return arg + 1
with Pipeline() as pl:
pl.add(Processor(job))
pl.add(Pipe())
pl.add(Processor(job))
Equivalently, we could write the same code without the Pipe, as Conveyor will add it implicitly.
from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe
def job(arg):
return arg + 1
with Pipeline() as pl:
pl.add(Processor(job))
pl.add(Processor(job))
Forks¶
Forks serve as a way of splitting data travelling through the pipeline in two different ways. Forks can act in a replicating fashion, where they duplicate all data coming in and push it to many output processors or in a balancing fashion, where they perform a load-balancing operation, dividing input data blocks over a number of output processors.
Both types of forks take in a single argument: the number of output pipes to which they will attach. The number of stages added in parallel with the next call to .add()
must match the number of output pipes at the previous stage. Processors, forks and pipes can all exist in parallel at the same stage in a pipeline.
Replicating Forks¶
Replicating Forks allow one processor to split output data into multiple copies so that multiple processors can then perform operations using the entire output data. For example, this would allow multiple different ML models to be trained and tested in parallel. The input-output numbering of the many-to-one relationship of forks is primarily user defined.
from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe, ReplicatingFork, Join
def job(arg):
return arg + 1
with Pipeline() as pl:
pl.add(ReplicatingFork(2))
pl.add(Processor(job), Pipe())
Balancing Forks¶
Balancing Forks allow one processor to balance a stream of data over multiple consumer processors. This will serve to minimize the effect of pipe stalling for larger data sets. The input-output numbering of the many-to-one relationship of forks is primarily determined by pipe stalling detected at runtime and the number of physical cores available.
from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe, BalancingFork, Join
def job(arg):
return arg + 1
with Pipeline() as pl:
pl.add(BalancingFork(2))
pl.add(Processor(job), Pipe())
Joins¶
Joins allow multiple processors to combine their data streams into one logical pipe. The combined stream can then be forked again for the next step of the pipeline, processed by a single processor, or serve as output at the end of the pipeline.
This means that the outputs from the previous parallel stages can be interleaved arbitrarily. Joins are useful when output from multiple different processors need to be serialized and work in a first-come-first-serve manner.
from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe, ReplicatingFork, Join
def job(arg):
return arg + 1
with Pipeline() as pl:
pl.add(ReplicatingFork(2))
pl.add(Processor(job), Pipe())
pl.add(Join(2))
pl.add(Processor(print))
Running a pipeline¶
The final step is to run our pipeline that we just created.To do so, we call Pipeline.run()
on an array of input data objects, where each item will be passed through the pipeline in the same order as in the array. At any given time, multiple items in the array will be in the pipeline, albeit at different stages. Running the following trivial example:
from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe, ReplicatingFork, Join
def job(arg):
return arg + 1
with Pipeline() as pl:
pl.add(ReplicatingFork(2))
pl.add(Processor(job), Pipe())
pl.add(Join(2))
pl.add(Processor(print))
pl.run([2, 9, 11])
Yields this output (separated by new lines): 3 10 12 2 9 11
.
Efficiency guide¶
If you’re using Conveyor, you’re using it because you want to squeeze every possible drop of performance out of your application. This should serve as a short guide of performance-breaking scenarios and how to avoid them.
Run pipelines using with
statements¶
Pipelines can be run in two different ways, as demonstrated with the code snippets below:
pl = Pipeline()
pl.add(job)
pl.run([data, data2, data3])
pl.run([data4, data5, data6])
or…
with Pipeline() as pl:
pl.add(job)
pl.run([data, data2, data3])
pl.run([data4, data5, data6])
When running a pipeline multiple times, Conveyor encourages users to use the
second option described above. In the first case, heavyweight processes are
created and killed (called ‘opening’ and ‘closing’ the pipeline, respectively)
at the start and end of each invocation of .run()
. This is disadvantageous,
because creating and killing processes takes a large length of time. It would
be much better to create the processes in the pipeline, use them on the first
invocation of .run()
, keep them running, and then use them again on the
second invocation of .run()
. This is what the second case does, where the
pipeline is implicitly opened and closed at the start and end of the with
statement.