Shared memory¶
Note: the common_memory
feature should be considered “very alpha” in that its specification and functionality are likely to change without warning. Additionally, common_memory
requires Python 3.8.
Overview¶
In very specific instances, it’s important to share memory across pipeline jobs and the main process without having to pass it through the pipeline. This is especially useful for when one needs to access variables or data “out of band” in that they are not part of the data stream on which the pipeline operates. Some use cases for this might be:
Collecting metrics¶
For measuring processor metrics, it would make sense to maintain a global database, rather than returning a metrics object describing the operation of the processor. It’s best to keep control and data variables separate for both programmer’s sanity and for the sake of saving inter-processor bus bandwidth.
Maintaining inter-processor state¶
In certain situations, multiple processors must be made stateful and must refer to a state variable stored elsewhere such that all processors can read it.
Large, static blocks of data (ie: a pandas DataFrame)¶
This is especially true for training machine learning datasets. While it is possible to send a large DataFrame object to multiple processors via a replicating fork, a better idea would be to use a global read-only store. This would prevent duplication of large quantities of redundant data.
Common pitfalls¶
It can be tempting to declare a global variable in your main application and then access it from within job function definitions. While this won’t throw an error, it is important to remember that while Conveyor processors are written as functions, they are ultimately run as processes with their own view of a virtual memory space. This means that each processor, while all sharing a global variable name, will have its own independent copy of that variable.
Additionally, when modifying memory shared between processors, race conditions can occur. If concurrent reads/writes occur on shared memory, be sure to surround variable accesses with appropriate locks. Needless to say, these locks should also live in shared memory.
Conveyor’s common_memory
functionality¶
When creating a pipeline, one can pass an optional argument of shared_memory_amt
. This is the number of bytes in memory to allocate for global access. You can access this data from within a processor with shared_memory.SharedMemory(name=common_memory)
.
from conveyor.pipeline import Pipeline
from conveyor.stages import Processor
from conveyor import common_memory
from multiprocessing import shared_memory
def worker1_task(args):
shmem = shared_memory.SharedMemory(name=common_memory)
buffer = shmem.buf
buffer[:4] = bytearray([00, 11, 22, 33])
shmem.close()
return args
def worker2_task(args):
shmem = shared_memory.SharedMemory(name=common_memory)
buffer = shmem.buf
buffer[0] = 44
shmem.close()
return args
def cleanup_task(args):
shmem = shared_memory.SharedMemory(name=common_memory)
import array
print(array.array('b', shmem.buf[:4]))
assert shmem.buf[0] == 44
assert shmem.buf[1] == 11
assert shmem.buf[2] == 22
assert shmem.buf[3] == 33
shmem.close()
shmem.unlink()
return args
pipeline = Pipeline(shared_memory_amt=10)
pipeline.add(Processor(worker1_task))
pipeline.add(Processor(worker2_task))
pipeline.add(Processor(cleanup_task))
with pipeline as pl:
pl.run(['abc'])
Yields this output: array('b', [44, 11, 22, 33])