diff options
author | George Kraft <george.kraft@calxeda.com> | 2012-12-04 15:21:44 -0600 |
---|---|---|
committer | George Kraft <george.kraft@calxeda.com> | 2012-12-04 15:21:44 -0600 |
commit | 156532b3371731224ab36a4eac75e79a7e6f7e1e (patch) | |
tree | 9edef67e8e3332ac443e5305605de5ab4d615445 | |
parent | c62a9791592d5af23bd590159a93d5222c9ee3af (diff) | |
download | cxmanage-156532b3371731224ab36a4eac75e79a7e6f7e1e.tar.gz |
CXMAN-150: TaskQueue: Only spawn threads as needed
This has a couple nice advantages. We can reduce the overhead of an
idle TaskQueue, change their thread count, etc.
Also using a deque internally, instead of a Queue. Seems to give
better performance with large task counts.
-rw-r--r-- | cxmanage_api/tasks.py | 67 |
1 files changed, 48 insertions, 19 deletions
diff --git a/cxmanage_api/tasks.py b/cxmanage_api/tasks.py index 50fdfa0..3138d51 100644 --- a/cxmanage_api/tasks.py +++ b/cxmanage_api/tasks.py @@ -28,10 +28,12 @@ # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH # DAMAGE. -from Queue import Queue -from threading import Thread + +from collections import deque +from threading import Thread, Lock from time import sleep + class Task(object): """ A task object representing some unit of work to be done. """ def __init__(self, method, *args): @@ -49,17 +51,21 @@ class Task(object): """ Return true if this task hasn't been finished """ return not self.status in ["Completed", "Failed"] + class TaskQueue(object): """ A task queue, consisting of a queue and a number of workers. """ def __init__(self, threads=48, delay=0): - self._queue = Queue() - self._workers = [TaskWorker(task_queue=self, delay=delay) - for x in xrange(threads)] + self.threads = threads + self.delay = delay + + self._lock = Lock() + self._queue = deque() + self._workers = 0 def put(self, method, *args): """ - Add a task to the task queue. + Add a task to the task queue, and spawn a worker if we're not full. method: a method to call args: args to pass to the method @@ -67,8 +73,16 @@ class TaskQueue(object): returns: a Task object that will be executed by a worker at a later time. """ + self._lock.acquire() + task = Task(method, *args) - self._queue.put(task) + self._queue.append(task) + + if self._workers < self.threads: + TaskWorker(task_queue=self, delay=self.delay) + self._workers += 1 + + self._lock.release() return task def get(self): @@ -76,11 +90,23 @@ class TaskQueue(object): Get a task from the task queue. Mainly used by workers. returns: a Task object that hasn't been executed yet. + raises: IndexError if there are no tasks in the queue. """ - return self._queue.get() + self._lock.acquire() + try: + return self._queue.popleft() + finally: + self._lock.release() + + def _remove_worker(self): + """ Decrement the worker count. Should only be used by TaskWorker. """ + self._lock.acquire() + self._workers -= 1 + self._lock.release() + class TaskWorker(Thread): - """ A TaskWorker that executes tasks from a TaskQueue. """ + """ A worker thread that runs tasks from a TaskQueue. """ def __init__(self, task_queue, delay=0): super(TaskWorker, self).__init__() self.daemon = True @@ -92,15 +118,18 @@ class TaskWorker(Thread): def run(self): """ Repeatedly get tasks from the TaskQueue and execute them. """ - while True: - sleep(self._delay) - task = self._task_queue.get() - task.status = "In Progress" - try: - task.result = task._method(*task._args) - task.status = "Completed" - except Exception as e: - task.error = e - task.status = "Failed" + try: + while True: + sleep(self._delay) + task = self._task_queue.get() + task.status = "In Progress" + try: + task.result = task._method(*task._args) + task.status = "Completed" + except Exception as e: + task.error = e + task.status = "Failed" + except: + self._task_queue._remove_worker() DEFAULT_TASK_QUEUE = TaskQueue() |