# WorQ - Python task queue
#
# Copyright (c) 2012 Daniel Miller
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""In-memory message queue and result store."""
import logging
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty
from threading import Lock
from weakref import WeakValueDictionary
import worq.const as const
from worq.core import AbstractTaskQueue
log = logging.getLogger(__name__)
_REFS = WeakValueDictionary()
[docs]class TaskQueue(AbstractTaskQueue):
"""Simple in-memory task queue implementation"""
@classmethod
[docs] def factory(cls, url, name=const.DEFAULT, *args, **kw):
obj = _REFS.get((url, name))
if obj is None:
obj = _REFS[(url, name)] = cls(url, name, *args, **kw)
return obj
def __init__(self, *args, **kw):
super(TaskQueue, self).__init__(*args, **kw)
self.queue = Queue()
self.results = WeakValueDictionary()
self.results_lock = Lock()
def _init_result(self, result, status, message):
with self.results_lock:
if result.id in self.results:
return False
self.results[result.id] = result
result.__status = status
result.__value = Queue()
result.__task = message
result.__args = {}
result.__lock = Lock()
result.__for = None
return True
[docs] def enqueue_task(self, result, message):
if self._init_result(result, const.ENQUEUED, message):
self.queue.put(result)
return True
return False
[docs] def defer_task(self, result, message, args):
if self._init_result(result, const.PENDING, message):
results = self.results
# keep references to results to prevent GC
result.__refs = [results.get(arg) for arg in args]
return True
return False
[docs] def undefer_task(self, task_id):
result = self.results[task_id]
self.queue.put(result)
[docs] def get(self, timeout=None):
try:
result = self.queue.get(timeout=timeout)
except Empty:
return None
result.__status = const.PROCESSING
return result.id, result.__task
[docs] def size(self):
return len(self.results)
[docs] def discard_pending(self):
with self.results_lock:
while True:
try:
self.queue.get_nowait()
except Empty:
break
self.results.clear()
[docs] def reserve_argument(self, argument_id, deferred_id):
result = self.results.get(argument_id)
if result is None:
return (False, None)
with result.__lock:
if result.__for is not None:
return (False, None)
result.__for = deferred_id
try:
message = result.__value.get_nowait()
except Empty:
message = None
if message is not None:
with self.results_lock:
self.results.pop(argument_id, None)
return (True, message)
[docs] def set_argument(self, task_id, argument_id, message):
result = self.results[task_id]
with self.results_lock:
self.results.pop(argument_id, None)
with result.__lock:
result.__args[argument_id] = message
return len(result.__args) == len(result.__refs)
[docs] def get_arguments(self, task_id):
try:
return self.results[task_id].__args
except KeyError:
return {}
[docs] def set_task_timeout(self, task_id, timeout):
pass
[docs] def get_status(self, task_id):
result = self.results.get(task_id)
return None if result is None else result.__status
[docs] def set_result(self, task_id, message, timeout):
result = self.results.get(task_id)
if result is not None:
with result.__lock:
result.__value.put(message)
return result.__for
[docs] def pop_result(self, task_id, timeout):
result = self.results.get(task_id)
if result is None:
return const.TASK_EXPIRED
# with result.__lock:
# if result.__for is not None:
# raise NotImplementedError
# #return const.RESERVED
# result.__for = task_id
try:
if timeout == 0:
value = result.__value.get_nowait()
else:
value = result.__value.get(timeout=timeout)
except Empty:
value = None
else:
self.results.pop(task_id, None)
return value
[docs] def discard_result(self, task_id, task_expired_token):
result = self.results.pop(task_id)
if result is not None:
result.__value.put(task_expired_token)