Source code for worq.tests

import os
from nose.plugins.skip import SkipTest

TEST_URLS = ['memory://']

[docs]def setup(): for get_test_url in [get_redis_url]: url = get_test_url() if url is not None: TEST_URLS.append(url)
[docs]def get_redis_url(): redis_url = os.environ.get( 'WORQ_TEST_REDIS_URL', 'redis://localhost:16379/0') # non-standard port if redis_url != 'disabled': try: import redis except ImportError: pass else: from worq.queue.redis import TaskQueue as RedisQueue queue = RedisQueue(redis_url) if queue.ping(): return redis_url return None
[docs]def test_redis_should_be_installed(): try: import redis except ImportError: if 'WORQ_TEST_REDIS_URL' in os.environ: assert 0, 'WORQ_TEST_REDIS_URL is set but redis is not installed' else: raise SkipTest( 'cannot test redis task queue because redis is not installed')