diff options
-rw-r--r-- | .pre-commit-config.yaml | 2 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 69 | ||||
-rw-r--r-- | oslo_messaging/rpc/__init__.py | 1 | ||||
-rw-r--r-- | oslo_messaging/rpc/client.py | 39 | ||||
-rw-r--r-- | oslo_messaging/tests/functional/utils.py | 6 | ||||
-rw-r--r-- | oslo_messaging/tests/rpc/test_client.py | 40 | ||||
-rw-r--r-- | oslo_messaging/tests/rpc/test_server.py | 4 | ||||
-rw-r--r-- | oslo_messaging/tests/test_transport.py | 2 | ||||
-rw-r--r-- | releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml | 5 | ||||
-rw-r--r-- | releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml | 9 | ||||
-rw-r--r-- | releasenotes/notes/get-rpc-client-0b4aa62160864b29.yaml | 11 | ||||
-rw-r--r-- | releasenotes/source/index.rst | 1 | ||||
-rw-r--r-- | releasenotes/source/zed.rst | 6 | ||||
-rw-r--r-- | test-requirements.txt | 2 | ||||
-rwxr-xr-x | tools/simulator.py | 2 |
15 files changed, 162 insertions, 37 deletions
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 08aef91..50d8dea 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -32,7 +32,7 @@ repos: - id: flake8 name: flake8 additional_dependencies: - - hacking>=3.0.1,<3.1.0 + - hacking>=3.0.1,<=4.1.0 language: python entry: flake8 files: '^.*\.py$' diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index ed2642c..c61393c 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -92,7 +92,7 @@ rabbit_opts = [ 'executable used does not support OpenSSL FIPS mode, ' 'an exception will be raised.'), cfg.BoolOpt('heartbeat_in_pthread', - default=True, + default=False, help="Run the health check heartbeat thread " "through a native python thread by default. If this " "option is equal to False then the health check " @@ -100,7 +100,9 @@ rabbit_opts = [ "from the parent process. For " "example if the parent process has monkey patched the " "stdlib by using eventlet/greenlet then the heartbeat " - "will be run through a green thread.", + "will be run through a green thread. " + "This option should be set to True only for the " + "wsgi services.", ), cfg.FloatOpt('kombu_reconnect_delay', default=1.0, @@ -347,11 +349,44 @@ class Consumer(object): self._declared_on = None self.exchange = kombu.entity.Exchange( name=exchange_name, - type=type, + type=self.type, durable=self.durable, auto_delete=self.exchange_auto_delete) self.enable_cancel_on_failover = enable_cancel_on_failover + def _declare_fallback(self, err, conn, consumer_arguments): + """Fallback by declaring a non durable queue. + + When a control exchange is shared between services it is possible + that some service created first a non durable control exchange and + then after that an other service can try to create the same control + exchange but as a durable control exchange. In this case RabbitMQ + will raise an exception (PreconditionFailed), and then it will stop + our execution and our service will fail entirly. In this case we want + to fallback by creating a non durable queue to match the default + config. + """ + if "PRECONDITION_FAILED - inequivalent arg 'durable'" in str(err): + LOG.info( + "[%s] Retrying to declare the exchange (%s) as " + "non durable", conn.connection_id, self.exchange_name) + self.exchange = kombu.entity.Exchange( + name=self.exchange_name, + type=self.type, + durable=False, + auto_delete=self.queue_auto_delete) + self.queue = kombu.entity.Queue( + name=self.queue_name, + channel=conn.channel, + exchange=self.exchange, + durable=False, + auto_delete=self.queue_auto_delete, + routing_key=self.routing_key, + queue_arguments=self.queue_arguments, + consumer_arguments=consumer_arguments + ) + self.queue.declare() + def declare(self, conn): """Re-declare the queue after a rabbit (re)connect.""" @@ -374,7 +409,18 @@ class Consumer(object): try: LOG.debug('[%s] Queue.declare: %s', conn.connection_id, self.queue_name) - self.queue.declare() + try: + self.queue.declare() + except amqp_exec.PreconditionFailed as err: + # NOTE(hberaud): This kind of exception may be triggered + # when a control exchange is shared between services and + # when services try to create it with configs that differ + # from each others. RabbitMQ will reject the services + # that try to create it with a configuration that differ + # from the one used first. + LOG.warning(err) + self._declare_fallback(err, conn, consumer_arguments) + except conn.connection.channel_errors as exc: # NOTE(jrosenboom): This exception may be triggered by a race # condition. Simply retrying will solve the error most of the time @@ -1352,7 +1398,20 @@ class Connection(object): """Publish a message.""" if not (exchange.passive or exchange.name in self._declared_exchanges): - exchange(self.channel).declare() + try: + exchange(self.channel).declare() + except amqp_exec.PreconditionFailed as err: + # NOTE(hberaud): This kind of exception may be triggered + # when a control exchange is shared between services and + # when services try to create it with configs that differ + # from each others. RabbitMQ will reject the services + # that try to create it with a configuration that differ + # from the one used first. + if "PRECONDITION_FAILED - inequivalent arg 'durable'" \ + in str(err): + LOG.warning("Force creating a non durable exchange.") + exchange.durable = False + exchange(self.channel).declare() self._declared_exchanges.add(exchange.name) log_info = {'msg': msg, diff --git a/oslo_messaging/rpc/__init__.py b/oslo_messaging/rpc/__init__.py index 9a320a8..135428e 100644 --- a/oslo_messaging/rpc/__init__.py +++ b/oslo_messaging/rpc/__init__.py @@ -30,6 +30,7 @@ __all__ = [ 'expected_exceptions', 'get_rpc_transport', 'get_rpc_server', + 'get_rpc_client', 'expose' ] diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py index cbec525..8e997e9 100644 --- a/oslo_messaging/rpc/client.py +++ b/oslo_messaging/rpc/client.py @@ -32,6 +32,7 @@ __all__ = [ 'RPCClient', 'RPCVersionCapError', 'RemoteError', + 'get_rpc_client', ] LOG = logging.getLogger(__name__) @@ -263,6 +264,9 @@ class RPCClient(_BaseCallContext): The RPCClient class is responsible for sending method invocations to and receiving return values from remote RPC servers via a messaging transport. + The class should always be instantiated by using the get_rpc_client + function and not constructing the class directly. + Two RPC patterns are supported: RPC calls and RPC casts. An RPC cast is used when an RPC method does *not* return a value to @@ -295,7 +299,7 @@ class RPCClient(_BaseCallContext): def __init__(self, transport): target = messaging.Target(topic='test', version='2.0') - self._client = messaging.RPCClient(transport, target) + self._client = messaging.get_rpc_client(transport, target) def test(self, ctxt, arg): return self._client.call(ctxt, 'test', arg=arg) @@ -320,7 +324,7 @@ class RPCClient(_BaseCallContext): transport = messaging.get_rpc_transport(cfg.CONF) target = messaging.Target(topic='test', version='2.0') - client = messaging.RPCClient(transport, target) + client = messaging.get_rpc_client(transport, target) client.call(ctxt, 'test', arg=arg) but this is probably only useful in limited circumstances as a wrapper @@ -334,7 +338,7 @@ class RPCClient(_BaseCallContext): have the RPC request fail with a MessageDeliveryFailure after the given number of retries. For example:: - client = messaging.RPCClient(transport, target, retry=None) + client = messaging.get_rpc_client(transport, target, retry=None) client.call(ctxt, 'sync') try: client.prepare(retry=0).cast(ctxt, 'ping') @@ -346,9 +350,13 @@ class RPCClient(_BaseCallContext): def __init__(self, transport, target, timeout=None, version_cap=None, serializer=None, retry=None, - call_monitor_timeout=None, transport_options=None): + call_monitor_timeout=None, transport_options=None, + _manual_load=True): """Construct an RPC client. + This should not be called directly, use the get_rpc_client function + to instantiate this class. + :param transport: a messaging transport handle :type transport: Transport :param target: the default target for invocations @@ -371,7 +379,17 @@ class RPCClient(_BaseCallContext): (less than the overall timeout parameter). :type call_monitor_timeout: int + :param transport_options: Transport options passed to client. + :type transport_options: TransportOptions + :param _manual_load: Internal use only to check if class was + manually instantiated or not. + :type _manual_load: bool """ + if _manual_load: + LOG.warning("Using RPCClient manually to instantiate client. " + "Please use get_rpc_client to obtain an RPC client " + "instance.") + if serializer is None: serializer = msg_serializer.NoOpSerializer() @@ -530,3 +548,16 @@ class RPCClient(_BaseCallContext): def can_send_version(self, version=_marker): """Check to see if a version is compatible with the version cap.""" return self.prepare(version=version).can_send_version() + + +def get_rpc_client(transport, target, **kwargs): + """Construct an RPC client. + + :param transport: the messaging transport + :type transport: Transport + :param target: the exchange, topic and server to listen on + :type target: Target + :param **kwargs: The kwargs will be passed down to the + RPCClient constructor + """ + return RPCClient(transport, target, _manual_load=False, **kwargs) diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 0d007b2..6a49f49 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -114,8 +114,8 @@ class RpcServerFixture(fixtures.Fixture): target=self.target, endpoints=endpoints, executor=self.executor) - self._ctrl = oslo_messaging.RPCClient(transport.transport, - self.ctrl_target) + self._ctrl = oslo_messaging.get_rpc_client(transport.transport, + self.ctrl_target) self._start() transport.wait() @@ -230,7 +230,7 @@ class ClientStub(object): transport_options=None, **kwargs): self.name = name or "functional-tests" self.cast = cast - self.client = oslo_messaging.RPCClient( + self.client = oslo_messaging.get_rpc_client( transport=transport, target=target, transport_options=transport_options, diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py index af1282a..1358c98 100644 --- a/oslo_messaging/tests/rpc/test_client.py +++ b/oslo_messaging/tests/rpc/test_client.py @@ -44,8 +44,9 @@ class TestCastCall(test_utils.BaseTestCase): self.config(rpc_response_timeout=None) transport_options = oslo_messaging.TransportOptions() transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), - transport_options=transport_options) + client = oslo_messaging.get_rpc_client( + transport, oslo_messaging.Target(), + transport_options=transport_options) transport._send = mock.Mock() @@ -70,7 +71,7 @@ class TestCastCall(test_utils.BaseTestCase): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport_options = oslo_messaging.TransportOptions(at_least_once=True) - client = oslo_messaging.RPCClient( + client = oslo_messaging.get_rpc_client( transport, oslo_messaging.Target(), transport_options=transport_options) @@ -215,7 +216,7 @@ class TestCastToTarget(test_utils.BaseTestCase): expect_target = oslo_messaging.Target(**self.expect) transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, target) + client = oslo_messaging.get_rpc_client(transport, target) transport._send = mock.Mock() @@ -269,9 +270,9 @@ class TestCallTimeout(test_utils.BaseTestCase): self.config(rpc_response_timeout=self.confval) transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), - timeout=self.ctor, - call_monitor_timeout=self.cm) + client = oslo_messaging.get_rpc_client( + transport, oslo_messaging.Target(), timeout=self.ctor, + call_monitor_timeout=self.cm) transport._send = mock.Mock() @@ -302,8 +303,9 @@ class TestCallRetry(test_utils.BaseTestCase): def test_call_retry(self): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), - retry=self.ctor) + client = oslo_messaging.get_rpc_client( + transport, oslo_messaging.Target(), + retry=self.ctor) transport._send = mock.Mock() @@ -332,8 +334,8 @@ class TestCallFanout(test_utils.BaseTestCase): def test_call_fanout(self): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, - oslo_messaging.Target(**self.target)) + client = oslo_messaging.get_rpc_client( + transport, oslo_messaging.Target(**self.target)) if self.prepare is not _notset: client = client.prepare(**self.prepare) @@ -363,8 +365,8 @@ class TestSerializer(test_utils.BaseTestCase): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') serializer = msg_serializer.NoOpSerializer() - client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), - serializer=serializer) + client = oslo_messaging.get_rpc_client( + transport, oslo_messaging.Target(), serializer=serializer) transport._send = mock.Mock() kwargs = dict(wait_for_reply=True, @@ -465,8 +467,8 @@ class TestVersionCap(test_utils.BaseTestCase): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') target = oslo_messaging.Target(version=self.version) - client = oslo_messaging.RPCClient(transport, target, - version_cap=self.cap) + client = oslo_messaging.get_rpc_client(transport, target, + version_cap=self.cap) if self.success: transport._send = mock.Mock() @@ -574,8 +576,8 @@ class TestCanSendVersion(test_utils.BaseTestCase): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') target = oslo_messaging.Target(version=self.version) - client = oslo_messaging.RPCClient(transport, target, - version_cap=self.cap) + client = oslo_messaging.get_rpc_client(transport, target, + version_cap=self.cap) prep_kwargs = {} if self.prepare_cap is not _notset: @@ -598,7 +600,7 @@ class TestCanSendVersion(test_utils.BaseTestCase): def test_invalid_version_type(self): target = oslo_messaging.Target(topic='sometopic') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, target) + client = oslo_messaging.get_rpc_client(transport, target) self.assertRaises(exceptions.MessagingException, client.prepare, version='5') self.assertRaises(exceptions.MessagingException, @@ -612,7 +614,7 @@ class TestTransportWarning(test_utils.BaseTestCase): @mock.patch('oslo_messaging.rpc.client.LOG') def test_warning_when_notifier_transport(self, log): transport = oslo_messaging.get_notification_transport(self.conf) - oslo_messaging.RPCClient(transport, oslo_messaging.Target()) + oslo_messaging.get_rpc_client(transport, oslo_messaging.Target()) log.warning.assert_called_once_with( "Using notification transport for RPC. Please use " "get_rpc_transport to obtain an RPC transport " diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 1fc6be8..06cf1c7 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -102,8 +102,8 @@ class ServerSetupMixin(object): def _setup_client(self, transport, topic='testtopic', exchange=None): target = oslo_messaging.Target(topic=topic, exchange=exchange) - return oslo_messaging.RPCClient(transport, target=target, - serializer=self.serializer) + return oslo_messaging.get_rpc_client(transport, target=target, + serializer=self.serializer) class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py index 31fec16..cb12c16 100644 --- a/oslo_messaging/tests/test_transport.py +++ b/oslo_messaging/tests/test_transport.py @@ -115,7 +115,7 @@ class GetTransportTestCase(test_utils.BaseTestCase): self.assertIsNotNone(transport_) self.assertIs(transport_.conf, self.conf) self.assertIs(transport_._driver, drvr) - self.assertTrue(isinstance(transport_, transport.RPCTransport)) + self.assertIsInstance(transport_, transport.RPCTransport) driver.DriverManager.assert_called_once_with('oslo.messaging.drivers', self.expect['backend'], diff --git a/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml b/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml new file mode 100644 index 0000000..985fc64 --- /dev/null +++ b/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Force creating non durable control exchange when a precondition failed + related to config that differ occuring. diff --git a/releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml b/releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml new file mode 100644 index 0000000..265e709 --- /dev/null +++ b/releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml @@ -0,0 +1,9 @@ +--- +upgrade: + - | + The ``[oslo_messaging_rabbit] heartbeat_in_pthread`` config option + defaults to ``False`` again. + For wsgi applications it is recommended to set this value to ``True`` + but enabling it for non-wsgi services may break such service. + Please check https://bugs.launchpad.net/oslo.messaging/+bug/1934937 + for more details. diff --git a/releasenotes/notes/get-rpc-client-0b4aa62160864b29.yaml b/releasenotes/notes/get-rpc-client-0b4aa62160864b29.yaml new file mode 100644 index 0000000..3375cfc --- /dev/null +++ b/releasenotes/notes/get-rpc-client-0b4aa62160864b29.yaml @@ -0,0 +1,11 @@ +--- +features: + - | + Added new ``get_rpc_client`` function to instantiate the RPCClient + class +deprecations: + - | + Instantiating the RPCClient class directly is deprecated in favor + of using the new ``get_rpc_client`` function to expose a more + common API similar to existing functions such as ``get_rpc_server`` + and ``get_rpc_transport`` diff --git a/releasenotes/source/index.rst b/releasenotes/source/index.rst index 57b9270..0654d71 100644 --- a/releasenotes/source/index.rst +++ b/releasenotes/source/index.rst @@ -6,6 +6,7 @@ :maxdepth: 1 unreleased + zed yoga xena wallaby diff --git a/releasenotes/source/zed.rst b/releasenotes/source/zed.rst new file mode 100644 index 0000000..9608c05 --- /dev/null +++ b/releasenotes/source/zed.rst @@ -0,0 +1,6 @@ +======================== +Zed Series Release Notes +======================== + +.. release-notes:: + :branch: stable/zed diff --git a/test-requirements.txt b/test-requirements.txt index 983c1c9..3a7c44f 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,7 +3,7 @@ # process, which may cause wedges in the gate later. # Hacking already pins down pep8, pyflakes and flake8 -hacking>=3.0.1,<3.1.0 # Apache-2.0 +hacking>=3.0.1,<=4.1.0 # Apache-2.0 fixtures>=3.0.0 # Apache-2.0/BSD stestr>=2.0.0 # Apache-2.0 diff --git a/tools/simulator.py b/tools/simulator.py index 8b37f50..da9d05e 100755 --- a/tools/simulator.py +++ b/tools/simulator.py @@ -410,7 +410,7 @@ class RPCClient(Client): def __init__(self, client_id, transport, target, timeout, is_cast, wait_after_msg, sync_mode=False): - client = rpc.RPCClient(transport, target) + client = rpc.get_rpc_client(transport, target) method = _rpc_cast if is_cast else _rpc_call super(RPCClient, self).__init__(client_id, |