summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-03-27 19:35:04 +0000
committerGerrit Code Review <review@openstack.org>2018-03-27 19:35:04 +0000
commit415e9b991b8775325d6a996ce405ae2be191d40c (patch)
treedd830fb827d1958730af5d6224e67a19d2705710
parentba9630707c4051c0adac47e9e43f4255fbcd14dc (diff)
parentfca263c43c85c29ee9c1b57b767552c0c613a72e (diff)
downloadoslo-messaging-415e9b991b8775325d6a996ce405ae2be191d40c.tar.gz
Merge "remove zmq tests"
-rw-r--r--.zuul.yaml84
-rw-r--r--oslo_messaging/tests/drivers/zmq/__init__.py0
-rw-r--r--oslo_messaging/tests/drivers/zmq/matchmaker/__init__.py0
-rwxr-xr-xoslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py140
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_impl_zmq.py129
-rwxr-xr-xoslo_messaging/tests/drivers/zmq/test_pub_sub.py150
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_routing_table.py80
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py226
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_zmq_address.py67
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_zmq_async.py93
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py127
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py132
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_zmq_version.py63
-rw-r--r--oslo_messaging/tests/drivers/zmq/zmq_common.py111
-rw-r--r--oslo_messaging/tests/functional/zmq/__init__.py0
-rw-r--r--oslo_messaging/tests/functional/zmq/multiproc_utils.py232
-rw-r--r--oslo_messaging/tests/functional/zmq/test_startup.py49
17 files changed, 0 insertions, 1683 deletions
diff --git a/.zuul.yaml b/.zuul.yaml
index da20f4a..e7f95b7 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -19,27 +19,6 @@
bindep_profile: rabbit
- job:
- name: oslo.messaging-tox-py27-func-zmq
- parent: openstack-tox-py27
- vars:
- tox_envlist: py27-func-zmq
- bindep_profile: zmq
-
-- job:
- name: oslo.messaging-tox-py27-func-zmq-proxy
- parent: openstack-tox-py27
- vars:
- tox_envlist: py27-func-zmq-proxy
- bindep_profile: zmq
-
-- job:
- name: oslo.messaging-tox-py27-func-zmq-pubsub
- parent: openstack-tox-py27
- vars:
- tox_envlist: py27-func-zmq-pubsub
- bindep_profile: zmq
-
-- job:
name: oslo.messaging-tox-py35-func-amqp1
parent: openstack-tox-py35
vars:
@@ -53,13 +32,6 @@
bindep_profile: rabbit
- job:
- name: oslo.messaging-tox-py35-func-zmq
- parent: openstack-tox-py35
- vars:
- tox_envlist: py35-func-zmq
- bindep_profile: zmq
-
-- job:
name: oslo.messaging-src-dsvm-full-rabbit-default
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-src-dsvm-full-rabbit-default/run.yaml
@@ -116,17 +88,6 @@
- openstack/oslo.messaging
- job:
- name: oslo.messaging-src-dsvm-full-zmq-default
- parent: legacy-dsvm-base
- run: playbooks/oslo.messaging-src-dsvm-full-zmq-default/run.yaml
- post-run: playbooks/oslo.messaging-src-dsvm-full-zmq-default/post.yaml
- timeout: 10800
- required-projects:
- - openstack-infra/devstack-gate
- - openstack/devstack-plugin-zmq
- - openstack/oslo.messaging
-
-- job:
name: oslo.messaging-src-grenade-dsvm
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-src-grenade-dsvm/run.yaml
@@ -195,24 +156,6 @@
- openstack/diskimage-builder
- job:
- name: oslo.messaging-telemetry-dsvm-integration-zmq
- parent: legacy-dsvm-base
- run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/run.yaml
- post-run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/post.yaml
- timeout: 4200
- required-projects:
- - openstack-infra/devstack-gate
- - openstack/aodh
- - openstack/ceilometer
- - openstack/devstack-plugin-zmq
- - openstack/oslo.messaging
- - openstack/panko
- # following are required when DEVSTACK_GATE_HEAT, which this
- # job turns on
- - openstack/dib-utils
- - openstack/diskimage-builder
-
-- job:
name: oslo.messaging-telemetry-dsvm-integration-rabbit
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-telemetry-dsvm-integration-rabbit/run.yaml
@@ -270,19 +213,6 @@
- openstack/oslo.messaging
- openstack/tempest
-- job:
- name: oslo.messaging-tempest-neutron-dsvm-src-zmq-default
- parent: legacy-dsvm-base
- run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/run.yaml
- post-run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/post.yaml
- timeout: 7800
- required-projects:
- - openstack-infra/devstack-gate
- - openstack/devstack-plugin-zmq
- - openstack/neutron
- - openstack/oslo.messaging
- - openstack/tempest
-
- project:
check:
@@ -292,18 +222,10 @@
- oslo.messaging-tox-py27-func-kafka:
voting: false
- oslo.messaging-tox-py27-func-rabbit
- - oslo.messaging-tox-py27-func-zmq-proxy:
- voting: false
- - oslo.messaging-tox-py27-func-zmq-pubsub:
- voting: false
- - oslo.messaging-tox-py27-func-zmq:
- voting: false
- oslo.messaging-tox-py35-func-amqp1:
voting: false
- oslo.messaging-tox-py35-func-rabbit:
voting: false
- - oslo.messaging-tox-py35-func-zmq:
- voting: false
- oslo.messaging-src-dsvm-full-rabbit-default
- oslo.messaging-src-dsvm-full-amqp1-hybrid:
@@ -316,8 +238,6 @@
voting: false
- oslo.messaging-src-dsvm-full-kafka-default:
voting: false
- - oslo.messaging-src-dsvm-full-zmq-default:
- voting: false
- oslo.messaging-src-grenade-dsvm:
voting: false
@@ -329,8 +249,6 @@
voting: false
- oslo.messaging-telemetry-dsvm-integration-kafka:
voting: false
- - oslo.messaging-telemetry-dsvm-integration-zmq:
- voting: false
- oslo.messaging-tempest-neutron-dsvm-src-rabbit-default
- oslo.messaging-tempest-neutron-dsvm-src-amqp1-hybrid:
@@ -338,8 +256,6 @@
branches: ^(?!stable/ocata).*$
- oslo.messaging-tempest-neutron-dsvm-src-kafka-default:
voting: false
- - oslo.messaging-tempest-neutron-dsvm-src-zmq-default:
- voting: false
gate:
jobs:
diff --git a/oslo_messaging/tests/drivers/zmq/__init__.py b/oslo_messaging/tests/drivers/zmq/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/tests/drivers/zmq/__init__.py
+++ /dev/null
diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/__init__.py b/oslo_messaging/tests/drivers/zmq/matchmaker/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/tests/drivers/zmq/matchmaker/__init__.py
+++ /dev/null
diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py
deleted file mode 100755
index a20cdb4..0000000
--- a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py
+++ /dev/null
@@ -1,140 +0,0 @@
-# Copyright 2014 Canonical, Ltd.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import inspect
-from stevedore import driver
-import testscenarios
-
-import oslo_messaging
-from oslo_messaging.tests import utils as test_utils
-from oslo_utils import importutils
-
-redis = importutils.try_import('redis')
-
-
-def redis_available():
- '''Helper to see if local redis server is running'''
- if not redis:
- return False
- try:
- redis.StrictRedis(socket_timeout=1).ping()
- return True
- except redis.exceptions.ConnectionError:
- return False
-
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class TestImplMatchmaker(test_utils.BaseTestCase):
-
- scenarios = [
- ("dummy", {"rpc_zmq_matchmaker": "dummy"}),
- ("redis", {"rpc_zmq_matchmaker": "redis"}),
- ]
-
- def setUp(self):
- super(TestImplMatchmaker, self).setUp()
-
- if self.rpc_zmq_matchmaker == "redis":
- if not redis_available():
- self.skipTest("redis unavailable")
-
- self.test_matcher = driver.DriverManager(
- 'oslo.messaging.zmq.matchmaker',
- self.rpc_zmq_matchmaker,
- ).driver(self.conf)
-
- if self.rpc_zmq_matchmaker == "redis":
- for redis_instance in self.test_matcher._redis_instances:
- self.addCleanup(redis_instance.flushdb)
-
- self.target = oslo_messaging.Target(topic="test_topic")
- self.host1 = b"test_host1"
- self.host2 = b"test_host2"
-
- def test_register(self):
- self.test_matcher.register(
- self.target,
- self.host1,
- "test",
- expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
-
- self.assertEqual([self.host1],
- self.test_matcher.get_hosts(self.target, "test"))
-
- def test_register_two_hosts(self):
- self.test_matcher.register(
- self.target,
- self.host1,
- "test",
- expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
- self.test_matcher.register(
- self.target,
- self.host2,
- "test",
- expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
-
- self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
- [self.host1, self.host2])
-
- def test_register_unregister(self):
- self.test_matcher.register(
- self.target,
- self.host1,
- "test",
- expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
- self.test_matcher.register(
- self.target,
- self.host2,
- "test",
- expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
-
- self.test_matcher.unregister(self.target, self.host2, "test")
-
- self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
- [self.host1])
-
- def test_register_two_same_hosts(self):
- self.test_matcher.register(
- self.target,
- self.host1,
- "test",
- expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
- self.test_matcher.register(
- self.target,
- self.host1,
- "test",
- expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
-
- self.assertEqual([self.host1],
- self.test_matcher.get_hosts(self.target, "test"))
-
- def test_get_hosts_wrong_topic(self):
- target = oslo_messaging.Target(topic="no_such_topic")
- self.assertEqual([], self.test_matcher.get_hosts(target, "test"))
-
- def test_handle_redis_package_error(self):
- if self.rpc_zmq_matchmaker == "redis":
- # move 'redis' variable to prevent this case affect others
- module = inspect.getmodule(self.test_matcher)
- redis_package = module.redis
-
- # 'redis' variable is set to None, when package importing is failed
- module.redis = None
- self.assertRaises(ImportError, self.test_matcher.__init__,
- self.conf)
-
- # retrieve 'redis' variable which is set originally
- module.redis = redis_package
diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
deleted file mode 100644
index 5c2d7e4..0000000
--- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
+++ /dev/null
@@ -1,129 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import testtools
-
-import oslo_messaging
-from oslo_messaging._drivers import impl_zmq
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_socket
-from oslo_messaging.tests.drivers.zmq import zmq_common
-from oslo_messaging.tests import utils as test_utils
-
-
-zmq = zmq_async.import_zmq()
-
-
-class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(ZmqTestPortsRange, self).setUp()
-
- # Set config values
- kwargs = {'rpc_zmq_min_port': 5555,
- 'rpc_zmq_max_port': 5560}
- self.config(group='oslo_messaging_zmq', **kwargs)
-
- def test_ports_range(self):
- listeners = []
-
- for i in range(10):
- try:
- target = oslo_messaging.Target(topic='testtopic_' + str(i))
- new_listener = self.driver.listen(target, None, None)
- listeners.append(new_listener)
- except zmq_socket.ZmqPortBusy:
- pass
-
- self.assertLessEqual(len(listeners), 5)
-
- for l in listeners:
- l.cleanup()
-
-
-class TestConfZmqDriverLoad(test_utils.BaseTestCase):
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(TestConfZmqDriverLoad, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
-
- def test_driver_load(self):
- transport = oslo_messaging.get_transport(self.conf)
- self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
-
-
-class TestZmqBasics(zmq_common.ZmqBaseTestCase):
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(TestZmqBasics, self).setUp()
- self.target = oslo_messaging.Target(topic='topic')
- self.ctxt = {'key': 'value'}
- self.message = {'method': 'qwerty', 'args': {'int': 1, 'bool': True}}
-
- def test_send_call_without_method_failure(self):
- self.message.pop('method')
- self.listener.listen(self.target)
- self.assertRaises(KeyError, self.driver.send,
- self.target, self.ctxt, self.message,
- wait_for_reply=True, timeout=10)
-
- def _check_listener_received(self):
- self.assertTrue(self.listener._received.isSet())
- self.assertEqual(self.ctxt, self.listener.message.ctxt)
- self.assertEqual(self.message, self.listener.message.message)
-
- def test_send_call_success(self):
- self.listener.listen(self.target)
- result = self.driver.send(self.target, self.ctxt, self.message,
- wait_for_reply=True, timeout=10)
- self.assertTrue(result)
- self._check_listener_received()
-
- def test_send_call_direct_success(self):
- self.target.server = 'server'
- self.listener.listen(self.target)
- result = self.driver.send(self.target, self.ctxt, self.message,
- wait_for_reply=True, timeout=10)
- self.assertTrue(result)
- self._check_listener_received()
-
- def test_send_cast_direct_success(self):
- self.target.server = 'server'
- self.listener.listen(self.target)
- result = self.driver.send(self.target, self.ctxt, self.message,
- wait_for_reply=False)
- self.listener._received.wait(5)
- self.assertIsNone(result)
- self._check_listener_received()
-
- def test_send_fanout_success(self):
- self.target.fanout = True
- self.listener.listen(self.target)
- result = self.driver.send(self.target, self.ctxt, self.message,
- wait_for_reply=False)
- self.listener._received.wait(5)
- self.assertIsNone(result)
- self._check_listener_received()
-
- def test_send_notify_success(self):
- self.listener.listen_notifications([(self.target, 'info')])
- self.target.topic += '.info'
- result = self.driver.send_notification(self.target, self.ctxt,
- self.message, '3.0')
- self.listener._received.wait(5)
- self.assertIsNone(result)
- self._check_listener_received()
diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
deleted file mode 100755
index cc72608..0000000
--- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
+++ /dev/null
@@ -1,150 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import json
-import time
-
-import msgpack
-import six
-import testscenarios
-
-from oslo_config import cfg
-
-import oslo_messaging
-from oslo_messaging._drivers.zmq_driver.proxy.central \
- import zmq_publisher_proxy
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
-from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_version
-from oslo_messaging.tests.drivers.zmq import zmq_common
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-zmq = zmq_async.import_zmq()
-
-opt_group = cfg.OptGroup(name='zmq_proxy_opts',
- title='ZeroMQ proxy options')
-cfg.CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
-
-
-class TestPubSub(zmq_common.ZmqBaseTestCase):
-
- LISTENERS_COUNT = 3
-
- scenarios = [
- ('json', {'serialization': 'json',
- 'dumps': lambda obj: six.b(json.dumps(obj))}),
- ('msgpack', {'serialization': 'msgpack',
- 'dumps': msgpack.dumps})
- ]
-
- def setUp(self):
- super(TestPubSub, self).setUp()
-
- kwargs = {'use_pub_sub': True,
- 'rpc_zmq_serialization': self.serialization}
- self.config(group='oslo_messaging_zmq', **kwargs)
-
- self.config(host="127.0.0.1", group="zmq_proxy_opts")
- self.config(publisher_port=0, group="zmq_proxy_opts")
-
- self.publisher = zmq_publisher_proxy.PublisherProxy(
- self.conf, self.driver.matchmaker)
- self.driver.matchmaker.register_publisher(
- (self.publisher.host, ''),
- expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
-
- self.listeners = []
- for _ in range(self.LISTENERS_COUNT):
- self.listeners.append(zmq_common.TestServerListener(self.driver))
-
- def tearDown(self):
- super(TestPubSub, self).tearDown()
- self.publisher.cleanup()
- for listener in self.listeners:
- listener.stop()
-
- def _send_request(self, target):
- # Needed only in test env to give listener a chance to connect
- # before request fires
- time.sleep(1)
- context = {}
- message = {'method': 'hello-world'}
-
- self.publisher.send_request(
- [b"reply_id",
- b'',
- six.b(zmq_version.MESSAGE_VERSION),
- six.b(str(zmq_names.CAST_FANOUT_TYPE)),
- zmq_address.target_to_subscribe_filter(target),
- b"message_id",
- self.dumps([context, message])]
- )
-
- def _check_listener(self, listener):
- listener._received.wait(timeout=5)
- self.assertTrue(listener._received.isSet())
- method = listener.message.message[u'method']
- self.assertEqual(u'hello-world', method)
-
- def _check_listener_negative(self, listener):
- listener._received.wait(timeout=1)
- self.assertFalse(listener._received.isSet())
-
- def test_single_listener(self):
- target = oslo_messaging.Target(topic='testtopic', fanout=True)
- self.listener.listen(target)
-
- self._send_request(target)
-
- self._check_listener(self.listener)
-
- def test_all_listeners(self):
- target = oslo_messaging.Target(topic='testtopic', fanout=True)
-
- for listener in self.listeners:
- listener.listen(target)
-
- self._send_request(target)
-
- for listener in self.listeners:
- self._check_listener(listener)
-
- def test_filtered(self):
- target = oslo_messaging.Target(topic='testtopic', fanout=True)
- target_wrong = oslo_messaging.Target(topic='wrong', fanout=True)
-
- self.listeners[0].listen(target)
- self.listeners[1].listen(target)
- self.listeners[2].listen(target_wrong)
-
- self._send_request(target)
-
- self._check_listener(self.listeners[0])
- self._check_listener(self.listeners[1])
- self._check_listener_negative(self.listeners[2])
-
- def test_topic_part_matching(self):
- target = oslo_messaging.Target(topic='testtopic', server='server')
- target_part = oslo_messaging.Target(topic='testtopic', fanout=True)
-
- self.listeners[0].listen(target)
- self.listeners[1].listen(target)
-
- self._send_request(target_part)
-
- self._check_listener(self.listeners[0])
- self._check_listener(self.listeners[1])
diff --git a/oslo_messaging/tests/drivers/zmq/test_routing_table.py b/oslo_messaging/tests/drivers/zmq/test_routing_table.py
deleted file mode 100644
index 508a161..0000000
--- a/oslo_messaging/tests/drivers/zmq/test_routing_table.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging.tests import utils as test_utils
-
-
-zmq = zmq_async.import_zmq()
-
-
-class TestRoutingTable(test_utils.BaseTestCase):
-
- def setUp(self):
- super(TestRoutingTable, self).setUp()
-
- def test_get_next_while_origin_changed(self):
- table = zmq_routing_table.RoutingTable(self.conf)
- table.register("topic1.server1", "1")
- table.register("topic1.server1", "2")
- table.register("topic1.server1", "3")
-
- rr_gen = table.get_hosts_round_robin("topic1.server1")
-
- result = []
- for i in range(3):
- result.append(next(rr_gen))
-
- self.assertEqual(3, len(result))
- self.assertIn("1", result)
- self.assertIn("2", result)
- self.assertIn("3", result)
-
- table.register("topic1.server1", "4")
- table.register("topic1.server1", "5")
- table.register("topic1.server1", "6")
-
- result = []
- for i in range(6):
- result.append(next(rr_gen))
-
- self.assertEqual(6, len(result))
- self.assertIn("1", result)
- self.assertIn("2", result)
- self.assertIn("3", result)
- self.assertIn("4", result)
- self.assertIn("5", result)
- self.assertIn("6", result)
-
- def test_no_targets(self):
- table = zmq_routing_table.RoutingTable(self.conf)
- rr_gen = table.get_hosts_round_robin("topic1.server1")
-
- result = []
- for t in rr_gen:
- result.append(t)
- self.assertEqual(0, len(result))
-
- def test_target_unchanged(self):
- table = zmq_routing_table.RoutingTable(self.conf)
- table.register("topic1.server1", "1")
-
- rr_gen = table.get_hosts_round_robin("topic1.server1")
-
- result = []
- for i in range(3):
- result.append(next(rr_gen))
-
- self.assertEqual(["1", "1", "1"], result)
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
deleted file mode 100644
index a0264cf..0000000
--- a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
+++ /dev/null
@@ -1,226 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from six.moves import mock
-import testtools
-import time
-
-import oslo_messaging
-from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
-from oslo_messaging._drivers.zmq_driver.client import zmq_senders
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
-from oslo_messaging._drivers.zmq_driver.server.consumers.zmq_dealer_consumer \
- import DealerConsumerWithAcks
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_options
-from oslo_messaging.tests.drivers.zmq import zmq_common
-from oslo_messaging.tests import utils as test_utils
-
-zmq = zmq_async.import_zmq()
-
-
-class TestZmqAckManager(test_utils.BaseTestCase):
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(TestZmqAckManager, self).setUp()
-
- # register and set necessary config opts
- self.messaging_conf.transport_driver = 'zmq'
- zmq_options.register_opts(self.conf, mock.MagicMock())
- kwargs = {'rpc_zmq_matchmaker': 'dummy',
- 'use_pub_sub': False,
- 'use_router_proxy': True,
- 'rpc_thread_pool_size': 1,
- 'rpc_use_acks': True,
- 'rpc_ack_timeout_base': 5,
- 'rpc_ack_timeout_multiplier': 1,
- 'rpc_retry_attempts': 2}
- self.config(group='oslo_messaging_zmq', **kwargs)
- self.conf.register_opts(zmq_proxy.zmq_proxy_opts,
- group='zmq_proxy_opts')
-
- # mock set_result method of futures
- self.set_result_patcher = mock.patch.object(
- zmq_receivers.futurist.Future, 'set_result',
- side_effect=zmq_receivers.futurist.Future.set_result, autospec=True
- )
- self.set_result = self.set_result_patcher.start()
-
- # mock send method of senders
- self.send_patcher = mock.patch.object(
- zmq_senders.RequestSenderProxy, 'send',
- side_effect=zmq_senders.RequestSenderProxy.send, autospec=True
- )
- self.send = self.send_patcher.start()
-
- # get driver
- transport = oslo_messaging.get_transport(self.conf)
- self.driver = transport._driver
-
- # prepare and launch proxy
- self.proxy = zmq_proxy.ZmqProxy(self.conf)
- vars(self.driver.matchmaker).update(vars(self.proxy.matchmaker))
- self.executor = zmq_async.get_executor(self.proxy.run)
- self.executor.execute()
-
- # create listener
- self.listener = zmq_common.TestServerListener(self.driver)
-
- # create target and message
- self.target = oslo_messaging.Target(topic='topic', server='server')
- self.message = {'method': 'xyz', 'args': {'x': 1, 'y': 2, 'z': 3}}
-
- # start listening to target
- self.listener.listen(self.target)
-
- # get ack manager
- self.ack_manager = self.driver.client.get().publishers['default']
-
- self.addCleanup(
- zmq_common.StopRpc(
- self, [('listener', 'stop'), ('executor', 'stop'),
- ('proxy', 'close'), ('driver', 'cleanup'),
- ('send_patcher', 'stop'),
- ('set_result_patcher', 'stop')]
- )
- )
-
- # wait for all connections to be established
- # and all parties to be ready for messaging
- time.sleep(1)
-
- @mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
- side_effect=DealerConsumerWithAcks._acknowledge,
- autospec=True)
- def test_cast_success_without_retries(self, received_ack_mock):
- result = self.driver.send(
- self.target, {}, self.message, wait_for_reply=False
- )
- self.assertIsNone(result)
- self.ack_manager.pool.shutdown(wait=True)
- self.assertTrue(self.listener._received.isSet())
- self.assertEqual(self.message, self.listener.message.message)
- self.assertEqual(1, self.send.call_count)
- self.assertEqual(1, received_ack_mock.call_count)
- self.assertEqual(2, self.set_result.call_count)
-
- def test_cast_success_with_one_retry(self):
- with mock.patch.object(DealerConsumerWithAcks,
- '_acknowledge') as lost_ack_mock:
- result = self.driver.send(
- self.target, {}, self.message, wait_for_reply=False
- )
- self.assertIsNone(result)
- self.listener._received.wait(5)
- self.assertTrue(self.listener._received.isSet())
- self.assertEqual(self.message, self.listener.message.message)
- self.assertEqual(1, self.send.call_count)
- self.assertEqual(1, lost_ack_mock.call_count)
- self.assertEqual(0, self.set_result.call_count)
- self.listener._received.clear()
- with mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
- side_effect=DealerConsumerWithAcks._acknowledge,
- autospec=True) as received_ack_mock:
- self.ack_manager.pool.shutdown(wait=True)
- self.assertFalse(self.listener._received.isSet())
- self.assertEqual(2, self.send.call_count)
- self.assertEqual(1, received_ack_mock.call_count)
- self.assertEqual(2, self.set_result.call_count)
-
- def test_cast_success_with_two_retries(self):
- with mock.patch.object(DealerConsumerWithAcks,
- '_acknowledge') as lost_ack_mock:
- result = self.driver.send(
- self.target, {}, self.message, wait_for_reply=False
- )
- self.assertIsNone(result)
- self.listener._received.wait(5)
- self.assertTrue(self.listener._received.isSet())
- self.assertEqual(self.message, self.listener.message.message)
- self.assertEqual(1, self.send.call_count)
- self.assertEqual(1, lost_ack_mock.call_count)
- self.assertEqual(0, self.set_result.call_count)
- self.listener._received.clear()
- self.listener._received.wait(7.5)
- self.assertFalse(self.listener._received.isSet())
- self.assertEqual(2, self.send.call_count)
- self.assertEqual(2, lost_ack_mock.call_count)
- self.assertEqual(0, self.set_result.call_count)
- with mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
- side_effect=DealerConsumerWithAcks._acknowledge,
- autospec=True) as received_ack_mock:
- self.ack_manager.pool.shutdown(wait=True)
- self.assertFalse(self.listener._received.isSet())
- self.assertEqual(3, self.send.call_count)
- self.assertEqual(1, received_ack_mock.call_count)
- self.assertEqual(2, self.set_result.call_count)
-
- @mock.patch.object(DealerConsumerWithAcks, '_acknowledge')
- def test_cast_failure_exhausted_retries(self, lost_ack_mock):
- result = self.driver.send(
- self.target, {}, self.message, wait_for_reply=False
- )
- self.assertIsNone(result)
- self.ack_manager.pool.shutdown(wait=True)
- self.assertTrue(self.listener._received.isSet())
- self.assertEqual(self.message, self.listener.message.message)
- self.assertEqual(3, self.send.call_count)
- self.assertEqual(3, lost_ack_mock.call_count)
- self.assertEqual(1, self.set_result.call_count)
-
- @mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
- side_effect=DealerConsumerWithAcks._acknowledge,
- autospec=True)
- @mock.patch.object(DealerConsumerWithAcks, '_reply',
- side_effect=DealerConsumerWithAcks._reply,
- autospec=True)
- @mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache',
- side_effect=DealerConsumerWithAcks._reply_from_cache,
- autospec=True)
- def test_call_success_without_retries(self, unused_reply_from_cache_mock,
- received_reply_mock,
- received_ack_mock):
- result = self.driver.send(
- self.target, {}, self.message, wait_for_reply=True, timeout=10
- )
- self.assertIsNotNone(result)
- self.ack_manager.pool.shutdown(wait=True)
- self.assertTrue(self.listener._received.isSet())
- self.assertEqual(self.message, self.listener.message.message)
- self.assertEqual(1, self.send.call_count)
- self.assertEqual(1, received_ack_mock.call_count)
- self.assertEqual(3, self.set_result.call_count)
- received_reply_mock.assert_called_once_with(mock.ANY, mock.ANY,
- reply=True, failure=None)
- self.assertEqual(0, unused_reply_from_cache_mock.call_count)
-
- @mock.patch.object(DealerConsumerWithAcks, '_acknowledge')
- @mock.patch.object(DealerConsumerWithAcks, '_reply')
- @mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache')
- def test_call_failure_exhausted_retries(self, lost_reply_from_cache_mock,
- lost_reply_mock, lost_ack_mock):
- self.assertRaises(oslo_messaging.MessagingTimeout,
- self.driver.send,
- self.target, {}, self.message,
- wait_for_reply=True, timeout=20)
- self.ack_manager.pool.shutdown(wait=True)
- self.assertTrue(self.listener._received.isSet())
- self.assertEqual(self.message, self.listener.message.message)
- self.assertEqual(3, self.send.call_count)
- self.assertEqual(3, lost_ack_mock.call_count)
- self.assertEqual(2, self.set_result.call_count)
- lost_reply_mock.assert_called_once_with(mock.ANY,
- reply=True, failure=None)
- self.assertEqual(2, lost_reply_from_cache_mock.call_count)
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_address.py b/oslo_messaging/tests/drivers/zmq/test_zmq_address.py
deleted file mode 100644
index 519c294..0000000
--- a/oslo_messaging/tests/drivers/zmq/test_zmq_address.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import testscenarios
-import testtools
-
-import oslo_messaging
-from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging.tests import utils as test_utils
-
-
-zmq = zmq_async.import_zmq()
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class TestZmqAddress(test_utils.BaseTestCase):
-
- scenarios = [
- ('router', {'listener_type': zmq_names.socket_type_str(zmq.ROUTER)}),
- ('dealer', {'listener_type': zmq_names.socket_type_str(zmq.DEALER)})
- ]
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def test_target_to_key_topic_only(self):
- target = oslo_messaging.Target(topic='topic')
- key = zmq_address.target_to_key(target, self.listener_type)
- self.assertEqual(self.listener_type + '/topic', key)
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def test_target_to_key_topic_server_round_robin(self):
- target = oslo_messaging.Target(topic='topic', server='server')
- key = zmq_address.target_to_key(target, self.listener_type)
- self.assertEqual(self.listener_type + '/topic/server', key)
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def test_target_to_key_topic_fanout(self):
- target = oslo_messaging.Target(topic='topic', fanout=True)
- key = zmq_address.target_to_key(target, self.listener_type)
- self.assertEqual(self.listener_type + '/topic', key)
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def test_target_to_key_topic_server_fanout(self):
- target = oslo_messaging.Target(topic='topic', server='server',
- fanout=True)
- key = zmq_address.target_to_key(target, self.listener_type)
- self.assertEqual(self.listener_type + '/topic', key)
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def test_target_to_key_topic_server_fanout_no_prefix(self):
- target = oslo_messaging.Target(topic='topic', server='server',
- fanout=True)
- key = zmq_address.target_to_key(target)
- self.assertEqual('topic', key)
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py
deleted file mode 100644
index a4dccd9..0000000
--- a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from six.moves import mock
-import testtools
-
-from oslo_messaging._drivers.zmq_driver.poller import green_poller
-from oslo_messaging._drivers.zmq_driver.poller import threading_poller
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging.tests import utils as test_utils
-
-zmq = zmq_async.import_zmq()
-
-
-class TestImportZmq(test_utils.BaseTestCase):
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(TestImportZmq, self).setUp()
-
- def test_when_eventlet_is_available_then_load_eventlet_green_zmq(self):
- zmq_async.eventletutils.is_monkey_patched = lambda _: True
-
- mock_try_import = mock.Mock()
- zmq_async.importutils.try_import = mock_try_import
-
- zmq_async.import_zmq()
-
- mock_try_import.assert_called_with('eventlet.green.zmq', default=None)
-
- def test_when_evetlet_is_unavailable_then_load_zmq(self):
- zmq_async.eventletutils.is_monkey_patched = lambda _: False
-
- mock_try_import = mock.Mock()
- zmq_async.importutils.try_import = mock_try_import
-
- zmq_async.import_zmq()
-
- mock_try_import.assert_called_with('zmq', default=None)
-
-
-class TestGetPoller(test_utils.BaseTestCase):
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(TestGetPoller, self).setUp()
-
- def test_when_eventlet_is_available_then_return_GreenPoller(self):
- zmq_async.eventletutils.is_monkey_patched = lambda _: True
-
- poller = zmq_async.get_poller()
-
- self.assertIsInstance(poller, green_poller.GreenPoller)
-
- def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
- zmq_async.eventletutils.is_monkey_patched = lambda _: False
-
- poller = zmq_async.get_poller()
-
- self.assertIsInstance(poller, threading_poller.ThreadingPoller)
-
-
-class TestGetExecutor(test_utils.BaseTestCase):
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(TestGetExecutor, self).setUp()
-
- def test_when_eventlet_module_is_available_then_return_GreenExecutor(self):
- zmq_async.eventletutils.is_monkey_patched = lambda _: True
-
- executor = zmq_async.get_executor('any method')
-
- self.assertIsInstance(executor, green_poller.GreenExecutor)
- self.assertEqual('any method', executor._method)
-
- def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self):
- zmq_async.eventletutils.is_monkey_patched = lambda _: False
-
- executor = zmq_async.get_executor('any method')
-
- self.assertIsInstance(executor,
- threading_poller.ThreadingExecutor)
- self.assertEqual('any method', executor._method)
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py b/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py
deleted file mode 100644
index 45df796..0000000
--- a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py
+++ /dev/null
@@ -1,127 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from six.moves import mock
-import testtools
-
-import oslo_messaging
-from oslo_messaging._drivers import common
-from oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base \
- import MatchmakerDummy
-from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging.tests import utils as test_utils
-
-zmq = zmq_async.import_zmq()
-
-redis = zmq_matchmaker_redis.redis
-sentinel = zmq_matchmaker_redis.redis_sentinel
-
-
-class TestZmqTransportUrl(test_utils.BaseTestCase):
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(TestZmqTransportUrl, self).setUp()
-
- def setup_url(self, url):
- transport = oslo_messaging.get_transport(self.conf, url)
- self.addCleanup(transport.cleanup)
- driver = transport._driver
- return driver, url
-
- def mock_redis(self):
- if redis is None:
- self.skipTest("redis not available")
- else:
- redis_patcher = mock.patch.object(redis, 'StrictRedis')
- self.addCleanup(redis_patcher.stop)
- return redis_patcher.start()
-
- def mock_sentinel(self):
- if sentinel is None:
- self.skipTest("sentinel not available")
- else:
- sentinel_patcher = mock.patch.object(sentinel, 'Sentinel')
- self.addCleanup(sentinel_patcher.stop)
- return sentinel_patcher.start()
-
- def test_empty_url(self):
- self.mock_redis()
- driver, url = self.setup_url("zmq:///")
- self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
- driver.matchmaker.__class__)
- self.assertEqual('zmq', driver.matchmaker.url.transport)
-
- def test_error_url(self):
- self.assertRaises(common.RPCException, self.setup_url, "zmq+error:///")
-
- def test_dummy_url(self):
- driver, url = self.setup_url("zmq+dummy:///")
- self.assertIs(MatchmakerDummy,
- driver.matchmaker.__class__)
- self.assertEqual('zmq+dummy', driver.matchmaker.url.transport)
-
- def test_redis_url(self):
- self.mock_redis()
- driver, url = self.setup_url("zmq+redis:///")
- self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
- driver.matchmaker.__class__)
- self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
-
- def test_sentinel_url(self):
- self.mock_sentinel()
- driver, url = self.setup_url("zmq+sentinel:///")
- self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel,
- driver.matchmaker.__class__)
- self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport)
-
- def test_host_with_credentials_url(self):
- self.mock_redis()
- driver, url = self.setup_url("zmq://:password@host:60000/")
- self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
- driver.matchmaker.__class__)
- self.assertEqual('zmq', driver.matchmaker.url.transport)
- self.assertEqual(
- [{"host": "host", "port": 60000, "password": "password"}],
- driver.matchmaker._redis_hosts
- )
-
- def test_redis_multiple_hosts_url(self):
- self.mock_redis()
- driver, url = self.setup_url(
- "zmq+redis://host1:60001,host2:60002,host3:60003/"
- )
- self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
- driver.matchmaker.__class__)
- self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
- self.assertEqual(
- [{"host": "host1", "port": 60001, "password": None},
- {"host": "host2", "port": 60002, "password": None},
- {"host": "host3", "port": 60003, "password": None}],
- driver.matchmaker._redis_hosts
- )
-
- def test_sentinel_multiple_hosts_url(self):
- self.mock_sentinel()
- driver, url = self.setup_url(
- "zmq+sentinel://host1:20001,host2:20002,host3:20003/"
- )
- self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel,
- driver.matchmaker.__class__)
- self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport)
- self.assertEqual(
- [("host1", 20001), ("host2", 20002), ("host3", 20003)],
- driver.matchmaker._sentinel_hosts
- )
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py
deleted file mode 100644
index 772a97d..0000000
--- a/oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py
+++ /dev/null
@@ -1,132 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import time
-
-from oslo_messaging._drivers.zmq_driver.server import zmq_ttl_cache
-from oslo_messaging.tests import utils as test_utils
-
-
-class TestZmqTTLCache(test_utils.BaseTestCase):
-
- def setUp(self):
- super(TestZmqTTLCache, self).setUp()
-
- def call_count_decorator(unbound_method):
- def wrapper(self, *args, **kwargs):
- wrapper.call_count += 1
- return unbound_method(self, *args, **kwargs)
- wrapper.call_count = 0
- return wrapper
-
- zmq_ttl_cache.TTLCache._update_cache = \
- call_count_decorator(zmq_ttl_cache.TTLCache._update_cache)
-
- self.cache = zmq_ttl_cache.TTLCache(ttl=1)
-
- self.addCleanup(lambda: self.cache.cleanup())
-
- def _test_add_get(self):
- self.cache.add('x', 'a')
-
- self.assertEqual(self.cache.get('x'), 'a')
- self.assertEqual(self.cache.get('x', 'b'), 'a')
- self.assertIsNone(self.cache.get('y'))
- self.assertEqual(self.cache.get('y', 'b'), 'b')
-
- time.sleep(1)
-
- self.assertIsNone(self.cache.get('x'))
- self.assertEqual(self.cache.get('x', 'b'), 'b')
-
- def test_add_get_with_executor(self):
- self._test_add_get()
-
- def test_add_get_without_executor(self):
- self.cache._executor.stop()
- self._test_add_get()
-
- def _test_in_operator(self):
- self.cache.add(1)
-
- self.assertIn(1, self.cache)
-
- time.sleep(0.5)
-
- self.cache.add(2)
-
- self.assertIn(1, self.cache)
- self.assertIn(2, self.cache)
-
- time.sleep(0.75)
-
- self.cache.add(3)
-
- self.assertNotIn(1, self.cache)
- self.assertIn(2, self.cache)
- self.assertIn(3, self.cache)
-
- time.sleep(0.5)
-
- self.assertNotIn(2, self.cache)
- self.assertIn(3, self.cache)
-
- def test_in_operator_with_executor(self):
- self._test_in_operator()
-
- def test_in_operator_without_executor(self):
- self.cache._executor.stop()
- self._test_in_operator()
-
- def _is_expired(self, key):
- with self.cache._lock:
- _, expiration_time = self.cache._cache[key]
- return self.cache._is_expired(expiration_time, time.time())
-
- def test_executor(self):
- self.cache.add(1)
-
- self.assertEqual([1], sorted(self.cache._cache.keys()))
- self.assertFalse(self._is_expired(1))
-
- time.sleep(0.75)
-
- self.assertEqual(1, self.cache._update_cache.call_count)
-
- self.cache.add(2)
-
- self.assertEqual([1, 2], sorted(self.cache._cache.keys()))
- self.assertFalse(self._is_expired(1))
- self.assertFalse(self._is_expired(2))
-
- time.sleep(0.75)
-
- self.assertEqual(2, self.cache._update_cache.call_count)
-
- self.cache.add(3)
-
- if 1 in self.cache:
- self.assertEqual([1, 2, 3], sorted(self.cache._cache.keys()))
- self.assertTrue(self._is_expired(1))
- else:
- self.assertEqual([2, 3], sorted(self.cache._cache.keys()))
- self.assertFalse(self._is_expired(2))
- self.assertFalse(self._is_expired(3))
-
- time.sleep(0.75)
-
- self.assertEqual(3, self.cache._update_cache.call_count)
-
- self.assertEqual([3], sorted(self.cache._cache.keys()))
- self.assertFalse(self._is_expired(3))
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_version.py b/oslo_messaging/tests/drivers/zmq/test_zmq_version.py
deleted file mode 100644
index 9b01894..0000000
--- a/oslo_messaging/tests/drivers/zmq/test_zmq_version.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging._drivers.zmq_driver import zmq_version
-from oslo_messaging.tests import utils as test_utils
-
-
-class Doer(object):
-
- def __init__(self):
- self.x = 1
- self.y = 2
- self.z = 3
-
- def _sudo(self):
- pass
-
- def do(self):
- pass
-
- def _do_v_1_1(self):
- pass
-
- def _do_v_2_2(self):
- pass
-
- def _do_v_3_3(self):
- pass
-
-
-class TestZmqVersion(test_utils.BaseTestCase):
-
- def setUp(self):
- super(TestZmqVersion, self).setUp()
- self.doer = Doer()
-
- def test_get_unknown_attr_versions(self):
- self.assertRaises(AssertionError, zmq_version.get_method_versions,
- self.doer, 'qwerty')
-
- def test_get_non_method_attr_versions(self):
- for attr_name in vars(self.doer):
- self.assertRaises(AssertionError, zmq_version.get_method_versions,
- self.doer, attr_name)
-
- def test_get_private_method_versions(self):
- self.assertRaises(AssertionError, zmq_version.get_method_versions,
- self.doer, '_sudo')
-
- def test_get_public_method_versions(self):
- do_versions = zmq_version.get_method_versions(self.doer, 'do')
- self.assertEqual(['1.1', '2.2', '3.3'], sorted(do_versions.keys()))
diff --git a/oslo_messaging/tests/drivers/zmq/zmq_common.py b/oslo_messaging/tests/drivers/zmq/zmq_common.py
deleted file mode 100644
index 2e36999..0000000
--- a/oslo_messaging/tests/drivers/zmq/zmq_common.py
+++ /dev/null
@@ -1,111 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import threading
-
-import fixtures
-from six.moves import mock
-import testtools
-
-import oslo_messaging
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_options
-from oslo_messaging._i18n import _LE
-from oslo_messaging.tests import utils as test_utils
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
-
-class TestServerListener(object):
-
- def __init__(self, driver):
- self.driver = driver
- self.listener = None
- self.executor = zmq_async.get_executor(self._run)
- self._stop = threading.Event()
- self._received = threading.Event()
- self.message = None
-
- def listen(self, target):
- self.listener = self.driver.listen(target, None,
- None)._poll_style_listener
- self.executor.execute()
-
- def listen_notifications(self, targets_and_priorities):
- self.listener = self.driver.listen_for_notifications(
- targets_and_priorities, None, None, None)._poll_style_listener
- self.executor.execute()
-
- def _run(self):
- try:
- messages = self.listener.poll()
- if messages:
- message = messages[0]
- message.acknowledge()
- self._received.set()
- self.message = message
- message.reply(reply=True)
- except Exception:
- LOG.exception(_LE("Unexpected exception occurred."))
-
- def stop(self):
- self.executor.stop()
-
-
-class ZmqBaseTestCase(test_utils.BaseTestCase):
- """Base test case for all ZMQ tests """
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(ZmqBaseTestCase, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- zmq_options.register_opts(self.conf, mock.MagicMock())
-
- # Set config values
- self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
- kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
- 'rpc_zmq_host': '127.0.0.1',
- 'rpc_zmq_ipc_dir': self.internal_ipc_dir,
- 'use_pub_sub': False,
- 'use_router_proxy': False,
- 'rpc_zmq_matchmaker': 'dummy'}
- self.config(group='oslo_messaging_zmq', **kwargs)
- self.config(rpc_response_timeout=5)
-
- # Get driver
- transport = oslo_messaging.get_transport(self.conf)
- self.driver = transport._driver
-
- self.listener = TestServerListener(self.driver)
-
- self.addCleanup(
- StopRpc(self, [('listener', 'stop'), ('driver', 'cleanup')])
- )
-
-
-class StopRpc(object):
- def __init__(self, obj, attrs_and_stops):
- self.obj = obj
- self.attrs_and_stops = attrs_and_stops
-
- def __call__(self):
- for attr, stop in self.attrs_and_stops:
- if hasattr(self.obj, attr):
- obj_attr = getattr(self.obj, attr)
- if hasattr(obj_attr, stop):
- obj_attr_stop = getattr(obj_attr, stop)
- obj_attr_stop()
diff --git a/oslo_messaging/tests/functional/zmq/__init__.py b/oslo_messaging/tests/functional/zmq/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/tests/functional/zmq/__init__.py
+++ /dev/null
diff --git a/oslo_messaging/tests/functional/zmq/multiproc_utils.py b/oslo_messaging/tests/functional/zmq/multiproc_utils.py
deleted file mode 100644
index cf3b6e3..0000000
--- a/oslo_messaging/tests/functional/zmq/multiproc_utils.py
+++ /dev/null
@@ -1,232 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import logging.handlers
-import multiprocessing
-import os
-import sys
-import threading
-import time
-import uuid
-
-from oslo_config import cfg
-
-import oslo_messaging
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging.tests.functional import utils
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
-
-class QueueHandler(logging.Handler):
- """This is a logging handler which sends events to a multiprocessing queue.
-
- The plan is to add it to Python 3.2, but this can be copy pasted into
- user code for use with earlier Python versions.
- """
-
- def __init__(self, queue):
- """Initialise an instance, using the passed queue."""
- logging.Handler.__init__(self)
- self.queue = queue
-
- def emit(self, record):
- """Emit a record.
-
- Writes the LogRecord to the queue.
- """
- try:
- ei = record.exc_info
- if ei:
- # just to get traceback text into record.exc_text
- dummy = self.format(record) # noqa
- record.exc_info = None # not needed any more
- self.queue.put_nowait(record)
- except (KeyboardInterrupt, SystemExit):
- raise
- except Exception:
- self.handleError(record)
-
-
-def listener_configurer(conf):
- root = logging.getLogger()
- h = logging.StreamHandler(sys.stdout)
- f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s '
- '%(levelname)-8s %(message)s')
- h.setFormatter(f)
- root.addHandler(h)
- log_path = conf.oslo_messaging_zmq.rpc_zmq_ipc_dir + \
- "/" + "zmq_multiproc.log"
- file_handler = logging.StreamHandler(open(log_path, 'w'))
- file_handler.setFormatter(f)
- root.addHandler(file_handler)
-
-
-def server_configurer(queue):
- h = QueueHandler(queue)
- root = logging.getLogger()
- root.addHandler(h)
- root.setLevel(logging.DEBUG)
-
-
-def listener_thread(queue, configurer, conf):
- configurer(conf)
- while True:
- time.sleep(0.3)
- try:
- record = queue.get()
- if record is None:
- break
- logger = logging.getLogger(record.name)
- logger.handle(record)
- except (KeyboardInterrupt, SystemExit):
- raise
-
-
-class Client(oslo_messaging.RPCClient):
-
- def __init__(self, transport, topic):
- super(Client, self).__init__(
- transport=transport, target=oslo_messaging.Target(topic=topic))
- self.replies = []
-
- def call_a(self):
- LOG.warning("call_a - client side")
- rep = self.call({}, 'call_a')
- LOG.warning("after call_a - client side")
- self.replies.append(rep)
- return rep
-
-
-class ReplyServerEndpoint(object):
-
- def call_a(self, *args, **kwargs):
- LOG.warning("call_a - Server endpoint reached!")
- return "OK"
-
-
-class Server(object):
-
- def __init__(self, conf, log_queue, transport_url, name, topic=None):
- self.conf = conf
- self.log_queue = log_queue
- self.transport_url = transport_url
- self.name = name
- self.topic = topic or str(uuid.uuid4())
- self.ready = multiprocessing.Value('b', False)
- self._stop = multiprocessing.Event()
-
- def start(self):
- self.process = multiprocessing.Process(target=self._run_server,
- name=self.name,
- args=(self.conf,
- self.transport_url,
- self.log_queue,
- self.ready))
- self.process.start()
- LOG.debug("Server process started: pid: %d", self.process.pid)
-
- def _run_server(self, conf, url, log_queue, ready):
- server_configurer(log_queue)
- LOG.debug("Starting RPC server")
-
- transport = oslo_messaging.get_transport(conf, url=url)
- target = oslo_messaging.Target(topic=self.topic, server=self.name)
- self.rpc_server = oslo_messaging.get_rpc_server(
- transport=transport, target=target,
- endpoints=[ReplyServerEndpoint()],
- executor='eventlet')
- self.rpc_server.start()
- ready.value = True
- LOG.debug("RPC server being started")
- while not self._stop.is_set():
- LOG.debug("Waiting for the stop signal ...")
- time.sleep(1)
- self.rpc_server.stop()
- self.rpc_server.wait()
- LOG.debug("Leaving process T:%s Pid:%d", str(target), os.getpid())
-
- def cleanup(self):
- LOG.debug("Stopping server")
- self.shutdown()
-
- def shutdown(self):
- self._stop.set()
-
- def restart(self, time_for_restart=1):
- pass
-
- def hang(self):
- pass
-
- def crash(self):
- pass
-
- def ping(self):
- pass
-
-
-class MultiprocTestCase(utils.SkipIfNoTransportURL):
-
- def setUp(self):
- super(MultiprocTestCase, self).setUp(conf=cfg.ConfigOpts())
-
- if not self.url.startswith("zmq"):
- self.skipTest("ZeroMQ specific skipped...")
-
- self.transport = oslo_messaging.get_transport(self.conf, url=self.url)
-
- LOG.debug("Start log queue")
-
- self.log_queue = multiprocessing.Queue()
- self.log_listener = threading.Thread(target=listener_thread,
- args=(self.log_queue,
- listener_configurer,
- self.conf))
- self.log_listener.start()
- self.spawned = []
-
- self.conf.prog = "test_prog"
- self.conf.project = "test_project"
-
- def tearDown(self):
- for process in self.spawned:
- process.cleanup()
- super(MultiprocTestCase, self).tearDown()
-
- def get_client(self, topic):
- return Client(self.transport, topic)
-
- def spawn_server(self, wait_for_server=False, topic=None):
- name = "server_%d_%s" % (len(self.spawned), str(uuid.uuid4())[:8])
- server = Server(self.conf, self.log_queue, self.url, name, topic)
- LOG.debug("[SPAWN] %s (starting)...", server.name)
- server.start()
- if wait_for_server:
- while not server.ready.value:
- LOG.debug("[SPAWN] %s (waiting for server ready)...",
- server.name)
- time.sleep(1)
- LOG.debug("[SPAWN] Server %s:%d started.",
- server.name, server.process.pid)
- self.spawned.append(server)
- return server
-
- def spawn_servers(self, number, wait_for_server=False, common_topic=True):
- topic = str(uuid.uuid4()) if common_topic else None
- for _ in range(number):
- self.spawn_server(wait_for_server, topic)
diff --git a/oslo_messaging/tests/functional/zmq/test_startup.py b/oslo_messaging/tests/functional/zmq/test_startup.py
deleted file mode 100644
index ebea76e..0000000
--- a/oslo_messaging/tests/functional/zmq/test_startup.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import os
-import sys
-
-from oslo_messaging.tests.functional.zmq import multiproc_utils
-
-
-class StartupOrderTestCase(multiproc_utils.MultiprocTestCase):
-
- def setUp(self):
- super(StartupOrderTestCase, self).setUp()
-
- self.conf.prog = "test_prog"
- self.conf.project = "test_project"
-
- self.config(rpc_response_timeout=10)
-
- log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir,
- str(os.getpid()) + ".log")
- sys.stdout = open(log_path, "wb", buffering=0)
-
- def test_call_client_wait_for_server(self):
- server = self.spawn_server(wait_for_server=True)
- client = self.get_client(server.topic)
- for _ in range(3):
- reply = client.call_a()
- self.assertIsNotNone(reply)
- self.assertEqual(3, len(client.replies))
-
- def test_call_client_dont_wait_for_server(self):
- server = self.spawn_server(wait_for_server=False)
- client = self.get_client(server.topic)
- for _ in range(3):
- reply = client.call_a()
- self.assertIsNotNone(reply)
- self.assertEqual(3, len(client.replies))