summaryrefslogtreecommitdiff
path: root/taskflow/engines
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2016-02-05 10:55:39 -0800
committerJoshua Harlow <harlowja@gmail.com>2016-02-11 22:00:03 -0800
commit8c1172b8b4e543107f6d3aea4ce02fbb40c1e4db (patch)
tree7398a658a3b0ea30cab899042239dcdea91cad72 /taskflow/engines
parentcea71f27998cfa911044103fcb8fca79b6989717 (diff)
downloadtaskflow-8c1172b8b4e543107f6d3aea4ce02fbb40c1e4db.tar.gz
Don't bother scanning for workers if no new messages arrived
If the worker finder has not gotten any new notification messages letting it know about new (or updated) workers we can just skip trying to match existing waiting work to workers as without messages being processed the match will still not work (as the worker data doesn't change without those messages being processed). Change-Id: I41d50c676f04f85c49a03d9d503da1955af45f7d
Diffstat (limited to 'taskflow/engines')
-rw-r--r--taskflow/engines/worker_based/executor.py27
-rw-r--r--taskflow/engines/worker_based/types.py6
2 files changed, 26 insertions, 7 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py
index 6ba29ad..d274953 100644
--- a/taskflow/engines/worker_based/executor.py
+++ b/taskflow/engines/worker_based/executor.py
@@ -74,6 +74,9 @@ class WorkerTaskExecutor(executor.TaskExecutor):
before_join=lambda t: p_worker.stop(),
after_join=lambda t: p_worker.reset(),
before_start=lambda t: p_worker.reset())
+ self._messages_processed = {
+ 'finder': self._finder.messages_processed,
+ }
def _process_response(self, response, message):
"""Process response from remote side."""
@@ -151,9 +154,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
if request.expired:
expired_requests[request_uuid] = request
elif request.state == pr.WAITING:
- worker = self._finder.get_worker_for_task(request.task)
- if worker is not None:
- waiting_requests[request_uuid] = (request, worker)
+ waiting_requests[request_uuid] = request
if expired_requests:
with self._ongoing_requests_lock:
while expired_requests:
@@ -161,10 +162,21 @@ class WorkerTaskExecutor(executor.TaskExecutor):
if self._handle_expired_request(request):
del self._ongoing_requests[request_uuid]
if waiting_requests:
- while waiting_requests:
- request_uuid, (request, worker) = waiting_requests.popitem()
- if request.transition_and_log_error(pr.PENDING, logger=LOG):
- self._publish_request(request, worker)
+ finder = self._finder
+ new_messages_processed = finder.messages_processed
+ last_messages_processed = self._messages_processed['finder']
+ if new_messages_processed > last_messages_processed:
+ # Some new message got to the finder, so we can see
+ # if any new workers match (if no new messages have been
+ # processed we might as well not do anything).
+ while waiting_requests:
+ _request_uuid, request = waiting_requests.popitem()
+ worker = finder.get_worker_for_task(request.task)
+ if (worker is not None and
+ request.transition_and_log_error(pr.PENDING,
+ logger=LOG)):
+ self._publish_request(request, worker)
+ self._messages_processed['finder'] = new_messages_processed
def _submit_task(self, task, task_uuid, action, arguments,
progress_callback=None, **kwargs):
@@ -248,3 +260,4 @@ class WorkerTaskExecutor(executor.TaskExecutor):
_request_uuid, request = self._ongoing_requests.popitem()
self._handle_expired_request(request)
self._finder.clear()
+ self._messages_processed['finder'] = self._finder.messages_processed
diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py
index 2cbe298..c8f11e2 100644
--- a/taskflow/engines/worker_based/types.py
+++ b/taskflow/engines/worker_based/types.py
@@ -152,6 +152,11 @@ class ProxyWorkerFinder(WorkerFinder):
response=True)),
})
self._counter = itertools.count()
+ self._messages_processed = 0
+
+ @property
+ def messages_processed(self):
+ return self._messages_processed
def _next_worker(self, topic, tasks, temporary=False):
if not temporary:
@@ -199,6 +204,7 @@ class ProxyWorkerFinder(WorkerFinder):
LOG.debug("Updated worker '%s' (%s total workers are"
" currently known)", worker, self._total_workers())
self._cond.notify_all()
+ self._messages_processed += 1
def clear(self):
with self._cond: