tests Package

tests Package

worq.tests.get_redis_url()[source]
worq.tests.setup()[source]
worq.tests.test_redis_should_be_installed()[source]

test_core Module

worq.tests.test_core.Broker_duplicate_task_id(url, identifier)[source]
worq.tests.test_core.test_Broker_duplicate_task_id_function()[source]
worq.tests.test_core.test_Broker_duplicate_task_id_string()[source]
worq.tests.test_core.test_Broker_task_failed()[source]

test_examples Module

worq.tests.test_examples.example(func)[source]
worq.tests.test_examples.test_examples()[source]

test_task Module

worq.tests.test_task.test_Queue_default_options()[source]
worq.tests.test_task.test_Queue_len()[source]
worq.tests.test_task.test_clear_Queue()[source]
worq.tests.test_task.test_completed_Deferred_as_argument()[source]
worq.tests.test_task.test_deferred_task_fail_on_error()[source]
worq.tests.test_task.test_worker_interrupted()[source]

util Module

class worq.tests.util.TimeoutLock(locked=False)[source]

Bases: object

A lock with acquisition timeout

acquire(timeout=10)[source]
release()[source]
worq.tests.util.assert_raises(*args, **kwds)[source]
worq.tests.util.eq_(value, other)[source]
worq.tests.util.eventually(get_value, expect, timeout=10, poll_interval=0)[source]
worq.tests.util.tempdir(*args, **kwds)[source]

Create a temporary directory to be used in a test

If (optional keyword argument) ‘delete’ evaluates to True (the default value), the temporary directory and all files in it will be removed on context manager exit.

worq.tests.util.thread_worker(*args, **kwds)[source]
worq.tests.util.with_urls(test=None, exclude=None)[source]