Quickstart guide

The entire idea behind Conveyor is to process data as if it is moving through a stream (in Conveyor, this is called a pipeline). Each pipeline contains a series of stages in which data is manipulated in some way before being passed out of the other end of the pipeline. Each stage runs as its own process, meaning that when processing large batches of data, each transformation of the data can happen in parallel.

Installation

Install Conveyor with

pip install parallel-conveyor

Pipelines

Pipelines form the core of Conveyor. They are the object through which data flows and is transformed. Initialize a pipeline with the following:

from conveyor.pipeline import Pipeline

pl = Pipeline()

We can add stages to a pipeline using the .add() function, and run them using .run(). Additionally, we can use the with keyword to define a Pipeline to automatically free up resources used after we’re done using the pipeline. You will often see this method used in our documentation.

from conveyor.pipeline import Pipeline

with Pipeline() as pl:
    # Code involving the pipeline goes here

Stages

We now need to add stages to our pipeline. Stages will allow us to process our data and acheive throughput to enhance the performance of our program. Stages are added using the Pipeline object’s .add() function. Stages come in 4 main types:

Processors

Processors are the Conveyor’s logical compute cores. Each processor will accept some data as input, transform that input and optionally return an output. Processors can maintain an internal state. Each of these processors is user-defined and wrapped in its own Python instance, allowing parallel execution of multiple processors at the same time. A user can specify sets of processors that should execute serially as joined by pipes or sets of processors that should act in parallel as defined by forks.

We can create a Processor with Processor(job) where the argument job serves as a callback to function that will run in parallel. Each callback should be written such that it has a single argument that represents a single block of data to be processed. This argument can be an integer, string, tuple, object or any other data type. At the end of the function, we must return another single block of data to be handled by the next stage in the pipeline.

Writing jobs as functions is relatively straightforward: when the pipeline runs, each stage will receive a data block as a function argument, perform its processing, then spit out a result as a return value. Additionally, single-argument functions from other libraries or Python’s standard library can be used (ie: print()).

from conveyor.pipeline import Pipeline
from conveyor.stages import Processor

def job(arg):
    return arg + 1

with Pipeline() as pl:
    pl.add(Processor(job))

Pipes

Pipes serve as links between processors. They act as a stream, transporting data from one point in the pipeline to another. Pipes are implemented as a producer-consumer buffer where a leading processor acts as the producer and a trailing processor acts as a consumer. They don’t use any processing power on their own and can be implicitly added by Conveyor to fill in gaps between processors.

from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe

def job(arg):
    return arg + 1

with Pipeline() as pl:
    pl.add(Processor(job))
    pl.add(Pipe())
    pl.add(Processor(job))

Equivalently, we could write the same code without the Pipe, as Conveyor will add it implicitly.

from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe

def job(arg):
    return arg + 1

with Pipeline() as pl:
    pl.add(Processor(job))
    pl.add(Processor(job))

Forks

Forks serve as a way of splitting data travelling through the pipeline in two different ways. Forks can act in a replicating fashion, where they duplicate all data coming in and push it to many output processors or in a balancing fashion, where they perform a load-balancing operation, dividing input data blocks over a number of output processors.

Both types of forks take in a single argument: the number of output pipes to which they will attach. The number of stages added in parallel with the next call to .add() must match the number of output pipes at the previous stage. Processors, forks and pipes can all exist in parallel at the same stage in a pipeline.

Replicating Forks

Replicating Forks allow one processor to split output data into multiple copies so that multiple processors can then perform operations using the entire output data. For example, this would allow multiple different ML models to be trained and tested in parallel. The input-output numbering of the many-to-one relationship of forks is primarily user defined.

from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe, ReplicatingFork, Join

def job(arg):
    return arg + 1

with Pipeline() as pl:
    pl.add(ReplicatingFork(2))
    pl.add(Processor(job), Pipe())

Balancing Forks

Balancing Forks allow one processor to balance a stream of data over multiple consumer processors. This will serve to minimize the effect of pipe stalling for larger data sets. The input-output numbering of the many-to-one relationship of forks is primarily determined by pipe stalling detected at runtime and the number of physical cores available.

from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe, BalancingFork, Join

def job(arg):
    return arg + 1

with Pipeline() as pl:
    pl.add(BalancingFork(2))
    pl.add(Processor(job), Pipe())

Joins

Joins allow multiple processors to combine their data streams into one logical pipe. The combined stream can then be forked again for the next step of the pipeline, processed by a single processor, or serve as output at the end of the pipeline.

This means that the outputs from the previous parallel stages can be interleaved arbitrarily. Joins are useful when output from multiple different processors need to be serialized and work in a first-come-first-serve manner.

from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe, ReplicatingFork, Join

def job(arg):
    return arg + 1

with Pipeline() as pl:
    pl.add(ReplicatingFork(2))
    pl.add(Processor(job), Pipe())
    pl.add(Join(2))
    pl.add(Processor(print))

Running a pipeline

The final step is to run our pipeline that we just created.To do so, we call Pipeline.run() on an array of input data objects, where each item will be passed through the pipeline in the same order as in the array. At any given time, multiple items in the array will be in the pipeline, albeit at different stages. Running the following trivial example:

from conveyor.pipeline import Pipeline
from conveyor.stages import Processor, Pipe, ReplicatingFork, Join

def job(arg):
    return arg + 1

with Pipeline() as pl:
    pl.add(ReplicatingFork(2))
    pl.add(Processor(job), Pipe())
    pl.add(Join(2))
    pl.add(Processor(print))
    pl.run([2, 9, 11])

Yields this output (separated by new lines): 3 10 12 2 9 11.