There could be scenes when you are using multiprocessing.pool.Pool
and you want to perform some initialization for each worker before tasks are scheduled via Pool.map()
or something alike. For example, you create a pool of 4 workers, each for one GPU, and expect tasks scheduled on Worker-i to precisely utilize GPU-i. In this case, Worker-i should be initialized with env var CUDA_VISIBLE_DEVICES=<i>
set.
To initialize spawned workers, the constructor of Pool
provides two arguments concerning the job 1 – initializer
and initargs
. initializer
is expected to be a callable, and if specified, each worker process will call initializer(*initargs)
when it starts.
import multiprocessing as mp
import multiprocessing.pool as mpp
def worker(arg1):
print(arg1)
mpp.Pool(processes=2, initializer=worker, initargs=(42, ))
This is, however, slightly away from what we expect. The initializer
is called with same arguments in each worker, while in our case, the arguments are expected to be different, like value 1
for Worker-0 and value 1
for Worker-1. There are two approaches to do the tricks.
Use a Queue
Queue
and SimpleQueue
types in module multiprocessing
2 implement multi-producer, multi-consumer FIFO queues under the multi-processing scenario. We may create and share a queue among parent and worker processes, send individual values from parent processes and read them from workers. Since the sending and receiving operations are synchronized, we won’t run into any race conditions.
def worker(q):
print(q.get())
q = mp.SimpleQueue()
p = mpp.Pool(processes=2, initializer=worker, initargs=(q,))
for i in range(2):
q.put(i)
p.close()
Use a Value
Alternatively, we may use a lighter shared object other than a queue. The Value
type in module multiprocessing
3 allows sharing simple values across multiple processes. It can also synchronize accesses to values to avoid race conditions if necessary. We can use a Value
object to allocate an individual id for each worker process.
def worker(v):
with v.get_lock():
val = v.value
v.value += 1
print(val)
v = mp.Value(ctypes.c_int32, 0, lock=True)
p = mpp.Pool(processes=2, initializer=worker, initargs=(v,))
p.close()