Initialize Process Pool Worker with Individual Value

There could be scenes when you are using multiprocessing.pool.Pool and you want to perform some initialization for each worker before tasks are scheduled via Pool.map() or something alike. For example, you create a pool of 4 workers, each for one GPU, and expect tasks scheduled on Worker-i to precisely utilize GPU-i. In this case, Worker-i should be initialized with env var CUDA_VISIBLE_DEVICES=<i> set.

To initialize spawned workers, the constructor of Pool provides two arguments concerning the job 1initializer and initargs. initializer is expected to be a callable, and if specified, each worker process will call initializer(*initargs) when it starts.

import multiprocessing as mp
import multiprocessing.pool as mpp

def worker(arg1):
print(arg1)

mpp.Pool(processes=2, initializer=worker, initargs=(42, ))
# 42
# 42

This is, however, slightly away from what we expect. The initializer is called with same arguments in each worker, while in our case, the arguments are expected to be different, like value 1 for Worker-0 and value 1 for Worker-1. There are two approaches to do the tricks.

Use a Queue

Queue and SimpleQueue types in module multiprocessing 2 implement multi-producer, multi-consumer FIFO queues under the multi-processing scenario. We may create and share a queue among parent and worker processes, send individual values from parent processes and read them from workers. Since the sending and receiving operations are synchronized, we won’t run into any race conditions.

def worker(q):
print(q.get())

q = mp.SimpleQueue()
p = mpp.Pool(processes=2, initializer=worker, initargs=(q,))
for i in range(2):
q.put(i)
p.close()
# 0
# 1

Use a Value

Alternatively, we may use a lighter shared object other than a queue. The Value type in module multiprocessing 3 allows sharing simple values across multiple processes. It can also synchronize accesses to values to avoid race conditions if necessary. We can use a Value object to allocate an individual id for each worker process.

def worker(v):
with v.get_lock():
val = v.value
v.value += 1
print(val)

v = mp.Value(ctypes.c_int32, 0, lock=True)
p = mpp.Pool(processes=2, initializer=worker, initargs=(v,))
p.close()
# 0
# 1

Author: hsfzxjy.
Link: .
License: CC BY-NC-ND 4.0.
All rights reserved by the author.
Commercial use of this post in any form is NOT permitted.
Non-commercial use of this post should be attributed with this block of text.

«A Flaw of Promoting Complex Trait Bounds in Rust

OOPS!

A comment box should be right here...But it was gone due to network issues :-(If you want to leave comments, make sure you have access to disqus.com.