Source code for worq.pool.process

# WorQ - Python task queue
#
# Copyright (c) 2012 Daniel Miller
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Multi-process worker pool

Processes in the ``worq.pool.process`` stack:

* Queue - enqueues tasks to be executed
* Broker - task queue and results backend (redis)
* WorkerPool - worker pool manager process
* Worker - worker process, which does the real work
"""

import errno
import logging
import os
import sys
import time
import subprocess
from functools import partial
from multiprocessing import Pipe, cpu_count, current_process
from multiprocessing.process import AuthenticationString
from multiprocessing.reduction import reduce_connection, rebuild_connection
try:
    from cPickle import dump, load, HIGHEST_PROTOCOL, PicklingError
    from Queue import Empty, Queue as ThreadQueue
except ImportError:
    from pickle import dump, load, HIGHEST_PROTOCOL, PicklingError
    from queue import Empty, Queue as ThreadQueue
from threading import Thread
from worq import get_broker
from worq.core import DAY, DEFAULT

log = logging.getLogger(__name__)

STOP = 'STOP'
PYTHON_EXE = sys.executable
WORKER_POLL_INTERVAL = 30

class Error(Exception): pass


[docs]class WorkerPool(object): """Multi-process worker pool :param broker: Broker object. :param init_func: Worker initializer. This is called to initialize each worker on startup. It can be used to setup logging or do any other global initialization. The first argument will be a broker url, and the remaining will be ``*init_args, **init_kwargs``. It must return a broker instance, and it must be pickleable. :param init_args: Additional arguments to pass to init_func. :param init_kwargs: Additional keyword arguments to pass to init_func. :param workers: Number of workers to maintain in the pool. The default value is the number returned by ``multiprocessing.cpu_count``. :param max_worker_tasks: Maximum number of tasks to execute on each worker before retiring the worker and spawning a new one in its place. :param name: A name for this pool to distinguish its log output from that of other pools running in the same process. """ def __init__(self, broker, init_func, init_args=(), init_kwargs=None, workers=None, max_worker_tasks=None, name=None, ): self.broker = broker self.init_func = init_func self.init_args = (broker.url,) + init_args self.init_kwargs = {} if init_kwargs is None else init_kwargs if workers is None: try: workers = cpu_count() except NotImplementedError: workers = 1 self.workers = workers self.max_worker_tasks = max_worker_tasks if name is None and broker.name != DEFAULT: name = broker.name self.name = name self._workers = [] self._worker_queue = ThreadQueue() self._running = False def __str__(self): parts = (type(self).__name__, os.getpid(), self.name) return '-'.join(str(p) for p in parts if p)
[docs] def start(self, timeout=10, handle_sigterm=True): """Start the worker pool :param timeout: Number of seconds to block while waiting for a new task to arrive on the queue. This timeout value affects pool stop time: a larger value means shutdown may take longer because it may need to wait longer for the consumer thread to complete. :param handle_sigterm: If true (the default) setup a signal handler and block until the process is signalled. This should only be called in the main thread in that case. If false, start workers and a pool manager thread and return. """ if handle_sigterm: return start_pools(self, timeout=timeout) if self._running: raise Error('cannot start already running WorkerPool') self._running = True for n in range(self.workers): worker = WorkerProxy( self.broker, self.init_func, self.init_args, self.init_kwargs, self.max_worker_tasks ) self._workers.append(worker) self._worker_queue.put(worker) args = (timeout,) self._consumer_thread = Thread(target=self._consume_tasks, args=args) self._consumer_thread.start() log.info('%s started', str(self))
def _consume_tasks(self, timeout): get_task = self.broker.next_task get_worker = self._worker_queue.get put_worker = self._worker_queue.put try: while True: worker = get_worker() if worker == STOP: break task = get_task(timeout) if task is None: put_worker(worker) continue worker.execute(task, put_worker) except Exception: log.error('%s task consumer crashed', str(self), exc_info=True) log.debug('%s consumer stopped', str(self))
[docs] def stop(self, join=True): """Shutdown the pool This is probably only useful when the pool was started with `handle_sigterm=False`. """ if not self._running: return False self._running = False log.info('shutting down %s...', str(self)) while True: try: item = self._worker_queue.get_nowait() except Empty: break self._worker_queue.put(STOP) for worker in self._workers: worker.stop() if join: self.join() return True
[docs] def join(self): """Wait for pool to stop (call after ``.stop(join=False)``)""" assert not self._running, 'call stop() first' self._consumer_thread.join() self._worker_queue = ThreadQueue() for worker in self._workers: worker.join() log.info('%s stopped', str(self))
[docs]def start_pools(*pools, **start_kwargs): """Start one or more pools and wait indefinitely for SIGTERM or SIGINT This is a blocking call, and should be run in the main thread. """ if not pools: raise ValueError('start_pools requires at least 1 argument, got 0') for pool in pools: pool.start(handle_sigterm=False, **start_kwargs) setup_exit_handler() log.info('Press CTRL+C or send SIGTERM to stop') try: # Sleep indefinitely waiting for a signal to initiate pool shutdown. while True: time.sleep(DAY) finally: for pool in pools: pool.stop(join=False) for pool in pools: pool.join()
class WorkerProxy(object): def __init__(self, *args): self.pid = 'not-started' self.queue = ThreadQueue() self.thread = Thread(target=self._proxy_loop, args=args) self.thread.start() def execute(self, task, return_to_pool): self.queue.put((task, return_to_pool)) def stop(self): self.queue.put((STOP, None)) def join(self): self.queue = None self.thread.join() def _proxy_loop(self, broker, *args): is_debug = partial(log.isEnabledFor, logging.DEBUG) pid = os.getpid() proc = None queue = self.queue def stop(): try: if proc is not None: child.send(STOP) child.close() proc.join() except Exception: log.error('%s failed to stop cleanly', str(self), exc_info=True) raise else: log.debug('terminated %s', str(self)) finally: self.pid = '%s-terminated' % self.pid while True: if proc is None or not proc.is_alive(): # start new worker process child, parent = Pipe() cx = _reduce_connection(parent) # HACK reduce for pickle proc = run_in_subprocess(worker_process, pid, cx, *args) self.pid = proc.pid task, return_to_pool = queue.get() if task == STOP: stop() break try: child.send(task) while not child.poll(task.heartrate): if not proc.is_alive(): broker.task_failed(task) raise Error('unknown cause of death') broker.heartbeat(task) (result, status) = child.recv() broker.set_result(task, result) except Exception: log.error('%s died unexpectedly', str(self), exc_info=True) child.close() proc.stdin.close() proc = None else: if is_debug(): log.debug('%s completed task', str(self)) if status == STOP: child.close() proc.stdin.close() proc = None finally: return_to_pool(self) def __str__(self): return 'WorkerProxy-%s' % self.pid def worker_process(parent_pid, reduced_cn, init, init_args, init_kw, max_worker_tasks): broker = init(*init_args, **init_kw) log.info('Worker-%s started', os.getpid()) task_count = 1 parent = reduced_cn[0](*reduced_cn[1]) # HACK un-reduce connection while True: while not parent.poll(WORKER_POLL_INTERVAL): if os.getppid() != parent_pid: log.error('abort: parent process changed') return try: task = parent.recv() except EOFError: log.error('abort: parent fd closed') break if task == STOP: break result = broker.invoke(task, return_result=True) if max_worker_tasks is None: parent.send((result, None)) # send result elif task_count < max_worker_tasks: task_count += 1 parent.send((result, None)) # send result else: parent.send((result, STOP)) # send result, worker stopping break log.info('Worker-%s stopped', os.getpid())
[docs]def run_in_subprocess(_func, *args, **kw): """Call function with arguments in subprocess All arguments to this function must be able to be pickled. Use ``subprocess.Popen`` rather than ``multiprocessing.Process`` because we use threads, which do not play nicely with ``fork``. This was originally written with ``multiprocessing.Process``, which caused in intermittent deadlocks. See http://bugs.python.org/issue6721 :returns: A ``PopenProcess`` object. """ prog = 'from worq.pool.process import main; main()' # close_fds=True prevents intermittent deadlock in Popen # See http://bugs.python.org/issue2320 proc = subprocess.Popen([PYTHON_EXE, '-c', prog], stdin=subprocess.PIPE, close_fds=True, universal_newlines=False, preexec_fn=disable_signal_propagation) assert proc.stdout is None assert proc.stderr is None try: dump((_func, args, kw), proc.stdin, HIGHEST_PROTOCOL) proc.stdin.flush() except IOError as e: # copied from subprocess.Popen.communicate if e.errno != errno.EPIPE and e.errno != errno.EINVAL: proc.terminate() raise except PicklingError: proc.terminate() raise return PopenProcess(proc)
def main(): func, args, kw = load(get_stdin(sys)) try: func(*args, **kw) except Exception: log.critical('subprocess crashed', exc_info=True) raise def _reduce_connection(conn): """Reduce a connection object so it can be pickled. WARNING this puts the current process' authentication key in the data to be pickled. Connections pickled with this function should not be sent over an untrusted network. HACK work around ``multiprocessing`` connection authentication because we are using ``subprocess.Popen`` instead of ``multiprocessing.Process`` to spawn new child processes. This will not be necessary when ``multiprocessing.Connection`` objects can be pickled. See http://bugs.python.org/issue4892 """ obj = reduce_connection(conn) assert obj[0] is rebuild_connection, obj assert len(obj) == 2, obj args = (bytes(current_process().authkey),) + obj[1] return (_rebuild_connection, args) def _rebuild_connection(authkey, *args): current_process().authkey = AuthenticationString(authkey) return rebuild_connection(*args)
[docs]class PopenProcess(object): """Make a ``subprocess.Popen`` object more like ``multiprocessing.Process`` """ def __init__(self, proc): self._proc = proc def is_alive(self): return self._proc.poll() is None def join(self, timeout=None): if timeout is None: self._proc.communicate() return end = time.time() + timeout while time.time() < end: if not self.is_alive(): return time.sleep(0.01) def __getattr__(self, name): return getattr(self._proc, name)
def disable_signal_propagation(): # http://stackoverflow.com/a/5446983/10840 os.setpgrp() def setup_exit_handler(): # http://danielkaes.wordpress.com/2009/06/04/how-to-catch-kill-events-with-python/ def on_exit(sig, func=None): sys.exit() if os.name == "nt": try: import win32api win32api.SetConsoleCtrlHandler(on_exit, True) except ImportError: version = ".".join(map(str, sys.version_info[:2])) raise Exception("pywin32 not installed for Python " + version) else: import signal signal.signal(signal.SIGINT, on_exit) signal.signal(signal.SIGTERM, on_exit) if sys.version_info.major < 3: def get_stdin(obj): return obj.stdin else: def get_stdin(obj): return obj.stdin.buffer