pool Package

process Module

Multi-process worker pool

Processes in the worq.pool.process stack:

  • Queue - enqueues tasks to be executed
  • Broker - task queue and results backend (redis)
  • WorkerPool - worker pool manager process
  • Worker - worker process, which does the real work
class worq.pool.process.PopenProcess(proc)[source]

Bases: object

Make a subprocess.Popen object more like multiprocessing.Process

class worq.pool.process.WorkerPool(broker, init_func, init_args=(), init_kwargs=None, workers=None, max_worker_tasks=None, name=None)[source]

Bases: object

Multi-process worker pool

Parameters:
  • broker – Broker object.
  • init_func – Worker initializer. This is called to initialize each worker on startup. It can be used to setup logging or do any other global initialization. The first argument will be a broker url, and the remaining will be *init_args, **init_kwargs. It must return a broker instance, and it must be pickleable.
  • init_args – Additional arguments to pass to init_func.
  • init_kwargs – Additional keyword arguments to pass to init_func.
  • workers – Number of workers to maintain in the pool. The default value is the number returned by multiprocessing.cpu_count.
  • max_worker_tasks – Maximum number of tasks to execute on each worker before retiring the worker and spawning a new one in its place.
  • name – A name for this pool to distinguish its log output from that of other pools running in the same process.
join()[source]

Wait for pool to stop (call after .stop(join=False))

start(timeout=10, handle_sigterm=True)[source]

Start the worker pool

Parameters:
  • timeout – Number of seconds to block while waiting for a new task to arrive on the queue. This timeout value affects pool stop time: a larger value means shutdown may take longer because it may need to wait longer for the consumer thread to complete.
  • handle_sigterm – If true (the default) setup a signal handler and block until the process is signalled. This should only be called in the main thread in that case. If false, start workers and a pool manager thread and return.
stop(join=True)[source]

Shutdown the pool

This is probably only useful when the pool was started with handle_sigterm=False.

worq.pool.process.run_in_subprocess(_func, *args, **kw)[source]

Call function with arguments in subprocess

All arguments to this function must be able to be pickled.

Use subprocess.Popen rather than multiprocessing.Process because we use threads, which do not play nicely with fork. This was originally written with multiprocessing.Process, which caused in intermittent deadlocks. See http://bugs.python.org/issue6721

Returns:A PopenProcess object.
worq.pool.process.start_pools(*pools, **start_kwargs)[source]

Start one or more pools and wait indefinitely for SIGTERM or SIGINT

This is a blocking call, and should be run in the main thread.

thread Module

class worq.pool.thread.WorkerPool(broker, workers=1, thread_factory=<class 'threading.Thread'>)[source]

Bases: object

Multi-thread worker pool

Parameters:
  • broker – Queue broker instance.
  • workers – Number of workers in the pool.
  • thread_factory – A factory function that creates a new thread object. This should have the same signature as threading.Thread and should return a thread object.
join()[source]

Wait for all threads to stop (call stop(join=False) first)

start(timeout=1)[source]

Start worker threads.

Parameters:timeout – The number of seconds to wait for a task before checking if the pool has been asked to stop.
stop(use_sentinel=False, join=True)[source]

Stop the worker pool

Parameters:
  • use_sentinel – Enqueue a no-op task for each worker if true. This will result in a more responsive shutdown if there are no other worker pools consuming tasks from the broker.
  • join – Join each thread afer sending the stop signal.