summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-10-15 08:02:32 +0000
committerGerrit Code Review <review@openstack.org>2015-10-15 08:02:32 +0000
commit947ccd7794a909e006c4d9e05715df543f7cb8c8 (patch)
tree40575dcb08a280cc2175c51b7581ae10f412ac29
parent822217e42736a6e5cdb3730241b90548e91ec393 (diff)
parent123a0371c4cd9133ea31d05808f6ef07548ee484 (diff)
downloadoslo-messaging-947ccd7794a909e006c4d9e05715df543f7cb8c8.tar.gz
Merge "Provide the executor 'wait' function a timeout and use it"
-rw-r--r--oslo_messaging/_executors/base.py14
-rw-r--r--oslo_messaging/_executors/impl_blocking.py6
-rw-r--r--oslo_messaging/_executors/impl_pooledexecutor.py43
-rw-r--r--oslo_messaging/tests/executors/test_executor.py52
4 files changed, 85 insertions, 30 deletions
diff --git a/oslo_messaging/_executors/base.py b/oslo_messaging/_executors/base.py
index 6fbc153..7749c00 100644
--- a/oslo_messaging/_executors/base.py
+++ b/oslo_messaging/_executors/base.py
@@ -30,12 +30,18 @@ class ExecutorBase(object):
@abc.abstractmethod
def start(self):
- "Start polling for incoming messages."
+ """Start polling for incoming messages."""
@abc.abstractmethod
def stop(self):
- "Stop polling for messages."
+ """Stop polling for messages."""
@abc.abstractmethod
- def wait(self):
- "Wait until the executor has stopped polling."
+ def wait(self, timeout=None):
+ """Wait until the executor has stopped polling.
+
+ If a timeout is provided, and it is not ``None`` then this method will
+ wait up to that amount of time for its components to finish, if not
+ all components finish in the alloted time, then false will be returned
+ otherwise true will be returned.
+ """
diff --git a/oslo_messaging/_executors/impl_blocking.py b/oslo_messaging/_executors/impl_blocking.py
index cc40edd..b59818f 100644
--- a/oslo_messaging/_executors/impl_blocking.py
+++ b/oslo_messaging/_executors/impl_blocking.py
@@ -26,13 +26,17 @@ class FakeBlockingThread(object):
self._target()
@staticmethod
- def join():
+ def join(timeout=None):
pass
@staticmethod
def stop():
pass
+ @staticmethod
+ def is_alive():
+ return False
+
class BlockingExecutor(impl_pooledexecutor.PooledExecutor):
"""A message executor which blocks the current thread.
diff --git a/oslo_messaging/_executors/impl_pooledexecutor.py b/oslo_messaging/_executors/impl_pooledexecutor.py
index 598229c..c083770 100644
--- a/oslo_messaging/_executors/impl_pooledexecutor.py
+++ b/oslo_messaging/_executors/impl_pooledexecutor.py
@@ -20,6 +20,7 @@ import threading
from futurist import waiters
from oslo_config import cfg
from oslo_utils import excutils
+from oslo_utils import timeutils
from oslo_messaging._executors import base
@@ -116,16 +117,32 @@ class PooledExecutor(base.ExecutorBase):
self._tombstone.set()
self.listener.stop()
- def wait(self):
- # TODO(harlowja): this method really needs a timeout.
- if self._poller is not None:
- self._tombstone.wait()
- self._poller.join()
- self._poller = None
- if self._executor is not None:
- with self._mutator:
- incomplete_fs = list(self._incomplete)
- self._incomplete.clear()
- if incomplete_fs:
- self._wait_for_all(incomplete_fs)
- self._executor = None
+ def wait(self, timeout=None):
+ with timeutils.StopWatch(duration=timeout) as w:
+ poller = self._poller
+ if poller is not None:
+ self._tombstone.wait(w.leftover(return_none=True))
+ if not self._tombstone.is_set():
+ return False
+ poller.join(w.leftover(return_none=True))
+ if poller.is_alive():
+ return False
+ self._poller = None
+ executor = self._executor
+ if executor is not None:
+ with self._mutator:
+ incomplete_fs = list(self._incomplete)
+ if incomplete_fs:
+ (done, not_done) = self._wait_for_all(
+ incomplete_fs,
+ timeout=w.leftover(return_none=True))
+ with self._mutator:
+ for fut in done:
+ try:
+ self._incomplete.remove(fut)
+ except ValueError:
+ pass
+ if not_done:
+ return False
+ self._executor = None
+ return True
diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py
index 3a6b00d..007d3ac 100644
--- a/oslo_messaging/tests/executors/test_executor.py
+++ b/oslo_messaging/tests/executors/test_executor.py
@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import time
import threading
# eventlet 0.16 with monkey patching does not work yet on Python 3,
@@ -71,9 +72,9 @@ class TestExecutor(test_utils.BaseTestCase):
thread = threading.Thread(target=target, args=(executor,))
thread.daemon = True
thread.start()
- thread.join(timeout=30)
+ return thread
- def test_executor_dispatch(self):
+ def _create_dispatcher(self):
if impl_aioeventlet is not None:
aioeventlet_class = impl_aioeventlet.AsyncioEventletExecutor
else:
@@ -110,11 +111,13 @@ class TestExecutor(test_utils.BaseTestCase):
endpoint = mock.MagicMock(return_value=simple_coroutine('result'))
event = eventlet.event.Event()
else:
+
def run_executor(executor):
executor.start()
executor.wait()
endpoint = mock.MagicMock(return_value='result')
+ event = None
class Dispatcher(object):
def __init__(self, endpoint):
@@ -139,27 +142,52 @@ class TestExecutor(test_utils.BaseTestCase):
self.callback,
executor_callback)
+ return Dispatcher(endpoint), endpoint, event, run_executor
+
+ def test_slow_wait(self):
+ dispatcher, endpoint, event, run_executor = self._create_dispatcher()
listener = mock.Mock(spec=['poll', 'stop'])
- dispatcher = Dispatcher(endpoint)
executor = self.executor(self.conf, listener, dispatcher)
-
incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
def fake_poll(timeout=None):
- if is_aioeventlet:
- if listener.poll.call_count == 1:
- return incoming_message
- event.wait()
+ time.sleep(0.1)
+ if listener.poll.call_count == 10:
+ if event is not None:
+ event.wait()
executor.stop()
else:
- if listener.poll.call_count == 1:
- return incoming_message
- executor.stop()
+ return incoming_message
listener.poll.side_effect = fake_poll
+ thread = self._run_in_thread(run_executor, executor)
+ self.assertFalse(executor.wait(timeout=0.1))
+ thread.join()
+ self.assertTrue(executor.wait())
- self._run_in_thread(run_executor, executor)
+ def test_dead_wait(self):
+ dispatcher, _endpoint, _event, _run_executor = self._create_dispatcher()
+ listener = mock.Mock(spec=['poll', 'stop'])
+ executor = self.executor(self.conf, listener, dispatcher)
+ executor.stop()
+ self.assertTrue(executor.wait())
+ def test_executor_dispatch(self):
+ dispatcher, endpoint, event, run_executor = self._create_dispatcher()
+ listener = mock.Mock(spec=['poll', 'stop'])
+ executor = self.executor(self.conf, listener, dispatcher)
+ incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
+
+ def fake_poll(timeout=None):
+ if listener.poll.call_count == 1:
+ return incoming_message
+ if event is not None:
+ event.wait()
+ executor.stop()
+
+ listener.poll.side_effect = fake_poll
+ thread = self._run_in_thread(run_executor, executor)
+ thread.join()
endpoint.assert_called_once_with({}, {'payload': 'data'})
self.assertEqual(dispatcher.result, 'result')