worq Package

worq Package

worq.get_broker(url, name='default', *args, **kw)[source]

Create a new broker

Parameters:
  • url – Task queue URL.
  • name – The name of the queue on which to expose or invoke tasks.
Returns:

An instance of worq.core.Broker.

worq.get_queue(url, name='default', target='', **options)[source]

Get a queue for invoking remote tasks

Parameters:
  • url – Task queue URL.
  • name – The name of the queue on which tasks should be invoked. Queued tasks will be invoked iff there is a worker listening on the named queue.
  • target – Task namespace (similar to a python module) or name (similar to a python function). Defaults to the root namespace.
  • **options

    Default task options for tasks created with the queue. These can be overridden with worq.task.Task.

Returns:

An instance of worq.task.Queue.

task Module

class worq.task.Deferred(broker, task)[source]

Bases: object

Deferred result object

Not thread-safe.

has_value()[source]

Check for value without touching the broker

status[source]

Get task status

value[source]

Get the value returned by the task (if completed)

Returns:The value returned by the task if it completed successfully.
Raises :AttributeError if the task has not yet completed. TaskFailure if the task failed for any reason.
wait(timeout)[source]

Wait for the task result.

Use this method wisely. In general a task should never wait on the result of another task because it may cause deadlock.

Parameters:timeout – Number of seconds to wait. A value of None will wait indefinitely, but this is dangerous since the worker may go away without notice (due to loss of power, etc.) causing this method to deadlock.
Returns:True if the result is available, otherwise False.
class worq.task.Queue(broker, target='', **options)[source]

Bases: object

Queue for invoking remote tasks

New Queue instances are generated through attribute access. For example:

>>> q = Queue(broker)
>>> q.foo
<Queue foo [default]>
>>> q.foo.bar
<Queue foo.bar [default]>

A Queue instance can be called like a function, which invokes a remote task identified by the target of the Queue instance. Example:

# Enqueue task 'func' in namespace 'foo' to be invoked
# by a worker listening on the 'default' queue.
>>> q = Queue(broker)
>>> q.foo.func(1, key=None)

The arrangement of queue tasks in TaskSpaces is similar to Python’s package/module/function hierarchy.

NOTE two Queue objects are considered equal if they refer to the same broker (their targets may be different).

Parameters:
  • broker – A Broker instance.
  • target – The task (space) path.
  • **options

    Default task options.

class worq.task.Task(queue, id=None, on_error='fail', ignore_result=False, result_timeout=3600, heartrate=30)[source]

Bases: object

Remote task handle

This class can be used to construct a task with custom options. A task is invoked by calling the task object.

Parameters:
  • queue – The Queue object identifying the task to be executed.
  • id – A unique identifier string for this task, or a function that returns a unique identifier string when called with the task’s arguments. If not specified, a global unique identifier is generated for each call. Only one task with a given id may exist in the queue at any given time. Note that a task with ignore_result=True will be removed from the queue before it is invoked.
  • on_error – What should happen when a deferred argument’s task fails. The TaskFailure exception will be passed as an argument if this value is Task.PASS, otherwise this will fail before it is invoked (the default action).
  • ignore_result – Create a fire-and-forget task if true. Task invocation will return None rather than a Deferred object.
  • result_timeout – Number of seconds to retain the result after the task has completed. The default is one hour. This is ignored by some TaskQueue implementations.
  • heartrate – Number of seconds between task heartbeats, which are maintained by some WorkerPool implementations to prevent result timeout while the task is running. The default is 30 seconds.
with_options(options)[source]

Clone this task with a new set of options

exception worq.task.TaskFailure[source]

Bases: exceptions.Exception

Task failure exception class

Initialize with the following positional arguments:
  1. Task name
  2. Queue name
  3. Task id
  4. Error text
class worq.task.TaskSpace(name='')[source]

Bases: object

Task namespace container

task(callable, name=None)[source]

Add a task to the namespace

This can be used as a decorator:

ts = TaskSpace(__name__)

@ts.task
def frob(value):
    db.update(value)
Parameters:
  • callable – A callable object, usually a function or method.
  • name – Task name. Defaults to callable.__name__.

core Module

class worq.core.Broker(taskqueue)[source]

Bases: object

A Broker controlls all interaction with the queue backend

deserialize(message, task_id=None)[source]

Deserialize an object

Parameters:
  • message – A serialized object (string).
  • deferred – When true load deferreds. When false raise an error if the message contains deferreds.
discard_pending_tasks()[source]

Discard pending tasks from queue

expose(obj, replace=False)[source]

Expose a TaskSpace or task callable.

Parameters:
  • obj – A TaskSpace or task callable.
  • replace – Replace existing task if True. Otherwise (by default), raise ValueError if this would replace an existing task.
heartbeat(task)[source]

Extend task result timeout

invoke(task, **kw)[source]

Invoke the given task (normally only called by a worker)

next_task(timeout=None)[source]

Get the next task from the queue.

Parameters:timeout – See AbstractTaskQueue.get.
Returns:A task object. None on timeout expiration or if the task could not be deserialized.
pop_result(task, timeout=0)[source]

Pop and deserialize a task’s result object

Parameters:
  • task – An object with id and name attributes representing the task.
  • timeout – Length of time to wait for the result. The default behavior is to return immediately (no wait). Wait indefinitely if None.
Returns:

The deserialized result object.

Raises :

KeyError if the result was not available.

Raises :

TaskExpired if the task expired before a result was returned. A task normally only expires if the pool loses its ability to communicate with the worker performing the task.

queue(target='', **options)[source]

Get a Queue from the broker

serialize(obj, deferred=False)[source]

Serialize an object

Parameters:
  • obj – The object to serialize.
  • deferred – When this is true Deferred objects are serialized and their values are loaded on deserialization. When this is false Deferred objects are not serializable.
set_result(task, result)[source]

Persist result object.

Parameters:
  • task – Task object for which to set the result.
  • result – Result object.
status(result)[source]

Get the status of a deferred result

task_failed(task)[source]

Signal that the given task has failed.

class worq.core.AbstractTaskQueue(url, name='default')[source]

Bases: object

Message queue abstract base class

Task/result lifecycle

  1. Atomically store non-expiring result placeholder and enqueue task.
  2. Atomically pop task from queue and set timeout on result placeholder.
  3. Task heartbeats extend result expiration as needed.
  4. Task finishes and result value is saved.

All methods must be thread-safe.

Parameters:
  • url – URL used to identify the queue.
  • name – Queue name.
defer_task(result, message, args)[source]

Defer a task until its arguments become available

Parameters:
  • result – A Deferred result for the task.
  • message – The serialized task message.
  • args – A list of task identifiers whose results will be included in the arguments to the task.
discard_pending()[source]

Discard pending tasks from queue

discard_result(task_id, task_expired_token)[source]

Discard the result for the given task.

A call to pop_result after this is invoked should return a task expired response.

Parameters:
  • task_id – The task identifier.
  • task_expired_token – A message that can be sent to blocking actors to signify that the task has expired.
enqueue_task(result, message)[source]

Enqueue task

Parameters:
  • result – A Deferred result for the task.
  • message – Serialized task message.
Returns:

True if the task was enqueued, otherwise False (duplicate task id).

get(timeout=None)[source]

Atomically get a serialized task message from the queue

Task processing has started when this method returns, which means that the task heartbeat must be maintained if there could be someone waiting on the result. The result status is set to worq.const.PROCESSING if a result is being maintained for the task.

Parameters:timeout – Number of seconds to wait before returning None if no task is available in the queue. Wait forever if timeout is None.
Returns:A two-tuple (<task_id>, <serialized task message>) or None if timeout was reached before a task arrived.
get_arguments(task_id)[source]

Get a dict of deferred arguments

Parameters:task_id – The identifier of the task to which the arguments will be passed.
Returns:A dict of serialized arguments keyed by argument id.
get_status(task_id)[source]

Get the status of a task

Parameters:task_id – Unique task identifier string.
Returns:A task status value or None.
pop_result(task_id, timeout)[source]

Pop serialized result message from persistent storage.

Parameters:
  • task_id – Unique task identifier string.
  • timeout – Number of seconds to wait for the result. Wait indefinitely if None. Return immediately if timeout is zero (0).
Returns:

One of the following:

  • The result message.
  • worq.const.RESERVED if another task depends on the result.
  • worq.const.TASK_EXPIRED if the task expired before a result was available.
  • None on timeout.

reserve_argument(argument_id, deferred_id)[source]

Reserve the result of a task as an argument of a deferred task

Parameters:
  • argument_id – Identifier of a task whose result will be reserved for another task.
  • deferred_id – Identifier of a deferred task who will get the reserved result as an argument.
Returns:

A two-tuple: (<bool>, <str>). The first item is a flag denoting if the argument was reserved, and the second is the serialized result if it was available else None.

set_argument(task_id, argument_id, message)[source]

Set deferred argument for task

Parameters:
  • task_id – The identifier of the task to which the argument will be passed.
  • argument_id – The argument identifier.
  • message – The serialized argument value.
Returns:

True if all arguments have been set for the task.

set_result(task_id, message, timeout)[source]

Persist serialized result message.

This also sets the result status to worq.const.COMPLETED.

Parameters:
  • task_id – Unique task identifier string.
  • message – Serialized result message.
  • timeout – Number of seconds to persist the result before discarding it.
Returns:

A deferred task identifier if the result has been reserved. Otherwise None.

set_task_timeout(task_id, timeout)[source]

Set a timeout on the task result

Recursively set the timeout on the given task and all deferred tasks depending on this task’s result.

size()[source]

Return the approximate number of tasks in the queue

undefer_task(task_id)[source]

Enqueue a deferred task

All deferred arguments must be available immediately.

const Module