diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-10-15 08:02:32 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-10-15 08:02:32 +0000 |
commit | 947ccd7794a909e006c4d9e05715df543f7cb8c8 (patch) | |
tree | 40575dcb08a280cc2175c51b7581ae10f412ac29 | |
parent | 822217e42736a6e5cdb3730241b90548e91ec393 (diff) | |
parent | 123a0371c4cd9133ea31d05808f6ef07548ee484 (diff) | |
download | oslo-messaging-947ccd7794a909e006c4d9e05715df543f7cb8c8.tar.gz |
Merge "Provide the executor 'wait' function a timeout and use it"
-rw-r--r-- | oslo_messaging/_executors/base.py | 14 | ||||
-rw-r--r-- | oslo_messaging/_executors/impl_blocking.py | 6 | ||||
-rw-r--r-- | oslo_messaging/_executors/impl_pooledexecutor.py | 43 | ||||
-rw-r--r-- | oslo_messaging/tests/executors/test_executor.py | 52 |
4 files changed, 85 insertions, 30 deletions
diff --git a/oslo_messaging/_executors/base.py b/oslo_messaging/_executors/base.py index 6fbc153..7749c00 100644 --- a/oslo_messaging/_executors/base.py +++ b/oslo_messaging/_executors/base.py @@ -30,12 +30,18 @@ class ExecutorBase(object): @abc.abstractmethod def start(self): - "Start polling for incoming messages." + """Start polling for incoming messages.""" @abc.abstractmethod def stop(self): - "Stop polling for messages." + """Stop polling for messages.""" @abc.abstractmethod - def wait(self): - "Wait until the executor has stopped polling." + def wait(self, timeout=None): + """Wait until the executor has stopped polling. + + If a timeout is provided, and it is not ``None`` then this method will + wait up to that amount of time for its components to finish, if not + all components finish in the alloted time, then false will be returned + otherwise true will be returned. + """ diff --git a/oslo_messaging/_executors/impl_blocking.py b/oslo_messaging/_executors/impl_blocking.py index cc40edd..b59818f 100644 --- a/oslo_messaging/_executors/impl_blocking.py +++ b/oslo_messaging/_executors/impl_blocking.py @@ -26,13 +26,17 @@ class FakeBlockingThread(object): self._target() @staticmethod - def join(): + def join(timeout=None): pass @staticmethod def stop(): pass + @staticmethod + def is_alive(): + return False + class BlockingExecutor(impl_pooledexecutor.PooledExecutor): """A message executor which blocks the current thread. diff --git a/oslo_messaging/_executors/impl_pooledexecutor.py b/oslo_messaging/_executors/impl_pooledexecutor.py index 598229c..c083770 100644 --- a/oslo_messaging/_executors/impl_pooledexecutor.py +++ b/oslo_messaging/_executors/impl_pooledexecutor.py @@ -20,6 +20,7 @@ import threading from futurist import waiters from oslo_config import cfg from oslo_utils import excutils +from oslo_utils import timeutils from oslo_messaging._executors import base @@ -116,16 +117,32 @@ class PooledExecutor(base.ExecutorBase): self._tombstone.set() self.listener.stop() - def wait(self): - # TODO(harlowja): this method really needs a timeout. - if self._poller is not None: - self._tombstone.wait() - self._poller.join() - self._poller = None - if self._executor is not None: - with self._mutator: - incomplete_fs = list(self._incomplete) - self._incomplete.clear() - if incomplete_fs: - self._wait_for_all(incomplete_fs) - self._executor = None + def wait(self, timeout=None): + with timeutils.StopWatch(duration=timeout) as w: + poller = self._poller + if poller is not None: + self._tombstone.wait(w.leftover(return_none=True)) + if not self._tombstone.is_set(): + return False + poller.join(w.leftover(return_none=True)) + if poller.is_alive(): + return False + self._poller = None + executor = self._executor + if executor is not None: + with self._mutator: + incomplete_fs = list(self._incomplete) + if incomplete_fs: + (done, not_done) = self._wait_for_all( + incomplete_fs, + timeout=w.leftover(return_none=True)) + with self._mutator: + for fut in done: + try: + self._incomplete.remove(fut) + except ValueError: + pass + if not_done: + return False + self._executor = None + return True diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py index 3a6b00d..007d3ac 100644 --- a/oslo_messaging/tests/executors/test_executor.py +++ b/oslo_messaging/tests/executors/test_executor.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import time import threading # eventlet 0.16 with monkey patching does not work yet on Python 3, @@ -71,9 +72,9 @@ class TestExecutor(test_utils.BaseTestCase): thread = threading.Thread(target=target, args=(executor,)) thread.daemon = True thread.start() - thread.join(timeout=30) + return thread - def test_executor_dispatch(self): + def _create_dispatcher(self): if impl_aioeventlet is not None: aioeventlet_class = impl_aioeventlet.AsyncioEventletExecutor else: @@ -110,11 +111,13 @@ class TestExecutor(test_utils.BaseTestCase): endpoint = mock.MagicMock(return_value=simple_coroutine('result')) event = eventlet.event.Event() else: + def run_executor(executor): executor.start() executor.wait() endpoint = mock.MagicMock(return_value='result') + event = None class Dispatcher(object): def __init__(self, endpoint): @@ -139,27 +142,52 @@ class TestExecutor(test_utils.BaseTestCase): self.callback, executor_callback) + return Dispatcher(endpoint), endpoint, event, run_executor + + def test_slow_wait(self): + dispatcher, endpoint, event, run_executor = self._create_dispatcher() listener = mock.Mock(spec=['poll', 'stop']) - dispatcher = Dispatcher(endpoint) executor = self.executor(self.conf, listener, dispatcher) - incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'}) def fake_poll(timeout=None): - if is_aioeventlet: - if listener.poll.call_count == 1: - return incoming_message - event.wait() + time.sleep(0.1) + if listener.poll.call_count == 10: + if event is not None: + event.wait() executor.stop() else: - if listener.poll.call_count == 1: - return incoming_message - executor.stop() + return incoming_message listener.poll.side_effect = fake_poll + thread = self._run_in_thread(run_executor, executor) + self.assertFalse(executor.wait(timeout=0.1)) + thread.join() + self.assertTrue(executor.wait()) - self._run_in_thread(run_executor, executor) + def test_dead_wait(self): + dispatcher, _endpoint, _event, _run_executor = self._create_dispatcher() + listener = mock.Mock(spec=['poll', 'stop']) + executor = self.executor(self.conf, listener, dispatcher) + executor.stop() + self.assertTrue(executor.wait()) + def test_executor_dispatch(self): + dispatcher, endpoint, event, run_executor = self._create_dispatcher() + listener = mock.Mock(spec=['poll', 'stop']) + executor = self.executor(self.conf, listener, dispatcher) + incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'}) + + def fake_poll(timeout=None): + if listener.poll.call_count == 1: + return incoming_message + if event is not None: + event.wait() + executor.stop() + + listener.poll.side_effect = fake_poll + thread = self._run_in_thread(run_executor, executor) + thread.join() endpoint.assert_called_once_with({}, {'payload': 'data'}) self.assertEqual(dispatcher.result, 'result') |