# WorQ - Python task queue
#
# Copyright (c) 2012 Daniel Miller
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Redis message queue and result store."""
from __future__ import absolute_import
import logging
import redis
import sys
import time
import worq.const as const
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
from worq.core import AbstractTaskQueue
log = logging.getLogger(__name__)
if sys.version_info.major < 3:
integer = (int, long)
def utf8(value):
return value
else:
integer = int
[docs] def utf8(value):
return value.encode('utf-8')
def unicode(value):
return value.decode('utf-8')
[docs]class TaskQueue(AbstractTaskQueue):
"""Redis task queue"""
def __init__(self, url, name=const.DEFAULT, initial_result_timeout=60,
redis_factory=redis.StrictRedis):
super(TaskQueue, self).__init__(url, name)
urlobj = urlparse(url)
if ':' in urlobj.netloc:
host, port = urlobj.netloc.rsplit(':', 1)
else:
host, port = urlobj.netloc, 6379
db = int(urlobj.path.lstrip('/'))
self.redis = redis_factory(host, int(port), db=db)
self.queue_key = b'worq:queue:' + utf8(self.name)
self._name = utf8(self.name)
self.initial_result_timeout = max(int(initial_result_timeout), 1)
def _task_pattern(self, task_id):
return b":".join([b'worq:task', self._name, task_id])
def _result_pattern(self, task_id):
return b":".join([b'worq:result', self._name, task_id])
def _args_pattern(self, task_id):
# args (for task_id) is a hash: result_id -> value
# each deferred and enqueued task may have an args key
return b":".join([b'worq:args', self._name, task_id])
def _reserve_pattern(self, result_id):
# reserve result_id (key) for deferred task_id (string value)
# each deferred and enqueued task may have a reserved key
return b":".join([b'worq:reserved', self._name, result_id])
[docs] def ping(self):
return self.redis.ping()
[docs] def log_all_worq(self, show_expiring=False):
"""debugging helper"""
for bkey in self.redis.keys(b'worq:*'):
ttl = self.redis.ttl(bkey)
key = bkey.decode('utf-8')
if ttl < 0 or show_expiring:
if key.startswith(('worq:task:', 'worq:args:')):
log.debug('%s %s %s', key, ttl, self.redis.hgetall(bkey))
else:
log.debug('%s %s', key, ttl)
[docs] def enqueue_task(self, result, message):
b_result_id = utf8(result.id)
task_key = self._task_pattern(b_result_id)
args_key = self._args_pattern(b_result_id)
result_key = self._result_pattern(b_result_id)
reserve_key = self._reserve_pattern(b_result_id)
with self.redis.pipeline() as pipe:
try:
pipe.watch(task_key)
if pipe.exists(task_key):
return False
pipe.multi()
pipe.hmset(task_key, {
b'task': message,
b'status': utf8(const.ENQUEUED)
})
pipe.delete(args_key, result_key, reserve_key)
pipe.lpush(self.queue_key, b_result_id) # left push (head)
pipe.execute()
return True
except redis.WatchError:
return False
[docs] def defer_task(self, result, message, args):
b_result_id = utf8(result.id)
task_key = self._task_pattern(b_result_id)
args_key = self._args_pattern(b_result_id)
result_key = self._result_pattern(b_result_id)
reserve_key = self._reserve_pattern(b_result_id)
with self.redis.pipeline() as pipe:
try:
pipe.watch(task_key)
if pipe.exists(task_key):
return False
pipe.multi()
pipe.hmset(task_key, {
b'task': message,
b'status': utf8(const.PENDING),
b'total_args': len(args),
#b'args_ready': 0, # zero by default
})
pipe.delete(args_key, result_key, reserve_key)
pipe.execute()
return True
except redis.WatchError:
return False
[docs] def undefer_task(self, task_id):
self.redis.lpush(self.queue_key, utf8(task_id))
[docs] def get(self, timeout=0):
result_timeout = self.initial_result_timeout
queue = self.queue_key
end = time.time() + timeout
while True:
task_id = self.redis.brpoplpush(queue, queue, timeout=timeout)
if task_id is None:
return None # timeout
task_key = self._task_pattern(task_id)
args_key = self._args_pattern(task_id)
reserve_key = self._reserve_pattern(task_id)
with self.redis.pipeline() as pipe:
pipe.hget(task_key, b'task')
pipe.hset(task_key, b'status', utf8(const.PROCESSING))
pipe.expire(task_key, result_timeout)
pipe.expire(args_key, result_timeout)
pipe.expire(reserve_key, result_timeout)
pipe.lrem(queue, 1, task_id) # traverse head to tail
cmd = pipe.execute()
message, removed = cmd[0], cmd[-1]
if removed:
return (unicode(task_id), message)
if timeout != 0:
timeout = int(end - time.time())
if timeout <= 0:
return None
[docs] def size(self):
tasks = self.redis.lrange(self.queue_key, 0, -1)
num = len(tasks)
while tasks:
keys = (self._reserve_pattern(t) for t in tasks)
tasks = [t for t in self.redis.mget(keys) if t is not None]
num += len(tasks)
return num
[docs] def discard_pending(self):
with self.redis.pipeline() as pipe:
pipe.lrange(self.queue_key, 0, -1)
pipe.delete(self.queue_key)
tasks = pipe.execute()[0]
while tasks:
reserved = []
all_keys = []
for task_id in tasks:
all_keys.append(self._task_pattern(task_id))
all_keys.append(self._args_pattern(task_id))
all_keys.append(self._result_pattern(task_id))
reserve_key = self._reserve_pattern(task_id)
all_keys.append(reserve_key)
reserved.append(reserve_key)
with self.redis.pipeline() as pipe:
pipe.mget(reserved)
pipe.delete(*all_keys)
tasks = [v for v in pipe.execute()[0] if v is not None]
[docs] def reserve_argument(self, argument_id, deferred_id):
b_argument_id = utf8(argument_id)
reserve_key = self._reserve_pattern(b_argument_id)
result_key = self._result_pattern(b_argument_id)
if self.redis.setnx(reserve_key, utf8(deferred_id)):
value = self.redis.lpop(result_key)
if value is not None:
self.redis.delete(
self._task_pattern(b_argument_id),
self._args_pattern(b_argument_id),
result_key,
reserve_key,
)
return (True, value)
return (False, None)
[docs] def set_argument(self, task_id, argument_id, message):
b_task_id = utf8(task_id)
task_key = self._task_pattern(b_task_id)
args_key = self._args_pattern(b_task_id)
with self.redis.pipeline() as pipe:
pipe.hincrby(task_key, b'args_ready', 1)
pipe.hget(task_key, b'total_args')
pipe.hset(args_key, utf8(argument_id), message)
ready, total, x = pipe.execute()
total = int(total)
assert isinstance(ready, integer), repr(ready)
assert ready > 0, ready
assert total > 0, total
return ready == total
[docs] def get_arguments(self, task_id):
args_key = self._args_pattern(utf8(task_id))
with self.redis.pipeline() as pipe:
pipe.hgetall(args_key)
pipe.delete(args_key)
args = pipe.execute()[0]
if args:
return {unicode(k): v for k, v in args.items()}
return {}
[docs] def set_task_timeout(self, task_id, timeout):
def set_timeout(b_task_id, args=True, timeout=max(int(timeout), 1)):
task_key = self._task_pattern(b_task_id)
reserve_key = self._reserve_pattern(b_task_id)
with self.redis.pipeline() as pipe:
while True:
try:
pipe.watch(reserve_key)
deferred_id = pipe.get(reserve_key)
pipe.multi()
pipe.expire(task_key, timeout)
if args:
args_key = self._args_pattern(b_task_id)
pipe.expire(args_key, timeout)
pipe.expire(reserve_key, timeout)
pipe.execute()
return deferred_id
except redis.WatchError:
continue
# Do not expire arguments of the first task since they have
# already been deleted. However, all subsequent deferred tasks'
# argument timeouts must be set.
deferred_id = set_timeout(utf8(task_id), False)
while deferred_id is not None:
deferred_id = set_timeout(deferred_id)
[docs] def get_status(self, task_id):
key = self._task_pattern(utf8(task_id))
value = self.redis.hget(key, b'status')
return value if value is None else unicode(value)
[docs] def set_result(self, task_id, message, timeout):
timeout = max(int(timeout), 1)
b_task_id = utf8(task_id)
task_key = self._task_pattern(b_task_id)
args_key = self._args_pattern(b_task_id)
result_key = self._result_pattern(b_task_id)
reserve_key = self._reserve_pattern(b_task_id)
with self.redis.pipeline() as pipe:
while True:
try:
pipe.watch(reserve_key)
deferred_id = pipe.get(reserve_key)
pipe.multi()
if deferred_id is None:
pipe.hset(task_key, b'status', utf8(const.COMPLETED))
pipe.expire(task_key, timeout)
pipe.rpush(result_key, message)
pipe.expire(result_key, timeout)
else:
pipe.delete(task_key, result_key)
deferred_id = unicode(deferred_id)
pipe.delete(args_key, reserve_key)
pipe.execute()
return deferred_id
except redis.WatchError:
continue
[docs] def pop_result(self, task_id, timeout):
b_task_id = utf8(task_id)
result_key = self._result_pattern(b_task_id)
if timeout == 0:
return self.redis.lpop(result_key)
task_key = self._task_pattern(b_task_id)
end = None if timeout is None else time.time() + timeout
while True:
with self.redis.pipeline() as pipe:
pipe.ttl(task_key)
pipe.exists(task_key)
timeout, exists = pipe.execute()
if end:
timeout = min(timeout, int(end - time.time()))
t = timeout
if timeout <= 0:
if not exists:
# task has expired (heartbeat stopped)
result = self.redis.lpop(result_key)
return const.TASK_EXPIRED if result is None else result
# task is still enqueued (heartbeat not started yet)
timeout = 1 if end else 5
result = self.redis.blpop([result_key], timeout=timeout)
if result is None:
if end and time.time() > end:
return None # timeout
continue
self.redis.delete(task_key)
return result[1]
[docs] def discard_result(self, task_id, task_expired_token):
b_task_id = utf8(task_id)
while b_task_id is not None:
reserve_key = self._reserve_pattern(b_task_id)
result_key = self._result_pattern(b_task_id)
args_key = self._args_pattern(b_task_id)
task_key = self._task_pattern(b_task_id)
with self.redis.pipeline() as pipe:
pipe.get(reserve_key) # reserved for deferred task
pipe.delete(task_key, args_key, reserve_key)
pipe.rpush(result_key, task_expired_token)
pipe.expire(result_key, 5)
b_task_id = pipe.execute()[0]