diff options
author | Joshua Harlow <harlowja@yahoo-inc.com> | 2016-02-05 10:55:39 -0800 |
---|---|---|
committer | Joshua Harlow <harlowja@gmail.com> | 2016-02-11 22:00:03 -0800 |
commit | 8c1172b8b4e543107f6d3aea4ce02fbb40c1e4db (patch) | |
tree | 7398a658a3b0ea30cab899042239dcdea91cad72 /taskflow/engines | |
parent | cea71f27998cfa911044103fcb8fca79b6989717 (diff) | |
download | taskflow-8c1172b8b4e543107f6d3aea4ce02fbb40c1e4db.tar.gz |
Don't bother scanning for workers if no new messages arrived
If the worker finder has not gotten any new notification messages
letting it know about new (or updated) workers we can just skip
trying to match existing waiting work to workers as without messages
being processed the match will still not work (as the worker data
doesn't change without those messages being processed).
Change-Id: I41d50c676f04f85c49a03d9d503da1955af45f7d
Diffstat (limited to 'taskflow/engines')
-rw-r--r-- | taskflow/engines/worker_based/executor.py | 27 | ||||
-rw-r--r-- | taskflow/engines/worker_based/types.py | 6 |
2 files changed, 26 insertions, 7 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 6ba29ad..d274953 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -74,6 +74,9 @@ class WorkerTaskExecutor(executor.TaskExecutor): before_join=lambda t: p_worker.stop(), after_join=lambda t: p_worker.reset(), before_start=lambda t: p_worker.reset()) + self._messages_processed = { + 'finder': self._finder.messages_processed, + } def _process_response(self, response, message): """Process response from remote side.""" @@ -151,9 +154,7 @@ class WorkerTaskExecutor(executor.TaskExecutor): if request.expired: expired_requests[request_uuid] = request elif request.state == pr.WAITING: - worker = self._finder.get_worker_for_task(request.task) - if worker is not None: - waiting_requests[request_uuid] = (request, worker) + waiting_requests[request_uuid] = request if expired_requests: with self._ongoing_requests_lock: while expired_requests: @@ -161,10 +162,21 @@ class WorkerTaskExecutor(executor.TaskExecutor): if self._handle_expired_request(request): del self._ongoing_requests[request_uuid] if waiting_requests: - while waiting_requests: - request_uuid, (request, worker) = waiting_requests.popitem() - if request.transition_and_log_error(pr.PENDING, logger=LOG): - self._publish_request(request, worker) + finder = self._finder + new_messages_processed = finder.messages_processed + last_messages_processed = self._messages_processed['finder'] + if new_messages_processed > last_messages_processed: + # Some new message got to the finder, so we can see + # if any new workers match (if no new messages have been + # processed we might as well not do anything). + while waiting_requests: + _request_uuid, request = waiting_requests.popitem() + worker = finder.get_worker_for_task(request.task) + if (worker is not None and + request.transition_and_log_error(pr.PENDING, + logger=LOG)): + self._publish_request(request, worker) + self._messages_processed['finder'] = new_messages_processed def _submit_task(self, task, task_uuid, action, arguments, progress_callback=None, **kwargs): @@ -248,3 +260,4 @@ class WorkerTaskExecutor(executor.TaskExecutor): _request_uuid, request = self._ongoing_requests.popitem() self._handle_expired_request(request) self._finder.clear() + self._messages_processed['finder'] = self._finder.messages_processed diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index 2cbe298..c8f11e2 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -152,6 +152,11 @@ class ProxyWorkerFinder(WorkerFinder): response=True)), }) self._counter = itertools.count() + self._messages_processed = 0 + + @property + def messages_processed(self): + return self._messages_processed def _next_worker(self, topic, tasks, temporary=False): if not temporary: @@ -199,6 +204,7 @@ class ProxyWorkerFinder(WorkerFinder): LOG.debug("Updated worker '%s' (%s total workers are" " currently known)", worker, self._total_workers()) self._cond.notify_all() + self._messages_processed += 1 def clear(self): with self._cond: |