summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDoug Royal <douglasroyal@gmail.com>2015-07-22 14:29:37 -0500
committerDoug Royal <douglasroyal@gmail.com>2015-07-31 18:04:37 -0500
commitdec09ae5ffbe2de415fede0947ad1cb887574e2e (patch)
tree6fdf09eb93a61002a1c545634ef9ae1c6e7f6211
parentc90525bfead1f495df86c5c5d795d25abad2e1d9 (diff)
downloadoslo-messaging-dec09ae5ffbe2de415fede0947ad1cb887574e2e.tar.gz
Add unit tests for zmq_async
Change option from boolean zmq_native to string zmq_concurrency. This eliminates ambiguity by requiring the user to explicitly name the mechanism they want to use for concurrency. Change-Id: I341a3eee73a0449716d3ee0df690bbe6af39bdf0
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py6
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_async.py64
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_zmq_async.py170
3 files changed, 210 insertions, 30 deletions
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index 064fe7c..18086eb 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -48,10 +48,8 @@ zmq_opts = [
default=True,
help='Use REQ/REP pattern for all methods CALL/CAST/FANOUT.'),
- cfg.BoolOpt('rpc_zmq_native',
- default=False,
- help='Switches ZeroMQ eventlet/threading way of usage.'
- 'Affects pollers, executors etc.'),
+ cfg.StrOpt('rpc_zmq_concurrency', default='eventlet',
+ help='Type of concurrency used. Either "native" or "eventlet"'),
# The following port is unassigned by IANA as of 2012-05-21
cfg.IntOpt('rpc_zmq_port', default=9501,
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_async.py b/oslo_messaging/_drivers/zmq_driver/zmq_async.py
index 2617463..7f437fd 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_async.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_async.py
@@ -14,20 +14,25 @@
import logging
+from oslo_messaging._drivers.zmq_driver.poller import green_poller
+from oslo_messaging._drivers.zmq_driver.poller import threading_poller
+from oslo_messaging._i18n import _, _LE
from oslo_utils import importutils
-from oslo_messaging._i18n import _LE
-
LOG = logging.getLogger(__name__)
-green_zmq = importutils.try_import('eventlet.green.zmq')
+# Map zmq_concurrency config option names to the actual module name.
+ZMQ_MODULES = {
+ 'native': 'zmq',
+ 'eventlet': 'eventlet.green.zmq',
+}
+
+def import_zmq(zmq_concurrency='eventlet'):
+ _raise_error_if_invalid_config_value(zmq_concurrency)
-def import_zmq(native_zmq=False):
- if native_zmq:
- imported_zmq = importutils.try_import('zmq')
- else:
- imported_zmq = green_zmq or importutils.try_import('zmq')
+ imported_zmq = importutils.try_import(ZMQ_MODULES[zmq_concurrency],
+ default='zmq')
if imported_zmq is None:
errmsg = _LE("ZeroMQ not found!")
@@ -36,28 +41,35 @@ def import_zmq(native_zmq=False):
return imported_zmq
-def get_poller(native_zmq=False):
- if native_zmq or green_zmq is None:
- from oslo_messaging._drivers.zmq_driver.poller import threading_poller
- return threading_poller.ThreadingPoller()
- else:
- from oslo_messaging._drivers.zmq_driver.poller import green_poller
+def get_poller(zmq_concurrency='eventlet'):
+ _raise_error_if_invalid_config_value(zmq_concurrency)
+
+ if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
return green_poller.GreenPoller()
+ return threading_poller.ThreadingPoller()
-def get_reply_poller(native_zmq=False):
- if native_zmq or green_zmq is None:
- from oslo_messaging._drivers.zmq_driver.poller import threading_poller
- return threading_poller.ThreadingPoller()
- else:
- from oslo_messaging._drivers.zmq_driver.poller import green_poller
+def get_reply_poller(zmq_concurrency='eventlet'):
+ _raise_error_if_invalid_config_value(zmq_concurrency)
+
+ if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
return green_poller.HoldReplyPoller()
+ return threading_poller.ThreadingPoller()
+
+def get_executor(method, zmq_concurrency='eventlet'):
+ _raise_error_if_invalid_config_value(zmq_concurrency)
-def get_executor(method, native_zmq=False):
- if native_zmq or green_zmq is None:
- from oslo_messaging._drivers.zmq_driver.poller import threading_poller
- return threading_poller.ThreadingExecutor(method)
- else:
- from oslo_messaging._drivers.zmq_driver.poller import green_poller
+ if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
return green_poller.GreenExecutor(method)
+ return threading_poller.ThreadingExecutor(method)
+
+
+def _is_eventlet_zmq_available():
+ return importutils.try_import('eventlet.green.zmq')
+
+
+def _raise_error_if_invalid_config_value(zmq_concurrency):
+ if zmq_concurrency not in ZMQ_MODULES:
+ errmsg = _('Invalid zmq_concurrency value: %s')
+ raise ValueError(errmsg % zmq_concurrency)
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py
new file mode 100644
index 0000000..28e091a
--- /dev/null
+++ b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py
@@ -0,0 +1,170 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from oslo_messaging._drivers.zmq_driver.poller import green_poller
+from oslo_messaging._drivers.zmq_driver.poller import threading_poller
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging.tests import utils as test_utils
+
+
+class TestImportZmq(test_utils.BaseTestCase):
+
+ def setUp(self):
+ super(TestImportZmq, self).setUp()
+
+ def test_config_short_names_are_converted_to_correct_module_names(self):
+ mock_try_import = mock.Mock()
+ zmq_async.importutils.try_import = mock_try_import
+
+ zmq_async.importutils.try_import.return_value = 'mock zmq module'
+ self.assertEqual('mock zmq module', zmq_async.import_zmq('native'))
+ mock_try_import.assert_called_with('zmq', default='zmq')
+
+ zmq_async.importutils.try_import.return_value = 'mock eventlet module'
+ self.assertEqual('mock eventlet module',
+ zmq_async.import_zmq('eventlet'))
+ mock_try_import.assert_called_with('eventlet.green.zmq', default='zmq')
+
+ def test_when_no_args_then_default_zmq_module_is_loaded(self):
+ mock_try_import = mock.Mock()
+ zmq_async.importutils.try_import = mock_try_import
+
+ zmq_async.import_zmq()
+
+ mock_try_import.assert_called_with('eventlet.green.zmq', default='zmq')
+
+ def test_when_import_fails_then_raise_ImportError(self):
+ zmq_async.importutils.try_import = mock.Mock()
+ zmq_async.importutils.try_import.return_value = None
+
+ with self.assertRaisesRegexp(ImportError, "ZeroMQ not found!"):
+ zmq_async.import_zmq('native')
+
+ def test_invalid_config_value_raise_ValueError(self):
+ invalid_opt = 'x'
+
+ errmsg = 'Invalid zmq_concurrency value: x'
+ with self.assertRaisesRegexp(ValueError, errmsg):
+ zmq_async.import_zmq(invalid_opt)
+
+
+class TestGetPoller(test_utils.BaseTestCase):
+
+ def setUp(self):
+ super(TestGetPoller, self).setUp()
+
+ def test_when_no_arg_to_get_poller_then_return_default_poller(self):
+ zmq_async._is_eventlet_zmq_available = lambda: True
+
+ actual = zmq_async.get_poller()
+
+ self.assertTrue(isinstance(actual, green_poller.GreenPoller))
+
+ def test_when_native_poller_requested_then_return_ThreadingPoller(self):
+ actual = zmq_async.get_poller('native')
+
+ self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
+
+ def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
+ zmq_async._is_eventlet_zmq_available = lambda: False
+
+ actual = zmq_async.get_poller('eventlet')
+
+ self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
+
+ def test_when_eventlet_is_available_then_return_GreenPoller(self):
+ zmq_async._is_eventlet_zmq_available = lambda: True
+
+ actual = zmq_async.get_poller('eventlet')
+
+ self.assertTrue(isinstance(actual, green_poller.GreenPoller))
+
+ def test_invalid_config_value_raise_ValueError(self):
+ invalid_opt = 'x'
+
+ errmsg = 'Invalid zmq_concurrency value: x'
+ with self.assertRaisesRegexp(ValueError, errmsg):
+ zmq_async.get_poller(invalid_opt)
+
+
+class TestGetReplyPoller(test_utils.BaseTestCase):
+
+ def setUp(self):
+ super(TestGetReplyPoller, self).setUp()
+
+ def test_default_reply_poller_is_HoldReplyPoller(self):
+ zmq_async._is_eventlet_zmq_available = lambda: True
+
+ actual = zmq_async.get_reply_poller()
+
+ self.assertTrue(isinstance(actual, green_poller.HoldReplyPoller))
+
+ def test_when_eventlet_is_available_then_return_HoldReplyPoller(self):
+ zmq_async._is_eventlet_zmq_available = lambda: True
+
+ actual = zmq_async.get_reply_poller('eventlet')
+
+ self.assertTrue(isinstance(actual, green_poller.HoldReplyPoller))
+
+ def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
+ zmq_async._is_eventlet_zmq_available = lambda: False
+
+ actual = zmq_async.get_reply_poller('eventlet')
+
+ self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
+
+ def test_invalid_config_value_raise_ValueError(self):
+ invalid_opt = 'x'
+
+ errmsg = 'Invalid zmq_concurrency value: x'
+ with self.assertRaisesRegexp(ValueError, errmsg):
+ zmq_async.get_reply_poller(invalid_opt)
+
+
+class TestGetExecutor(test_utils.BaseTestCase):
+
+ def setUp(self):
+ super(TestGetExecutor, self).setUp()
+
+ def test_default_executor_is_GreenExecutor(self):
+ zmq_async._is_eventlet_zmq_available = lambda: True
+
+ executor = zmq_async.get_executor('any method')
+
+ self.assertTrue(isinstance(executor, green_poller.GreenExecutor))
+ self.assertEqual('any method', executor._method)
+
+ def test_when_eventlet_module_is_available_then_return_GreenExecutor(self):
+ zmq_async._is_eventlet_zmq_available = lambda: True
+
+ executor = zmq_async.get_executor('any method', 'eventlet')
+
+ self.assertTrue(isinstance(executor, green_poller.GreenExecutor))
+ self.assertEqual('any method', executor._method)
+
+ def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self):
+ zmq_async._is_eventlet_zmq_available = lambda: False
+
+ executor = zmq_async.get_executor('any method', 'eventlet')
+
+ self.assertTrue(isinstance(executor,
+ threading_poller.ThreadingExecutor))
+ self.assertEqual('any method', executor._method)
+
+ def test_invalid_config_value_raise_ValueError(self):
+ invalid_opt = 'x'
+
+ errmsg = 'Invalid zmq_concurrency value: x'
+ with self.assertRaisesRegexp(ValueError, errmsg):
+ zmq_async.get_executor('any method', invalid_opt)