Source code for worq.tests.util

# WorQ - Python task queue
#
# Copyright (c) 2012 Daniel Miller
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import functools
import time
import logging
import shutil
from contextlib import contextmanager
from tempfile import mkdtemp
from threading import Lock

from worq.pool.thread import WorkerPool
from worq.tests import TEST_URLS

try:
    basestring
except NameError:
    # Python 3 compat
    basestring = str

log = logging.getLogger(__name__)

WAIT = 30 # default test timeout (seconds)
DEFAULT_TIMEOUT = 10 # default utility timeout

[docs]def with_urls(test=None, exclude=None): if test is not None: @functools.wraps(test) def wrapper(): for url in TEST_URLS: if exclude: if not url.startswith(exclude): yield test, url else: yield test, url return wrapper return functools.partial(with_urls, exclude=exclude)
@contextmanager
[docs]def thread_worker(broker, lock=None, timeout=1): if lock is not None: real_next_task = broker.next_task def next_task(*args, **kw): log.debug('acquiring lock before getting next task') lock.acquire() return real_next_task(*args, **kw) broker.next_task = next_task pool = WorkerPool(broker) pool.start(timeout=timeout) try: yield finally: pool.stop(use_sentinel=True, join=False) try: if lock is not None: lock.release() except Exception: log.error('lock release failed', exc_info=True) finally: pool.join() broker.discard_pending_tasks()
[docs]class TimeoutLock(object): """A lock with acquisition timeout""" def __init__(self, locked=False): self.lock = Lock() self.mutex = Lock() with self.mutex: if locked: self.lock.acquire() self.locked = locked
[docs] def acquire(self, timeout=DEFAULT_TIMEOUT): end = time.time() + timeout while time.time() < end: with self.mutex: if self.lock.acquire(False): self.locked = True return raise Exception('lock timeout')
[docs] def release(self): with self.mutex: self.lock.release() self.locked = False
[docs]def eventually(get_value, expect, timeout=DEFAULT_TIMEOUT, poll_interval=0): end = time.time() + timeout while time.time() < end: actual = get_value() if actual == expect: return time.sleep(poll_interval) raise AssertionError('eventually timeout: %r != %r' % (actual, expect))
@contextmanager
[docs]def tempdir(*args, **kw): """Create a temporary directory to be used in a test If (optional keyword argument) 'delete' evaluates to True (the default value), the temporary directory and all files in it will be removed on context manager exit. """ delete = kw.pop("delete", True) path = mkdtemp(*args, **kw) try: yield path finally: if delete: shutil.rmtree(path)
@contextmanager
[docs]def assert_raises(exc_class, msg=None): try: yield except exc_class as exc: if isinstance(msg, basestring): eq_(str(exc), msg) elif msg is not None: msg.search(str(exc)) else: raise AssertionError('%s not raised' % exc_class.__name__)
[docs]def eq_(value, other): assert value == other, '%r != %r' % (value, other)