summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2015-05-27 08:33:25 +0200
committerMehdi Abaakouk <sileht@redhat.com>2015-06-09 10:35:24 +0200
commit1f8ccd3ac50aa941f042b02069b7185c3d5b5ae7 (patch)
treee02de77ae759e8651e756cd81f1fb1b86c2ce126
parent85c069e15435342fddd3da2f00d7871986826216 (diff)
downloadoslo-messaging-1f8ccd3ac50aa941f042b02069b7185c3d5b5ae7.tar.gz
rabbit: Add logging on blocked connection
When the broker will block the connection for a server-side issue like disk full, it notifies the client. This change adds the callback methods when this occurs to inform the deployer about the reason of this blocking. Change-Id: I5164b9e1b720f022b45a5718258df036ba8808ed Closes-bug: #1454449
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py17
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py4
2 files changed, 18 insertions, 3 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 7bc7c43..d026779 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -477,7 +477,12 @@ class Connection(object):
login_method=self.login_method,
failover_strategy="shuffle",
heartbeat=self.heartbeat_timeout_threshold,
- transport_options={'confirm_publish': True})
+ transport_options={
+ 'confirm_publish': True,
+ 'on_blocked': self._on_connection_blocked,
+ 'on_unblocked': self._on_connection_unblocked,
+ },
+ )
LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'),
self.connection.info())
@@ -581,6 +586,14 @@ class Connection(object):
return ssl_params or True
return False
+ @staticmethod
+ def _on_connection_blocked(reason):
+ LOG.error(_LE("The broker has blocked the connection: %s"), reason)
+
+ @staticmethod
+ def _on_connection_unblocked():
+ LOG.info(_LI("The broker has unblocked the connection"))
+
def ensure_connection(self):
self.ensure(method=lambda: True)
@@ -829,7 +842,7 @@ class Connection(object):
def _connect_error(exc):
log_info = {'topic': consumer.routing_key, 'err_str': exc}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
- "%(err_str)s"), log_info)
+ "%(err_str)s"), log_info)
def _declare_consumer():
consumer.declare(self)
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 660d861..9b22a1a 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -169,7 +169,9 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
transport._driver._get_connection()
connection_klass.assert_called_once_with(
- 'memory:///', transport_options={'confirm_publish': True},
+ 'memory:///', transport_options={'confirm_publish': True,
+ 'on_blocked': mock.ANY,
+ 'on_unblocked': mock.ANY},
ssl=self.expected, login_method='AMQPLAIN',
heartbeat=0, failover_strategy="shuffle")