summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorge Kraft <george.kraft@calxeda.com>2012-12-04 15:21:44 -0600
committerGeorge Kraft <george.kraft@calxeda.com>2012-12-04 15:21:44 -0600
commit156532b3371731224ab36a4eac75e79a7e6f7e1e (patch)
tree9edef67e8e3332ac443e5305605de5ab4d615445
parentc62a9791592d5af23bd590159a93d5222c9ee3af (diff)
downloadcxmanage-156532b3371731224ab36a4eac75e79a7e6f7e1e.tar.gz
CXMAN-150: TaskQueue: Only spawn threads as needed
This has a couple nice advantages. We can reduce the overhead of an idle TaskQueue, change their thread count, etc. Also using a deque internally, instead of a Queue. Seems to give better performance with large task counts.
-rw-r--r--cxmanage_api/tasks.py67
1 files changed, 48 insertions, 19 deletions
diff --git a/cxmanage_api/tasks.py b/cxmanage_api/tasks.py
index 50fdfa0..3138d51 100644
--- a/cxmanage_api/tasks.py
+++ b/cxmanage_api/tasks.py
@@ -28,10 +28,12 @@
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
# DAMAGE.
-from Queue import Queue
-from threading import Thread
+
+from collections import deque
+from threading import Thread, Lock
from time import sleep
+
class Task(object):
""" A task object representing some unit of work to be done. """
def __init__(self, method, *args):
@@ -49,17 +51,21 @@ class Task(object):
""" Return true if this task hasn't been finished """
return not self.status in ["Completed", "Failed"]
+
class TaskQueue(object):
""" A task queue, consisting of a queue and a number of workers. """
def __init__(self, threads=48, delay=0):
- self._queue = Queue()
- self._workers = [TaskWorker(task_queue=self, delay=delay)
- for x in xrange(threads)]
+ self.threads = threads
+ self.delay = delay
+
+ self._lock = Lock()
+ self._queue = deque()
+ self._workers = 0
def put(self, method, *args):
"""
- Add a task to the task queue.
+ Add a task to the task queue, and spawn a worker if we're not full.
method: a method to call
args: args to pass to the method
@@ -67,8 +73,16 @@ class TaskQueue(object):
returns: a Task object that will be executed by a worker at a later
time.
"""
+ self._lock.acquire()
+
task = Task(method, *args)
- self._queue.put(task)
+ self._queue.append(task)
+
+ if self._workers < self.threads:
+ TaskWorker(task_queue=self, delay=self.delay)
+ self._workers += 1
+
+ self._lock.release()
return task
def get(self):
@@ -76,11 +90,23 @@ class TaskQueue(object):
Get a task from the task queue. Mainly used by workers.
returns: a Task object that hasn't been executed yet.
+ raises: IndexError if there are no tasks in the queue.
"""
- return self._queue.get()
+ self._lock.acquire()
+ try:
+ return self._queue.popleft()
+ finally:
+ self._lock.release()
+
+ def _remove_worker(self):
+ """ Decrement the worker count. Should only be used by TaskWorker. """
+ self._lock.acquire()
+ self._workers -= 1
+ self._lock.release()
+
class TaskWorker(Thread):
- """ A TaskWorker that executes tasks from a TaskQueue. """
+ """ A worker thread that runs tasks from a TaskQueue. """
def __init__(self, task_queue, delay=0):
super(TaskWorker, self).__init__()
self.daemon = True
@@ -92,15 +118,18 @@ class TaskWorker(Thread):
def run(self):
""" Repeatedly get tasks from the TaskQueue and execute them. """
- while True:
- sleep(self._delay)
- task = self._task_queue.get()
- task.status = "In Progress"
- try:
- task.result = task._method(*task._args)
- task.status = "Completed"
- except Exception as e:
- task.error = e
- task.status = "Failed"
+ try:
+ while True:
+ sleep(self._delay)
+ task = self._task_queue.get()
+ task.status = "In Progress"
+ try:
+ task.result = task._method(*task._args)
+ task.status = "Completed"
+ except Exception as e:
+ task.error = e
+ task.status = "Failed"
+ except:
+ self._task_queue._remove_worker()
DEFAULT_TASK_QUEUE = TaskQueue()