summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDoug Hellmann <doug@doughellmann.com>2015-04-28 12:18:34 +0000
committerVictor Sergeyev <vsergeyev@mirantis.com>2015-07-29 11:17:57 +0300
commitc90525bfead1f495df86c5c5d795d25abad2e1d9 (patch)
tree8d81ad551288a3c37a1e6f8c71235820ffac43ec
parent99b24b3888233656779a5baec1011554174604d7 (diff)
downloadoslo-messaging-c90525bfead1f495df86c5c5d795d25abad2e1d9.tar.gz
Remove oslo namespace package
Blueprint remove-namespace-packages Cherry-picked from: 03265410e0294e176d18dd42b57268a8056eb8fc Change-Id: Ibaba19ef10b4902c4f4f9fbdf7078e66b75f2035
-rw-r--r--oslo/__init__.py16
-rw-r--r--oslo/messaging/__init__.py38
-rw-r--r--oslo/messaging/_drivers/__init__.py0
-rw-r--r--oslo/messaging/_drivers/common.py16
-rw-r--r--oslo/messaging/_executors/__init__.py0
-rw-r--r--oslo/messaging/_executors/base.py17
-rw-r--r--oslo/messaging/conffixture.py13
-rw-r--r--oslo/messaging/exceptions.py13
-rw-r--r--oslo/messaging/localcontext.py13
-rw-r--r--oslo/messaging/notify/__init__.py28
-rw-r--r--oslo/messaging/notify/dispatcher.py13
-rw-r--r--oslo/messaging/notify/listener.py13
-rw-r--r--oslo/messaging/notify/log_handler.py13
-rw-r--r--oslo/messaging/notify/logger.py13
-rw-r--r--oslo/messaging/notify/middleware.py13
-rw-r--r--oslo/messaging/notify/notifier.py13
-rw-r--r--oslo/messaging/rpc/__init__.py32
-rw-r--r--oslo/messaging/rpc/client.py13
-rw-r--r--oslo/messaging/rpc/dispatcher.py13
-rw-r--r--oslo/messaging/rpc/server.py13
-rw-r--r--oslo/messaging/serializer.py13
-rw-r--r--oslo/messaging/server.py13
-rw-r--r--oslo/messaging/target.py13
-rw-r--r--oslo/messaging/transport.py13
-rw-r--r--setup.cfg4
-rw-r--r--tests/__init__.py26
-rw-r--r--tests/drivers/__init__.py0
-rw-r--r--tests/drivers/test_impl_qpid.py850
-rw-r--r--tests/drivers/test_impl_rabbit.py758
-rw-r--r--tests/drivers/test_impl_zmq.py228
-rw-r--r--tests/drivers/test_matchmaker.py69
-rw-r--r--tests/drivers/test_matchmaker_redis.py83
-rw-r--r--tests/drivers/test_matchmaker_ring.py73
-rw-r--r--tests/notify/__init__.py0
-rw-r--r--tests/notify/test_dispatcher.py171
-rw-r--r--tests/notify/test_listener.py411
-rw-r--r--tests/notify/test_log_handler.py57
-rw-r--r--tests/notify/test_logger.py157
-rw-r--r--tests/notify/test_middleware.py190
-rw-r--r--tests/notify/test_notifier.py540
-rw-r--r--tests/rpc/__init__.py0
-rw-r--r--tests/rpc/test_client.py519
-rw-r--r--tests/rpc/test_dispatcher.py178
-rw-r--r--tests/rpc/test_server.py503
-rw-r--r--tests/test_amqp_driver.py738
-rw-r--r--tests/test_exception_serialization.py308
-rw-r--r--tests/test_expected_exceptions.py66
-rw-r--r--tests/test_target.py177
-rw-r--r--tests/test_transport.py367
-rw-r--r--tests/test_urls.py236
-rw-r--r--tests/test_warning.py61
51 files changed, 0 insertions, 7125 deletions
diff --git a/oslo/__init__.py b/oslo/__init__.py
deleted file mode 100644
index 8feca65..0000000
--- a/oslo/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-
-# Copyright 2012 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-__import__('pkg_resources').declare_namespace(__name__)
diff --git a/oslo/messaging/__init__.py b/oslo/messaging/__init__.py
deleted file mode 100644
index 125c96a..0000000
--- a/oslo/messaging/__init__.py
+++ /dev/null
@@ -1,38 +0,0 @@
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import warnings
-
-from .exceptions import *
-from .localcontext import *
-from .notify import *
-from .rpc import *
-from .serializer import *
-from .server import *
-from .target import *
-from .transport import *
-
-
-def deprecated():
- new_name = __name__.replace('.', '_')
- warnings.warn(
- ('The oslo namespace package is deprecated. Please use %s instead.' %
- new_name),
- DeprecationWarning,
- stacklevel=3,
- )
-
-
-deprecated()
diff --git a/oslo/messaging/_drivers/__init__.py b/oslo/messaging/_drivers/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/oslo/messaging/_drivers/__init__.py
+++ /dev/null
diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py
deleted file mode 100644
index 12f8b09..0000000
--- a/oslo/messaging/_drivers/common.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# TODO(dhellmann): This private package and these imports can be
-# removed after heat fixes their tests. See
-# https://bugs.launchpad.net/oslo.messaging/+bug/1410196.
-from oslo_messaging._drivers.common import * # noqa
diff --git a/oslo/messaging/_executors/__init__.py b/oslo/messaging/_executors/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/oslo/messaging/_executors/__init__.py
+++ /dev/null
diff --git a/oslo/messaging/_executors/base.py b/oslo/messaging/_executors/base.py
deleted file mode 100644
index 01dfc06..0000000
--- a/oslo/messaging/_executors/base.py
+++ /dev/null
@@ -1,17 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging._executors.base import * # noqa
-
-# FIXME(dhellmann): Provide a dummy value so the mock in nova's unit
-# test fixture works. See bug #1412841
-POLL_TIMEOUT = 0.1
diff --git a/oslo/messaging/conffixture.py b/oslo/messaging/conffixture.py
deleted file mode 100644
index 8b4be93..0000000
--- a/oslo/messaging/conffixture.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.conffixture import * # noqa
diff --git a/oslo/messaging/exceptions.py b/oslo/messaging/exceptions.py
deleted file mode 100644
index 4708d87..0000000
--- a/oslo/messaging/exceptions.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.exceptions import * # noqa
diff --git a/oslo/messaging/localcontext.py b/oslo/messaging/localcontext.py
deleted file mode 100644
index 0b24f7f..0000000
--- a/oslo/messaging/localcontext.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.localcontext import * # noqa
diff --git a/oslo/messaging/notify/__init__.py b/oslo/messaging/notify/__init__.py
deleted file mode 100644
index 9de8331..0000000
--- a/oslo/messaging/notify/__init__.py
+++ /dev/null
@@ -1,28 +0,0 @@
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-__all__ = ['Notifier',
- 'LoggingNotificationHandler',
- 'get_notification_listener',
- 'NotificationResult',
- 'PublishErrorsHandler',
- 'LoggingErrorNotificationHandler']
-
-from .notifier import *
-from .listener import *
-from .log_handler import *
-from .logger import *
-from .dispatcher import NotificationResult
-from oslo_messaging.notify import _impl_test
diff --git a/oslo/messaging/notify/dispatcher.py b/oslo/messaging/notify/dispatcher.py
deleted file mode 100644
index d472674..0000000
--- a/oslo/messaging/notify/dispatcher.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.notify.dispatcher import * # noqa
diff --git a/oslo/messaging/notify/listener.py b/oslo/messaging/notify/listener.py
deleted file mode 100644
index 0e73924..0000000
--- a/oslo/messaging/notify/listener.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.notify.listener import * # noqa
diff --git a/oslo/messaging/notify/log_handler.py b/oslo/messaging/notify/log_handler.py
deleted file mode 100644
index 3ee75a0..0000000
--- a/oslo/messaging/notify/log_handler.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.notify.log_handler import * # noqa
diff --git a/oslo/messaging/notify/logger.py b/oslo/messaging/notify/logger.py
deleted file mode 100644
index f32a424..0000000
--- a/oslo/messaging/notify/logger.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.notify.logger import * # noqa
diff --git a/oslo/messaging/notify/middleware.py b/oslo/messaging/notify/middleware.py
deleted file mode 100644
index 992b65b..0000000
--- a/oslo/messaging/notify/middleware.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.notify.middleware import * # noqa
diff --git a/oslo/messaging/notify/notifier.py b/oslo/messaging/notify/notifier.py
deleted file mode 100644
index 0d23eb0..0000000
--- a/oslo/messaging/notify/notifier.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.notify.notifier import * # noqa
diff --git a/oslo/messaging/rpc/__init__.py b/oslo/messaging/rpc/__init__.py
deleted file mode 100644
index f9cc881..0000000
--- a/oslo/messaging/rpc/__init__.py
+++ /dev/null
@@ -1,32 +0,0 @@
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-__all__ = [
- 'ClientSendError',
- 'ExpectedException',
- 'NoSuchMethod',
- 'RPCClient',
- 'RPCDispatcher',
- 'RPCDispatcherError',
- 'RPCVersionCapError',
- 'RemoteError',
- 'UnsupportedVersion',
- 'expected_exceptions',
- 'get_rpc_server',
-]
-
-from .client import *
-from .dispatcher import *
-from .server import *
diff --git a/oslo/messaging/rpc/client.py b/oslo/messaging/rpc/client.py
deleted file mode 100644
index c625ba2..0000000
--- a/oslo/messaging/rpc/client.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.rpc.client import * # noqa
diff --git a/oslo/messaging/rpc/dispatcher.py b/oslo/messaging/rpc/dispatcher.py
deleted file mode 100644
index 0cf3871..0000000
--- a/oslo/messaging/rpc/dispatcher.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.rpc.dispatcher import * # noqa
diff --git a/oslo/messaging/rpc/server.py b/oslo/messaging/rpc/server.py
deleted file mode 100644
index c297fd1..0000000
--- a/oslo/messaging/rpc/server.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.rpc.server import * # noqa
diff --git a/oslo/messaging/serializer.py b/oslo/messaging/serializer.py
deleted file mode 100644
index b7b9b3f..0000000
--- a/oslo/messaging/serializer.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.serializer import * # noqa
diff --git a/oslo/messaging/server.py b/oslo/messaging/server.py
deleted file mode 100644
index 517f9ab..0000000
--- a/oslo/messaging/server.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.server import * # noqa
diff --git a/oslo/messaging/target.py b/oslo/messaging/target.py
deleted file mode 100644
index 2f521a1..0000000
--- a/oslo/messaging/target.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.target import * # noqa
diff --git a/oslo/messaging/transport.py b/oslo/messaging/transport.py
deleted file mode 100644
index a10dfe4..0000000
--- a/oslo/messaging/transport.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.transport import * # noqa
diff --git a/setup.cfg b/setup.cfg
index ce73f1a..e6a9352 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -19,11 +19,7 @@ classifier =
[files]
packages =
- oslo
- oslo.messaging
oslo_messaging
-namespace_packages =
- oslo
[entry_points]
console_scripts =
diff --git a/tests/__init__.py b/tests/__init__.py
deleted file mode 100644
index 0222c4e..0000000
--- a/tests/__init__.py
+++ /dev/null
@@ -1,26 +0,0 @@
-# Copyright 2014 eNovance
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# Import oslotest before importing test submodules to setup six.moves for mock
-import oslotest
-
-try:
- import eventlet
-except ImportError:
- pass
-else:
- # Ensure that eventlet monkey patching is enabled before loading the qpid
- # module, otherwise qpid will hang
- eventlet.monkey_patch()
diff --git a/tests/drivers/__init__.py b/tests/drivers/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/tests/drivers/__init__.py
+++ /dev/null
diff --git a/tests/drivers/test_impl_qpid.py b/tests/drivers/test_impl_qpid.py
deleted file mode 100644
index ae4d806..0000000
--- a/tests/drivers/test_impl_qpid.py
+++ /dev/null
@@ -1,850 +0,0 @@
-# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import operator
-import random
-import threading
-import time
-
-try:
- import qpid
-except ImportError:
- qpid = None
-from six.moves import _thread
-import testscenarios
-import testtools
-
-from oslo import messaging
-from oslo_messaging._drivers import amqp
-from oslo_messaging._drivers import impl_qpid as qpid_driver
-from oslo_messaging.tests import utils as test_utils
-from six.moves import mock
-
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-QPID_BROKER = 'localhost:5672'
-
-
-class TestQpidDriverLoad(test_utils.BaseTestCase):
-
- def setUp(self):
- super(TestQpidDriverLoad, self).setUp()
- self.messaging_conf.transport_driver = 'qpid'
-
- def test_driver_load(self):
- transport = messaging.get_transport(self.conf)
- self.assertIsInstance(transport._driver, qpid_driver.QpidDriver)
-
-
-def _is_qpidd_service_running():
-
- """this function checks if the qpid service is running or not."""
-
- qpid_running = True
- try:
- broker = QPID_BROKER
- connection = qpid.messaging.Connection(broker)
- connection.open()
- except Exception:
- # qpid service is not running.
- qpid_running = False
- else:
- connection.close()
-
- return qpid_running
-
-
-class _QpidBaseTestCase(test_utils.BaseTestCase):
-
- @testtools.skipIf(qpid is None, "qpid not available")
- def setUp(self):
- super(_QpidBaseTestCase, self).setUp()
- self.messaging_conf.transport_driver = 'qpid'
- self.fake_qpid = not _is_qpidd_service_running()
-
- if self.fake_qpid:
- self.session_receive = get_fake_qpid_session()
- self.session_send = get_fake_qpid_session()
- else:
- self.broker = QPID_BROKER
- # create connection from the qpid.messaging
- # connection for the Consumer.
- self.con_receive = qpid.messaging.Connection(self.broker)
- self.con_receive.open()
- # session to receive the messages
- self.session_receive = self.con_receive.session()
-
- # connection for sending the message
- self.con_send = qpid.messaging.Connection(self.broker)
- self.con_send.open()
- # session to send the messages
- self.session_send = self.con_send.session()
-
- # list to store the expected messages and
- # the actual received messages
- self._expected = []
- self._messages = []
- self.initialized = True
-
- def tearDown(self):
- super(_QpidBaseTestCase, self).tearDown()
-
- if self.initialized:
- if self.fake_qpid:
- _fake_session.flush_exchanges()
- else:
- self.con_receive.close()
- self.con_send.close()
-
-
-class TestQpidTransportURL(_QpidBaseTestCase):
-
- scenarios = [
- ('none', dict(url=None,
- expected=[dict(host='localhost:5672',
- username='',
- password='')])),
- ('empty',
- dict(url='qpid:///',
- expected=[dict(host='localhost:5672',
- username='',
- password='')])),
- ('localhost',
- dict(url='qpid://localhost/',
- expected=[dict(host='localhost',
- username='',
- password='')])),
- ('no_creds',
- dict(url='qpid://host/',
- expected=[dict(host='host',
- username='',
- password='')])),
- ('no_port',
- dict(url='qpid://user:password@host/',
- expected=[dict(host='host',
- username='user',
- password='password')])),
- ('full_url',
- dict(url='qpid://user:password@host:10/',
- expected=[dict(host='host:10',
- username='user',
- password='password')])),
- ('full_two_url',
- dict(url='qpid://user:password@host:10,'
- 'user2:password2@host2:12/',
- expected=[dict(host='host:10',
- username='user',
- password='password'),
- dict(host='host2:12',
- username='user2',
- password='password2')
- ]
- )),
-
- ]
-
- @mock.patch.object(qpid_driver.Connection, 'reconnect')
- def test_transport_url(self, *args):
- transport = messaging.get_transport(self.conf, self.url)
- self.addCleanup(transport.cleanup)
- driver = transport._driver
-
- brokers_params = driver._get_connection().brokers_params
- self.assertEqual(sorted(self.expected,
- key=operator.itemgetter('host')),
- sorted(brokers_params,
- key=operator.itemgetter('host')))
-
-
-class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
- """Unit test cases to test invalid qpid topology version."""
-
- scenarios = [
- ('direct', dict(consumer_cls=qpid_driver.DirectConsumer,
- consumer_kwargs={},
- publisher_cls=qpid_driver.DirectPublisher,
- publisher_kwargs={})),
- ('topic', dict(consumer_cls=qpid_driver.TopicConsumer,
- consumer_kwargs={'exchange_name': 'openstack'},
- publisher_cls=qpid_driver.TopicPublisher,
- publisher_kwargs={'exchange_name': 'openstack'})),
- ('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer,
- consumer_kwargs={},
- publisher_cls=qpid_driver.FanoutPublisher,
- publisher_kwargs={})),
- ]
-
- def setUp(self):
- super(TestQpidInvalidTopologyVersion, self).setUp()
- self.config(qpid_topology_version=-1,
- group='oslo_messaging_qpid')
-
- def test_invalid_topology_version(self):
- def consumer_callback(msg):
- pass
-
- msgid_or_topic = 'test'
-
- # not using self.assertRaises because
- # 1. qpid driver raises Exception(msg) for invalid topology version
- # 2. flake8 - H202 assertRaises Exception too broad
- exception_msg = ("Invalid value for qpid_topology_version: %d" %
- self.conf.oslo_messaging_qpid.qpid_topology_version)
- recvd_exc_msg = ''
-
- try:
- self.consumer_cls(self.conf.oslo_messaging_qpid,
- self.session_receive,
- msgid_or_topic,
- consumer_callback,
- **self.consumer_kwargs)
- except Exception as e:
- recvd_exc_msg = e.message
-
- self.assertEqual(exception_msg, recvd_exc_msg)
-
- recvd_exc_msg = ''
- try:
- self.publisher_cls(self.conf.oslo_messaging_qpid,
- self.session_send,
- topic=msgid_or_topic,
- **self.publisher_kwargs)
- except Exception as e:
- recvd_exc_msg = e.message
-
- self.assertEqual(exception_msg, recvd_exc_msg)
-
-
-class TestQpidDirectConsumerPublisher(_QpidBaseTestCase):
- """Unit test cases to test DirectConsumer and Direct Publisher."""
-
- _n_qpid_topology = [
- ('v1', dict(qpid_topology=1)),
- ('v2', dict(qpid_topology=2)),
- ]
-
- _n_msgs = [
- ('single', dict(no_msgs=1)),
- ('multiple', dict(no_msgs=10)),
- ]
-
- @classmethod
- def generate_scenarios(cls):
- cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology,
- cls._n_msgs)
-
- def consumer_callback(self, msg):
- # This function will be called by the DirectConsumer
- # when any message is received.
- # Append the received message into the messages list
- # so that the received messages can be validated
- # with the expected messages
- if isinstance(msg, dict):
- self._messages.append(msg['content'])
- else:
- self._messages.append(msg)
-
- def test_qpid_direct_consumer_producer(self):
- self.msgid = str(random.randint(1, 100))
-
- # create a DirectConsumer and DirectPublisher class objects
- self.dir_cons = qpid_driver.DirectConsumer(
- self.conf.oslo_messaging_qpid,
- self.session_receive,
- self.msgid,
- self.consumer_callback)
- self.dir_pub = qpid_driver.DirectPublisher(
- self.conf.oslo_messaging_qpid,
- self.session_send,
- self.msgid)
-
- def try_send_msg(no_msgs):
- for i in range(no_msgs):
- self._expected.append(str(i))
- snd_msg = {'content_type': 'text/plain', 'content': str(i)}
- self.dir_pub.send(snd_msg)
-
- def try_receive_msg(no_msgs):
- for i in range(no_msgs):
- self.dir_cons.consume()
-
- thread1 = threading.Thread(target=try_receive_msg,
- args=(self.no_msgs,))
- thread2 = threading.Thread(target=try_send_msg,
- args=(self.no_msgs,))
-
- thread1.start()
- thread2.start()
- thread1.join()
- thread2.join()
-
- self.assertEqual(self.no_msgs, len(self._messages))
- self.assertEqual(self._expected, self._messages)
-
-
-TestQpidDirectConsumerPublisher.generate_scenarios()
-
-
-class TestQpidTopicAndFanout(_QpidBaseTestCase):
- """Unit Test cases to test TopicConsumer and
- TopicPublisher classes of the qpid driver
- and FanoutConsumer and FanoutPublisher classes
- of the qpid driver
- """
-
- _n_qpid_topology = [
- ('v1', dict(qpid_topology=1)),
- ('v2', dict(qpid_topology=2)),
- ]
-
- _n_msgs = [
- ('single', dict(no_msgs=1)),
- ('multiple', dict(no_msgs=10)),
- ]
-
- _n_senders = [
- ('single', dict(no_senders=1)),
- ('multiple', dict(no_senders=10)),
- ]
-
- _n_receivers = [
- ('single', dict(no_receivers=1)),
- ]
- _exchange_class = [
- ('topic', dict(consumer_cls=qpid_driver.TopicConsumer,
- consumer_kwargs={'exchange_name': 'openstack'},
- publisher_cls=qpid_driver.TopicPublisher,
- publisher_kwargs={'exchange_name': 'openstack'},
- topic='topictest.test',
- receive_topic='topictest.test')),
- ('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer,
- consumer_kwargs={},
- publisher_cls=qpid_driver.FanoutPublisher,
- publisher_kwargs={},
- topic='fanouttest',
- receive_topic='fanouttest')),
- ]
-
- @classmethod
- def generate_scenarios(cls):
- cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology,
- cls._n_msgs,
- cls._n_senders,
- cls._n_receivers,
- cls._exchange_class)
-
- def setUp(self):
- super(TestQpidTopicAndFanout, self).setUp()
-
- # to store the expected messages and the
- # actual received messages
- #
- # NOTE(dhellmann): These are dicts, where the base class uses
- # lists.
- self._expected = {}
- self._messages = {}
-
- self._senders = []
- self._receivers = []
-
- self._sender_threads = []
- self._receiver_threads = []
-
- def consumer_callback(self, msg):
- """callback function called by the ConsumerBase class of
- qpid driver.
- Message will be received in the format x-y
- where x is the sender id and y is the msg number of the sender
- extract the sender id 'x' and store the msg 'x-y' with 'x' as
- the key
- """
-
- if isinstance(msg, dict):
- msgcontent = msg['content']
- else:
- msgcontent = msg
-
- splitmsg = msgcontent.split('-')
- key = _thread.get_ident()
-
- if key not in self._messages:
- self._messages[key] = dict()
-
- tdict = self._messages[key]
-
- if splitmsg[0] not in tdict:
- tdict[splitmsg[0]] = []
-
- tdict[splitmsg[0]].append(msgcontent)
-
- def _try_send_msg(self, sender_id, no_msgs):
- for i in range(no_msgs):
- sendmsg = '%s-%s' % (str(sender_id), str(i))
- key = str(sender_id)
- # Store the message in the self._expected for each sender.
- # This will be used later to
- # validate the test by comparing it with the
- # received messages by all the receivers
- if key not in self._expected:
- self._expected[key] = []
- self._expected[key].append(sendmsg)
- send_dict = {'content_type': 'text/plain', 'content': sendmsg}
- self._senders[sender_id].send(send_dict)
-
- def _try_receive_msg(self, receiver_id, no_msgs):
- for i in range(self.no_senders * no_msgs):
- no_of_attempts = 0
-
- # ConsumerBase.consume blocks indefinitely until a message
- # is received.
- # So qpid_receiver.available() is called before calling
- # ConsumerBase.consume() so that we are not
- # blocked indefinitely
- qpid_receiver = self._receivers[receiver_id].get_receiver()
- while no_of_attempts < 50:
- if qpid_receiver.available() > 0:
- self._receivers[receiver_id].consume()
- break
- no_of_attempts += 1
- time.sleep(0.05)
-
- def test_qpid_topic_and_fanout(self):
- for receiver_id in range(self.no_receivers):
- consumer = self.consumer_cls(self.conf.oslo_messaging_qpid,
- self.session_receive,
- self.receive_topic,
- self.consumer_callback,
- **self.consumer_kwargs)
- self._receivers.append(consumer)
-
- # create receivers threads
- thread = threading.Thread(target=self._try_receive_msg,
- args=(receiver_id, self.no_msgs,))
- self._receiver_threads.append(thread)
-
- for sender_id in range(self.no_senders):
- publisher = self.publisher_cls(self.conf.oslo_messaging_qpid,
- self.session_send,
- topic=self.topic,
- **self.publisher_kwargs)
- self._senders.append(publisher)
-
- # create sender threads
- thread = threading.Thread(target=self._try_send_msg,
- args=(sender_id, self.no_msgs,))
- self._sender_threads.append(thread)
-
- for thread in self._receiver_threads:
- thread.start()
-
- for thread in self._sender_threads:
- thread.start()
-
- for thread in self._receiver_threads:
- thread.join()
-
- for thread in self._sender_threads:
- thread.join()
-
- # Each receiver should receive all the messages sent by
- # the sender(s).
- # So, Iterate through each of the receiver items in
- # self._messages and compare with the expected messages
- # messages.
-
- self.assertEqual(self.no_senders, len(self._expected))
- self.assertEqual(self.no_receivers, len(self._messages))
-
- for key, messages in self._messages.iteritems():
- self.assertEqual(self._expected, messages)
-
-TestQpidTopicAndFanout.generate_scenarios()
-
-
-class AddressNodeMatcher(object):
- def __init__(self, node):
- self.node = node
-
- def __eq__(self, address):
- return address.split(';')[0].strip() == self.node
-
-
-class TestDriverInterface(_QpidBaseTestCase):
- """Unit Test cases to test the amqpdriver with qpid
- """
-
- def setUp(self):
- super(TestDriverInterface, self).setUp()
- self.config(qpid_topology_version=2,
- group='oslo_messaging_qpid')
- transport = messaging.get_transport(self.conf)
- self.driver = transport._driver
-
- original_get_connection = self.driver._get_connection
- p = mock.patch.object(self.driver, '_get_connection',
- side_effect=lambda pooled=True:
- original_get_connection(False))
- p.start()
- self.addCleanup(p.stop)
-
- def test_listen_and_direct_send(self):
- target = messaging.Target(exchange="exchange_test",
- topic="topic_test",
- server="server_test")
-
- with mock.patch('qpid.messaging.Connection') as conn_cls:
- conn = conn_cls.return_value
- session = conn.session.return_value
- session.receiver.side_effect = [mock.Mock(), mock.Mock(),
- mock.Mock()]
-
- listener = self.driver.listen(target)
- listener.conn.direct_send("msg_id", {})
-
- self.assertEqual(3, len(listener.conn.consumers))
-
- expected_calls = [
- mock.call(AddressNodeMatcher(
- 'amq.topic/topic/exchange_test/topic_test')),
- mock.call(AddressNodeMatcher(
- 'amq.topic/topic/exchange_test/topic_test.server_test')),
- mock.call(AddressNodeMatcher('amq.topic/fanout/topic_test')),
- ]
- session.receiver.assert_has_calls(expected_calls)
- session.sender.assert_called_with(
- AddressNodeMatcher("amq.direct/msg_id"))
-
- def test_send(self):
- target = messaging.Target(exchange="exchange_test",
- topic="topic_test",
- server="server_test")
- with mock.patch('qpid.messaging.Connection') as conn_cls:
- conn = conn_cls.return_value
- session = conn.session.return_value
-
- self.driver.send(target, {}, {})
- session.sender.assert_called_with(AddressNodeMatcher(
- "amq.topic/topic/exchange_test/topic_test.server_test"))
-
- def test_send_notification(self):
- target = messaging.Target(exchange="exchange_test",
- topic="topic_test.info")
- with mock.patch('qpid.messaging.Connection') as conn_cls:
- conn = conn_cls.return_value
- session = conn.session.return_value
-
- self.driver.send_notification(target, {}, {}, "2.0")
- session.sender.assert_called_with(AddressNodeMatcher(
- "amq.topic/topic/exchange_test/topic_test.info"))
-
-
-class TestQpidReconnectOrder(test_utils.BaseTestCase):
- """Unit Test cases to test reconnection
- """
-
- @testtools.skipIf(qpid is None, "qpid not available")
- def test_reconnect_order(self):
- brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
- brokers_count = len(brokers)
-
- self.config(qpid_hosts=brokers,
- group='oslo_messaging_qpid')
-
- with mock.patch('qpid.messaging.Connection') as conn_mock:
- # starting from the first broker in the list
- url = messaging.TransportURL.parse(self.conf, None)
- connection = qpid_driver.Connection(self.conf, url,
- amqp.PURPOSE_SEND)
-
- # reconnect will advance to the next broker, one broker per
- # attempt, and then wrap to the start of the list once the end is
- # reached
- for _ in range(brokers_count):
- connection.reconnect()
-
- expected = []
- for broker in brokers:
- expected.extend([mock.call("%s:5672" % broker),
- mock.call().open(),
- mock.call().session(),
- mock.call().opened(),
- mock.call().opened().__nonzero__(),
- mock.call().close()])
-
- conn_mock.assert_has_calls(expected, any_order=True)
-
-
-def synchronized(func):
- func.__lock__ = threading.Lock()
-
- def synced_func(*args, **kws):
- with func.__lock__:
- return func(*args, **kws)
-
- return synced_func
-
-
-class FakeQpidMsgManager(object):
- def __init__(self):
- self._exchanges = {}
-
- @synchronized
- def add_exchange(self, exchange):
- if exchange not in self._exchanges:
- self._exchanges[exchange] = {'msgs': [], 'consumers': {}}
-
- @synchronized
- def add_exchange_consumer(self, exchange, consumer_id):
- exchange_info = self._exchanges[exchange]
- cons_dict = exchange_info['consumers']
- cons_dict[consumer_id] = 0
-
- @synchronized
- def add_exchange_msg(self, exchange, msg):
- exchange_info = self._exchanges[exchange]
- exchange_info['msgs'].append(msg)
-
- def get_exchange_msg(self, exchange, index):
- exchange_info = self._exchanges[exchange]
- return exchange_info['msgs'][index]
-
- def get_no_exch_msgs(self, exchange):
- exchange_info = self._exchanges[exchange]
- return len(exchange_info['msgs'])
-
- def get_exch_cons_index(self, exchange, consumer_id):
- exchange_info = self._exchanges[exchange]
- cons_dict = exchange_info['consumers']
- return cons_dict[consumer_id]
-
- @synchronized
- def inc_consumer_index(self, exchange, consumer_id):
- exchange_info = self._exchanges[exchange]
- cons_dict = exchange_info['consumers']
- cons_dict[consumer_id] += 1
-
-_fake_qpid_msg_manager = FakeQpidMsgManager()
-
-
-class FakeQpidSessionSender(object):
- def __init__(self, session, id, target, options):
- self.session = session
- self.id = id
- self.target = target
- self.options = options
-
- @synchronized
- def send(self, object, sync=True, timeout=None):
- _fake_qpid_msg_manager.add_exchange_msg(self.target, object)
-
- def close(self, timeout=None):
- pass
-
-
-class FakeQpidSessionReceiver(object):
-
- def __init__(self, session, id, source, options):
- self.session = session
- self.id = id
- self.source = source
- self.options = options
-
- @synchronized
- def fetch(self, timeout=None):
- if timeout is None:
- # if timeout is not given, take a default time out
- # of 30 seconds to avoid indefinite loop
- _timeout = 30
- else:
- _timeout = timeout
-
- deadline = time.time() + _timeout
- while time.time() <= deadline:
- index = _fake_qpid_msg_manager.get_exch_cons_index(self.source,
- self.id)
- try:
- msg = _fake_qpid_msg_manager.get_exchange_msg(self.source,
- index)
- except IndexError:
- pass
- else:
- _fake_qpid_msg_manager.inc_consumer_index(self.source,
- self.id)
- return qpid.messaging.Message(msg)
- time.sleep(0.050)
-
- if timeout is None:
- raise Exception('timed out waiting for reply')
-
- def close(self, timeout=None):
- pass
-
- @synchronized
- def available(self):
- no_msgs = _fake_qpid_msg_manager.get_no_exch_msgs(self.source)
- index = _fake_qpid_msg_manager.get_exch_cons_index(self.source,
- self.id)
- if no_msgs == 0 or index >= no_msgs:
- return 0
- else:
- return no_msgs - index
-
-
-class FakeQpidSession(object):
-
- def __init__(self, connection=None, name=None, transactional=None):
- self.connection = connection
- self.name = name
- self.transactional = transactional
- self._receivers = {}
- self.conf = None
- self.url = None
- self._senders = {}
- self._sender_id = 0
- self._receiver_id = 0
-
- @synchronized
- def sender(self, target, **options):
- exchange_key = self._extract_exchange_key(target)
- _fake_qpid_msg_manager.add_exchange(exchange_key)
-
- sendobj = FakeQpidSessionSender(self, self._sender_id,
- exchange_key, options)
- self._senders[self._sender_id] = sendobj
- self._sender_id = self._sender_id + 1
- return sendobj
-
- @synchronized
- def receiver(self, source, **options):
- exchange_key = self._extract_exchange_key(source)
- _fake_qpid_msg_manager.add_exchange(exchange_key)
- recvobj = FakeQpidSessionReceiver(self, self._receiver_id,
- exchange_key, options)
- self._receivers[self._receiver_id] = recvobj
- _fake_qpid_msg_manager.add_exchange_consumer(exchange_key,
- self._receiver_id)
- self._receiver_id += 1
- return recvobj
-
- def acknowledge(self, message=None, disposition=None, sync=True):
- pass
-
- @synchronized
- def flush_exchanges(self):
- _fake_qpid_msg_manager._exchanges = {}
-
- def _extract_exchange_key(self, exchange_msg):
- """This function extracts a unique key for the exchange.
- This key is used in the dictionary as a 'key' for
- this exchange.
- Eg. if the exchange_msg (for qpid topology version 1)
- is 33/33 ; {"node": {"x-declare": {"auto-delete": true, ....
- then 33 is returned as the key.
- Eg 2. For topology v2, if the
- exchange_msg is - amq.direct/44 ; {"link": {"x-dec.......
- then 44 is returned
- """
- # first check for ';'
- semicolon_split = exchange_msg.split(';')
-
- # split the first item of semicolon_split with '/'
- slash_split = semicolon_split[0].split('/')
- # return the last element of the list as the key
- key = slash_split[-1]
- return key.strip()
-
- def close(self):
- pass
-
-_fake_session = FakeQpidSession()
-
-
-def get_fake_qpid_session():
- return _fake_session
-
-
-class QPidHATestCase(test_utils.BaseTestCase):
-
- @testtools.skipIf(qpid is None, "qpid not available")
- def setUp(self):
- super(QPidHATestCase, self).setUp()
- self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
-
- self.config(qpid_hosts=self.brokers,
- qpid_username=None,
- qpid_password=None,
- group='oslo_messaging_qpid')
-
- hostname_sets = set()
- self.info = {'attempt': 0,
- 'fail': False}
-
- def _connect(myself, broker):
- # do as little work that is enough to pass connection attempt
- myself.connection = mock.Mock()
- hostname = broker['host']
- self.assertNotIn(hostname, hostname_sets)
- hostname_sets.add(hostname)
-
- self.info['attempt'] += 1
- if self.info['fail']:
- raise qpid.messaging.exceptions.ConnectionError
-
- # just make sure connection instantiation does not fail with an
- # exception
- self.stubs.Set(qpid_driver.Connection, '_connect', _connect)
-
- # starting from the first broker in the list
- url = messaging.TransportURL.parse(self.conf, None)
- self.connection = qpid_driver.Connection(self.conf, url,
- amqp.PURPOSE_SEND)
- self.addCleanup(self.connection.close)
-
- self.info.update({'attempt': 0,
- 'fail': True})
- hostname_sets.clear()
-
- def test_reconnect_order(self):
- self.assertRaises(messaging.MessageDeliveryFailure,
- self.connection.reconnect,
- retry=len(self.brokers) - 1)
- self.assertEqual(len(self.brokers), self.info['attempt'])
-
- def test_ensure_four_retries(self):
- mock_callback = mock.Mock(
- side_effect=qpid.messaging.exceptions.ConnectionError)
- self.assertRaises(messaging.MessageDeliveryFailure,
- self.connection.ensure, None, mock_callback,
- retry=4)
- self.assertEqual(5, self.info['attempt'])
- self.assertEqual(1, mock_callback.call_count)
-
- def test_ensure_one_retry(self):
- mock_callback = mock.Mock(
- side_effect=qpid.messaging.exceptions.ConnectionError)
- self.assertRaises(messaging.MessageDeliveryFailure,
- self.connection.ensure, None, mock_callback,
- retry=1)
- self.assertEqual(2, self.info['attempt'])
- self.assertEqual(1, mock_callback.call_count)
-
- def test_ensure_no_retry(self):
- mock_callback = mock.Mock(
- side_effect=qpid.messaging.exceptions.ConnectionError)
- self.assertRaises(messaging.MessageDeliveryFailure,
- self.connection.ensure, None, mock_callback,
- retry=0)
- self.assertEqual(1, self.info['attempt'])
- self.assertEqual(1, mock_callback.call_count)
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
deleted file mode 100644
index b2da4a8..0000000
--- a/tests/drivers/test_impl_rabbit.py
+++ /dev/null
@@ -1,758 +0,0 @@
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import sys
-import threading
-import time
-import uuid
-
-import fixtures
-import kombu
-from oslotest import mockpatch
-import testscenarios
-
-from oslo import messaging
-from oslo_config import cfg
-from oslo_messaging._drivers import amqp
-from oslo_messaging._drivers import amqpdriver
-from oslo_messaging._drivers import common as driver_common
-from oslo_messaging._drivers import impl_rabbit as rabbit_driver
-from oslo_messaging.tests import utils as test_utils
-from oslo_serialization import jsonutils
-from six.moves import mock
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase):
-
- def setUp(self):
- super(TestDeprecatedRabbitDriverLoad, self).setUp(
- conf=cfg.ConfigOpts())
- self.messaging_conf.transport_driver = 'rabbit'
- self.config(fake_rabbit=True, group="oslo_messaging_rabbit")
-
- def test_driver_load(self):
- self.config(heartbeat_timeout_threshold=0,
- group='oslo_messaging_rabbit')
- transport = messaging.get_transport(self.conf)
- self.addCleanup(transport.cleanup)
- driver = transport._driver
- url = driver._get_connection()._url
-
- self.assertIsInstance(driver, rabbit_driver.RabbitDriver)
- self.assertEqual('memory:////', url)
-
-
-class TestRabbitDriverLoad(test_utils.BaseTestCase):
-
- scenarios = [
- ('rabbit', dict(transport_driver='rabbit',
- url='amqp://guest:guest@localhost:5672//')),
- ('kombu', dict(transport_driver='kombu',
- url='amqp://guest:guest@localhost:5672//')),
- ('rabbit+memory', dict(transport_driver='kombu+memory',
- url='memory:///'))
- ]
-
- @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure')
- @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset')
- def test_driver_load(self, fake_ensure, fake_reset):
- self.config(heartbeat_timeout_threshold=0,
- group='oslo_messaging_rabbit')
- self.messaging_conf.transport_driver = self.transport_driver
- transport = messaging.get_transport(self.conf)
- self.addCleanup(transport.cleanup)
- driver = transport._driver
- url = driver._get_connection()._url
-
- self.assertIsInstance(driver, rabbit_driver.RabbitDriver)
- self.assertEqual(self.url, url)
-
-
-class TestRabbitConsume(test_utils.BaseTestCase):
-
- def test_consume_timeout(self):
- transport = messaging.get_transport(self.conf, 'kombu+memory:////')
- self.addCleanup(transport.cleanup)
- deadline = time.time() + 6
- with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
- self.assertRaises(driver_common.Timeout,
- conn.consume, timeout=3)
-
- # kombu memory transport doesn't really raise error
- # so just simulate a real driver behavior
- conn.connection.connection.recoverable_channel_errors = (IOError,)
- conn.declare_fanout_consumer("notif.info", lambda msg: True)
- with mock.patch('kombu.connection.Connection.drain_events',
- side_effect=IOError):
- self.assertRaises(driver_common.Timeout,
- conn.consume, timeout=3)
-
- self.assertEqual(0, int(deadline - time.time()))
-
-
-class TestRabbitTransportURL(test_utils.BaseTestCase):
-
- scenarios = [
- ('none', dict(url=None,
- expected=["amqp://guest:guest@localhost:5672//"])),
- ('memory', dict(url='kombu+memory:////',
- expected=["memory:///"])),
- ('empty',
- dict(url='rabbit:///',
- expected=['amqp://guest:guest@localhost:5672/'])),
- ('localhost',
- dict(url='rabbit://localhost/',
- expected=['amqp://:@localhost:5672/'])),
- ('virtual_host',
- dict(url='rabbit:///vhost',
- expected=['amqp://guest:guest@localhost:5672/vhost'])),
- ('no_creds',
- dict(url='rabbit://host/virtual_host',
- expected=['amqp://:@host:5672/virtual_host'])),
- ('no_port',
- dict(url='rabbit://user:password@host/virtual_host',
- expected=['amqp://user:password@host:5672/virtual_host'])),
- ('full_url',
- dict(url='rabbit://user:password@host:10/virtual_host',
- expected=['amqp://user:password@host:10/virtual_host'])),
- ('full_two_url',
- dict(url='rabbit://user:password@host:10,'
- 'user2:password2@host2:12/virtual_host',
- expected=["amqp://user:password@host:10/virtual_host",
- "amqp://user2:password2@host2:12/virtual_host"]
- )),
- ('qpid',
- dict(url='kombu+qpid://user:password@host:10/virtual_host',
- expected=['qpid://user:password@host:10/virtual_host'])),
- ('rabbit',
- dict(url='kombu+rabbit://user:password@host:10/virtual_host',
- expected=['amqp://user:password@host:10/virtual_host'])),
- ('rabbit_ipv6',
- dict(url='kombu+rabbit://u:p@[fd00:beef:dead:55::133]:10/vhost',
- skip_py26='python 2.6 has broken urlparse for ipv6',
- expected=['amqp://u:p@[fd00:beef:dead:55::133]:10/vhost'])),
- ('rabbit_ipv4',
- dict(url='kombu+rabbit://user:password@10.20.30.40:10/vhost',
- expected=['amqp://user:password@10.20.30.40:10/vhost'])),
- ]
-
- def setUp(self):
- super(TestRabbitTransportURL, self).setUp()
- self.config(heartbeat_timeout_threshold=0,
- group='oslo_messaging_rabbit')
- self.messaging_conf.transport_driver = 'rabbit'
-
- @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure')
- @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset')
- def test_transport_url(self, fake_ensure_connection, fake_reset):
- if hasattr(self, 'skip_py26') and sys.version_info < (2, 7):
- self.skipTest(self.skip_py26)
-
- transport = messaging.get_transport(self.conf, self.url)
- self.addCleanup(transport.cleanup)
- driver = transport._driver
-
- # NOTE(sileht): some kombu transport can depend on library that
- # we don't want to depend yet, because selecting the transport
- # is experimental, only amqp is supported
- # for example kombu+qpid depends of qpid-tools
- # so, mock the connection.info to skip call to qpid-tools
- with mock.patch('kombu.connection.Connection.info'):
- urls = driver._get_connection()._url.split(";")
- self.assertEqual(sorted(self.expected), sorted(urls))
-
-
-class TestSendReceive(test_utils.BaseTestCase):
-
- _n_senders = [
- ('single_sender', dict(n_senders=1)),
- ('multiple_senders', dict(n_senders=10)),
- ]
-
- _context = [
- ('empty_context', dict(ctxt={})),
- ('with_context', dict(ctxt={'user': 'mark'})),
- ]
-
- _reply = [
- ('rx_id', dict(rx_id=True, reply=None)),
- ('none', dict(rx_id=False, reply=None)),
- ('empty_list', dict(rx_id=False, reply=[])),
- ('empty_dict', dict(rx_id=False, reply={})),
- ('false', dict(rx_id=False, reply=False)),
- ('zero', dict(rx_id=False, reply=0)),
- ]
-
- _failure = [
- ('success', dict(failure=False)),
- ('failure', dict(failure=True, expected=False)),
- ('expected_failure', dict(failure=True, expected=True)),
- ]
-
- _timeout = [
- ('no_timeout', dict(timeout=None)),
- ('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken?
- ]
-
- @classmethod
- def generate_scenarios(cls):
- cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
- cls._context,
- cls._reply,
- cls._failure,
- cls._timeout)
-
- def test_send_receive(self):
- self.config(heartbeat_timeout_threshold=0,
- group='oslo_messaging_rabbit')
- transport = messaging.get_transport(self.conf, 'kombu+memory:////')
- self.addCleanup(transport.cleanup)
-
- driver = transport._driver
-
- target = messaging.Target(topic='testtopic')
-
- listener = driver.listen(target)
-
- senders = []
- replies = []
- msgs = []
- errors = []
-
- def stub_error(msg, *a, **kw):
- if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
- a = a[0]
- errors.append(str(msg) % a)
-
- self.stubs.Set(driver_common.LOG, 'error', stub_error)
-
- def send_and_wait_for_reply(i):
- try:
- replies.append(driver.send(target,
- self.ctxt,
- {'tx_id': i},
- wait_for_reply=True,
- timeout=self.timeout))
- self.assertFalse(self.failure)
- self.assertIsNone(self.timeout)
- except (ZeroDivisionError, messaging.MessagingTimeout) as e:
- replies.append(e)
- self.assertTrue(self.failure or self.timeout is not None)
-
- while len(senders) < self.n_senders:
- senders.append(threading.Thread(target=send_and_wait_for_reply,
- args=(len(senders), )))
-
- for i in range(len(senders)):
- senders[i].start()
-
- received = listener.poll()
- self.assertIsNotNone(received)
- self.assertEqual(self.ctxt, received.ctxt)
- self.assertEqual({'tx_id': i}, received.message)
- msgs.append(received)
-
- # reply in reverse, except reply to the first guy second from last
- order = list(range(len(senders) - 1, -1, -1))
- if len(order) > 1:
- order[-1], order[-2] = order[-2], order[-1]
-
- for i in order:
- if self.timeout is None:
- if self.failure:
- try:
- raise ZeroDivisionError
- except Exception:
- failure = sys.exc_info()
-
- # NOTE(noelbk) confirm that Publisher exchanges
- # are always declared with passive=True
- outer_self = self
- test_exchange_was_called = [False]
- old_init = kombu.entity.Exchange.__init__
-
- def new_init(self, *args, **kwargs):
- test_exchange_was_called[0] = True
- outer_self.assertTrue(kwargs['passive'])
- old_init(self, *args, **kwargs)
- kombu.entity.Exchange.__init__ = new_init
-
- try:
- msgs[i].reply(failure=failure,
- log_failure=not self.expected)
- finally:
- kombu.entity.Exchange.__init__ = old_init
-
- self.assertTrue(test_exchange_was_called[0])
-
- elif self.rx_id:
- msgs[i].reply({'rx_id': i})
- else:
- msgs[i].reply(self.reply)
- senders[i].join()
-
- self.assertEqual(len(senders), len(replies))
- for i, reply in enumerate(replies):
- if self.timeout is not None:
- self.assertIsInstance(reply, messaging.MessagingTimeout)
- elif self.failure:
- self.assertIsInstance(reply, ZeroDivisionError)
- elif self.rx_id:
- self.assertEqual({'rx_id': order[i]}, reply)
- else:
- self.assertEqual(self.reply, reply)
-
- if not self.timeout and self.failure and not self.expected:
- self.assertTrue(len(errors) > 0, errors)
- else:
- self.assertEqual(0, len(errors), errors)
-
-
-TestSendReceive.generate_scenarios()
-
-
-class TestPollAsync(test_utils.BaseTestCase):
-
- def test_poll_timeout(self):
- transport = messaging.get_transport(self.conf, 'kombu+memory:////')
- self.addCleanup(transport.cleanup)
- driver = transport._driver
- target = messaging.Target(topic='testtopic')
- listener = driver.listen(target)
- received = listener.poll(timeout=0.050)
- self.assertIsNone(received)
-
-
-class TestRacyWaitForReply(test_utils.BaseTestCase):
-
- def test_send_receive(self):
- transport = messaging.get_transport(self.conf, 'kombu+memory:////')
- self.addCleanup(transport.cleanup)
-
- driver = transport._driver
-
- target = messaging.Target(topic='testtopic')
-
- listener = driver.listen(target)
-
- senders = []
- replies = []
- msgs = []
-
- wait_conditions = []
- orig_reply_waiter = amqpdriver.ReplyWaiter.wait
-
- def reply_waiter(self, msg_id, timeout):
- if wait_conditions:
- cond = wait_conditions.pop()
- with cond:
- cond.notify()
- with cond:
- cond.wait()
- return orig_reply_waiter(self, msg_id, timeout)
-
- self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter)
-
- def send_and_wait_for_reply(i, wait_for_reply):
- replies.append(driver.send(target,
- {},
- {'tx_id': i},
- wait_for_reply=wait_for_reply,
- timeout=None))
-
- while len(senders) < 2:
- t = threading.Thread(target=send_and_wait_for_reply,
- args=(len(senders), True))
- t.daemon = True
- senders.append(t)
-
- # test the case then msg_id is not set
- t = threading.Thread(target=send_and_wait_for_reply,
- args=(len(senders), False))
- t.daemon = True
- senders.append(t)
-
- # Start the first guy, receive his message, but delay his polling
- notify_condition = threading.Condition()
- wait_conditions.append(notify_condition)
- with notify_condition:
- senders[0].start()
- notify_condition.wait()
-
- msgs.append(listener.poll())
- self.assertEqual({'tx_id': 0}, msgs[-1].message)
-
- # Start the second guy, receive his message
- senders[1].start()
-
- msgs.append(listener.poll())
- self.assertEqual({'tx_id': 1}, msgs[-1].message)
-
- # Reply to both in order, making the second thread queue
- # the reply meant for the first thread
- msgs[0].reply({'rx_id': 0})
- msgs[1].reply({'rx_id': 1})
-
- # Wait for the second thread to finish
- senders[1].join()
-
- # Start the 3rd guy, receive his message
- senders[2].start()
-
- msgs.append(listener.poll())
- self.assertEqual({'tx_id': 2}, msgs[-1].message)
-
- # Verify the _send_reply was not invoked by driver:
- with mock.patch.object(msgs[2], '_send_reply') as method:
- msgs[2].reply({'rx_id': 2})
- self.assertEqual(method.call_count, 0)
-
- # Wait for the 3rd thread to finish
- senders[2].join()
-
- # Let the first thread continue
- with notify_condition:
- notify_condition.notify()
-
- # Wait for the first thread to finish
- senders[0].join()
-
- # Verify replies were received out of order
- self.assertEqual(len(senders), len(replies))
- self.assertEqual({'rx_id': 1}, replies[0])
- self.assertIsNone(replies[1])
- self.assertEqual({'rx_id': 0}, replies[2])
-
-
-def _declare_queue(target):
- connection = kombu.connection.BrokerConnection(transport='memory')
-
- # Kludge to speed up tests.
- connection.transport.polling_interval = 0.0
-
- connection.connect()
- channel = connection.channel()
-
- # work around 'memory' transport bug in 1.1.3
- channel._new_queue('ae.undeliver')
-
- if target.fanout:
- exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
- type='fanout',
- durable=False,
- auto_delete=True)
- queue = kombu.entity.Queue(name=target.topic + '_fanout_12345',
- channel=channel,
- exchange=exchange,
- routing_key=target.topic)
- if target.server:
- exchange = kombu.entity.Exchange(name='openstack',
- type='topic',
- durable=False,
- auto_delete=False)
- topic = '%s.%s' % (target.topic, target.server)
- queue = kombu.entity.Queue(name=topic,
- channel=channel,
- exchange=exchange,
- routing_key=topic)
- else:
- exchange = kombu.entity.Exchange(name='openstack',
- type='topic',
- durable=False,
- auto_delete=False)
- queue = kombu.entity.Queue(name=target.topic,
- channel=channel,
- exchange=exchange,
- routing_key=target.topic)
-
- queue.declare()
-
- return connection, channel, queue
-
-
-class TestRequestWireFormat(test_utils.BaseTestCase):
-
- _target = [
- ('topic_target',
- dict(topic='testtopic', server=None, fanout=False)),
- ('server_target',
- dict(topic='testtopic', server='testserver', fanout=False)),
- # NOTE(markmc): https://github.com/celery/kombu/issues/195
- ('fanout_target',
- dict(topic='testtopic', server=None, fanout=True,
- skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
- ]
-
- _msg = [
- ('empty_msg',
- dict(msg={}, expected={})),
- ('primitive_msg',
- dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
- ('complex_msg',
- dict(msg={'a': {'b': datetime.datetime(1920, 2, 3, 4, 5, 6, 7)}},
- expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
- ]
-
- _context = [
- ('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
- ('user_project_ctxt',
- dict(ctxt={'user': 'mark', 'project': 'snarkybunch'},
- expected_ctxt={'_context_user': 'mark',
- '_context_project': 'snarkybunch'})),
- ]
-
- @classmethod
- def generate_scenarios(cls):
- cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
- cls._context,
- cls._target)
-
- def setUp(self):
- super(TestRequestWireFormat, self).setUp()
- self.uuids = []
- self.orig_uuid4 = uuid.uuid4
- self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
-
- def mock_uuid4(self):
- self.uuids.append(self.orig_uuid4())
- return self.uuids[-1]
-
- def test_request_wire_format(self):
- if hasattr(self, 'skip_msg'):
- self.skipTest(self.skip_msg)
-
- transport = messaging.get_transport(self.conf, 'kombu+memory:////')
- self.addCleanup(transport.cleanup)
-
- driver = transport._driver
-
- target = messaging.Target(topic=self.topic,
- server=self.server,
- fanout=self.fanout)
-
- connection, channel, queue = _declare_queue(target)
- self.addCleanup(connection.release)
-
- driver.send(target, self.ctxt, self.msg)
-
- msgs = []
-
- def callback(msg):
- msg = channel.message_to_python(msg)
- msg.ack()
- msgs.append(msg.payload)
-
- queue.consume(callback=callback,
- consumer_tag='1',
- nowait=False)
-
- connection.drain_events()
-
- self.assertEqual(1, len(msgs))
- self.assertIn('oslo.message', msgs[0])
-
- received = msgs[0]
- received['oslo.message'] = jsonutils.loads(received['oslo.message'])
-
- # FIXME(markmc): add _msg_id and _reply_q check
- expected_msg = {
- '_unique_id': self.uuids[0].hex,
- }
- expected_msg.update(self.expected)
- expected_msg.update(self.expected_ctxt)
-
- expected = {
- 'oslo.version': '2.0',
- 'oslo.message': expected_msg,
- }
-
- self.assertEqual(expected, received)
-
-
-TestRequestWireFormat.generate_scenarios()
-
-
-def _create_producer(target):
- connection = kombu.connection.BrokerConnection(transport='memory')
-
- # Kludge to speed up tests.
- connection.transport.polling_interval = 0.0
-
- connection.connect()
- channel = connection.channel()
-
- # work around 'memory' transport bug in 1.1.3
- channel._new_queue('ae.undeliver')
-
- if target.fanout:
- exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
- type='fanout',
- durable=False,
- auto_delete=True)
- producer = kombu.messaging.Producer(exchange=exchange,
- channel=channel,
- routing_key=target.topic)
- elif target.server:
- exchange = kombu.entity.Exchange(name='openstack',
- type='topic',
- durable=False,
- auto_delete=False)
- topic = '%s.%s' % (target.topic, target.server)
- producer = kombu.messaging.Producer(exchange=exchange,
- channel=channel,
- routing_key=topic)
- else:
- exchange = kombu.entity.Exchange(name='openstack',
- type='topic',
- durable=False,
- auto_delete=False)
- producer = kombu.messaging.Producer(exchange=exchange,
- channel=channel,
- routing_key=target.topic)
-
- return connection, producer
-
-
-class TestReplyWireFormat(test_utils.BaseTestCase):
-
- _target = [
- ('topic_target',
- dict(topic='testtopic', server=None, fanout=False)),
- ('server_target',
- dict(topic='testtopic', server='testserver', fanout=False)),
- # NOTE(markmc): https://github.com/celery/kombu/issues/195
- ('fanout_target',
- dict(topic='testtopic', server=None, fanout=True,
- skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
- ]
-
- _msg = [
- ('empty_msg',
- dict(msg={}, expected={})),
- ('primitive_msg',
- dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
- ('complex_msg',
- dict(msg={'a': {'b': '1920-02-03T04:05:06.000007'}},
- expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
- ]
-
- _context = [
- ('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
- ('user_project_ctxt',
- dict(ctxt={'_context_user': 'mark',
- '_context_project': 'snarkybunch'},
- expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})),
- ]
-
- @classmethod
- def generate_scenarios(cls):
- cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
- cls._context,
- cls._target)
-
- def test_reply_wire_format(self):
- if hasattr(self, 'skip_msg'):
- self.skipTest(self.skip_msg)
-
- transport = messaging.get_transport(self.conf, 'kombu+memory:////')
- self.addCleanup(transport.cleanup)
-
- driver = transport._driver
-
- target = messaging.Target(topic=self.topic,
- server=self.server,
- fanout=self.fanout)
-
- listener = driver.listen(target)
-
- connection, producer = _create_producer(target)
- self.addCleanup(connection.release)
-
- msg = {
- 'oslo.version': '2.0',
- 'oslo.message': {}
- }
-
- msg['oslo.message'].update(self.msg)
- msg['oslo.message'].update(self.ctxt)
-
- msg['oslo.message'].update({
- '_msg_id': uuid.uuid4().hex,
- '_unique_id': uuid.uuid4().hex,
- '_reply_q': 'reply_' + uuid.uuid4().hex,
- })
-
- msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
-
- producer.publish(msg)
-
- received = listener.poll()
- self.assertIsNotNone(received)
- self.assertEqual(self.expected_ctxt, received.ctxt)
- self.assertEqual(self.expected, received.message)
-
-
-TestReplyWireFormat.generate_scenarios()
-
-
-class RpcKombuHATestCase(test_utils.BaseTestCase):
-
- def setUp(self):
- super(RpcKombuHATestCase, self).setUp()
- self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
- self.config(rabbit_hosts=self.brokers,
- rabbit_retry_interval=0.01,
- rabbit_retry_backoff=0.01,
- kombu_reconnect_delay=0,
- group="oslo_messaging_rabbit")
-
- self.kombu_connect = mock.Mock()
- self.useFixture(mockpatch.Patch(
- 'kombu.connection.Connection.connect',
- side_effect=self.kombu_connect))
- self.useFixture(mockpatch.Patch(
- 'kombu.connection.Connection.channel'))
-
- # starting from the first broker in the list
- url = messaging.TransportURL.parse(self.conf, None)
- self.connection = rabbit_driver.Connection(self.conf, url,
- amqp.PURPOSE_SEND)
- self.addCleanup(self.connection.close)
-
- def test_ensure_four_retry(self):
- mock_callback = mock.Mock(side_effect=IOError)
- self.assertRaises(messaging.MessageDeliveryFailure,
- self.connection.ensure, mock_callback,
- retry=4)
- self.assertEqual(5, self.kombu_connect.call_count)
- self.assertEqual(6, mock_callback.call_count)
-
- def test_ensure_one_retry(self):
- mock_callback = mock.Mock(side_effect=IOError)
- self.assertRaises(messaging.MessageDeliveryFailure,
- self.connection.ensure, mock_callback,
- retry=1)
- self.assertEqual(2, self.kombu_connect.call_count)
- self.assertEqual(3, mock_callback.call_count)
-
- def test_ensure_no_retry(self):
- mock_callback = mock.Mock(side_effect=IOError)
- self.assertRaises(messaging.MessageDeliveryFailure,
- self.connection.ensure, mock_callback,
- retry=0)
- self.assertEqual(1, self.kombu_connect.call_count)
- self.assertEqual(2, mock_callback.call_count)
diff --git a/tests/drivers/test_impl_zmq.py b/tests/drivers/test_impl_zmq.py
deleted file mode 100644
index d191ae6..0000000
--- a/tests/drivers/test_impl_zmq.py
+++ /dev/null
@@ -1,228 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import threading
-
-import fixtures
-import testtools
-
-import oslo_messaging
-from oslo_messaging._drivers import impl_zmq
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._i18n import _
-from oslo_messaging.tests import utils as test_utils
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
-
-class TestRPCServerListener(object):
-
- def __init__(self, driver):
- self.driver = driver
- self.target = None
- self.listener = None
- self.executor = zmq_async.get_executor(self._run)
- self._stop = threading.Event()
- self._received = threading.Event()
- self.message = None
-
- def listen(self, target):
- self.target = target
- self.listener = self.driver.listen(self.target)
- self.executor.execute()
-
- def _run(self):
- try:
- message = self.listener.poll()
- if message is not None:
- self._received.set()
- self.message = message
- message.reply(reply=True)
- except Exception:
- LOG.exception(_("Unexpected exception occurred."))
-
- def stop(self):
- self.executor.stop()
-
-
-class ZmqBaseTestCase(test_utils.BaseTestCase):
- """Base test case for all ZMQ tests """
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(ZmqBaseTestCase, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
-
- # Set config values
- self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
- kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
- 'rpc_zmq_host': '127.0.0.1',
- 'rpc_response_timeout': 5,
- 'rpc_zmq_ipc_dir': self.internal_ipc_dir,
- 'rpc_zmq_matchmaker': 'dummy'}
- self.config(**kwargs)
-
- # Get driver
- transport = oslo_messaging.get_transport(self.conf)
- self.driver = transport._driver
-
- self.listener = TestRPCServerListener(self.driver)
-
- self.addCleanup(stopRpc(self.__dict__))
-
-
-class TestConfZmqDriverLoad(test_utils.BaseTestCase):
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(TestConfZmqDriverLoad, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
-
- def test_driver_load(self):
- transport = oslo_messaging.get_transport(self.conf)
- self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
-
-
-class stopRpc(object):
- def __init__(self, attrs):
- self.attrs = attrs
-
- def __call__(self):
- if self.attrs['driver']:
- self.attrs['driver'].cleanup()
- if self.attrs['listener']:
- self.attrs['listener'].stop()
-
-
-class TestZmqBasics(ZmqBaseTestCase):
-
- def test_send_receive_raises(self):
- """Call() without method."""
- target = oslo_messaging.Target(topic='testtopic')
- self.listener.listen(target)
- self.assertRaises(
- KeyError,
- self.driver.send,
- target, {}, {'tx_id': 1}, wait_for_reply=True)
-
- def test_send_receive_topic(self):
- """Call() with topic."""
-
- target = oslo_messaging.Target(topic='testtopic')
- self.listener.listen(target)
- result = self.driver.send(
- target, {},
- {'method': 'hello-world', 'tx_id': 1},
- wait_for_reply=True)
- self.assertTrue(result)
-
- def test_send_noreply(self):
- """Cast() with topic."""
-
- target = oslo_messaging.Target(topic='testtopic', server="my@server")
- self.listener.listen(target)
- result = self.driver.send(
- target, {},
- {'method': 'hello-world', 'tx_id': 1},
- wait_for_reply=False)
-
- self.listener._received.wait()
-
- self.assertIsNone(result)
- self.assertEqual(True, self.listener._received.isSet())
- method = self.listener.message.message[u'method']
- self.assertEqual(u'hello-world', method)
-
- def test_send_fanout(self):
- target = oslo_messaging.Target(topic='testtopic', fanout=True)
- self.listener.listen(target)
-
- result = self.driver.send(
- target, {},
- {'method': 'hello-world', 'tx_id': 1},
- wait_for_reply=False)
-
- self.listener._received.wait()
-
- self.assertIsNone(result)
- self.assertEqual(True, self.listener._received.isSet())
- method = self.listener.message.message[u'method']
- self.assertEqual(u'hello-world', method)
-
- def test_send_receive_direct(self):
- """Call() without topic."""
-
- target = oslo_messaging.Target(server='127.0.0.1')
- self.listener.listen(target)
- message = {'method': 'hello-world', 'tx_id': 1}
- context = {}
- result = self.driver.send(target, context, message,
- wait_for_reply=True)
- self.assertTrue(result)
-
-
-class TestPoller(test_utils.BaseTestCase):
-
- def setUp(self):
- super(TestPoller, self).setUp()
- self.poller = zmq_async.get_poller()
- self.ctx = zmq.Context()
- self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
- self.ADDR_REQ = "ipc://%s/request1" % self.internal_ipc_dir
-
- def test_poll_blocking(self):
-
- rep = self.ctx.socket(zmq.REP)
- rep.bind(self.ADDR_REQ)
-
- reply_poller = zmq_async.get_reply_poller()
- reply_poller.register(rep)
-
- def listener():
- incoming, socket = reply_poller.poll()
- self.assertEqual(b'Hello', incoming[0])
- socket.send_string('Reply')
- reply_poller.resume_polling(socket)
-
- executor = zmq_async.get_executor(listener)
- executor.execute()
-
- req1 = self.ctx.socket(zmq.REQ)
- req1.connect(self.ADDR_REQ)
-
- req2 = self.ctx.socket(zmq.REQ)
- req2.connect(self.ADDR_REQ)
-
- req1.send_string('Hello')
- req2.send_string('Hello')
-
- reply = req1.recv_string()
- self.assertEqual('Reply', reply)
-
- reply = req2.recv_string()
- self.assertEqual('Reply', reply)
-
- def test_poll_timeout(self):
- rep = self.ctx.socket(zmq.REP)
- rep.bind(self.ADDR_REQ)
-
- reply_poller = zmq_async.get_reply_poller()
- reply_poller.register(rep)
-
- incoming, socket = reply_poller.poll(1)
- self.assertIsNone(incoming)
- self.assertIsNone(socket)
diff --git a/tests/drivers/test_matchmaker.py b/tests/drivers/test_matchmaker.py
deleted file mode 100644
index fe59fef..0000000
--- a/tests/drivers/test_matchmaker.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Copyright 2014 Canonical, Ltd.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import testtools
-
-from oslo_messaging.tests import utils as test_utils
-from oslo_utils import importutils
-
-# NOTE(jamespage) matchmaker tied directly to eventlet
-# which is not yet py3 compatible - skip if import fails
-matchmaker = (
- importutils.try_import('oslo.messaging._drivers.matchmaker'))
-
-
-@testtools.skipIf(not matchmaker, "matchmaker/eventlet unavailable")
-class MatchmakerTest(test_utils.BaseTestCase):
-
- def test_fanout_binding(self):
- matcher = matchmaker.MatchMakerBase()
- matcher.add_binding(
- matchmaker.FanoutBinding(), matchmaker.DirectExchange())
- self.assertEqual(matcher.queues('hello.world'), [])
- self.assertEqual(
- matcher.queues('fanout~fantasy.unicorn'),
- [('fanout~fantasy.unicorn', 'unicorn')])
- self.assertEqual(
- matcher.queues('fanout~fantasy.pony'),
- [('fanout~fantasy.pony', 'pony')])
-
- def test_topic_binding(self):
- matcher = matchmaker.MatchMakerBase()
- matcher.add_binding(
- matchmaker.TopicBinding(), matchmaker.StubExchange())
- self.assertEqual(
- matcher.queues('hello-world'), [('hello-world', None)])
-
- def test_direct_binding(self):
- matcher = matchmaker.MatchMakerBase()
- matcher.add_binding(
- matchmaker.DirectBinding(), matchmaker.StubExchange())
- self.assertEqual(
- matcher.queues('hello.server'), [('hello.server', None)])
- self.assertEqual(matcher.queues('hello-world'), [])
-
- def test_localhost_match(self):
- matcher = matchmaker.MatchMakerLocalhost()
- self.assertEqual(
- matcher.queues('hello.server'), [('hello.server', 'server')])
-
- # Gets remapped due to localhost exchange
- # all bindings default to first match.
- self.assertEqual(
- matcher.queues('fanout~testing.server'),
- [('fanout~testing.localhost', 'localhost')])
-
- self.assertEqual(
- matcher.queues('hello-world'),
- [('hello-world.localhost', 'localhost')])
diff --git a/tests/drivers/test_matchmaker_redis.py b/tests/drivers/test_matchmaker_redis.py
deleted file mode 100644
index 35a8c14..0000000
--- a/tests/drivers/test_matchmaker_redis.py
+++ /dev/null
@@ -1,83 +0,0 @@
-# Copyright 2014 Canonical, Ltd.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import testtools
-
-from oslo_messaging.tests import utils as test_utils
-from oslo_utils import importutils
-
-redis = importutils.try_import('redis')
-matchmaker_redis = (
- importutils.try_import('oslo.messaging._drivers.matchmaker_redis'))
-
-
-def redis_available():
- '''Helper to see if local redis server is running'''
- if not redis:
- return False
- try:
- c = redis.StrictRedis(socket_timeout=1)
- c.ping()
- return True
- except redis.exceptions.ConnectionError:
- return False
-
-
-@testtools.skipIf(not matchmaker_redis, "matchmaker/eventlet unavailable")
-@testtools.skipIf(not redis_available(), "redis unavailable")
-class RedisMatchMakerTest(test_utils.BaseTestCase):
-
- def setUp(self):
- super(RedisMatchMakerTest, self).setUp()
- self.ring_data = {
- "conductor": ["controller1", "node1", "node2", "node3"],
- "scheduler": ["controller1", "node1", "node2", "node3"],
- "network": ["controller1", "node1", "node2", "node3"],
- "cert": ["controller1"],
- "console": ["controller1"],
- "l3_agent.node1": ["node1"],
- "consoleauth": ["controller1"]}
- self.matcher = matchmaker_redis.MatchMakerRedis()
- self.populate()
-
- def tearDown(self):
- super(RedisMatchMakerTest, self).tearDown()
- c = redis.StrictRedis()
- c.flushdb()
-
- def populate(self):
- for k, hosts in self.ring_data.items():
- for h in hosts:
- self.matcher.register(k, h)
-
- def test_direct(self):
- self.assertEqual(
- self.matcher.queues('cert.controller1'),
- [('cert.controller1', 'controller1')])
-
- def test_register(self):
- self.matcher.register('cert', 'keymaster')
- self.assertEqual(
- sorted(self.matcher.redis.smembers('cert')),
- ['cert.controller1', 'cert.keymaster'])
- self.matcher.register('l3_agent.node1', 'node1')
- self.assertEqual(
- sorted(self.matcher.redis.smembers('l3_agent.node1')),
- ['l3_agent.node1.node1'])
-
- def test_unregister(self):
- self.matcher.unregister('conductor', 'controller1')
- self.assertEqual(
- sorted(self.matcher.redis.smembers('conductor')),
- ['conductor.node1', 'conductor.node2', 'conductor.node3'])
diff --git a/tests/drivers/test_matchmaker_ring.py b/tests/drivers/test_matchmaker_ring.py
deleted file mode 100644
index 0107464..0000000
--- a/tests/drivers/test_matchmaker_ring.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright 2014 Canonical, Ltd.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import testtools
-
-from oslo_messaging.tests import utils as test_utils
-from oslo_utils import importutils
-
-# NOTE(jamespage) matchmaker tied directly to eventlet
-# which is not yet py3 compatible - skip if import fails
-matchmaker_ring = (
- importutils.try_import('oslo.messaging._drivers.matchmaker_ring'))
-
-
-@testtools.skipIf(not matchmaker_ring, "matchmaker/eventlet unavailable")
-class MatchmakerRingTest(test_utils.BaseTestCase):
-
- def setUp(self):
- super(MatchmakerRingTest, self).setUp()
- self.ring_data = {
- "conductor": ["controller1", "node1", "node2", "node3"],
- "scheduler": ["controller1", "node1", "node2", "node3"],
- "network": ["controller1", "node1", "node2", "node3"],
- "cert": ["controller1"],
- "console": ["controller1"],
- "consoleauth": ["controller1"]}
- self.matcher = matchmaker_ring.MatchMakerRing(self.ring_data)
-
- def test_direct(self):
- self.assertEqual(
- self.matcher.queues('cert.controller1'),
- [('cert.controller1', 'controller1')])
- self.assertEqual(
- self.matcher.queues('conductor.node1'),
- [('conductor.node1', 'node1')])
-
- def test_fanout(self):
- self.assertEqual(
- self.matcher.queues('fanout~conductor'),
- [('fanout~conductor.controller1', 'controller1'),
- ('fanout~conductor.node1', 'node1'),
- ('fanout~conductor.node2', 'node2'),
- ('fanout~conductor.node3', 'node3')])
-
- def test_bare_topic(self):
- # Round robins through the hosts on the topic
- self.assertEqual(
- self.matcher.queues('scheduler'),
- [('scheduler.controller1', 'controller1')])
- self.assertEqual(
- self.matcher.queues('scheduler'),
- [('scheduler.node1', 'node1')])
- self.assertEqual(
- self.matcher.queues('scheduler'),
- [('scheduler.node2', 'node2')])
- self.assertEqual(
- self.matcher.queues('scheduler'),
- [('scheduler.node3', 'node3')])
- # Cycles loop
- self.assertEqual(
- self.matcher.queues('scheduler'),
- [('scheduler.controller1', 'controller1')])
diff --git a/tests/notify/__init__.py b/tests/notify/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/tests/notify/__init__.py
+++ /dev/null
diff --git a/tests/notify/test_dispatcher.py b/tests/notify/test_dispatcher.py
deleted file mode 100644
index 5c61840..0000000
--- a/tests/notify/test_dispatcher.py
+++ /dev/null
@@ -1,171 +0,0 @@
-
-# Copyright 2013 eNovance
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import itertools
-
-from oslo_utils import timeutils
-import testscenarios
-
-from oslo import messaging
-from oslo.messaging.notify import dispatcher as notify_dispatcher
-from oslo_messaging.tests import utils as test_utils
-from six.moves import mock
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-notification_msg = dict(
- publisher_id="publisher_id",
- event_type="compute.start",
- payload={"info": "fuu"},
- message_id="uuid",
- timestamp=str(timeutils.utcnow())
-)
-
-
-class TestDispatcherScenario(test_utils.BaseTestCase):
-
- scenarios = [
- ('no_endpoints',
- dict(endpoints=[],
- endpoints_expect_calls=[],
- priority='info',
- ex=None,
- return_value=messaging.NotificationResult.HANDLED)),
- ('one_endpoints',
- dict(endpoints=[['warn']],
- endpoints_expect_calls=['warn'],
- priority='warn',
- ex=None,
- return_value=messaging.NotificationResult.HANDLED)),
- ('two_endpoints_only_one_match',
- dict(endpoints=[['warn'], ['info']],
- endpoints_expect_calls=[None, 'info'],
- priority='info',
- ex=None,
- return_value=messaging.NotificationResult.HANDLED)),
- ('two_endpoints_both_match',
- dict(endpoints=[['debug', 'info'], ['info', 'debug']],
- endpoints_expect_calls=['debug', 'debug'],
- priority='debug',
- ex=None,
- return_value=messaging.NotificationResult.HANDLED)),
- ('no_return_value',
- dict(endpoints=[['warn']],
- endpoints_expect_calls=['warn'],
- priority='warn',
- ex=None, return_value=None)),
- ('requeue',
- dict(endpoints=[['debug', 'warn']],
- endpoints_expect_calls=['debug'],
- priority='debug', msg=notification_msg,
- ex=None,
- return_value=messaging.NotificationResult.REQUEUE)),
- ('exception',
- dict(endpoints=[['debug', 'warn']],
- endpoints_expect_calls=['debug'],
- priority='debug', msg=notification_msg,
- ex=Exception,
- return_value=messaging.NotificationResult.HANDLED)),
- ]
-
- def test_dispatcher(self):
- endpoints = []
- for endpoint_methods in self.endpoints:
- e = mock.Mock(spec=endpoint_methods)
- endpoints.append(e)
- for m in endpoint_methods:
- method = getattr(e, m)
- if self.ex:
- method.side_effect = self.ex()
- else:
- method.return_value = self.return_value
-
- msg = notification_msg.copy()
- msg['priority'] = self.priority
-
- targets = [messaging.Target(topic='notifications')]
- dispatcher = notify_dispatcher.NotificationDispatcher(
- targets, endpoints, None, allow_requeue=True, pool=None)
-
- # check it listen on wanted topics
- self.assertEqual(sorted(set((targets[0], prio)
- for prio in itertools.chain.from_iterable(
- self.endpoints))),
- sorted(dispatcher._targets_priorities))
-
- incoming = mock.Mock(ctxt={}, message=msg)
- with dispatcher(incoming) as callback:
- callback()
-
- # check endpoint callbacks are called or not
- for i, endpoint_methods in enumerate(self.endpoints):
- for m in endpoint_methods:
- if m == self.endpoints_expect_calls[i]:
- method = getattr(endpoints[i], m)
- method.assert_called_once_with(
- {},
- msg['publisher_id'],
- msg['event_type'],
- msg['payload'], {
- 'timestamp': mock.ANY,
- 'message_id': mock.ANY
- })
- else:
- self.assertEqual(0, endpoints[i].call_count)
-
- if self.ex:
- self.assertEqual(1, incoming.acknowledge.call_count)
- self.assertEqual(0, incoming.requeue.call_count)
- elif self.return_value == messaging.NotificationResult.HANDLED \
- or self.return_value is None:
- self.assertEqual(1, incoming.acknowledge.call_count)
- self.assertEqual(0, incoming.requeue.call_count)
- elif self.return_value == messaging.NotificationResult.REQUEUE:
- self.assertEqual(0, incoming.acknowledge.call_count)
- self.assertEqual(1, incoming.requeue.call_count)
-
-
-class TestDispatcher(test_utils.BaseTestCase):
-
- @mock.patch('oslo_messaging.notify.dispatcher.LOG')
- def test_dispatcher_unknown_prio(self, mylog):
- msg = notification_msg.copy()
- msg['priority'] = 'what???'
- dispatcher = notify_dispatcher.NotificationDispatcher(
- [mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None)
- with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback:
- callback()
- mylog.warning.assert_called_once_with('Unknown priority "%s"',
- 'what???')
-
- def test_dispatcher_executor_callback(self):
- endpoint = mock.Mock(spec=['warn'])
- endpoint_method = endpoint.warn
- endpoint_method.return_value = messaging.NotificationResult.HANDLED
-
- targets = [messaging.Target(topic='notifications')]
- dispatcher = notify_dispatcher.NotificationDispatcher(
- targets, [endpoint], None, allow_requeue=True)
-
- msg = notification_msg.copy()
- msg['priority'] = 'warn'
-
- incoming = mock.Mock(ctxt={}, message=msg)
- executor_callback = mock.Mock()
- with dispatcher(incoming, executor_callback) as callback:
- callback()
- self.assertTrue(executor_callback.called)
- self.assertEqual(executor_callback.call_args[0][0], endpoint_method)
diff --git a/tests/notify/test_listener.py b/tests/notify/test_listener.py
deleted file mode 100644
index 84e257d..0000000
--- a/tests/notify/test_listener.py
+++ /dev/null
@@ -1,411 +0,0 @@
-
-# Copyright 2013 eNovance
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import threading
-import time
-
-import testscenarios
-
-from oslo import messaging
-from oslo.messaging.notify import dispatcher
-from oslo_config import cfg
-from oslo_messaging.tests import utils as test_utils
-from six.moves import mock
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class RestartableServerThread(object):
- def __init__(self, server):
- self.server = server
- self.thread = None
-
- def start(self):
- if self.thread is None:
- self.thread = threading.Thread(target=self.server.start)
- self.thread.daemon = True
- self.thread.start()
-
- def stop(self):
- if self.thread is not None:
- # Check start() does nothing with a running listener
- self.server.start()
- self.server.stop()
- self.server.wait()
- self.thread.join(timeout=15)
- ret = self.thread.isAlive()
- self.thread = None
- return ret
- return True
-
-
-class ListenerSetupMixin(object):
-
- class ThreadTracker(object):
- def __init__(self):
- self._received_msgs = 0
- self.threads = []
- self.lock = threading.Lock()
-
- def info(self, ctxt, publisher_id, event_type, payload, metadata):
- # NOTE(sileht): this run into an other thread
- with self.lock:
- self._received_msgs += 1
-
- def wait_for_messages(self, expect_messages):
- while self._received_msgs < expect_messages:
- time.sleep(0.01)
-
- def stop(self):
- for thread in self.threads:
- thread.stop()
- self.threads = []
-
- def start(self, thread):
- self.threads.append(thread)
- thread.start()
-
- def setUp(self):
- self.trackers = {}
- self.addCleanup(self._stop_trackers)
-
- def _stop_trackers(self):
- for pool in self.trackers:
- self.trackers[pool].stop()
- self.trackers = {}
-
- def _setup_listener(self, transport, endpoints,
- targets=None, pool=None):
-
- if pool is None:
- tracker_name = '__default__'
- else:
- tracker_name = pool
-
- if targets is None:
- targets = [messaging.Target(topic='testtopic')]
-
- tracker = self.trackers.setdefault(
- tracker_name, self.ThreadTracker())
- listener = messaging.get_notification_listener(
- transport, targets=targets, endpoints=[tracker] + endpoints,
- allow_requeue=True, pool=pool)
-
- thread = RestartableServerThread(listener)
- tracker.start(thread)
- return thread
-
- def wait_for_messages(self, expect_messages, tracker_name='__default__'):
- self.trackers[tracker_name].wait_for_messages(expect_messages)
-
- def _setup_notifier(self, transport, topic='testtopic',
- publisher_id='testpublisher'):
- return messaging.Notifier(transport, topic=topic,
- driver='messaging',
- publisher_id=publisher_id)
-
-
-class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
-
- def __init__(self, *args):
- super(TestNotifyListener, self).__init__(*args)
- ListenerSetupMixin.__init__(self)
-
- def setUp(self):
- super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
- ListenerSetupMixin.setUp(self)
-
- def test_constructor(self):
- transport = messaging.get_transport(self.conf, url='fake:')
- target = messaging.Target(topic='foo')
- endpoints = [object()]
-
- listener = messaging.get_notification_listener(transport, [target],
- endpoints)
-
- self.assertIs(listener.conf, self.conf)
- self.assertIs(listener.transport, transport)
- self.assertIsInstance(listener.dispatcher,
- dispatcher.NotificationDispatcher)
- self.assertIs(listener.dispatcher.endpoints, endpoints)
- self.assertEqual('blocking', listener.executor)
-
- def test_no_target_topic(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- listener = messaging.get_notification_listener(transport,
- [messaging.Target()],
- [mock.Mock()])
- try:
- listener.start()
- except Exception as ex:
- self.assertIsInstance(ex, messaging.InvalidTarget, ex)
- else:
- self.assertTrue(False)
-
- def test_unknown_executor(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- try:
- messaging.get_notification_listener(transport, [], [],
- executor='foo')
- except Exception as ex:
- self.assertIsInstance(ex, messaging.ExecutorLoadFailure)
- self.assertEqual('foo', ex.executor)
- else:
- self.assertTrue(False)
-
- def test_one_topic(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- endpoint = mock.Mock()
- endpoint.info.return_value = None
- listener_thread = self._setup_listener(transport, [endpoint])
-
- notifier = self._setup_notifier(transport)
- notifier.info({}, 'an_event.start', 'test message')
-
- self.wait_for_messages(1)
- self.assertFalse(listener_thread.stop())
-
- endpoint.info.assert_called_once_with(
- {}, 'testpublisher', 'an_event.start', 'test message',
- {'message_id': mock.ANY, 'timestamp': mock.ANY})
-
- def test_two_topics(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- endpoint = mock.Mock()
- endpoint.info.return_value = None
- targets = [messaging.Target(topic="topic1"),
- messaging.Target(topic="topic2")]
- listener_thread = self._setup_listener(transport, [endpoint],
- targets=targets)
- notifier = self._setup_notifier(transport, topic='topic1')
- notifier.info({'ctxt': '1'}, 'an_event.start1', 'test')
- notifier = self._setup_notifier(transport, topic='topic2')
- notifier.info({'ctxt': '2'}, 'an_event.start2', 'test')
-
- self.wait_for_messages(2)
- self.assertFalse(listener_thread.stop())
-
- endpoint.info.assert_has_calls([
- mock.call({'ctxt': '1'}, 'testpublisher',
- 'an_event.start1', 'test',
- {'timestamp': mock.ANY, 'message_id': mock.ANY}),
- mock.call({'ctxt': '2'}, 'testpublisher',
- 'an_event.start2', 'test',
- {'timestamp': mock.ANY, 'message_id': mock.ANY})],
- any_order=True)
-
- def test_two_exchanges(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- endpoint = mock.Mock()
- endpoint.info.return_value = None
- targets = [messaging.Target(topic="topic",
- exchange="exchange1"),
- messaging.Target(topic="topic",
- exchange="exchange2")]
- listener_thread = self._setup_listener(transport, [endpoint],
- targets=targets)
-
- notifier = self._setup_notifier(transport, topic="topic")
-
- def mock_notifier_exchange(name):
- def side_effect(target, ctxt, message, version, retry):
- target.exchange = name
- return transport._driver.send_notification(target, ctxt,
- message, version,
- retry=retry)
- transport._send_notification = mock.MagicMock(
- side_effect=side_effect)
-
- notifier.info({'ctxt': '0'},
- 'an_event.start', 'test message default exchange')
- mock_notifier_exchange('exchange1')
- notifier.info({'ctxt': '1'},
- 'an_event.start', 'test message exchange1')
- mock_notifier_exchange('exchange2')
- notifier.info({'ctxt': '2'},
- 'an_event.start', 'test message exchange2')
-
- self.wait_for_messages(2)
- self.assertFalse(listener_thread.stop())
-
- endpoint.info.assert_has_calls([
- mock.call({'ctxt': '1'}, 'testpublisher', 'an_event.start',
- 'test message exchange1',
- {'timestamp': mock.ANY, 'message_id': mock.ANY}),
- mock.call({'ctxt': '2'}, 'testpublisher', 'an_event.start',
- 'test message exchange2',
- {'timestamp': mock.ANY, 'message_id': mock.ANY})],
- any_order=True)
-
- def test_two_endpoints(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- endpoint1 = mock.Mock()
- endpoint1.info.return_value = None
- endpoint2 = mock.Mock()
- endpoint2.info.return_value = messaging.NotificationResult.HANDLED
- listener_thread = self._setup_listener(transport,
- [endpoint1, endpoint2])
- notifier = self._setup_notifier(transport)
- notifier.info({}, 'an_event.start', 'test')
-
- self.wait_for_messages(1)
- self.assertFalse(listener_thread.stop())
-
- endpoint1.info.assert_called_once_with(
- {}, 'testpublisher', 'an_event.start', 'test', {
- 'timestamp': mock.ANY,
- 'message_id': mock.ANY})
-
- endpoint2.info.assert_called_once_with(
- {}, 'testpublisher', 'an_event.start', 'test', {
- 'timestamp': mock.ANY,
- 'message_id': mock.ANY})
-
- def test_requeue(self):
- transport = messaging.get_transport(self.conf, url='fake:')
- endpoint = mock.Mock()
- endpoint.info = mock.Mock()
-
- def side_effect_requeue(*args, **kwargs):
- if endpoint.info.call_count == 1:
- return messaging.NotificationResult.REQUEUE
- return messaging.NotificationResult.HANDLED
-
- endpoint.info.side_effect = side_effect_requeue
- listener_thread = self._setup_listener(transport, [endpoint])
- notifier = self._setup_notifier(transport)
- notifier.info({}, 'an_event.start', 'test')
-
- self.wait_for_messages(2)
- self.assertFalse(listener_thread.stop())
-
- endpoint.info.assert_has_calls([
- mock.call({}, 'testpublisher', 'an_event.start', 'test',
- {'timestamp': mock.ANY, 'message_id': mock.ANY}),
- mock.call({}, 'testpublisher', 'an_event.start', 'test',
- {'timestamp': mock.ANY, 'message_id': mock.ANY})])
-
- def test_two_pools(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- endpoint1 = mock.Mock()
- endpoint1.info.return_value = None
- endpoint2 = mock.Mock()
- endpoint2.info.return_value = None
-
- targets = [messaging.Target(topic="topic")]
- listener1_thread = self._setup_listener(transport, [endpoint1],
- targets=targets, pool="pool1")
- listener2_thread = self._setup_listener(transport, [endpoint2],
- targets=targets, pool="pool2")
-
- notifier = self._setup_notifier(transport, topic="topic")
- notifier.info({'ctxt': '0'}, 'an_event.start', 'test message0')
- notifier.info({'ctxt': '1'}, 'an_event.start', 'test message1')
-
- self.wait_for_messages(2, "pool1")
- self.wait_for_messages(2, "pool2")
- self.assertFalse(listener2_thread.stop())
- self.assertFalse(listener1_thread.stop())
-
- def mocked_endpoint_call(i):
- return mock.call({'ctxt': '%d' % i}, 'testpublisher',
- 'an_event.start', 'test message%d' % i,
- {'timestamp': mock.ANY, 'message_id': mock.ANY})
-
- endpoint1.info.assert_has_calls([mocked_endpoint_call(0),
- mocked_endpoint_call(1)])
- endpoint2.info.assert_has_calls([mocked_endpoint_call(0),
- mocked_endpoint_call(1)])
-
- def test_two_pools_three_listener(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- endpoint1 = mock.Mock()
- endpoint1.info.return_value = None
- endpoint2 = mock.Mock()
- endpoint2.info.return_value = None
- endpoint3 = mock.Mock()
- endpoint3.info.return_value = None
-
- targets = [messaging.Target(topic="topic")]
- listener1_thread = self._setup_listener(transport, [endpoint1],
- targets=targets, pool="pool1")
- listener2_thread = self._setup_listener(transport, [endpoint2],
- targets=targets, pool="pool2")
- listener3_thread = self._setup_listener(transport, [endpoint3],
- targets=targets, pool="pool2")
-
- def mocked_endpoint_call(i):
- return mock.call({'ctxt': '%d' % i}, 'testpublisher',
- 'an_event.start', 'test message%d' % i,
- {'timestamp': mock.ANY, 'message_id': mock.ANY})
-
- notifier = self._setup_notifier(transport, topic="topic")
- mocked_endpoint1_calls = []
- for i in range(0, 25):
- notifier.info({'ctxt': '%d' % i}, 'an_event.start',
- 'test message%d' % i)
- mocked_endpoint1_calls.append(mocked_endpoint_call(i))
-
- self.wait_for_messages(25, 'pool2')
- listener2_thread.stop()
-
- for i in range(0, 25):
- notifier.info({'ctxt': '%d' % i}, 'an_event.start',
- 'test message%d' % i)
- mocked_endpoint1_calls.append(mocked_endpoint_call(i))
-
- self.wait_for_messages(50, 'pool2')
- listener2_thread.start()
- listener3_thread.stop()
-
- for i in range(0, 25):
- notifier.info({'ctxt': '%d' % i}, 'an_event.start',
- 'test message%d' % i)
- mocked_endpoint1_calls.append(mocked_endpoint_call(i))
-
- self.wait_for_messages(75, 'pool2')
- listener3_thread.start()
-
- for i in range(0, 25):
- notifier.info({'ctxt': '%d' % i}, 'an_event.start',
- 'test message%d' % i)
- mocked_endpoint1_calls.append(mocked_endpoint_call(i))
-
- self.wait_for_messages(100, 'pool1')
- self.wait_for_messages(100, 'pool2')
-
- self.assertFalse(listener3_thread.stop())
- self.assertFalse(listener2_thread.stop())
- self.assertFalse(listener1_thread.stop())
-
- self.assertEqual(100, endpoint1.info.call_count)
- endpoint1.info.assert_has_calls(mocked_endpoint1_calls)
-
- self.assertLessEqual(25, endpoint2.info.call_count)
- self.assertLessEqual(25, endpoint3.info.call_count)
-
- self.assertEqual(100, endpoint2.info.call_count +
- endpoint3.info.call_count)
- for call in mocked_endpoint1_calls:
- self.assertIn(call, endpoint2.info.mock_calls +
- endpoint3.info.mock_calls)
diff --git a/tests/notify/test_log_handler.py b/tests/notify/test_log_handler.py
deleted file mode 100644
index 3adc572..0000000
--- a/tests/notify/test_log_handler.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-
-from oslo import messaging
-from oslo.messaging.notify import log_handler
-from oslo_messaging.tests.notify import test_notifier
-from oslo_messaging.tests import utils as test_utils
-from six.moves import mock
-
-
-class PublishErrorsHandlerTestCase(test_utils.BaseTestCase):
- """Tests for log.PublishErrorsHandler"""
- def setUp(self):
- super(PublishErrorsHandlerTestCase, self).setUp()
- self.publisherrorshandler = (log_handler.
- PublishErrorsHandler(logging.ERROR))
-
- def test_emit_cfg_log_notifier_in_notifier_drivers(self):
- drivers = ['messaging', 'log']
- self.config(notification_driver=drivers)
- self.stub_flg = True
-
- transport = test_notifier._FakeTransport(self.conf)
- notifier = messaging.Notifier(transport)
-
- def fake_notifier(*args, **kwargs):
- self.stub_flg = False
-
- self.stubs.Set(notifier, 'error', fake_notifier)
-
- logrecord = logging.LogRecord(name='name', level='WARN',
- pathname='/tmp', lineno=1, msg='Message',
- args=None, exc_info=None)
- self.publisherrorshandler.emit(logrecord)
- self.assertTrue(self.stub_flg)
-
- @mock.patch('oslo_messaging.notify.notifier.Notifier._notify')
- def test_emit_notification(self, mock_notify):
- logrecord = logging.LogRecord(name='name', level='ERROR',
- pathname='/tmp', lineno=1, msg='Message',
- args=None, exc_info=None)
- self.publisherrorshandler.emit(logrecord)
- self.assertEqual('error.publisher',
- self.publisherrorshandler._notifier.publisher_id)
- mock_notify.assert_called_with({}, 'error_notification',
- {'error': 'Message'}, 'ERROR')
diff --git a/tests/notify/test_logger.py b/tests/notify/test_logger.py
deleted file mode 100644
index 06ad820..0000000
--- a/tests/notify/test_logger.py
+++ /dev/null
@@ -1,157 +0,0 @@
-# Copyright 2013 eNovance
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import logging
-import logging.config
-import os
-import sys
-
-from oslo_utils import timeutils
-import testscenarios
-import testtools
-
-from oslo import messaging
-import oslo_messaging
-from oslo_messaging.tests.notify import test_notifier
-from oslo_messaging.tests import utils as test_utils
-from six.moves import mock
-
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-# Stolen from openstack.common.logging
-logging.AUDIT = logging.INFO + 1
-logging.addLevelName(logging.AUDIT, 'AUDIT')
-
-
-class TestLogNotifier(test_utils.BaseTestCase):
-
- scenarios = [
- ('debug', dict(priority='debug')),
- ('info', dict(priority='info')),
- ('warning', dict(priority='warning', queue='WARN')),
- ('warn', dict(priority='warn')),
- ('error', dict(priority='error')),
- ('critical', dict(priority='critical')),
- ('audit', dict(priority='audit')),
- ]
-
- def setUp(self):
- super(TestLogNotifier, self).setUp()
- self.addCleanup(oslo_messaging.notify._impl_test.reset)
- self.config(notification_driver=['test'])
- # NOTE(jamespage) disable thread information logging for testing
- # as this causes test failures when zmq tests monkey_patch via
- # eventlet
- logging.logThreads = 0
-
- @mock.patch('oslo_utils.timeutils.utcnow')
- def test_logger(self, mock_utcnow):
- with mock.patch('oslo_messaging.transport.get_transport',
- return_value=test_notifier._FakeTransport(self.conf)):
- self.logger = messaging.LoggingNotificationHandler('test://')
-
- mock_utcnow.return_value = datetime.datetime.utcnow()
-
- levelno = getattr(logging, self.priority.upper(), 42)
-
- record = logging.LogRecord('foo',
- levelno,
- '/foo/bar',
- 42,
- 'Something happened',
- None,
- None)
-
- self.logger.emit(record)
-
- context = oslo_messaging.notify._impl_test.NOTIFICATIONS[0][0]
- self.assertEqual({}, context)
-
- n = oslo_messaging.notify._impl_test.NOTIFICATIONS[0][1]
- self.assertEqual(getattr(self, 'queue', self.priority.upper()),
- n['priority'])
- self.assertEqual('logrecord', n['event_type'])
- self.assertEqual(str(timeutils.utcnow()), n['timestamp'])
- self.assertEqual(None, n['publisher_id'])
- self.assertEqual(
- {'process': os.getpid(),
- 'funcName': None,
- 'name': 'foo',
- 'thread': None,
- 'levelno': levelno,
- 'processName': 'MainProcess',
- 'pathname': '/foo/bar',
- 'lineno': 42,
- 'msg': 'Something happened',
- 'exc_info': None,
- 'levelname': logging.getLevelName(levelno),
- 'extra': None},
- n['payload'])
-
- @testtools.skipUnless(hasattr(logging.config, 'dictConfig'),
- "Need logging.config.dictConfig (Python >= 2.7)")
- @mock.patch('oslo_utils.timeutils.utcnow')
- def test_logging_conf(self, mock_utcnow):
- with mock.patch('oslo_messaging.transport.get_transport',
- return_value=test_notifier._FakeTransport(self.conf)):
- logging.config.dictConfig({
- 'version': 1,
- 'handlers': {
- 'notification': {
- 'class': 'oslo.messaging.LoggingNotificationHandler',
- 'level': self.priority.upper(),
- 'url': 'test://',
- },
- },
- 'loggers': {
- 'default': {
- 'handlers': ['notification'],
- 'level': self.priority.upper(),
- },
- },
- })
-
- mock_utcnow.return_value = datetime.datetime.utcnow()
-
- levelno = getattr(logging, self.priority.upper())
-
- logger = logging.getLogger('default')
- lineno = sys._getframe().f_lineno + 1
- logger.log(levelno, 'foobar')
-
- n = oslo_messaging.notify._impl_test.NOTIFICATIONS[0][1]
- self.assertEqual(getattr(self, 'queue', self.priority.upper()),
- n['priority'])
- self.assertEqual('logrecord', n['event_type'])
- self.assertEqual(str(timeutils.utcnow()), n['timestamp'])
- self.assertEqual(None, n['publisher_id'])
- pathname = __file__
- if pathname.endswith(('.pyc', '.pyo')):
- pathname = pathname[:-1]
- self.assertDictEqual(
- n['payload'],
- {'process': os.getpid(),
- 'funcName': 'test_logging_conf',
- 'name': 'default',
- 'thread': None,
- 'levelno': levelno,
- 'processName': 'MainProcess',
- 'pathname': pathname,
- 'lineno': lineno,
- 'msg': 'foobar',
- 'exc_info': None,
- 'levelname': logging.getLevelName(levelno),
- 'extra': None})
diff --git a/tests/notify/test_middleware.py b/tests/notify/test_middleware.py
deleted file mode 100644
index ed81cb0..0000000
--- a/tests/notify/test_middleware.py
+++ /dev/null
@@ -1,190 +0,0 @@
-# Copyright 2013-2014 eNovance
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import uuid
-
-import webob
-
-from oslo.messaging.notify import middleware
-from oslo_messaging.tests import utils
-from six.moves import mock
-
-
-class FakeApp(object):
- def __call__(self, env, start_response):
- body = 'Some response'
- start_response('200 OK', [
- ('Content-Type', 'text/plain'),
- ('Content-Length', str(sum(map(len, body))))
- ])
- return [body]
-
-
-class FakeFailingApp(object):
- def __call__(self, env, start_response):
- raise Exception("It happens!")
-
-
-class NotifierMiddlewareTest(utils.BaseTestCase):
-
- def test_notification(self):
- m = middleware.RequestNotifier(FakeApp())
- req = webob.Request.blank('/foo/bar',
- environ={'REQUEST_METHOD': 'GET',
- 'HTTP_X_AUTH_TOKEN': uuid.uuid4()})
- with mock.patch(
- 'oslo.messaging.notify.notifier.Notifier._notify') as notify:
- m(req)
- # Check first notification with only 'request'
- call_args = notify.call_args_list[0][0]
- self.assertEqual(call_args[1], 'http.request')
- self.assertEqual(call_args[3], 'INFO')
- self.assertEqual(set(call_args[2].keys()),
- set(['request']))
-
- request = call_args[2]['request']
- self.assertEqual(request['PATH_INFO'], '/foo/bar')
- self.assertEqual(request['REQUEST_METHOD'], 'GET')
- self.assertIn('HTTP_X_SERVICE_NAME', request)
- self.assertNotIn('HTTP_X_AUTH_TOKEN', request)
- self.assertFalse(any(map(lambda s: s.startswith('wsgi.'),
- request.keys())),
- "WSGI fields are filtered out")
-
- # Check second notification with request + response
- call_args = notify.call_args_list[1][0]
- self.assertEqual(call_args[1], 'http.response')
- self.assertEqual(call_args[3], 'INFO')
- self.assertEqual(set(call_args[2].keys()),
- set(['request', 'response']))
-
- request = call_args[2]['request']
- self.assertEqual(request['PATH_INFO'], '/foo/bar')
- self.assertEqual(request['REQUEST_METHOD'], 'GET')
- self.assertIn('HTTP_X_SERVICE_NAME', request)
- self.assertNotIn('HTTP_X_AUTH_TOKEN', request)
- self.assertFalse(any(map(lambda s: s.startswith('wsgi.'),
- request.keys())),
- "WSGI fields are filtered out")
-
- response = call_args[2]['response']
- self.assertEqual(response['status'], '200 OK')
- self.assertEqual(response['headers']['content-length'], '13')
-
- def test_notification_response_failure(self):
- m = middleware.RequestNotifier(FakeFailingApp())
- req = webob.Request.blank('/foo/bar',
- environ={'REQUEST_METHOD': 'GET',
- 'HTTP_X_AUTH_TOKEN': uuid.uuid4()})
- with mock.patch(
- 'oslo.messaging.notify.notifier.Notifier._notify') as notify:
- try:
- m(req)
- self.fail("Application exception has not been re-raised")
- except Exception:
- pass
- # Check first notification with only 'request'
- call_args = notify.call_args_list[0][0]
- self.assertEqual(call_args[1], 'http.request')
- self.assertEqual(call_args[3], 'INFO')
- self.assertEqual(set(call_args[2].keys()),
- set(['request']))
-
- request = call_args[2]['request']
- self.assertEqual(request['PATH_INFO'], '/foo/bar')
- self.assertEqual(request['REQUEST_METHOD'], 'GET')
- self.assertIn('HTTP_X_SERVICE_NAME', request)
- self.assertNotIn('HTTP_X_AUTH_TOKEN', request)
- self.assertFalse(any(map(lambda s: s.startswith('wsgi.'),
- request.keys())),
- "WSGI fields are filtered out")
-
- # Check second notification with 'request' and 'exception'
- call_args = notify.call_args_list[1][0]
- self.assertEqual(call_args[1], 'http.response')
- self.assertEqual(call_args[3], 'INFO')
- self.assertEqual(set(call_args[2].keys()),
- set(['request', 'exception']))
-
- request = call_args[2]['request']
- self.assertEqual(request['PATH_INFO'], '/foo/bar')
- self.assertEqual(request['REQUEST_METHOD'], 'GET')
- self.assertIn('HTTP_X_SERVICE_NAME', request)
- self.assertNotIn('HTTP_X_AUTH_TOKEN', request)
- self.assertFalse(any(map(lambda s: s.startswith('wsgi.'),
- request.keys())),
- "WSGI fields are filtered out")
-
- exception = call_args[2]['exception']
- self.assertIn('middleware.py', exception['traceback'][0])
- self.assertIn('It happens!', exception['traceback'][-1])
- self.assertEqual(exception['value'], "Exception('It happens!',)")
-
- def test_process_request_fail(self):
- def notify_error(context, publisher_id, event_type,
- priority, payload):
- raise Exception('error')
- with mock.patch('oslo.messaging.notify.notifier.Notifier._notify',
- notify_error):
- m = middleware.RequestNotifier(FakeApp())
- req = webob.Request.blank('/foo/bar',
- environ={'REQUEST_METHOD': 'GET'})
- m.process_request(req)
-
- def test_process_response_fail(self):
- def notify_error(context, publisher_id, event_type,
- priority, payload):
- raise Exception('error')
- with mock.patch('oslo.messaging.notify.notifier.Notifier._notify',
- notify_error):
- m = middleware.RequestNotifier(FakeApp())
- req = webob.Request.blank('/foo/bar',
- environ={'REQUEST_METHOD': 'GET'})
- m.process_response(req, webob.response.Response())
-
- def test_ignore_req_opt(self):
- m = middleware.RequestNotifier(FakeApp(),
- ignore_req_list='get, PUT')
- req = webob.Request.blank('/skip/foo',
- environ={'REQUEST_METHOD': 'GET'})
- req1 = webob.Request.blank('/skip/foo',
- environ={'REQUEST_METHOD': 'PUT'})
- req2 = webob.Request.blank('/accept/foo',
- environ={'REQUEST_METHOD': 'POST'})
- with mock.patch(
- 'oslo.messaging.notify.notifier.Notifier._notify') as notify:
- # Check GET request does not send notification
- m(req)
- m(req1)
- self.assertEqual(len(notify.call_args_list), 0)
-
- # Check non-GET request does send notification
- m(req2)
- self.assertEqual(len(notify.call_args_list), 2)
- call_args = notify.call_args_list[0][0]
- self.assertEqual(call_args[1], 'http.request')
- self.assertEqual(call_args[3], 'INFO')
- self.assertEqual(set(call_args[2].keys()),
- set(['request']))
-
- request = call_args[2]['request']
- self.assertEqual(request['PATH_INFO'], '/accept/foo')
- self.assertEqual(request['REQUEST_METHOD'], 'POST')
-
- call_args = notify.call_args_list[1][0]
- self.assertEqual(call_args[1], 'http.response')
- self.assertEqual(call_args[3], 'INFO')
- self.assertEqual(set(call_args[2].keys()),
- set(['request', 'response']))
diff --git a/tests/notify/test_notifier.py b/tests/notify/test_notifier.py
deleted file mode 100644
index 9cc8ec0..0000000
--- a/tests/notify/test_notifier.py
+++ /dev/null
@@ -1,540 +0,0 @@
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import logging
-import sys
-import uuid
-
-import fixtures
-from oslo_serialization import jsonutils
-from oslo_utils import timeutils
-from stevedore import dispatch
-from stevedore import extension
-import testscenarios
-import yaml
-
-from oslo import messaging
-from oslo.messaging.notify import notifier as msg_notifier
-from oslo.messaging import serializer as msg_serializer
-from oslo_messaging.notify import _impl_log
-from oslo_messaging.notify import _impl_messaging
-from oslo_messaging.notify import _impl_test
-from oslo_messaging.tests import utils as test_utils
-from six.moves import mock
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class _FakeTransport(object):
-
- def __init__(self, conf):
- self.conf = conf
-
- def _send_notification(self, target, ctxt, message, version, retry=None):
- pass
-
-
-class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
-
- """Record logged exceptions and re-raise in cleanup.
-
- The notifier just logs notification send errors so, for the sake of
- debugging test failures, we record any exceptions logged and re-raise them
- during cleanup.
- """
-
- class FakeLogger(object):
-
- def __init__(self):
- self.exceptions = []
-
- def exception(self, msg, *args, **kwargs):
- self.exceptions.append(sys.exc_info()[1])
-
- def setUp(self):
- super(_ReRaiseLoggedExceptionsFixture, self).setUp()
-
- self.logger = self.FakeLogger()
-
- def reraise_exceptions():
- for ex in self.logger.exceptions:
- raise ex
-
- self.addCleanup(reraise_exceptions)
-
-
-class TestMessagingNotifier(test_utils.BaseTestCase):
-
- _v1 = [
- ('v1', dict(v1=True)),
- ('not_v1', dict(v1=False)),
- ]
-
- _v2 = [
- ('v2', dict(v2=True)),
- ('not_v2', dict(v2=False)),
- ]
-
- _publisher_id = [
- ('ctor_pub_id', dict(ctor_pub_id='test',
- expected_pub_id='test')),
- ('prep_pub_id', dict(prep_pub_id='test.localhost',
- expected_pub_id='test.localhost')),
- ('override', dict(ctor_pub_id='test',
- prep_pub_id='test.localhost',
- expected_pub_id='test.localhost')),
- ]
-
- _topics = [
- ('no_topics', dict(topics=[])),
- ('single_topic', dict(topics=['notifications'])),
- ('multiple_topic2', dict(topics=['foo', 'bar'])),
- ]
-
- _priority = [
- ('audit', dict(priority='audit')),
- ('debug', dict(priority='debug')),
- ('info', dict(priority='info')),
- ('warn', dict(priority='warn')),
- ('error', dict(priority='error')),
- ('sample', dict(priority='sample')),
- ('critical', dict(priority='critical')),
- ]
-
- _payload = [
- ('payload', dict(payload={'foo': 'bar'})),
- ]
-
- _context = [
- ('ctxt', dict(ctxt={'user': 'bob'})),
- ]
-
- _retry = [
- ('unconfigured', dict()),
- ('None', dict(retry=None)),
- ('0', dict(retry=0)),
- ('5', dict(retry=5)),
- ]
-
- @classmethod
- def generate_scenarios(cls):
- cls.scenarios = testscenarios.multiply_scenarios(cls._v1,
- cls._v2,
- cls._publisher_id,
- cls._topics,
- cls._priority,
- cls._payload,
- cls._context,
- cls._retry)
-
- def setUp(self):
- super(TestMessagingNotifier, self).setUp()
-
- self.logger = self.useFixture(_ReRaiseLoggedExceptionsFixture()).logger
- self.stubs.Set(_impl_messaging, 'LOG', self.logger)
- self.stubs.Set(msg_notifier, '_LOG', self.logger)
-
- @mock.patch('oslo_utils.timeutils.utcnow')
- def test_notifier(self, mock_utcnow):
- drivers = []
- if self.v1:
- drivers.append('messaging')
- if self.v2:
- drivers.append('messagingv2')
-
- self.config(notification_driver=drivers,
- notification_topics=self.topics)
-
- transport = _FakeTransport(self.conf)
-
- if hasattr(self, 'ctor_pub_id'):
- notifier = messaging.Notifier(transport,
- publisher_id=self.ctor_pub_id)
- else:
- notifier = messaging.Notifier(transport)
-
- prepare_kwds = {}
- if hasattr(self, 'retry'):
- prepare_kwds['retry'] = self.retry
- if hasattr(self, 'prep_pub_id'):
- prepare_kwds['publisher_id'] = self.prep_pub_id
- if prepare_kwds:
- notifier = notifier.prepare(**prepare_kwds)
-
- self.mox.StubOutWithMock(transport, '_send_notification')
-
- message_id = uuid.uuid4()
- self.mox.StubOutWithMock(uuid, 'uuid4')
- uuid.uuid4().AndReturn(message_id)
-
- mock_utcnow.return_value = datetime.datetime.utcnow()
-
- message = {
- 'message_id': str(message_id),
- 'publisher_id': self.expected_pub_id,
- 'event_type': 'test.notify',
- 'priority': self.priority.upper(),
- 'payload': self.payload,
- 'timestamp': str(timeutils.utcnow()),
- }
-
- sends = []
- if self.v1:
- sends.append(dict(version=1.0))
- if self.v2:
- sends.append(dict(version=2.0))
-
- for send_kwargs in sends:
- for topic in self.topics:
- if hasattr(self, 'retry'):
- send_kwargs['retry'] = self.retry
- else:
- send_kwargs['retry'] = None
- target = messaging.Target(topic='%s.%s' % (topic,
- self.priority))
- transport._send_notification(target, self.ctxt, message,
- **send_kwargs).InAnyOrder()
-
- self.mox.ReplayAll()
-
- method = getattr(notifier, self.priority)
- method(self.ctxt, 'test.notify', self.payload)
-
-
-TestMessagingNotifier.generate_scenarios()
-
-
-class TestSerializer(test_utils.BaseTestCase):
-
- def setUp(self):
- super(TestSerializer, self).setUp()
- self.addCleanup(_impl_test.reset)
-
- @mock.patch('oslo_utils.timeutils.utcnow')
- def test_serializer(self, mock_utcnow):
- transport = _FakeTransport(self.conf)
-
- serializer = msg_serializer.NoOpSerializer()
-
- notifier = messaging.Notifier(transport,
- 'test.localhost',
- driver='test',
- topic='test',
- serializer=serializer)
-
- message_id = uuid.uuid4()
- self.mox.StubOutWithMock(uuid, 'uuid4')
- uuid.uuid4().AndReturn(message_id)
-
- mock_utcnow.return_value = datetime.datetime.utcnow()
-
- self.mox.StubOutWithMock(serializer, 'serialize_context')
- self.mox.StubOutWithMock(serializer, 'serialize_entity')
- serializer.serialize_context(dict(user='bob')).\
- AndReturn(dict(user='alice'))
- serializer.serialize_entity(dict(user='bob'), 'bar').AndReturn('sbar')
-
- self.mox.ReplayAll()
-
- notifier.info(dict(user='bob'), 'test.notify', 'bar')
-
- message = {
- 'message_id': str(message_id),
- 'publisher_id': 'test.localhost',
- 'event_type': 'test.notify',
- 'priority': 'INFO',
- 'payload': 'sbar',
- 'timestamp': str(timeutils.utcnow()),
- }
-
- self.assertEqual([(dict(user='alice'), message, 'INFO', None)],
- _impl_test.NOTIFICATIONS)
-
-
-class TestLogNotifier(test_utils.BaseTestCase):
-
- @mock.patch('oslo_utils.timeutils.utcnow')
- def test_notifier(self, mock_utcnow):
- self.config(notification_driver=['log'])
-
- transport = _FakeTransport(self.conf)
-
- notifier = messaging.Notifier(transport, 'test.localhost')
-
- message_id = uuid.uuid4()
- self.mox.StubOutWithMock(uuid, 'uuid4')
- uuid.uuid4().AndReturn(message_id)
-
- mock_utcnow.return_value = datetime.datetime.utcnow()
-
- message = {
- 'message_id': str(message_id),
- 'publisher_id': 'test.localhost',
- 'event_type': 'test.notify',
- 'priority': 'INFO',
- 'payload': 'bar',
- 'timestamp': str(timeutils.utcnow()),
- }
-
- logger = self.mox.CreateMockAnything()
-
- self.mox.StubOutWithMock(logging, 'getLogger')
- logging.getLogger('oslo.messaging.notification.test.notify').\
- AndReturn(logger)
-
- logger.info(jsonutils.dumps(message))
-
- self.mox.ReplayAll()
-
- notifier.info({}, 'test.notify', 'bar')
-
- def test_sample_priority(self):
- # Ensure logger drops sample-level notifications.
- driver = _impl_log.LogDriver(None, None, None)
-
- logger = self.mox.CreateMock(
- logging.getLogger('oslo.messaging.notification.foo'))
- logger.sample = None
- self.mox.StubOutWithMock(logging, 'getLogger')
- logging.getLogger('oslo.messaging.notification.foo').\
- AndReturn(logger)
-
- self.mox.ReplayAll()
-
- msg = {'event_type': 'foo'}
- driver.notify(None, msg, "sample", None)
-
-
-class TestRoutingNotifier(test_utils.BaseTestCase):
- def setUp(self):
- super(TestRoutingNotifier, self).setUp()
- self.config(notification_driver=['routing'])
-
- transport = _FakeTransport(self.conf)
- self.notifier = messaging.Notifier(transport)
- self.router = self.notifier._driver_mgr['routing'].obj
-
- def _fake_extension_manager(self, ext):
- return extension.ExtensionManager.make_test_instance(
- [extension.Extension('test', None, None, ext), ])
-
- def _empty_extension_manager(self):
- return extension.ExtensionManager.make_test_instance([])
-
- def test_should_load_plugin(self):
- self.router.used_drivers = set(["zoo", "blah"])
- ext = mock.MagicMock()
- ext.name = "foo"
- self.assertFalse(self.router._should_load_plugin(ext))
- ext.name = "zoo"
- self.assertTrue(self.router._should_load_plugin(ext))
-
- def test_load_notifiers_no_config(self):
- # default routing_notifier_config=""
- self.router._load_notifiers()
- self.assertEqual({}, self.router.routing_groups)
- self.assertEqual(0, len(self.router.used_drivers))
-
- def test_load_notifiers_no_extensions(self):
- self.config(routing_notifier_config="routing_notifier.yaml")
- routing_config = r""
- config_file = mock.MagicMock()
- config_file.return_value = routing_config
-
- with mock.patch.object(self.router, '_get_notifier_config_file',
- config_file):
- with mock.patch('stevedore.dispatch.DispatchExtensionManager',
- return_value=self._empty_extension_manager()):
- with mock.patch('oslo_messaging.notify.'
- '_impl_routing.LOG') as mylog:
- self.router._load_notifiers()
- self.assertFalse(mylog.debug.called)
- self.assertEqual({}, self.router.routing_groups)
-
- def test_load_notifiers_config(self):
- self.config(routing_notifier_config="routing_notifier.yaml")
- routing_config = r"""
-group_1:
- rpc : foo
-group_2:
- rpc : blah
- """
-
- config_file = mock.MagicMock()
- config_file.return_value = routing_config
-
- with mock.patch.object(self.router, '_get_notifier_config_file',
- config_file):
- with mock.patch('stevedore.dispatch.DispatchExtensionManager',
- return_value=self._fake_extension_manager(
- mock.MagicMock())):
- self.router._load_notifiers()
- groups = list(self.router.routing_groups.keys())
- groups.sort()
- self.assertEqual(['group_1', 'group_2'], groups)
-
- def test_get_drivers_for_message_accepted_events(self):
- config = r"""
-group_1:
- rpc:
- accepted_events:
- - foo.*
- - blah.zoo.*
- - zip
- """
- groups = yaml.load(config)
- group = groups['group_1']
-
- # No matching event ...
- self.assertEqual([],
- self.router._get_drivers_for_message(
- group, "unknown", "info"))
-
- # Child of foo ...
- self.assertEqual(['rpc'],
- self.router._get_drivers_for_message(
- group, "foo.1", "info"))
-
- # Foo itself ...
- self.assertEqual([],
- self.router._get_drivers_for_message(
- group, "foo", "info"))
-
- # Child of blah.zoo
- self.assertEqual(['rpc'],
- self.router._get_drivers_for_message(
- group, "blah.zoo.zing", "info"))
-
- def test_get_drivers_for_message_accepted_priorities(self):
- config = r"""
-group_1:
- rpc:
- accepted_priorities:
- - info
- - error
- """
- groups = yaml.load(config)
- group = groups['group_1']
-
- # No matching priority
- self.assertEqual([],
- self.router._get_drivers_for_message(
- group, None, "unknown"))
-
- # Info ...
- self.assertEqual(['rpc'],
- self.router._get_drivers_for_message(
- group, None, "info"))
-
- # Error (to make sure the list is getting processed) ...
- self.assertEqual(['rpc'],
- self.router._get_drivers_for_message(
- group, None, "error"))
-
- def test_get_drivers_for_message_both(self):
- config = r"""
-group_1:
- rpc:
- accepted_priorities:
- - info
- accepted_events:
- - foo.*
- driver_1:
- accepted_priorities:
- - info
- driver_2:
- accepted_events:
- - foo.*
- """
- groups = yaml.load(config)
- group = groups['group_1']
-
- # Valid event, but no matching priority
- self.assertEqual(['driver_2'],
- self.router._get_drivers_for_message(
- group, 'foo.blah', "unknown"))
-
- # Valid priority, but no matching event
- self.assertEqual(['driver_1'],
- self.router._get_drivers_for_message(
- group, 'unknown', "info"))
-
- # Happy day ...
- x = self.router._get_drivers_for_message(group, 'foo.blah', "info")
- x.sort()
- self.assertEqual(['driver_1', 'driver_2', 'rpc'], x)
-
- def test_filter_func(self):
- ext = mock.MagicMock()
- ext.name = "rpc"
-
- # Good ...
- self.assertTrue(self.router._filter_func(ext, {}, {}, 'info',
- None, ['foo', 'rpc']))
-
- # Bad
- self.assertFalse(self.router._filter_func(ext, {}, {}, 'info',
- None, ['foo']))
-
- def test_notify(self):
- self.router.routing_groups = {'group_1': None, 'group_2': None}
- drivers_mock = mock.MagicMock()
- drivers_mock.side_effect = [['rpc'], ['foo']]
-
- with mock.patch.object(self.router, 'plugin_manager') as pm:
- with mock.patch.object(self.router, '_get_drivers_for_message',
- drivers_mock):
- self.notifier.info({}, 'my_event', {})
- self.assertEqual(sorted(['rpc', 'foo']),
- sorted(pm.map.call_args[0][6]))
-
- def test_notify_filtered(self):
- self.config(routing_notifier_config="routing_notifier.yaml")
- routing_config = r"""
-group_1:
- rpc:
- accepted_events:
- - my_event
- rpc2:
- accepted_priorities:
- - info
- bar:
- accepted_events:
- - nothing
- """
- config_file = mock.MagicMock()
- config_file.return_value = routing_config
-
- rpc_driver = mock.Mock()
- rpc2_driver = mock.Mock()
- bar_driver = mock.Mock()
-
- pm = dispatch.DispatchExtensionManager.make_test_instance(
- [extension.Extension('rpc', None, None, rpc_driver),
- extension.Extension('rpc2', None, None, rpc2_driver),
- extension.Extension('bar', None, None, bar_driver)],
- )
-
- with mock.patch.object(self.router, '_get_notifier_config_file',
- config_file):
- with mock.patch('stevedore.dispatch.DispatchExtensionManager',
- return_value=pm):
- self.notifier.info({}, 'my_event', {})
- self.assertFalse(bar_driver.info.called)
- rpc_driver.notify.assert_called_once_with(
- {}, mock.ANY, 'INFO', None)
- rpc2_driver.notify.assert_called_once_with(
- {}, mock.ANY, 'INFO', None)
diff --git a/tests/rpc/__init__.py b/tests/rpc/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/tests/rpc/__init__.py
+++ /dev/null
diff --git a/tests/rpc/test_client.py b/tests/rpc/test_client.py
deleted file mode 100644
index 65c4f67..0000000
--- a/tests/rpc/test_client.py
+++ /dev/null
@@ -1,519 +0,0 @@
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import testscenarios
-
-from oslo import messaging
-from oslo.messaging import exceptions
-from oslo.messaging import serializer as msg_serializer
-from oslo_config import cfg
-from oslo_messaging.tests import utils as test_utils
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class _FakeTransport(object):
-
- def __init__(self, conf):
- self.conf = conf
-
- def _send(self, *args, **kwargs):
- pass
-
-
-class TestCastCall(test_utils.BaseTestCase):
-
- scenarios = [
- ('cast_no_ctxt_no_args', dict(call=False, ctxt={}, args={})),
- ('call_no_ctxt_no_args', dict(call=True, ctxt={}, args={})),
- ('cast_ctxt_and_args',
- dict(call=False,
- ctxt=dict(user='testuser', project='testtenant'),
- args=dict(bar='blaa', foobar=11.01))),
- ('call_ctxt_and_args',
- dict(call=True,
- ctxt=dict(user='testuser', project='testtenant'),
- args=dict(bar='blaa', foobar=11.01))),
- ]
-
- def test_cast_call(self):
- self.config(rpc_response_timeout=None)
-
- transport = _FakeTransport(self.conf)
- client = messaging.RPCClient(transport, messaging.Target())
-
- self.mox.StubOutWithMock(transport, '_send')
-
- msg = dict(method='foo', args=self.args)
- kwargs = {'retry': None}
- if self.call:
- kwargs['wait_for_reply'] = True
- kwargs['timeout'] = None
-
- transport._send(messaging.Target(), self.ctxt, msg, **kwargs)
- self.mox.ReplayAll()
-
- method = client.call if self.call else client.cast
- method(self.ctxt, 'foo', **self.args)
-
-
-class TestCastToTarget(test_utils.BaseTestCase):
-
- _base = [
- ('all_none', dict(ctor={}, prepare={}, expect={})),
- ('ctor_exchange',
- dict(ctor=dict(exchange='testexchange'),
- prepare={},
- expect=dict(exchange='testexchange'))),
- ('prepare_exchange',
- dict(ctor={},
- prepare=dict(exchange='testexchange'),
- expect=dict(exchange='testexchange'))),
- ('prepare_exchange_none',
- dict(ctor=dict(exchange='testexchange'),
- prepare=dict(exchange=None),
- expect={})),
- ('both_exchange',
- dict(ctor=dict(exchange='ctorexchange'),
- prepare=dict(exchange='testexchange'),
- expect=dict(exchange='testexchange'))),
- ('ctor_topic',
- dict(ctor=dict(topic='testtopic'),
- prepare={},
- expect=dict(topic='testtopic'))),
- ('prepare_topic',
- dict(ctor={},
- prepare=dict(topic='testtopic'),
- expect=dict(topic='testtopic'))),
- ('prepare_topic_none',
- dict(ctor=dict(topic='testtopic'),
- prepare=dict(topic=None),
- expect={})),
- ('both_topic',
- dict(ctor=dict(topic='ctortopic'),
- prepare=dict(topic='testtopic'),
- expect=dict(topic='testtopic'))),
- ('ctor_namespace',
- dict(ctor=dict(namespace='testnamespace'),
- prepare={},
- expect=dict(namespace='testnamespace'))),
- ('prepare_namespace',
- dict(ctor={},
- prepare=dict(namespace='testnamespace'),
- expect=dict(namespace='testnamespace'))),
- ('prepare_namespace_none',
- dict(ctor=dict(namespace='testnamespace'),
- prepare=dict(namespace=None),
- expect={})),
- ('both_namespace',
- dict(ctor=dict(namespace='ctornamespace'),
- prepare=dict(namespace='testnamespace'),
- expect=dict(namespace='testnamespace'))),
- ('ctor_version',
- dict(ctor=dict(version='testversion'),
- prepare={},
- expect=dict(version='testversion'))),
- ('prepare_version',
- dict(ctor={},
- prepare=dict(version='testversion'),
- expect=dict(version='testversion'))),
- ('prepare_version_none',
- dict(ctor=dict(version='testversion'),
- prepare=dict(version=None),
- expect={})),
- ('both_version',
- dict(ctor=dict(version='ctorversion'),
- prepare=dict(version='testversion'),
- expect=dict(version='testversion'))),
- ('ctor_server',
- dict(ctor=dict(server='testserver'),
- prepare={},
- expect=dict(server='testserver'))),
- ('prepare_server',
- dict(ctor={},
- prepare=dict(server='testserver'),
- expect=dict(server='testserver'))),
- ('prepare_server_none',
- dict(ctor=dict(server='testserver'),
- prepare=dict(server=None),
- expect={})),
- ('both_server',
- dict(ctor=dict(server='ctorserver'),
- prepare=dict(server='testserver'),
- expect=dict(server='testserver'))),
- ('ctor_fanout',
- dict(ctor=dict(fanout=True),
- prepare={},
- expect=dict(fanout=True))),
- ('prepare_fanout',
- dict(ctor={},
- prepare=dict(fanout=True),
- expect=dict(fanout=True))),
- ('prepare_fanout_none',
- dict(ctor=dict(fanout=True),
- prepare=dict(fanout=None),
- expect={})),
- ('both_fanout',
- dict(ctor=dict(fanout=True),
- prepare=dict(fanout=False),
- expect=dict(fanout=False))),
- ]
-
- _prepare = [
- ('single_prepare', dict(double_prepare=False)),
- ('double_prepare', dict(double_prepare=True)),
- ]
-
- @classmethod
- def generate_scenarios(cls):
- cls.scenarios = testscenarios.multiply_scenarios(cls._base,
- cls._prepare)
-
- def setUp(self):
- super(TestCastToTarget, self).setUp(conf=cfg.ConfigOpts())
-
- def test_cast_to_target(self):
- target = messaging.Target(**self.ctor)
- expect_target = messaging.Target(**self.expect)
-
- transport = _FakeTransport(self.conf)
- client = messaging.RPCClient(transport, target)
-
- self.mox.StubOutWithMock(transport, '_send')
-
- msg = dict(method='foo', args={})
- if 'namespace' in self.expect:
- msg['namespace'] = self.expect['namespace']
- if 'version' in self.expect:
- msg['version'] = self.expect['version']
- transport._send(expect_target, {}, msg, retry=None)
-
- self.mox.ReplayAll()
-
- if self.prepare:
- client = client.prepare(**self.prepare)
- if self.double_prepare:
- client = client.prepare(**self.prepare)
- client.cast({}, 'foo')
-
-
-TestCastToTarget.generate_scenarios()
-
-
-_notset = object()
-
-
-class TestCallTimeout(test_utils.BaseTestCase):
-
- scenarios = [
- ('all_none',
- dict(confval=None, ctor=None, prepare=_notset, expect=None)),
- ('confval',
- dict(confval=21.1, ctor=None, prepare=_notset, expect=21.1)),
- ('ctor',
- dict(confval=None, ctor=21.1, prepare=_notset, expect=21.1)),
- ('ctor_zero',
- dict(confval=None, ctor=0, prepare=_notset, expect=0)),
- ('prepare',
- dict(confval=None, ctor=None, prepare=21.1, expect=21.1)),
- ('prepare_override',
- dict(confval=None, ctor=10.1, prepare=21.1, expect=21.1)),
- ('prepare_zero',
- dict(confval=None, ctor=None, prepare=0, expect=0)),
- ]
-
- def test_call_timeout(self):
- self.config(rpc_response_timeout=self.confval)
-
- transport = _FakeTransport(self.conf)
- client = messaging.RPCClient(transport, messaging.Target(),
- timeout=self.ctor)
-
- self.mox.StubOutWithMock(transport, '_send')
-
- msg = dict(method='foo', args={})
- kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None)
- transport._send(messaging.Target(), {}, msg, **kwargs)
-
- self.mox.ReplayAll()
-
- if self.prepare is not _notset:
- client = client.prepare(timeout=self.prepare)
- client.call({}, 'foo')
-
-
-class TestCallRetry(test_utils.BaseTestCase):
-
- scenarios = [
- ('all_none', dict(ctor=None, prepare=_notset, expect=None)),
- ('ctor', dict(ctor=21, prepare=_notset, expect=21)),
- ('ctor_zero', dict(ctor=0, prepare=_notset, expect=0)),
- ('prepare', dict(ctor=None, prepare=21, expect=21)),
- ('prepare_override', dict(ctor=10, prepare=21, expect=21)),
- ('prepare_zero', dict(ctor=None, prepare=0, expect=0)),
- ]
-
- def test_call_retry(self):
- transport = _FakeTransport(self.conf)
- client = messaging.RPCClient(transport, messaging.Target(),
- retry=self.ctor)
-
- self.mox.StubOutWithMock(transport, '_send')
-
- msg = dict(method='foo', args={})
- kwargs = dict(wait_for_reply=True, timeout=60,
- retry=self.expect)
- transport._send(messaging.Target(), {}, msg, **kwargs)
-
- self.mox.ReplayAll()
-
- if self.prepare is not _notset:
- client = client.prepare(retry=self.prepare)
- client.call({}, 'foo')
-
-
-class TestCallFanout(test_utils.BaseTestCase):
-
- scenarios = [
- ('target', dict(prepare=_notset, target={'fanout': True})),
- ('prepare', dict(prepare={'fanout': True}, target={})),
- ('both', dict(prepare={'fanout': True}, target={'fanout': True})),
- ]
-
- def test_call_fanout(self):
- transport = _FakeTransport(self.conf)
- client = messaging.RPCClient(transport,
- messaging.Target(**self.target))
-
- if self.prepare is not _notset:
- client = client.prepare(**self.prepare)
-
- self.assertRaises(exceptions.InvalidTarget,
- client.call, {}, 'foo')
-
-
-class TestSerializer(test_utils.BaseTestCase):
-
- scenarios = [
- ('cast',
- dict(call=False,
- ctxt=dict(user='bob'),
- args=dict(a='a', b='b', c='c'),
- retval=None)),
- ('call',
- dict(call=True,
- ctxt=dict(user='bob'),
- args=dict(a='a', b='b', c='c'),
- retval='d')),
- ]
-
- def test_call_serializer(self):
- self.config(rpc_response_timeout=None)
-
- transport = _FakeTransport(self.conf)
- serializer = msg_serializer.NoOpSerializer()
-
- client = messaging.RPCClient(transport, messaging.Target(),
- serializer=serializer)
-
- self.mox.StubOutWithMock(transport, '_send')
-
- msg = dict(method='foo',
- args=dict([(k, 's' + v) for k, v in self.args.items()]))
- kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {}
- kwargs['retry'] = None
- transport._send(messaging.Target(),
- dict(user='alice'),
- msg,
- **kwargs).AndReturn(self.retval)
-
- self.mox.StubOutWithMock(serializer, 'serialize_entity')
- self.mox.StubOutWithMock(serializer, 'deserialize_entity')
- self.mox.StubOutWithMock(serializer, 'serialize_context')
-
- for arg in self.args:
- serializer.serialize_entity(self.ctxt, arg).AndReturn('s' + arg)
-
- if self.call:
- serializer.deserialize_entity(self.ctxt, self.retval).\
- AndReturn('d' + self.retval)
-
- serializer.serialize_context(self.ctxt).AndReturn(dict(user='alice'))
-
- self.mox.ReplayAll()
-
- method = client.call if self.call else client.cast
- retval = method(self.ctxt, 'foo', **self.args)
- if self.retval is not None:
- self.assertEqual('d' + self.retval, retval)
-
-
-class TestVersionCap(test_utils.BaseTestCase):
-
- _call_vs_cast = [
- ('call', dict(call=True)),
- ('cast', dict(call=False)),
- ]
-
- _cap_scenarios = [
- ('all_none',
- dict(cap=None, prepare_cap=_notset,
- version=None, prepare_version=_notset,
- success=True)),
- ('ctor_cap_ok',
- dict(cap='1.1', prepare_cap=_notset,
- version='1.0', prepare_version=_notset,
- success=True)),
- ('ctor_cap_override_ok',
- dict(cap='2.0', prepare_cap='1.1',
- version='1.0', prepare_version='1.0',
- success=True)),
- ('ctor_cap_override_none_ok',
- dict(cap='1.1', prepare_cap=None,
- version='1.0', prepare_version=_notset,
- success=True)),
- ('ctor_cap_minor_fail',
- dict(cap='1.0', prepare_cap=_notset,
- version='1.1', prepare_version=_notset,
- success=False)),
- ('ctor_cap_major_fail',
- dict(cap='2.0', prepare_cap=_notset,
- version=None, prepare_version='1.0',
- success=False)),
- ]
-
- @classmethod
- def generate_scenarios(cls):
- cls.scenarios = (
- testscenarios.multiply_scenarios(cls._call_vs_cast,
- cls._cap_scenarios))
-
- def test_version_cap(self):
- self.config(rpc_response_timeout=None)
-
- transport = _FakeTransport(self.conf)
-
- target = messaging.Target(version=self.version)
- client = messaging.RPCClient(transport, target,
- version_cap=self.cap)
-
- if self.success:
- self.mox.StubOutWithMock(transport, '_send')
-
- if self.prepare_version is not _notset:
- target = target(version=self.prepare_version)
-
- msg = dict(method='foo', args={})
- if target.version is not None:
- msg['version'] = target.version
-
- kwargs = {'retry': None}
- if self.call:
- kwargs['wait_for_reply'] = True
- kwargs['timeout'] = None
-
- transport._send(target, {}, msg, **kwargs)
-
- self.mox.ReplayAll()
-
- prep_kwargs = {}
- if self.prepare_cap is not _notset:
- prep_kwargs['version_cap'] = self.prepare_cap
- if self.prepare_version is not _notset:
- prep_kwargs['version'] = self.prepare_version
- if prep_kwargs:
- client = client.prepare(**prep_kwargs)
-
- method = client.call if self.call else client.cast
- try:
- method({}, 'foo')
- except Exception as ex:
- self.assertIsInstance(ex, messaging.RPCVersionCapError, ex)
- self.assertFalse(self.success)
- else:
- self.assertTrue(self.success)
-
-
-TestVersionCap.generate_scenarios()
-
-
-class TestCanSendVersion(test_utils.BaseTestCase):
-
- scenarios = [
- ('all_none',
- dict(cap=None, prepare_cap=_notset,
- version=None, prepare_version=_notset,
- can_send_version=_notset,
- can_send=True)),
- ('ctor_cap_ok',
- dict(cap='1.1', prepare_cap=_notset,
- version='1.0', prepare_version=_notset,
- can_send_version=_notset,
- can_send=True)),
- ('ctor_cap_override_ok',
- dict(cap='2.0', prepare_cap='1.1',
- version='1.0', prepare_version='1.0',
- can_send_version=_notset,
- can_send=True)),
- ('ctor_cap_override_none_ok',
- dict(cap='1.1', prepare_cap=None,
- version='1.0', prepare_version=_notset,
- can_send_version=_notset,
- can_send=True)),
- ('ctor_cap_can_send_ok',
- dict(cap='1.1', prepare_cap=None,
- version='1.0', prepare_version=_notset,
- can_send_version='1.1',
- can_send=True)),
- ('ctor_cap_can_send_none_ok',
- dict(cap='1.1', prepare_cap=None,
- version='1.0', prepare_version=_notset,
- can_send_version=None,
- can_send=True)),
- ('ctor_cap_minor_fail',
- dict(cap='1.0', prepare_cap=_notset,
- version='1.1', prepare_version=_notset,
- can_send_version=_notset,
- can_send=False)),
- ('ctor_cap_major_fail',
- dict(cap='2.0', prepare_cap=_notset,
- version=None, prepare_version='1.0',
- can_send_version=_notset,
- can_send=False)),
- ]
-
- def test_version_cap(self):
- self.config(rpc_response_timeout=None)
-
- transport = _FakeTransport(self.conf)
-
- target = messaging.Target(version=self.version)
- client = messaging.RPCClient(transport, target,
- version_cap=self.cap)
-
- prep_kwargs = {}
- if self.prepare_cap is not _notset:
- prep_kwargs['version_cap'] = self.prepare_cap
- if self.prepare_version is not _notset:
- prep_kwargs['version'] = self.prepare_version
- if prep_kwargs:
- client = client.prepare(**prep_kwargs)
-
- if self.can_send_version is not _notset:
- can_send = client.can_send_version(version=self.can_send_version)
- else:
- can_send = client.can_send_version()
-
- self.assertEqual(self.can_send, can_send)
diff --git a/tests/rpc/test_dispatcher.py b/tests/rpc/test_dispatcher.py
deleted file mode 100644
index 64181f0..0000000
--- a/tests/rpc/test_dispatcher.py
+++ /dev/null
@@ -1,178 +0,0 @@
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import testscenarios
-
-from oslo import messaging
-from oslo.messaging import serializer as msg_serializer
-from oslo_messaging.tests import utils as test_utils
-from six.moves import mock
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class _FakeEndpoint(object):
-
- def __init__(self, target=None):
- self.target = target
-
- def foo(self, ctxt, **kwargs):
- pass
-
- def bar(self, ctxt, **kwargs):
- pass
-
-
-class TestDispatcher(test_utils.BaseTestCase):
-
- scenarios = [
- ('no_endpoints',
- dict(endpoints=[],
- dispatch_to=None,
- ctxt={}, msg=dict(method='foo'),
- success=False, ex=messaging.UnsupportedVersion)),
- ('default_target',
- dict(endpoints=[{}],
- dispatch_to=dict(endpoint=0, method='foo'),
- ctxt={}, msg=dict(method='foo'),
- success=True, ex=None)),
- ('default_target_ctxt_and_args',
- dict(endpoints=[{}],
- dispatch_to=dict(endpoint=0, method='bar'),
- ctxt=dict(user='bob'), msg=dict(method='bar',
- args=dict(blaa=True)),
- success=True, ex=None)),
- ('default_target_namespace',
- dict(endpoints=[{}],
- dispatch_to=dict(endpoint=0, method='foo'),
- ctxt={}, msg=dict(method='foo', namespace=None),
- success=True, ex=None)),
- ('default_target_version',
- dict(endpoints=[{}],
- dispatch_to=dict(endpoint=0, method='foo'),
- ctxt={}, msg=dict(method='foo', version='1.0'),
- success=True, ex=None)),
- ('default_target_no_such_method',
- dict(endpoints=[{}],
- dispatch_to=None,
- ctxt={}, msg=dict(method='foobar'),
- success=False, ex=messaging.NoSuchMethod)),
- ('namespace',
- dict(endpoints=[{}, dict(namespace='testns')],
- dispatch_to=dict(endpoint=1, method='foo'),
- ctxt={}, msg=dict(method='foo', namespace='testns'),
- success=True, ex=None)),
- ('namespace_mismatch',
- dict(endpoints=[{}, dict(namespace='testns')],
- dispatch_to=None,
- ctxt={}, msg=dict(method='foo', namespace='nstest'),
- success=False, ex=messaging.UnsupportedVersion)),
- ('version',
- dict(endpoints=[dict(version='1.5'), dict(version='3.4')],
- dispatch_to=dict(endpoint=1, method='foo'),
- ctxt={}, msg=dict(method='foo', version='3.2'),
- success=True, ex=None)),
- ('version_mismatch',
- dict(endpoints=[dict(version='1.5'), dict(version='3.0')],
- dispatch_to=None,
- ctxt={}, msg=dict(method='foo', version='3.2'),
- success=False, ex=messaging.UnsupportedVersion)),
- ]
-
- def test_dispatcher(self):
- endpoints = [mock.Mock(spec=_FakeEndpoint,
- target=messaging.Target(**e))
- for e in self.endpoints]
-
- serializer = None
- target = messaging.Target()
- dispatcher = messaging.RPCDispatcher(target, endpoints, serializer)
-
- def check_reply(reply=None, failure=None, log_failure=True):
- if self.ex and failure is not None:
- ex = failure[1]
- self.assertFalse(self.success, ex)
- self.assertIsNotNone(self.ex, ex)
- self.assertIsInstance(ex, self.ex, ex)
- if isinstance(ex, messaging.NoSuchMethod):
- self.assertEqual(self.msg.get('method'), ex.method)
- elif isinstance(ex, messaging.UnsupportedVersion):
- self.assertEqual(self.msg.get('version', '1.0'),
- ex.version)
- if ex.method:
- self.assertEqual(self.msg.get('method'), ex.method)
- else:
- self.assertTrue(self.success, failure)
- self.assertIsNone(failure)
-
- incoming = mock.Mock(ctxt=self.ctxt, message=self.msg)
- incoming.reply.side_effect = check_reply
-
- with dispatcher(incoming) as callback:
- callback()
-
- for n, endpoint in enumerate(endpoints):
- for method_name in ['foo', 'bar']:
- method = getattr(endpoint, method_name)
- if self.dispatch_to and n == self.dispatch_to['endpoint'] and \
- method_name == self.dispatch_to['method']:
- method.assert_called_once_with(
- self.ctxt, **self.msg.get('args', {}))
- else:
- self.assertEqual(0, method.call_count)
-
- self.assertEqual(1, incoming.reply.call_count)
-
-
-class TestSerializer(test_utils.BaseTestCase):
-
- scenarios = [
- ('no_args_or_retval',
- dict(ctxt={}, dctxt={}, args={}, retval=None)),
- ('args_and_retval',
- dict(ctxt=dict(user='bob'),
- dctxt=dict(user='alice'),
- args=dict(a='a', b='b', c='c'),
- retval='d')),
- ]
-
- def test_serializer(self):
- endpoint = _FakeEndpoint()
- serializer = msg_serializer.NoOpSerializer()
- target = messaging.Target()
- dispatcher = messaging.RPCDispatcher(target, [endpoint], serializer)
-
- self.mox.StubOutWithMock(endpoint, 'foo')
- args = dict([(k, 'd' + v) for k, v in self.args.items()])
- endpoint.foo(self.dctxt, **args).AndReturn(self.retval)
-
- self.mox.StubOutWithMock(serializer, 'serialize_entity')
- self.mox.StubOutWithMock(serializer, 'deserialize_entity')
- self.mox.StubOutWithMock(serializer, 'deserialize_context')
-
- serializer.deserialize_context(self.ctxt).AndReturn(self.dctxt)
-
- for arg in self.args:
- serializer.deserialize_entity(self.dctxt, arg).AndReturn('d' + arg)
-
- serializer.serialize_entity(self.dctxt, self.retval).\
- AndReturn('s' + self.retval if self.retval else None)
-
- self.mox.ReplayAll()
-
- retval = dispatcher._dispatch(self.ctxt, dict(method='foo',
- args=self.args))
- if self.retval is not None:
- self.assertEqual('s' + self.retval, retval)
diff --git a/tests/rpc/test_server.py b/tests/rpc/test_server.py
deleted file mode 100644
index 6e1ae16..0000000
--- a/tests/rpc/test_server.py
+++ /dev/null
@@ -1,503 +0,0 @@
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import threading
-
-import testscenarios
-
-from oslo import messaging
-from oslo_config import cfg
-from oslo_messaging.tests import utils as test_utils
-from six.moves import mock
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class ServerSetupMixin(object):
-
- class Server(object):
- def __init__(self, transport, topic, server, endpoint, serializer):
- target = messaging.Target(topic=topic, server=server)
- self._server = messaging.get_rpc_server(transport,
- target,
- [endpoint, self],
- serializer=serializer)
-
- def stop(self, ctxt):
- # Check start() does nothing with a running server
- self._server.start()
- self._server.stop()
- self._server.wait()
-
- def start(self):
- self._server.start()
-
- class TestSerializer(object):
-
- def serialize_entity(self, ctxt, entity):
- return ('s' + entity) if entity else entity
-
- def deserialize_entity(self, ctxt, entity):
- return ('d' + entity) if entity else entity
-
- def serialize_context(self, ctxt):
- return dict([(k, 's' + v) for k, v in ctxt.items()])
-
- def deserialize_context(self, ctxt):
- return dict([(k, 'd' + v) for k, v in ctxt.items()])
-
- def __init__(self):
- self.serializer = self.TestSerializer()
-
- def _setup_server(self, transport, endpoint, topic=None, server=None):
- server = self.Server(transport,
- topic=topic or 'testtopic',
- server=server or 'testserver',
- endpoint=endpoint,
- serializer=self.serializer)
-
- thread = threading.Thread(target=server.start)
- thread.daemon = True
- thread.start()
-
- return thread
-
- def _stop_server(self, client, server_thread, topic=None):
- if topic is not None:
- client = client.prepare(topic=topic)
- client.cast({}, 'stop')
- server_thread.join(timeout=30)
-
- def _setup_client(self, transport, topic='testtopic'):
- return messaging.RPCClient(transport,
- messaging.Target(topic=topic),
- serializer=self.serializer)
-
-
-class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
-
- def __init__(self, *args):
- super(TestRPCServer, self).__init__(*args)
- ServerSetupMixin.__init__(self)
-
- def setUp(self):
- super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
-
- def test_constructor(self):
- transport = messaging.get_transport(self.conf, url='fake:')
- target = messaging.Target(topic='foo', server='bar')
- endpoints = [object()]
- serializer = object()
-
- server = messaging.get_rpc_server(transport, target, endpoints,
- serializer=serializer)
-
- self.assertIs(server.conf, self.conf)
- self.assertIs(server.transport, transport)
- self.assertIsInstance(server.dispatcher, messaging.RPCDispatcher)
- self.assertIs(server.dispatcher.endpoints, endpoints)
- self.assertIs(server.dispatcher.serializer, serializer)
- self.assertEqual('blocking', server.executor)
-
- def test_server_wait_method(self):
- transport = messaging.get_transport(self.conf, url='fake:')
- target = messaging.Target(topic='foo', server='bar')
- endpoints = [object()]
- serializer = object()
-
- server = messaging.get_rpc_server(transport, target, endpoints,
- serializer=serializer)
- # Mocking executor
- server._executor = mock.Mock()
- # Here assigning executor's listener object to listener variable
- # before calling wait method, beacuse in wait method we are
- # setting executor to None.
- listener = server._executor.listener
- # call server wait method
- server.wait()
- self.assertIsNone(server._executor)
- self.assertEqual(1, listener.cleanup.call_count)
-
- def test_no_target_server(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- server = messaging.get_rpc_server(transport,
- messaging.Target(topic='testtopic'),
- [])
- try:
- server.start()
- except Exception as ex:
- self.assertIsInstance(ex, messaging.InvalidTarget, ex)
- self.assertEqual('testtopic', ex.target.topic)
- else:
- self.assertTrue(False)
-
- def test_no_server_topic(self):
- transport = messaging.get_transport(self.conf, url='fake:')
- target = messaging.Target(server='testserver')
- server = messaging.get_rpc_server(transport, target, [])
- try:
- server.start()
- except Exception as ex:
- self.assertIsInstance(ex, messaging.InvalidTarget, ex)
- self.assertEqual('testserver', ex.target.server)
- else:
- self.assertTrue(False)
-
- def _test_no_client_topic(self, call=True):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- client = self._setup_client(transport, topic=None)
-
- method = client.call if call else client.cast
-
- try:
- method({}, 'ping', arg='foo')
- except Exception as ex:
- self.assertIsInstance(ex, messaging.InvalidTarget, ex)
- self.assertIsNotNone(ex.target)
- else:
- self.assertTrue(False)
-
- def test_no_client_topic_call(self):
- self._test_no_client_topic(call=True)
-
- def test_no_client_topic_cast(self):
- self._test_no_client_topic(call=False)
-
- def test_client_call_timeout(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- finished = False
- wait = threading.Condition()
-
- class TestEndpoint(object):
- def ping(self, ctxt, arg):
- with wait:
- if not finished:
- wait.wait()
-
- server_thread = self._setup_server(transport, TestEndpoint())
- client = self._setup_client(transport)
-
- try:
- client.prepare(timeout=0).call({}, 'ping', arg='foo')
- except Exception as ex:
- self.assertIsInstance(ex, messaging.MessagingTimeout, ex)
- else:
- self.assertTrue(False)
-
- with wait:
- finished = True
- wait.notify()
-
- self._stop_server(client, server_thread)
-
- def test_unknown_executor(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- try:
- messaging.get_rpc_server(transport, None, [], executor='foo')
- except Exception as ex:
- self.assertIsInstance(ex, messaging.ExecutorLoadFailure)
- self.assertEqual('foo', ex.executor)
- else:
- self.assertTrue(False)
-
- def test_cast(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- class TestEndpoint(object):
- def __init__(self):
- self.pings = []
-
- def ping(self, ctxt, arg):
- self.pings.append(arg)
-
- endpoint = TestEndpoint()
- server_thread = self._setup_server(transport, endpoint)
- client = self._setup_client(transport)
-
- client.cast({}, 'ping', arg='foo')
- client.cast({}, 'ping', arg='bar')
-
- self._stop_server(client, server_thread)
-
- self.assertEqual(['dsfoo', 'dsbar'], endpoint.pings)
-
- def test_call(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- class TestEndpoint(object):
- def ping(self, ctxt, arg):
- return arg
-
- server_thread = self._setup_server(transport, TestEndpoint())
- client = self._setup_client(transport)
-
- self.assertIsNone(client.call({}, 'ping', arg=None))
- self.assertEqual(0, client.call({}, 'ping', arg=0))
- self.assertEqual(False, client.call({}, 'ping', arg=False))
- self.assertEqual([], client.call({}, 'ping', arg=[]))
- self.assertEqual({}, client.call({}, 'ping', arg={}))
- self.assertEqual('dsdsfoo', client.call({}, 'ping', arg='foo'))
-
- self._stop_server(client, server_thread)
-
- def test_direct_call(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- class TestEndpoint(object):
- def ping(self, ctxt, arg):
- return arg
-
- server_thread = self._setup_server(transport, TestEndpoint())
- client = self._setup_client(transport)
-
- direct = client.prepare(server='testserver')
- self.assertIsNone(direct.call({}, 'ping', arg=None))
- self.assertEqual(0, client.call({}, 'ping', arg=0))
- self.assertEqual(False, client.call({}, 'ping', arg=False))
- self.assertEqual([], client.call({}, 'ping', arg=[]))
- self.assertEqual({}, client.call({}, 'ping', arg={}))
- self.assertEqual('dsdsfoo', direct.call({}, 'ping', arg='foo'))
-
- self._stop_server(client, server_thread)
-
- def test_context(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- class TestEndpoint(object):
- def ctxt_check(self, ctxt, key):
- return ctxt[key]
-
- server_thread = self._setup_server(transport, TestEndpoint())
- client = self._setup_client(transport)
-
- self.assertEqual('dsdsb',
- client.call({'dsa': 'b'},
- 'ctxt_check',
- key='a'))
-
- self._stop_server(client, server_thread)
-
- def test_failure(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- class TestEndpoint(object):
- def ping(self, ctxt, arg):
- raise ValueError(arg)
-
- server_thread = self._setup_server(transport, TestEndpoint())
- client = self._setup_client(transport)
-
- try:
- client.call({}, 'ping', arg='foo')
- except Exception as ex:
- self.assertIsInstance(ex, ValueError)
- self.assertEqual('dsfoo', str(ex))
- else:
- self.assertTrue(False)
-
- self._stop_server(client, server_thread)
-
- def test_expected_failure(self):
- transport = messaging.get_transport(self.conf, url='fake:')
-
- class TestEndpoint(object):
- @messaging.expected_exceptions(ValueError)
- def ping(self, ctxt, arg):
- raise ValueError(arg)
-
- server_thread = self._setup_server(transport, TestEndpoint())
- client = self._setup_client(transport)
-
- try:
- client.call({}, 'ping', arg='foo')
- except Exception as ex:
- self.assertIsInstance(ex, ValueError)
- self.assertEqual('dsfoo', str(ex))
- else:
- self.assertTrue(False)
-
- self._stop_server(client, server_thread)
-
-
-class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
-
- _exchanges = [
- ('same_exchange', dict(exchange1=None, exchange2=None)),
- ('diff_exchange', dict(exchange1='x1', exchange2='x2')),
- ]
-
- _topics = [
- ('same_topic', dict(topic1='t', topic2='t')),
- ('diff_topic', dict(topic1='t1', topic2='t2')),
- ]
-
- _server = [
- ('same_server', dict(server1=None, server2=None)),
- ('diff_server', dict(server1='s1', server2='s2')),
- ]
-
- _fanout = [
- ('not_fanout', dict(fanout1=None, fanout2=None)),
- ('fanout', dict(fanout1=True, fanout2=True)),
- ]
-
- _method = [
- ('call', dict(call1=True, call2=True)),
- ('cast', dict(call1=False, call2=False)),
- ]
-
- _endpoints = [
- ('one_endpoint',
- dict(multi_endpoints=False,
- expect1=['ds1', 'ds2'],
- expect2=['ds1', 'ds2'])),
- ('two_endpoints',
- dict(multi_endpoints=True,
- expect1=['ds1'],
- expect2=['ds2'])),
- ]
-
- @classmethod
- def generate_scenarios(cls):
- cls.scenarios = testscenarios.multiply_scenarios(cls._exchanges,
- cls._topics,
- cls._server,
- cls._fanout,
- cls._method,
- cls._endpoints)
-
- # fanout call not supported
- def filter_fanout_call(scenario):
- params = scenario[1]
- fanout = params['fanout1'] or params['fanout2']
- call = params['call1'] or params['call2']
- return not (call and fanout)
-
- # listening multiple times on same topic/server pair not supported
- def filter_same_topic_and_server(scenario):
- params = scenario[1]
- single_topic = params['topic1'] == params['topic2']
- single_server = params['server1'] == params['server2']
- return not (single_topic and single_server)
-
- # fanout to multiple servers on same topic and exchange
- # each endpoint will receive both messages
- def fanout_to_servers(scenario):
- params = scenario[1]
- fanout = params['fanout1'] or params['fanout2']
- single_exchange = params['exchange1'] == params['exchange2']
- single_topic = params['topic1'] == params['topic2']
- multi_servers = params['server1'] != params['server2']
- if fanout and single_exchange and single_topic and multi_servers:
- params['expect1'] = params['expect1'][:] + params['expect1']
- params['expect2'] = params['expect2'][:] + params['expect2']
- return scenario
-
- # multiple endpoints on same topic and exchange
- # either endpoint can get either message
- def single_topic_multi_endpoints(scenario):
- params = scenario[1]
- single_exchange = params['exchange1'] == params['exchange2']
- single_topic = params['topic1'] == params['topic2']
- if single_topic and single_exchange and params['multi_endpoints']:
- params['expect_either'] = (params['expect1'] +
- params['expect2'])
- params['expect1'] = params['expect2'] = []
- else:
- params['expect_either'] = []
- return scenario
-
- for f in [filter_fanout_call, filter_same_topic_and_server]:
- cls.scenarios = filter(f, cls.scenarios)
- for m in [fanout_to_servers, single_topic_multi_endpoints]:
- cls.scenarios = map(m, cls.scenarios)
-
- def __init__(self, *args):
- super(TestMultipleServers, self).__init__(*args)
- ServerSetupMixin.__init__(self)
-
- def setUp(self):
- super(TestMultipleServers, self).setUp(conf=cfg.ConfigOpts())
-
- def test_multiple_servers(self):
- url1 = 'fake:///' + (self.exchange1 or '')
- url2 = 'fake:///' + (self.exchange2 or '')
-
- transport1 = messaging.get_transport(self.conf, url=url1)
- if url1 != url2:
- transport2 = messaging.get_transport(self.conf, url=url1)
- else:
- transport2 = transport1
-
- class TestEndpoint(object):
- def __init__(self):
- self.pings = []
-
- def ping(self, ctxt, arg):
- self.pings.append(arg)
-
- def alive(self, ctxt):
- return 'alive'
-
- if self.multi_endpoints:
- endpoint1, endpoint2 = TestEndpoint(), TestEndpoint()
- else:
- endpoint1 = endpoint2 = TestEndpoint()
-
- thread1 = self._setup_server(transport1, endpoint1,
- topic=self.topic1, server=self.server1)
- thread2 = self._setup_server(transport2, endpoint2,
- topic=self.topic2, server=self.server2)
-
- client1 = self._setup_client(transport1, topic=self.topic1)
- client2 = self._setup_client(transport2, topic=self.topic2)
-
- client1 = client1.prepare(server=self.server1)
- client2 = client2.prepare(server=self.server2)
-
- if self.fanout1:
- client1.call({}, 'alive')
- client1 = client1.prepare(fanout=True)
- if self.fanout2:
- client2.call({}, 'alive')
- client2 = client2.prepare(fanout=True)
-
- (client1.call if self.call1 else client1.cast)({}, 'ping', arg='1')
- (client2.call if self.call2 else client2.cast)({}, 'ping', arg='2')
-
- self.assertTrue(thread1.isAlive())
- self._stop_server(client1.prepare(fanout=None),
- thread1, topic=self.topic1)
- self.assertTrue(thread2.isAlive())
- self._stop_server(client2.prepare(fanout=None),
- thread2, topic=self.topic2)
-
- def check(pings, expect):
- self.assertEqual(len(expect), len(pings))
- for a in expect:
- self.assertIn(a, pings)
-
- if self.expect_either:
- check(endpoint1.pings + endpoint2.pings, self.expect_either)
- else:
- check(endpoint1.pings, self.expect1)
- check(endpoint2.pings, self.expect2)
-
-
-TestMultipleServers.generate_scenarios()
diff --git a/tests/test_amqp_driver.py b/tests/test_amqp_driver.py
deleted file mode 100644
index df0a769..0000000
--- a/tests/test_amqp_driver.py
+++ /dev/null
@@ -1,738 +0,0 @@
-# Copyright (C) 2014 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import os
-import select
-import socket
-import threading
-import time
-import uuid
-
-import six
-from six import moves
-import testtools
-
-from oslo import messaging
-from oslo_messaging.tests import utils as test_utils
-
-if six.PY2:
- # NOTE(flaper87): pyngus currently doesn't support py34. It's
- # on the works, though.
- from oslo_messaging._drivers.protocols.amqp import driver as amqp_driver
- import pyngus
-
-
-LOG = logging.getLogger(__name__)
-
-
-class _ListenerThread(threading.Thread):
- """Run a blocking listener in a thread."""
- def __init__(self, listener, msg_count):
- super(_ListenerThread, self).__init__()
- self.listener = listener
- self.msg_count = msg_count
- self.messages = moves.queue.Queue()
- self.daemon = True
- self.start()
-
- def run(self):
- LOG.debug("Listener started")
- while self.msg_count > 0:
- in_msg = self.listener.poll()
- self.messages.put(in_msg)
- self.msg_count -= 1
- if in_msg.message.get('method') == 'echo':
- in_msg.reply(reply={'correlation-id':
- in_msg.message.get('id')})
- LOG.debug("Listener stopped")
-
- def get_messages(self):
- """Returns a list of all received messages."""
- msgs = []
- try:
- while True:
- m = self.messages.get(False)
- msgs.append(m)
- except moves.queue.Empty:
- pass
- return msgs
-
-
-@testtools.skipUnless(six.PY2, "No Py3K support yet")
-class TestProtonDriverLoad(test_utils.BaseTestCase):
-
- def setUp(self):
- super(TestProtonDriverLoad, self).setUp()
- self.messaging_conf.transport_driver = 'amqp'
-
- def test_driver_load(self):
- transport = messaging.get_transport(self.conf)
- self.assertIsInstance(transport._driver,
- amqp_driver.ProtonDriver)
-
-
-class _AmqpBrokerTestCase(test_utils.BaseTestCase):
-
- @testtools.skipUnless(six.PY2, "No Py3K support yet")
- def setUp(self):
- super(_AmqpBrokerTestCase, self).setUp()
- self._broker = FakeBroker()
- self._broker_addr = "amqp://%s:%d" % (self._broker.host,
- self._broker.port)
- self._broker_url = messaging.TransportURL.parse(self.conf,
- self._broker_addr)
- self._broker.start()
-
- def tearDown(self):
- super(_AmqpBrokerTestCase, self).tearDown()
- self._broker.stop()
-
-
-class TestAmqpSend(_AmqpBrokerTestCase):
- """Test sending and receiving messages."""
-
- def test_driver_unconnected_cleanup(self):
- """Verify the driver can cleanly shutdown even if never connected."""
- driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
- driver.cleanup()
-
- def test_listener_cleanup(self):
- """Verify unused listener can cleanly shutdown."""
- driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
- target = messaging.Target(topic="test-topic")
- listener = driver.listen(target)
- self.assertIsInstance(listener, amqp_driver.ProtonListener)
- driver.cleanup()
-
- def test_send_no_reply(self):
- driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
- target = messaging.Target(topic="test-topic")
- listener = _ListenerThread(driver.listen(target), 1)
- rc = driver.send(target, {"context": True},
- {"msg": "value"}, wait_for_reply=False)
- self.assertIsNone(rc)
- listener.join(timeout=30)
- self.assertFalse(listener.isAlive())
- self.assertEqual(listener.messages.get().message, {"msg": "value"})
- driver.cleanup()
-
- def test_send_exchange_with_reply(self):
- driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
- target1 = messaging.Target(topic="test-topic", exchange="e1")
- listener1 = _ListenerThread(driver.listen(target1), 1)
- target2 = messaging.Target(topic="test-topic", exchange="e2")
- listener2 = _ListenerThread(driver.listen(target2), 1)
-
- rc = driver.send(target1, {"context": "whatever"},
- {"method": "echo", "id": "e1"},
- wait_for_reply=True,
- timeout=30)
- self.assertIsNotNone(rc)
- self.assertEqual(rc.get('correlation-id'), 'e1')
-
- rc = driver.send(target2, {"context": "whatever"},
- {"method": "echo", "id": "e2"},
- wait_for_reply=True,
- timeout=30)
- self.assertIsNotNone(rc)
- self.assertEqual(rc.get('correlation-id'), 'e2')
-
- listener1.join(timeout=30)
- self.assertFalse(listener1.isAlive())
- listener2.join(timeout=30)
- self.assertFalse(listener2.isAlive())
- driver.cleanup()
-
- def test_messaging_patterns(self):
- """Verify the direct, shared, and fanout message patterns work."""
- driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
- target1 = messaging.Target(topic="test-topic", server="server1")
- listener1 = _ListenerThread(driver.listen(target1), 4)
- target2 = messaging.Target(topic="test-topic", server="server2")
- listener2 = _ListenerThread(driver.listen(target2), 3)
-
- shared_target = messaging.Target(topic="test-topic")
- fanout_target = messaging.Target(topic="test-topic",
- fanout=True)
- # this should go to only one server:
- driver.send(shared_target, {"context": "whatever"},
- {"method": "echo", "id": "either-1"},
- wait_for_reply=True)
- self.assertEqual(self._broker.topic_count, 1)
- self.assertEqual(self._broker.direct_count, 1) # reply
-
- # this should go to the other server:
- driver.send(shared_target, {"context": "whatever"},
- {"method": "echo", "id": "either-2"},
- wait_for_reply=True)
- self.assertEqual(self._broker.topic_count, 2)
- self.assertEqual(self._broker.direct_count, 2) # reply
-
- # these should only go to listener1:
- driver.send(target1, {"context": "whatever"},
- {"method": "echo", "id": "server1-1"},
- wait_for_reply=True)
-
- driver.send(target1, {"context": "whatever"},
- {"method": "echo", "id": "server1-2"},
- wait_for_reply=True)
- self.assertEqual(self._broker.direct_count, 6) # 2X(send+reply)
-
- # this should only go to listener2:
- driver.send(target2, {"context": "whatever"},
- {"method": "echo", "id": "server2"},
- wait_for_reply=True)
- self.assertEqual(self._broker.direct_count, 8)
-
- # both listeners should get a copy:
- driver.send(fanout_target, {"context": "whatever"},
- {"method": "echo", "id": "fanout"})
-
- listener1.join(timeout=30)
- self.assertFalse(listener1.isAlive())
- listener2.join(timeout=30)
- self.assertFalse(listener2.isAlive())
- self.assertEqual(self._broker.fanout_count, 1)
-
- listener1_ids = [x.message.get('id') for x in listener1.get_messages()]
- listener2_ids = [x.message.get('id') for x in listener2.get_messages()]
-
- self.assertTrue('fanout' in listener1_ids and
- 'fanout' in listener2_ids)
- self.assertTrue('server1-1' in listener1_ids and
- 'server1-1' not in listener2_ids)
- self.assertTrue('server1-2' in listener1_ids and
- 'server1-2' not in listener2_ids)
- self.assertTrue('server2' in listener2_ids and
- 'server2' not in listener1_ids)
- if 'either-1' in listener1_ids:
- self.assertTrue('either-2' in listener2_ids and
- 'either-2' not in listener1_ids and
- 'either-1' not in listener2_ids)
- else:
- self.assertTrue('either-2' in listener1_ids and
- 'either-2' not in listener2_ids and
- 'either-1' in listener2_ids)
- driver.cleanup()
-
- def test_send_timeout(self):
- """Verify send timeout."""
- driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
- target = messaging.Target(topic="test-topic")
- listener = _ListenerThread(driver.listen(target), 1)
-
- # the listener will drop this message:
- try:
- driver.send(target,
- {"context": "whatever"},
- {"method": "drop"},
- wait_for_reply=True,
- timeout=1.0)
- except Exception as ex:
- self.assertIsInstance(ex, messaging.MessagingTimeout, ex)
- else:
- self.assertTrue(False, "No Exception raised!")
- listener.join(timeout=30)
- self.assertFalse(listener.isAlive())
- driver.cleanup()
-
-
-class TestAmqpNotification(_AmqpBrokerTestCase):
- """Test sending and receiving notifications."""
-
- def test_notification(self):
- driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
- notifications = [(messaging.Target(topic="topic-1"), 'info'),
- (messaging.Target(topic="topic-1"), 'error'),
- (messaging.Target(topic="topic-2"), 'debug')]
- nl = driver.listen_for_notifications(notifications, None)
-
- # send one for each support version:
- msg_count = len(notifications) * 2
- listener = _ListenerThread(nl, msg_count)
- targets = ['topic-1.info',
- 'topic-1.bad', # will raise MessagingDeliveryFailure
- 'bad-topic.debug', # will raise MessagingDeliveryFailure
- 'topic-1.error',
- 'topic-2.debug']
-
- excepted_targets = []
- exception_count = 0
- for version in (1.0, 2.0):
- for t in targets:
- try:
- driver.send_notification(messaging.Target(topic=t),
- "context", {'target': t},
- version)
- except messaging.MessageDeliveryFailure:
- exception_count += 1
- excepted_targets.append(t)
-
- listener.join(timeout=30)
- self.assertFalse(listener.isAlive())
- topics = [x.message.get('target') for x in listener.get_messages()]
- self.assertEqual(len(topics), msg_count)
- self.assertEqual(topics.count('topic-1.info'), 2)
- self.assertEqual(topics.count('topic-1.error'), 2)
- self.assertEqual(topics.count('topic-2.debug'), 2)
- self.assertEqual(self._broker.dropped_count, 4)
- self.assertEqual(exception_count, 4)
- self.assertEqual(excepted_targets.count('topic-1.bad'), 2)
- self.assertEqual(excepted_targets.count('bad-topic.debug'), 2)
- driver.cleanup()
-
-
-@testtools.skipUnless(six.PY2, "No Py3K support yet")
-class TestAuthentication(test_utils.BaseTestCase):
-
- def setUp(self):
- super(TestAuthentication, self).setUp()
- # for simplicity, encode the credentials as they would appear 'on the
- # wire' in a SASL frame - username and password prefixed by zero.
- user_credentials = ["\0joe\0secret"]
- self._broker = FakeBroker(sasl_mechanisms="PLAIN",
- user_credentials=user_credentials)
- self._broker.start()
-
- def tearDown(self):
- super(TestAuthentication, self).tearDown()
- self._broker.stop()
-
- def test_authentication_ok(self):
- """Verify that username and password given in TransportHost are
- accepted by the broker.
- """
-
- addr = "amqp://joe:secret@%s:%d" % (self._broker.host,
- self._broker.port)
- url = messaging.TransportURL.parse(self.conf, addr)
- driver = amqp_driver.ProtonDriver(self.conf, url)
- target = messaging.Target(topic="test-topic")
- listener = _ListenerThread(driver.listen(target), 1)
- rc = driver.send(target, {"context": True},
- {"method": "echo"}, wait_for_reply=True)
- self.assertIsNotNone(rc)
- listener.join(timeout=30)
- self.assertFalse(listener.isAlive())
- driver.cleanup()
-
- def test_authentication_failure(self):
- """Verify that a bad password given in TransportHost is
- rejected by the broker.
- """
-
- addr = "amqp://joe:badpass@%s:%d" % (self._broker.host,
- self._broker.port)
- url = messaging.TransportURL.parse(self.conf, addr)
- driver = amqp_driver.ProtonDriver(self.conf, url)
- target = messaging.Target(topic="test-topic")
- _ListenerThread(driver.listen(target), 1)
- self.assertRaises(messaging.MessagingTimeout,
- driver.send,
- target, {"context": True},
- {"method": "echo"},
- wait_for_reply=True,
- timeout=2.0)
- driver.cleanup()
-
-
-@testtools.skipUnless(six.PY2, "No Py3K support yet")
-class TestFailover(test_utils.BaseTestCase):
-
- def setUp(self):
- super(TestFailover, self).setUp()
- self._brokers = [FakeBroker(), FakeBroker()]
- hosts = []
- for broker in self._brokers:
- hosts.append(messaging.TransportHost(hostname=broker.host,
- port=broker.port))
- self._broker_url = messaging.TransportURL(self.conf,
- transport="amqp",
- hosts=hosts)
-
- def tearDown(self):
- super(TestFailover, self).tearDown()
- for broker in self._brokers:
- if broker.isAlive():
- broker.stop()
-
- def test_broker_failover(self):
- """Simulate failover of one broker to another."""
- self._brokers[0].start()
- driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
-
- target = messaging.Target(topic="my-topic")
- listener = _ListenerThread(driver.listen(target), 2)
-
- rc = driver.send(target, {"context": "whatever"},
- {"method": "echo", "id": "echo-1"},
- wait_for_reply=True,
- timeout=30)
- self.assertIsNotNone(rc)
- self.assertEqual(rc.get('correlation-id'), 'echo-1')
- # 1 request msg, 1 response:
- self.assertEqual(self._brokers[0].topic_count, 1)
- self.assertEqual(self._brokers[0].direct_count, 1)
-
- # fail broker 0 and start broker 1:
- self._brokers[0].stop()
- self._brokers[1].start()
- deadline = time.time() + 30
- responded = False
- sequence = 2
- while deadline > time.time() and not responded:
- if not listener.isAlive():
- # listener may have exited after replying to an old correlation
- # id: restart new listener
- listener = _ListenerThread(driver.listen(target), 1)
- try:
- rc = driver.send(target, {"context": "whatever"},
- {"method": "echo",
- "id": "echo-%d" % sequence},
- wait_for_reply=True,
- timeout=2)
- self.assertIsNotNone(rc)
- self.assertEqual(rc.get('correlation-id'),
- 'echo-%d' % sequence)
- responded = True
- except messaging.MessagingTimeout:
- sequence += 1
-
- self.assertTrue(responded)
- listener.join(timeout=30)
- self.assertFalse(listener.isAlive())
-
- # note: stopping the broker first tests cleaning up driver without a
- # connection active
- self._brokers[1].stop()
- driver.cleanup()
-
-
-class FakeBroker(threading.Thread):
- """A test AMQP message 'broker'."""
-
- if six.PY2:
- class Connection(pyngus.ConnectionEventHandler):
- """A single AMQP connection."""
-
- def __init__(self, server, socket_, name,
- sasl_mechanisms, user_credentials):
- """Create a Connection using socket_."""
- self.socket = socket_
- self.name = name
- self.server = server
- self.connection = server.container.create_connection(name,
- self)
- self.connection.user_context = self
- self.sasl_mechanisms = sasl_mechanisms
- self.user_credentials = user_credentials
- if sasl_mechanisms:
- self.connection.pn_sasl.mechanisms(sasl_mechanisms)
- self.connection.pn_sasl.server()
- self.connection.open()
- self.sender_links = set()
- self.closed = False
-
- def destroy(self):
- """Destroy the test connection."""
- while self.sender_links:
- link = self.sender_links.pop()
- link.destroy()
- self.connection.destroy()
- self.connection = None
- self.socket.close()
-
- def fileno(self):
- """Allows use of this in a select() call."""
- return self.socket.fileno()
-
- def process_input(self):
- """Called when socket is read-ready."""
- try:
- pyngus.read_socket_input(self.connection, self.socket)
- except socket.error:
- pass
- self.connection.process(time.time())
-
- def send_output(self):
- """Called when socket is write-ready."""
- try:
- pyngus.write_socket_output(self.connection,
- self.socket)
- except socket.error:
- pass
- self.connection.process(time.time())
-
- # Pyngus ConnectionEventHandler callbacks:
-
- def connection_remote_closed(self, connection, reason):
- """Peer has closed the connection."""
- self.connection.close()
-
- def connection_closed(self, connection):
- """Connection close completed."""
- self.closed = True # main loop will destroy
-
- def connection_failed(self, connection, error):
- """Connection failure detected."""
- self.connection_closed(connection)
-
- def sender_requested(self, connection, link_handle,
- name, requested_source, properties):
- """Create a new message source."""
- addr = requested_source or "source-" + uuid.uuid4().hex
- link = FakeBroker.SenderLink(self.server, self,
- link_handle, addr)
- self.sender_links.add(link)
-
- def receiver_requested(self, connection, link_handle,
- name, requested_target, properties):
- """Create a new message consumer."""
- addr = requested_target or "target-" + uuid.uuid4().hex
- FakeBroker.ReceiverLink(self.server, self,
- link_handle, addr)
-
- def sasl_step(self, connection, pn_sasl):
- if self.sasl_mechanisms == 'PLAIN':
- credentials = pn_sasl.recv()
- if not credentials:
- return # wait until some arrives
- if credentials not in self.user_credentials:
- # failed
- return pn_sasl.done(pn_sasl.AUTH)
- pn_sasl.done(pn_sasl.OK)
-
- class SenderLink(pyngus.SenderEventHandler):
- """An AMQP sending link."""
- def __init__(self, server, conn, handle, src_addr=None):
- self.server = server
- cnn = conn.connection
- self.link = cnn.accept_sender(handle,
- source_override=src_addr,
- event_handler=self)
- self.link.open()
- self.routed = False
-
- def destroy(self):
- """Destroy the link."""
- self._cleanup()
- if self.link:
- self.link.destroy()
- self.link = None
-
- def send_message(self, message):
- """Send a message over this link."""
- self.link.send(message)
-
- def _cleanup(self):
- if self.routed:
- self.server.remove_route(self.link.source_address,
- self)
- self.routed = False
-
- # Pyngus SenderEventHandler callbacks:
-
- def sender_active(self, sender_link):
- self.server.add_route(self.link.source_address, self)
- self.routed = True
-
- def sender_remote_closed(self, sender_link, error):
- self._cleanup()
- self.link.close()
-
- def sender_closed(self, sender_link):
- self.destroy()
-
- class ReceiverLink(pyngus.ReceiverEventHandler):
- """An AMQP Receiving link."""
- def __init__(self, server, conn, handle, addr=None):
- self.server = server
- cnn = conn.connection
- self.link = cnn.accept_receiver(handle,
- target_override=addr,
- event_handler=self)
- self.link.open()
- self.link.add_capacity(10)
-
- # ReceiverEventHandler callbacks:
-
- def receiver_remote_closed(self, receiver_link, error):
- self.link.close()
-
- def receiver_closed(self, receiver_link):
- self.link.destroy()
- self.link = None
-
- def message_received(self, receiver_link, message, handle):
- """Forward this message out the proper sending link."""
- if self.server.forward_message(message):
- self.link.message_accepted(handle)
- else:
- self.link.message_rejected(handle)
-
- if self.link.capacity < 1:
- self.link.add_capacity(10)
-
- def __init__(self, server_prefix="exclusive",
- broadcast_prefix="broadcast",
- group_prefix="unicast",
- address_separator=".",
- sock_addr="", sock_port=0,
- sasl_mechanisms="ANONYMOUS",
- user_credentials=None):
- """Create a fake broker listening on sock_addr:sock_port."""
- if not pyngus:
- raise AssertionError("pyngus module not present")
- threading.Thread.__init__(self)
- self._server_prefix = server_prefix + address_separator
- self._broadcast_prefix = broadcast_prefix + address_separator
- self._group_prefix = group_prefix + address_separator
- self._address_separator = address_separator
- self._sasl_mechanisms = sasl_mechanisms
- self._user_credentials = user_credentials
- self._wakeup_pipe = os.pipe()
- self._my_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._my_socket.bind((sock_addr, sock_port))
- self.host, self.port = self._my_socket.getsockname()
- self.container = pyngus.Container("test_server_%s:%d"
- % (self.host, self.port))
- self._connections = {}
- self._sources = {}
- # count of messages forwarded, by messaging pattern
- self.direct_count = 0
- self.topic_count = 0
- self.fanout_count = 0
- self.dropped_count = 0
-
- def start(self):
- """Start the server."""
- LOG.debug("Starting Test Broker on %s:%d", self.host, self.port)
- self._shutdown = False
- self.daemon = True
- self._my_socket.listen(10)
- super(FakeBroker, self).start()
-
- def stop(self):
- """Shutdown the server."""
- LOG.debug("Stopping test Broker %s:%d", self.host, self.port)
- self._shutdown = True
- os.write(self._wakeup_pipe[1], "!")
- self.join()
- LOG.debug("Test Broker %s:%d stopped", self.host, self.port)
-
- def run(self):
- """Process I/O and timer events until the broker is stopped."""
- LOG.debug("Test Broker on %s:%d started", self.host, self.port)
- while not self._shutdown:
- readers, writers, timers = self.container.need_processing()
-
- # map pyngus Connections back to _TestConnections:
- readfd = [c.user_context for c in readers]
- readfd.extend([self._my_socket, self._wakeup_pipe[0]])
- writefd = [c.user_context for c in writers]
-
- timeout = None
- if timers:
- # [0] == next expiring timer
- deadline = timers[0].next_tick
- now = time.time()
- timeout = 0 if deadline <= now else deadline - now
-
- readable, writable, ignore = select.select(readfd,
- writefd,
- [],
- timeout)
- worked = set()
- for r in readable:
- if r is self._my_socket:
- # new inbound connection request received,
- # create a new Connection for it:
- client_socket, client_address = self._my_socket.accept()
- name = str(client_address)
- conn = FakeBroker.Connection(self, client_socket, name,
- self._sasl_mechanisms,
- self._user_credentials)
- self._connections[conn.name] = conn
- elif r is self._wakeup_pipe[0]:
- os.read(self._wakeup_pipe[0], 512)
- else:
- r.process_input()
- worked.add(r)
-
- for t in timers:
- now = time.time()
- if t.next_tick > now:
- break
- t.process(now)
- conn = t.user_context
- worked.add(conn)
-
- for w in writable:
- w.send_output()
- worked.add(w)
-
- # clean up any closed connections:
- while worked:
- conn = worked.pop()
- if conn.closed:
- del self._connections[conn.name]
- conn.destroy()
-
- # Shutting down
- self._my_socket.close()
- for conn in self._connections.itervalues():
- conn.destroy()
- return 0
-
- def add_route(self, address, link):
- # route from address -> link[, link ...]
- if address not in self._sources:
- self._sources[address] = [link]
- elif link not in self._sources[address]:
- self._sources[address].append(link)
-
- def remove_route(self, address, link):
- if address in self._sources:
- if link in self._sources[address]:
- self._sources[address].remove(link)
- if not self._sources[address]:
- del self._sources[address]
-
- def forward_message(self, message):
- # returns True if message was routed
- dest = message.address
- if dest not in self._sources:
- self.dropped_count += 1
- return False
- LOG.debug("Forwarding [%s]", dest)
- # route "behavior" determined by prefix:
- if dest.startswith(self._broadcast_prefix):
- self.fanout_count += 1
- for link in self._sources[dest]:
- LOG.debug("Broadcast to %s", dest)
- link.send_message(message)
- elif dest.startswith(self._group_prefix):
- # round-robin:
- self.topic_count += 1
- link = self._sources[dest].pop(0)
- link.send_message(message)
- LOG.debug("Send to %s", dest)
- self._sources[dest].append(link)
- else:
- # unicast:
- self.direct_count += 1
- LOG.debug("Unicast to %s", dest)
- self._sources[dest][0].send_message(message)
- return True
diff --git a/tests/test_exception_serialization.py b/tests/test_exception_serialization.py
deleted file mode 100644
index baa2b79..0000000
--- a/tests/test_exception_serialization.py
+++ /dev/null
@@ -1,308 +0,0 @@
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import sys
-
-import six
-import testscenarios
-
-from oslo import messaging
-
-from oslo_messaging._drivers import common as exceptions
-from oslo_messaging.tests import utils as test_utils
-from oslo_serialization import jsonutils
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
-
-
-class NovaStyleException(Exception):
-
- format = 'I am Nova'
-
- def __init__(self, message=None, **kwargs):
- self.kwargs = kwargs
- if not message:
- message = self.format % kwargs
- super(NovaStyleException, self).__init__(message)
-
-
-class KwargsStyleException(NovaStyleException):
-
- format = 'I am %(who)s'
-
-
-def add_remote_postfix(ex):
- ex_type = type(ex)
- message = str(ex)
- str_override = lambda self: message
- new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
- {'__str__': str_override,
- '__unicode__': str_override})
- new_ex_type.__module__ = '%s_Remote' % ex.__class__.__module__
- try:
- ex.__class__ = new_ex_type
- except TypeError:
- ex.args = (message,) + ex.args[1:]
- return ex
-
-
-class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase):
-
- _log_failure = [
- ('log_failure', dict(log_failure=True)),
- ('do_not_log_failure', dict(log_failure=False)),
- ]
-
- _add_remote = [
- ('add_remote', dict(add_remote=True)),
- ('do_not_add_remote', dict(add_remote=False)),
- ]
-
- _exception_types = [
- ('bog_standard', dict(cls=Exception,
- args=['test'],
- kwargs={},
- clsname='Exception',
- modname=EXCEPTIONS_MODULE,
- msg='test')),
- ('nova_style', dict(cls=NovaStyleException,
- args=[],
- kwargs={},
- clsname='NovaStyleException',
- modname=__name__,
- msg='I am Nova')),
- ('nova_style_with_msg', dict(cls=NovaStyleException,
- args=['testing'],
- kwargs={},
- clsname='NovaStyleException',
- modname=__name__,
- msg='testing')),
- ('kwargs_style', dict(cls=KwargsStyleException,
- args=[],
- kwargs={'who': 'Oslo'},
- clsname='KwargsStyleException',
- modname=__name__,
- msg='I am Oslo')),
- ]
-
- @classmethod
- def generate_scenarios(cls):
- cls.scenarios = testscenarios.multiply_scenarios(cls._log_failure,
- cls._add_remote,
- cls._exception_types)
-
- def setUp(self):
- super(SerializeRemoteExceptionTestCase, self).setUp()
-
- def test_serialize_remote_exception(self):
- errors = []
-
- def stub_error(msg, *a, **kw):
- if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
- a = a[0]
- errors.append(str(msg) % a)
-
- self.stubs.Set(exceptions.LOG, 'error', stub_error)
-
- try:
- try:
- raise self.cls(*self.args, **self.kwargs)
- except Exception as ex:
- cls_error = ex
- if self.add_remote:
- ex = add_remote_postfix(ex)
- raise ex
- except Exception:
- exc_info = sys.exc_info()
-
- serialized = exceptions.serialize_remote_exception(
- exc_info, log_failure=self.log_failure)
-
- failure = jsonutils.loads(serialized)
-
- self.assertEqual(self.clsname, failure['class'], failure)
- self.assertEqual(self.modname, failure['module'])
- self.assertEqual(self.msg, failure['message'])
- self.assertEqual([self.msg], failure['args'])
- self.assertEqual(self.kwargs, failure['kwargs'])
-
- # Note: _Remote prefix not stripped from tracebacks
- tb = cls_error.__class__.__name__ + ': ' + self.msg
- self.assertIn(tb, ''.join(failure['tb']))
-
- if self.log_failure:
- self.assertTrue(len(errors) > 0, errors)
- else:
- self.assertEqual(0, len(errors), errors)
-
-
-SerializeRemoteExceptionTestCase.generate_scenarios()
-
-
-class DeserializeRemoteExceptionTestCase(test_utils.BaseTestCase):
-
- _standard_allowed = [__name__]
-
- scenarios = [
- ('bog_standard',
- dict(allowed=_standard_allowed,
- clsname='Exception',
- modname=EXCEPTIONS_MODULE,
- cls=Exception,
- args=['test'],
- kwargs={},
- str='test\ntraceback\ntraceback\n',
- remote_name='Exception',
- remote_args=('test\ntraceback\ntraceback\n', ),
- remote_kwargs={})),
- ('nova_style',
- dict(allowed=_standard_allowed,
- clsname='NovaStyleException',
- modname=__name__,
- cls=NovaStyleException,
- args=[],
- kwargs={},
- str='test\ntraceback\ntraceback\n',
- remote_name='NovaStyleException_Remote',
- remote_args=('I am Nova', ),
- remote_kwargs={})),
- ('nova_style_with_msg',
- dict(allowed=_standard_allowed,
- clsname='NovaStyleException',
- modname=__name__,
- cls=NovaStyleException,
- args=['testing'],
- kwargs={},
- str='test\ntraceback\ntraceback\n',
- remote_name='NovaStyleException_Remote',
- remote_args=('testing', ),
- remote_kwargs={})),
- ('kwargs_style',
- dict(allowed=_standard_allowed,
- clsname='KwargsStyleException',
- modname=__name__,
- cls=KwargsStyleException,
- args=[],
- kwargs={'who': 'Oslo'},
- str='test\ntraceback\ntraceback\n',
- remote_name='KwargsStyleException_Remote',
- remote_args=('I am Oslo', ),
- remote_kwargs={})),
- ('not_allowed',
- dict(allowed=[],
- clsname='NovaStyleException',
- modname=__name__,
- cls=messaging.RemoteError,
- args=[],
- kwargs={},
- str=("Remote error: NovaStyleException test\n"
- "[%r]." % u'traceback\ntraceback\n'),
- msg=("Remote error: NovaStyleException test\n"
- "[%r]." % u'traceback\ntraceback\n'),
- remote_name='RemoteError',
- remote_args=(),
- remote_kwargs={'exc_type': 'NovaStyleException',
- 'value': 'test',
- 'traceback': 'traceback\ntraceback\n'})),
- ('unknown_module',
- dict(allowed=['notexist'],
- clsname='Exception',
- modname='notexist',
- cls=messaging.RemoteError,
- args=[],
- kwargs={},
- str=("Remote error: Exception test\n"
- "[%r]." % u'traceback\ntraceback\n'),
- msg=("Remote error: Exception test\n"
- "[%r]." % u'traceback\ntraceback\n'),
- remote_name='RemoteError',
- remote_args=(),
- remote_kwargs={'exc_type': 'Exception',
- 'value': 'test',
- 'traceback': 'traceback\ntraceback\n'})),
- ('unknown_exception',
- dict(allowed=[],
- clsname='FarcicalError',
- modname=EXCEPTIONS_MODULE,
- cls=messaging.RemoteError,
- args=[],
- kwargs={},
- str=("Remote error: FarcicalError test\n"
- "[%r]." % u'traceback\ntraceback\n'),
- msg=("Remote error: FarcicalError test\n"
- "[%r]." % u'traceback\ntraceback\n'),
- remote_name='RemoteError',
- remote_args=(),
- remote_kwargs={'exc_type': 'FarcicalError',
- 'value': 'test',
- 'traceback': 'traceback\ntraceback\n'})),
- ('unknown_kwarg',
- dict(allowed=[],
- clsname='Exception',
- modname=EXCEPTIONS_MODULE,
- cls=messaging.RemoteError,
- args=[],
- kwargs={'foobar': 'blaa'},
- str=("Remote error: Exception test\n"
- "[%r]." % u'traceback\ntraceback\n'),
- msg=("Remote error: Exception test\n"
- "[%r]." % u'traceback\ntraceback\n'),
- remote_name='RemoteError',
- remote_args=(),
- remote_kwargs={'exc_type': 'Exception',
- 'value': 'test',
- 'traceback': 'traceback\ntraceback\n'})),
- ('system_exit',
- dict(allowed=[],
- clsname='SystemExit',
- modname=EXCEPTIONS_MODULE,
- cls=messaging.RemoteError,
- args=[],
- kwargs={},
- str=("Remote error: SystemExit test\n"
- "[%r]." % u'traceback\ntraceback\n'),
- msg=("Remote error: SystemExit test\n"
- "[%r]." % u'traceback\ntraceback\n'),
- remote_name='RemoteError',
- remote_args=(),
- remote_kwargs={'exc_type': 'SystemExit',
- 'value': 'test',
- 'traceback': 'traceback\ntraceback\n'})),
- ]
-
- def test_deserialize_remote_exception(self):
- failure = {
- 'class': self.clsname,
- 'module': self.modname,
- 'message': 'test',
- 'tb': ['traceback\ntraceback\n'],
- 'args': self.args,
- 'kwargs': self.kwargs,
- }
-
- serialized = jsonutils.dumps(failure)
-
- ex = exceptions.deserialize_remote_exception(serialized, self.allowed)
-
- self.assertIsInstance(ex, self.cls)
- self.assertEqual(self.remote_name, ex.__class__.__name__)
- self.assertEqual(self.str, six.text_type(ex))
- if hasattr(self, 'msg'):
- self.assertEqual(self.msg, six.text_type(ex))
- self.assertEqual((self.msg,) + self.remote_args, ex.args)
- else:
- self.assertEqual(self.remote_args, ex.args)
diff --git a/tests/test_expected_exceptions.py b/tests/test_expected_exceptions.py
deleted file mode 100644
index 702f3a2..0000000
--- a/tests/test_expected_exceptions.py
+++ /dev/null
@@ -1,66 +0,0 @@
-
-# Copyright 2012 OpenStack Foundation
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo import messaging
-from oslo_messaging.tests import utils as test_utils
-
-
-class TestExpectedExceptions(test_utils.BaseTestCase):
-
- def test_exception(self):
- e = None
- try:
- try:
- raise ValueError()
- except Exception:
- raise messaging.ExpectedException()
- except messaging.ExpectedException as e:
- self.assertIsInstance(e, messaging.ExpectedException)
- self.assertTrue(hasattr(e, 'exc_info'))
- self.assertIsInstance(e.exc_info[1], ValueError)
-
- def test_decorator_expected(self):
- class FooException(Exception):
- pass
-
- @messaging.expected_exceptions(FooException)
- def naughty():
- raise FooException()
-
- self.assertRaises(messaging.ExpectedException, naughty)
-
- def test_decorator_expected_subclass(self):
- class FooException(Exception):
- pass
-
- class BarException(FooException):
- pass
-
- @messaging.expected_exceptions(FooException)
- def naughty():
- raise BarException()
-
- self.assertRaises(messaging.ExpectedException, naughty)
-
- def test_decorator_unexpected(self):
- class FooException(Exception):
- pass
-
- @messaging.expected_exceptions(FooException)
- def really_naughty():
- raise ValueError()
-
- self.assertRaises(ValueError, really_naughty)
diff --git a/tests/test_target.py b/tests/test_target.py
deleted file mode 100644
index 68f98f4..0000000
--- a/tests/test_target.py
+++ /dev/null
@@ -1,177 +0,0 @@
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import testscenarios
-
-from oslo import messaging
-from oslo_messaging.tests import utils as test_utils
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class TargetConstructorTestCase(test_utils.BaseTestCase):
-
- scenarios = [
- ('all_none', dict(kwargs=dict())),
- ('exchange', dict(kwargs=dict(exchange='testexchange'))),
- ('topic', dict(kwargs=dict(topic='testtopic'))),
- ('namespace', dict(kwargs=dict(namespace='testnamespace'))),
- ('version', dict(kwargs=dict(version='3.4'))),
- ('server', dict(kwargs=dict(server='testserver'))),
- ('fanout', dict(kwargs=dict(fanout=True))),
- ]
-
- def test_constructor(self):
- target = messaging.Target(**self.kwargs)
- for k in self.kwargs:
- self.assertEqual(self.kwargs[k], getattr(target, k))
- for k in ['exchange', 'topic', 'namespace',
- 'version', 'server', 'fanout']:
- if k in self.kwargs:
- continue
- self.assertIsNone(getattr(target, k))
-
-
-class TargetCallableTestCase(test_utils.BaseTestCase):
-
- scenarios = [
- ('all_none', dict(attrs=dict(), kwargs=dict(), vals=dict())),
- ('exchange_attr', dict(attrs=dict(exchange='testexchange'),
- kwargs=dict(),
- vals=dict(exchange='testexchange'))),
- ('exchange_arg', dict(attrs=dict(),
- kwargs=dict(exchange='testexchange'),
- vals=dict(exchange='testexchange'))),
- ('topic_attr', dict(attrs=dict(topic='testtopic'),
- kwargs=dict(),
- vals=dict(topic='testtopic'))),
- ('topic_arg', dict(attrs=dict(),
- kwargs=dict(topic='testtopic'),
- vals=dict(topic='testtopic'))),
- ('namespace_attr', dict(attrs=dict(namespace='testnamespace'),
- kwargs=dict(),
- vals=dict(namespace='testnamespace'))),
- ('namespace_arg', dict(attrs=dict(),
- kwargs=dict(namespace='testnamespace'),
- vals=dict(namespace='testnamespace'))),
- ('version_attr', dict(attrs=dict(version='3.4'),
- kwargs=dict(),
- vals=dict(version='3.4'))),
- ('version_arg', dict(attrs=dict(),
- kwargs=dict(version='3.4'),
- vals=dict(version='3.4'))),
- ('server_attr', dict(attrs=dict(server='testserver'),
- kwargs=dict(),
- vals=dict(server='testserver'))),
- ('server_arg', dict(attrs=dict(),
- kwargs=dict(server='testserver'),
- vals=dict(server='testserver'))),
- ('fanout_attr', dict(attrs=dict(fanout=True),
- kwargs=dict(),
- vals=dict(fanout=True))),
- ('fanout_arg', dict(attrs=dict(),
- kwargs=dict(fanout=True),
- vals=dict(fanout=True))),
- ]
-
- def test_callable(self):
- target = messaging.Target(**self.attrs)
- target = target(**self.kwargs)
- for k in self.vals:
- self.assertEqual(self.vals[k], getattr(target, k))
- for k in ['exchange', 'topic', 'namespace',
- 'version', 'server', 'fanout']:
- if k in self.vals:
- continue
- self.assertIsNone(getattr(target, k))
-
-
-class TargetReprTestCase(test_utils.BaseTestCase):
-
- scenarios = [
- ('all_none', dict(kwargs=dict(), repr='')),
- ('exchange', dict(kwargs=dict(exchange='testexchange'),
- repr='exchange=testexchange')),
- ('topic', dict(kwargs=dict(topic='testtopic'),
- repr='topic=testtopic')),
- ('namespace', dict(kwargs=dict(namespace='testnamespace'),
- repr='namespace=testnamespace')),
- ('version', dict(kwargs=dict(version='3.4'),
- repr='version=3.4')),
- ('server', dict(kwargs=dict(server='testserver'),
- repr='server=testserver')),
- ('fanout', dict(kwargs=dict(fanout=True),
- repr='fanout=True')),
- ('exchange_and_fanout', dict(kwargs=dict(exchange='testexchange',
- fanout=True),
- repr='exchange=testexchange, '
- 'fanout=True')),
- ]
-
- def test_repr(self):
- target = messaging.Target(**self.kwargs)
- self.assertEqual('<Target ' + self.repr + '>', str(target))
-
-
-_notset = object()
-
-
-class EqualityTestCase(test_utils.BaseTestCase):
-
- @classmethod
- def generate_scenarios(cls):
- attr = [
- ('exchange', dict(attr='exchange')),
- ('topic', dict(attr='topic')),
- ('namespace', dict(attr='namespace')),
- ('version', dict(attr='version')),
- ('server', dict(attr='server')),
- ('fanout', dict(attr='fanout')),
- ]
- a = [
- ('a_notset', dict(a_value=_notset)),
- ('a_none', dict(a_value=None)),
- ('a_empty', dict(a_value='')),
- ('a_foo', dict(a_value='foo')),
- ('a_bar', dict(a_value='bar')),
- ]
- b = [
- ('b_notset', dict(b_value=_notset)),
- ('b_none', dict(b_value=None)),
- ('b_empty', dict(b_value='')),
- ('b_foo', dict(b_value='foo')),
- ('b_bar', dict(b_value='bar')),
- ]
-
- cls.scenarios = testscenarios.multiply_scenarios(attr, a, b)
- for s in cls.scenarios:
- s[1]['equals'] = (s[1]['a_value'] == s[1]['b_value'])
-
- def test_equality(self):
- a_kwargs = {self.attr: self.a_value}
- b_kwargs = {self.attr: self.b_value}
-
- a = messaging.Target(**a_kwargs)
- b = messaging.Target(**b_kwargs)
-
- if self.equals:
- self.assertEqual(a, b)
- self.assertFalse(a != b)
- else:
- self.assertNotEqual(a, b)
- self.assertFalse(a == b)
-
-
-EqualityTestCase.generate_scenarios()
diff --git a/tests/test_transport.py b/tests/test_transport.py
deleted file mode 100644
index a3b5b91..0000000
--- a/tests/test_transport.py
+++ /dev/null
@@ -1,367 +0,0 @@
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import fixtures
-from mox3 import mox
-import six
-from stevedore import driver
-import testscenarios
-
-from oslo import messaging
-from oslo.messaging import transport
-from oslo_config import cfg
-from oslo_messaging.tests import utils as test_utils
-from oslo_messaging import transport as private_transport
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class _FakeDriver(object):
-
- def __init__(self, conf):
- self.conf = conf
-
- def send(self, *args, **kwargs):
- pass
-
- def send_notification(self, *args, **kwargs):
- pass
-
- def listen(self, target):
- pass
-
-
-class _FakeManager(object):
-
- def __init__(self, driver):
- self.driver = driver
-
-
-class GetTransportTestCase(test_utils.BaseTestCase):
-
- scenarios = [
- ('rpc_backend',
- dict(url=None, transport_url=None, rpc_backend='testbackend',
- control_exchange=None, allowed=None, aliases=None,
- expect=dict(backend='testbackend',
- exchange=None,
- url='testbackend:',
- allowed=[]))),
- ('transport_url',
- dict(url=None, transport_url='testtransport:', rpc_backend=None,
- control_exchange=None, allowed=None, aliases=None,
- expect=dict(backend='testtransport',
- exchange=None,
- url='testtransport:',
- allowed=[]))),
- ('url_param',
- dict(url='testtransport:', transport_url=None, rpc_backend=None,
- control_exchange=None, allowed=None, aliases=None,
- expect=dict(backend='testtransport',
- exchange=None,
- url='testtransport:',
- allowed=[]))),
- ('control_exchange',
- dict(url=None, transport_url=None, rpc_backend='testbackend',
- control_exchange='testexchange', allowed=None, aliases=None,
- expect=dict(backend='testbackend',
- exchange='testexchange',
- url='testbackend:',
- allowed=[]))),
- ('allowed_remote_exmods',
- dict(url=None, transport_url=None, rpc_backend='testbackend',
- control_exchange=None, allowed=['foo', 'bar'], aliases=None,
- expect=dict(backend='testbackend',
- exchange=None,
- url='testbackend:',
- allowed=['foo', 'bar']))),
- ('rpc_backend_aliased',
- dict(url=None, transport_url=None, rpc_backend='testfoo',
- control_exchange=None, allowed=None,
- aliases=dict(testfoo='testbackend'),
- expect=dict(backend='testbackend',
- exchange=None,
- url='testbackend:',
- allowed=[]))),
- ('transport_url_aliased',
- dict(url=None, transport_url='testfoo:', rpc_backend=None,
- control_exchange=None, allowed=None,
- aliases=dict(testfoo='testtransport'),
- expect=dict(backend='testtransport',
- exchange=None,
- url='testtransport:',
- allowed=[]))),
- ('url_param_aliased',
- dict(url='testfoo:', transport_url=None, rpc_backend=None,
- control_exchange=None, allowed=None,
- aliases=dict(testfoo='testtransport'),
- expect=dict(backend='testtransport',
- exchange=None,
- url='testtransport:',
- allowed=[]))),
- ]
-
- def test_get_transport(self):
- self.config(rpc_backend=self.rpc_backend,
- control_exchange=self.control_exchange,
- transport_url=self.transport_url)
-
- self.mox.StubOutWithMock(driver, 'DriverManager')
-
- invoke_args = [self.conf,
- messaging.TransportURL.parse(self.conf,
- self.expect['url'])]
- invoke_kwds = dict(default_exchange=self.expect['exchange'],
- allowed_remote_exmods=self.expect['allowed'])
-
- drvr = _FakeDriver(self.conf)
- driver.DriverManager('oslo.messaging.drivers',
- self.expect['backend'],
- invoke_on_load=True,
- invoke_args=invoke_args,
- invoke_kwds=invoke_kwds).\
- AndReturn(_FakeManager(drvr))
-
- self.mox.ReplayAll()
-
- kwargs = dict(url=self.url)
- if self.allowed is not None:
- kwargs['allowed_remote_exmods'] = self.allowed
- if self.aliases is not None:
- kwargs['aliases'] = self.aliases
- transport_ = messaging.get_transport(self.conf, **kwargs)
-
- self.assertIsNotNone(transport_)
- self.assertIs(transport_.conf, self.conf)
- self.assertIs(transport_._driver, drvr)
-
-
-class GetTransportSadPathTestCase(test_utils.BaseTestCase):
-
- scenarios = [
- ('invalid_transport_url',
- dict(url=None, transport_url='invalid', rpc_backend=None,
- ex=dict(cls=messaging.InvalidTransportURL,
- msg_contains='No scheme specified',
- url='invalid'))),
- ('invalid_url_param',
- dict(url='invalid', transport_url=None, rpc_backend=None,
- ex=dict(cls=messaging.InvalidTransportURL,
- msg_contains='No scheme specified',
- url='invalid'))),
- ('driver_load_failure',
- dict(url=None, transport_url=None, rpc_backend='testbackend',
- ex=dict(cls=messaging.DriverLoadFailure,
- msg_contains='Failed to load',
- driver='testbackend'))),
- ]
-
- def test_get_transport_sad(self):
- self.config(rpc_backend=self.rpc_backend,
- transport_url=self.transport_url)
-
- if self.rpc_backend:
- self.mox.StubOutWithMock(driver, 'DriverManager')
-
- invoke_args = [self.conf,
- messaging.TransportURL.parse(self.conf,
- self.url)]
- invoke_kwds = dict(default_exchange='openstack',
- allowed_remote_exmods=[])
-
- driver.DriverManager('oslo.messaging.drivers',
- self.rpc_backend,
- invoke_on_load=True,
- invoke_args=invoke_args,
- invoke_kwds=invoke_kwds).\
- AndRaise(RuntimeError())
-
- self.mox.ReplayAll()
-
- try:
- messaging.get_transport(self.conf, url=self.url)
- self.assertFalse(True)
- except Exception as ex:
- ex_cls = self.ex.pop('cls')
- ex_msg_contains = self.ex.pop('msg_contains')
-
- self.assertIsInstance(ex, messaging.MessagingException)
- self.assertIsInstance(ex, ex_cls)
- self.assertIn(ex_msg_contains, six.text_type(ex))
-
- for k, v in self.ex.items():
- self.assertTrue(hasattr(ex, k))
- self.assertEqual(v, str(getattr(ex, k)))
-
-
-# FIXME(markmc): this could be used elsewhere
-class _SetDefaultsFixture(fixtures.Fixture):
-
- def __init__(self, set_defaults, opts, *names):
- super(_SetDefaultsFixture, self).__init__()
- self.set_defaults = set_defaults
- self.opts = opts
- self.names = names
-
- def setUp(self):
- super(_SetDefaultsFixture, self).setUp()
-
- # FIXME(markmc): this comes from Id5c1f3ba
- def first(seq, default=None, key=None):
- if key is None:
- key = bool
- return next(six.moves.filter(key, seq), default)
-
- def default(opts, name):
- return first(opts, key=lambda o: o.name == name).default
-
- orig_defaults = {}
- for n in self.names:
- orig_defaults[n] = default(self.opts, n)
-
- def restore_defaults():
- self.set_defaults(**orig_defaults)
-
- self.addCleanup(restore_defaults)
-
-
-class TestSetDefaults(test_utils.BaseTestCase):
-
- def setUp(self):
- super(TestSetDefaults, self).setUp(conf=cfg.ConfigOpts())
- self.useFixture(_SetDefaultsFixture(messaging.set_transport_defaults,
- private_transport._transport_opts,
- 'control_exchange'))
-
- def test_set_default_control_exchange(self):
- messaging.set_transport_defaults(control_exchange='foo')
-
- self.mox.StubOutWithMock(driver, 'DriverManager')
- invoke_kwds = mox.ContainsKeyValue('default_exchange', 'foo')
- driver.DriverManager(mox.IgnoreArg(),
- mox.IgnoreArg(),
- invoke_on_load=mox.IgnoreArg(),
- invoke_args=mox.IgnoreArg(),
- invoke_kwds=invoke_kwds).\
- AndReturn(_FakeManager(_FakeDriver(self.conf)))
- self.mox.ReplayAll()
-
- messaging.get_transport(self.conf)
-
-
-class TestTransportMethodArgs(test_utils.BaseTestCase):
-
- _target = messaging.Target(topic='topic', server='server')
-
- def test_send_defaults(self):
- t = transport.Transport(_FakeDriver(cfg.CONF))
-
- self.mox.StubOutWithMock(t._driver, 'send')
- t._driver.send(self._target, 'ctxt', 'message',
- wait_for_reply=None,
- timeout=None, retry=None)
- self.mox.ReplayAll()
-
- t._send(self._target, 'ctxt', 'message')
-
- def test_send_all_args(self):
- t = transport.Transport(_FakeDriver(cfg.CONF))
-
- self.mox.StubOutWithMock(t._driver, 'send')
- t._driver.send(self._target, 'ctxt', 'message',
- wait_for_reply='wait_for_reply',
- timeout='timeout', retry='retry')
- self.mox.ReplayAll()
-
- t._send(self._target, 'ctxt', 'message',
- wait_for_reply='wait_for_reply',
- timeout='timeout', retry='retry')
-
- def test_send_notification(self):
- t = transport.Transport(_FakeDriver(cfg.CONF))
-
- self.mox.StubOutWithMock(t._driver, 'send_notification')
- t._driver.send_notification(self._target, 'ctxt', 'message', 1.0,
- retry=None)
- self.mox.ReplayAll()
-
- t._send_notification(self._target, 'ctxt', 'message', version=1.0)
-
- def test_send_notification_all_args(self):
- t = transport.Transport(_FakeDriver(cfg.CONF))
-
- self.mox.StubOutWithMock(t._driver, 'send_notification')
- t._driver.send_notification(self._target, 'ctxt', 'message', 1.0,
- retry=5)
- self.mox.ReplayAll()
-
- t._send_notification(self._target, 'ctxt', 'message', version=1.0,
- retry=5)
-
- def test_listen(self):
- t = transport.Transport(_FakeDriver(cfg.CONF))
-
- self.mox.StubOutWithMock(t._driver, 'listen')
- t._driver.listen(self._target)
- self.mox.ReplayAll()
-
- t._listen(self._target)
-
-
-class TestTransportUrlCustomisation(test_utils.BaseTestCase):
- def setUp(self):
- super(TestTransportUrlCustomisation, self).setUp()
- self.url1 = transport.TransportURL.parse(self.conf, "fake://vhost1")
- self.url2 = transport.TransportURL.parse(self.conf, "fake://vhost2")
- self.url3 = transport.TransportURL.parse(self.conf, "fake://vhost1")
-
- def test_hash(self):
- urls = {}
- urls[self.url1] = self.url1
- urls[self.url2] = self.url2
- urls[self.url3] = self.url3
- self.assertEqual(2, len(urls))
-
- def test_eq(self):
- self.assertEqual(self.url1, self.url3)
- self.assertNotEqual(self.url1, self.url2)
-
-
-class TestTransportHostCustomisation(test_utils.BaseTestCase):
- def setUp(self):
- super(TestTransportHostCustomisation, self).setUp()
- self.host1 = transport.TransportHost("host1", 5662, "user", "pass")
- self.host2 = transport.TransportHost("host1", 5662, "user", "pass")
- self.host3 = transport.TransportHost("host1", 5663, "user", "pass")
- self.host4 = transport.TransportHost("host1", 5662, "user2", "pass")
- self.host5 = transport.TransportHost("host1", 5662, "user", "pass2")
- self.host6 = transport.TransportHost("host2", 5662, "user", "pass")
-
- def test_hash(self):
- hosts = {}
- hosts[self.host1] = self.host1
- hosts[self.host2] = self.host2
- hosts[self.host3] = self.host3
- hosts[self.host4] = self.host4
- hosts[self.host5] = self.host5
- hosts[self.host6] = self.host6
- self.assertEqual(5, len(hosts))
-
- def test_eq(self):
- self.assertEqual(self.host1, self.host2)
- self.assertNotEqual(self.host1, self.host3)
- self.assertNotEqual(self.host1, self.host4)
- self.assertNotEqual(self.host1, self.host5)
- self.assertNotEqual(self.host1, self.host6)
diff --git a/tests/test_urls.py b/tests/test_urls.py
deleted file mode 100644
index 9562742..0000000
--- a/tests/test_urls.py
+++ /dev/null
@@ -1,236 +0,0 @@
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import testscenarios
-
-from oslo import messaging
-from oslo_messaging.tests import utils as test_utils
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class TestParseURL(test_utils.BaseTestCase):
-
- scenarios = [
- ('transport',
- dict(url='foo:', aliases=None,
- expect=dict(transport='foo'))),
- ('transport_aliased',
- dict(url='bar:', aliases=dict(bar='foo'),
- expect=dict(transport='foo'))),
- ('virtual_host_slash',
- dict(url='foo:////', aliases=None,
- expect=dict(transport='foo', virtual_host='/'))),
- ('virtual_host',
- dict(url='foo:///bar', aliases=None,
- expect=dict(transport='foo', virtual_host='bar'))),
- ('host',
- dict(url='foo://host/bar', aliases=None,
- expect=dict(transport='foo',
- virtual_host='bar',
- hosts=[
- dict(host='host'),
- ]))),
- ('ipv6_host',
- dict(url='foo://[ffff::1]/bar', aliases=None,
- expect=dict(transport='foo',
- virtual_host='bar',
- hosts=[
- dict(host='ffff::1'),
- ]))),
- ('port',
- dict(url='foo://host:1234/bar', aliases=None,
- expect=dict(transport='foo',
- virtual_host='bar',
- hosts=[
- dict(host='host', port=1234),
- ]))),
- ('ipv6_port',
- dict(url='foo://[ffff::1]:1234/bar', aliases=None,
- expect=dict(transport='foo',
- virtual_host='bar',
- hosts=[
- dict(host='ffff::1', port=1234),
- ]))),
- ('username',
- dict(url='foo://u@host:1234/bar', aliases=None,
- expect=dict(transport='foo',
- virtual_host='bar',
- hosts=[
- dict(host='host', port=1234, username='u'),
- ]))),
- ('password',
- dict(url='foo://u:p@host:1234/bar', aliases=None,
- expect=dict(transport='foo',
- virtual_host='bar',
- hosts=[
- dict(host='host', port=1234,
- username='u', password='p'),
- ]))),
- ('creds_no_host',
- dict(url='foo://u:p@/bar', aliases=None,
- expect=dict(transport='foo',
- virtual_host='bar',
- hosts=[
- dict(username='u', password='p'),
- ]))),
- ('multi_host',
- dict(url='foo://u:p@host1:1234,host2:4321/bar', aliases=None,
- expect=dict(transport='foo',
- virtual_host='bar',
- hosts=[
- dict(host='host1', port=1234,
- username='u', password='p'),
- dict(host='host2', port=4321),
- ]))),
- ('multi_creds',
- dict(url='foo://u1:p1@host1:1234,u2:p2@host2:4321/bar', aliases=None,
- expect=dict(transport='foo',
- virtual_host='bar',
- hosts=[
- dict(host='host1', port=1234,
- username='u1', password='p1'),
- dict(host='host2', port=4321,
- username='u2', password='p2'),
- ]))),
- ('multi_creds_ipv6',
- dict(url='foo://u1:p1@[ffff::1]:1234,u2:p2@[ffff::2]:4321/bar',
- aliases=None,
- expect=dict(transport='foo',
- virtual_host='bar',
- hosts=[
- dict(host='ffff::1', port=1234,
- username='u1', password='p1'),
- dict(host='ffff::2', port=4321,
- username='u2', password='p2'),
- ]))),
- ]
-
- def test_parse_url(self):
- self.config(rpc_backend=None)
-
- url = messaging.TransportURL.parse(self.conf, self.url, self.aliases)
-
- hosts = []
- for host in self.expect.get('hosts', []):
- hosts.append(messaging.TransportHost(host.get('host'),
- host.get('port'),
- host.get('username'),
- host.get('password')))
- expected = messaging.TransportURL(self.conf,
- self.expect.get('transport'),
- self.expect.get('virtual_host'),
- hosts)
-
- self.assertEqual(expected, url)
-
-
-class TestFormatURL(test_utils.BaseTestCase):
-
- scenarios = [
- ('rpc_backend',
- dict(rpc_backend='testbackend',
- transport=None,
- virtual_host=None,
- hosts=[],
- aliases=None,
- expected='testbackend:///')),
- ('rpc_backend_aliased',
- dict(rpc_backend='testfoo',
- transport=None,
- virtual_host=None,
- hosts=[],
- aliases=dict(testfoo='testbackend'),
- expected='testbackend:///')),
- ('transport',
- dict(rpc_backend=None,
- transport='testtransport',
- virtual_host=None,
- hosts=[],
- aliases=None,
- expected='testtransport:///')),
- ('transport_aliased',
- dict(rpc_backend=None,
- transport='testfoo',
- virtual_host=None,
- hosts=[],
- aliases=dict(testfoo='testtransport'),
- expected='testtransport:///')),
- ('virtual_host',
- dict(rpc_backend=None,
- transport='testtransport',
- virtual_host='/vhost',
- hosts=[],
- aliases=None,
- expected='testtransport:////vhost')),
- ('host',
- dict(rpc_backend=None,
- transport='testtransport',
- virtual_host='/',
- hosts=[
- dict(hostname='host',
- port=10,
- username='bob',
- password='secret'),
- ],
- aliases=None,
- expected='testtransport://bob:secret@host:10//')),
- ('multi_host',
- dict(rpc_backend=None,
- transport='testtransport',
- virtual_host='',
- hosts=[
- dict(hostname='h1',
- port=1000,
- username='b1',
- password='s1'),
- dict(hostname='h2',
- port=2000,
- username='b2',
- password='s2'),
- ],
- aliases=None,
- expected='testtransport://b1:s1@h1:1000,b2:s2@h2:2000/')),
- ('quoting',
- dict(rpc_backend=None,
- transport='testtransport',
- virtual_host='/$',
- hosts=[
- dict(hostname='host',
- port=10,
- username='b$',
- password='s&'),
- ],
- aliases=None,
- expected='testtransport://b%24:s%26@host:10//%24')),
- ]
-
- def test_parse_url(self):
- self.config(rpc_backend=self.rpc_backend)
-
- hosts = []
- for host in self.hosts:
- hosts.append(messaging.TransportHost(host.get('hostname'),
- host.get('port'),
- host.get('username'),
- host.get('password')))
-
- url = messaging.TransportURL(self.conf,
- self.transport,
- self.virtual_host,
- hosts,
- self.aliases)
-
- self.assertEqual(self.expected, str(url))
diff --git a/tests/test_warning.py b/tests/test_warning.py
deleted file mode 100644
index 589ed88..0000000
--- a/tests/test_warning.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import imp
-import os
-import warnings
-
-from oslotest import base as test_base
-import six
-from six.moves import mock
-
-
-class DeprecationWarningTest(test_base.BaseTestCase):
-
- @mock.patch('warnings.warn')
- def test_warning(self, mock_warn):
- import oslo.messaging
- imp.reload(oslo.messaging)
- self.assertTrue(mock_warn.called)
- args = mock_warn.call_args
- self.assertIn('oslo_messaging', args[0][0])
- self.assertIn('deprecated', args[0][0])
- self.assertTrue(issubclass(args[0][1], DeprecationWarning))
-
- def test_real_warning(self):
- with warnings.catch_warnings(record=True) as warning_msgs:
- warnings.resetwarnings()
- warnings.simplefilter('always', DeprecationWarning)
- import oslo.messaging
-
- # Use a separate function to get the stack level correct
- # so we know the message points back to this file. This
- # corresponds to an import or reload, which isn't working
- # inside the test under Python 3.3. That may be due to a
- # difference in the import implementation not triggering
- # warnings properly when the module is reloaded, or
- # because the warnings module is mostly implemented in C
- # and something isn't cleanly resetting the global state
- # used to track whether a warning needs to be
- # emitted. Whatever the cause, we definitely see the
- # warnings.warn() being invoked on a reload (see the test
- # above) and warnings are reported on the console when we
- # run the tests. A simpler test script run outside of
- # testr does correctly report the warnings.
- def foo():
- oslo.messaging.deprecated()
-
- foo()
- self.assertEqual(1, len(warning_msgs))
- msg = warning_msgs[0]
- self.assertIn('oslo_messaging', six.text_type(msg.message))
- self.assertEqual('test_warning.py', os.path.basename(msg.filename))