summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-06-09 12:55:20 +0000
committerGerrit Code Review <review@openstack.org>2015-06-09 12:55:20 +0000
commit33152a1f98020400cfe5e9fb4a11cccc7a358235 (patch)
tree46c7f95916480b94b10312b311645186ec6eadfd
parent41d0d875a10b48eea89f46ae3c426a023dfae27c (diff)
parent2daf4dccc323c48ec2bc32ec59523fd3c0ec589f (diff)
downloadoslo-messaging-33152a1f98020400cfe5e9fb4a11cccc7a358235.tar.gz
Merge "rabbit: Set timeout on the underlying socket" into stable/kilo
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py329
1 files changed, 176 insertions, 153 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index bba9008..6cbf462 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -365,111 +365,122 @@ class FanoutConsumer(ConsumerBase):
class Publisher(object):
- """Base Publisher class."""
+ """Publisher that silently creates exchange but no queues."""
- def __init__(self, channel, exchange_name, routing_key, **kwargs):
+ passive = False
+
+ def __init__(self, conf, exchange_name, routing_key, type, durable,
+ auto_delete):
"""Init the Publisher class with the exchange_name, routing_key,
- and other options
+ type, durable auto_delete
"""
+ self.queue_arguments = _get_queue_arguments(conf)
self.exchange_name = exchange_name
self.routing_key = routing_key
- self.kwargs = kwargs
- self.reconnect(channel)
-
- def reconnect(self, channel):
- """Re-establish the Producer after a rabbit reconnection."""
+ self.auto_delete = auto_delete
+ self.durable = durable
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
- **self.kwargs)
- self.producer = kombu.messaging.Producer(exchange=self.exchange,
- channel=channel,
- routing_key=self.routing_key)
-
- def send(self, msg, timeout=None):
- """Send a message."""
+ type=type,
+ exclusive=False,
+ durable=durable,
+ auto_delete=auto_delete,
+ passive=self.passive)
+
+ def send(self, conn, msg, timeout=None):
+ """Send a message on an channel."""
+ producer = kombu.messaging.Producer(exchange=self.exchange,
+ channel=conn.channel,
+ routing_key=self.routing_key)
+
+ headers = {}
if timeout:
- #
- # AMQP TTL is in milliseconds when set in the header.
- #
- self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
- else:
- self.producer.publish(msg)
-
-
-class DirectPublisher(Publisher):
- """Publisher class for 'direct'."""
- def __init__(self, conf, channel, topic, **kwargs):
- """Init a 'direct' publisher.
-
- Kombu options may be passed as keyword args to override defaults
- """
-
- options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': False,
- 'passive': True}
- options.update(kwargs)
- super(DirectPublisher, self).__init__(channel, topic, topic,
- type='direct', **options)
-
-
-class TopicPublisher(Publisher):
- """Publisher class for 'topic'."""
- def __init__(self, conf, channel, exchange_name, topic, **kwargs):
- """Init a 'topic' publisher.
-
- Kombu options may be passed as keyword args to override defaults
- """
- options = {'durable': conf.amqp_durable_queues,
- 'auto_delete': conf.amqp_auto_delete,
- 'exclusive': False}
-
- options.update(kwargs)
- super(TopicPublisher, self).__init__(channel,
- exchange_name,
- topic,
- type='topic',
- **options)
+ # AMQP TTL is in milliseconds when set in the property.
+ # Details: http://www.rabbitmq.com/ttl.html#per-message-ttl
+ # NOTE(sileht): this amqp header doesn't exists ... LP#1444854
+ headers['ttl'] = timeout * 1000
+
+ # NOTE(sileht): no need to wait more, caller expects
+ # a answer before timeout is reached
+ transport_timeout = timeout
+
+ heartbeat_timeout = conn.driver_conf.heartbeat_timeout_threshold
+ if (conn._heartbeat_supported_and_enabled() and (
+ transport_timeout is None or
+ transport_timeout > heartbeat_timeout)):
+ # NOTE(sileht): we are supposed to send heartbeat every
+ # heartbeat_timeout, no need to wait more otherwise will
+ # disconnect us, so raise timeout earlier ourself
+ transport_timeout = heartbeat_timeout
+
+ with conn._transport_socket_timeout(transport_timeout):
+ producer.publish(msg, headers=headers)
+
+
+class DeclareQueuePublisher(Publisher):
+ """Publisher that declares a default queue
+
+ When the exchange is missing instead of silently creating an exchange
+ not binded to a queue, this publisher creates a default queue
+ named with the routing_key.
+
+ This is mainly used to not miss notifications in case of nobody consumes
+ them yet. If the future consumer binds the default queue it can retrieve
+ missing messages.
+ """
+ # FIXME(sileht): The side effect of this is that we declare again and
+ # again the same queue, and generate a lot of useless rabbit traffic.
+ # https://bugs.launchpad.net/oslo.messaging/+bug/1437902
+
+ def send(self, conn, msg, timeout=None):
+ queue = kombu.entity.Queue(
+ channel=conn.channel,
+ exchange=self.exchange,
+ durable=self.durable,
+ auto_delete=self.auto_delete,
+ name=self.routing_key,
+ routing_key=self.routing_key,
+ queue_arguments=self.queue_arguments)
+ queue.declare()
+ super(DeclareQueuePublisher, self).send(
+ conn, msg, timeout)
-class FanoutPublisher(Publisher):
- """Publisher class for 'fanout'."""
- def __init__(self, conf, channel, topic, **kwargs):
- """Init a 'fanout' publisher.
+class RetryOnMissingExchangePublisher(Publisher):
+ """Publisher that retry during 60 seconds if the exchange is missing."""
- Kombu options may be passed as keyword args to override defaults
- """
- options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': False}
- options.update(kwargs)
- super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
- None, type='fanout', **options)
+ passive = True
+ def send(self, conn, msg, timeout=None):
+ # TODO(sileht):
+ # * use timeout parameter when available
+ # * use rpc_timeout if not instead of hardcoded 60
+ # * use @retrying
+ timer = rpc_common.DecayingTimer(duration=60)
+ timer.start()
-class NotifyPublisher(TopicPublisher):
- """Publisher class for 'notify'."""
-
- def __init__(self, conf, channel, exchange_name, topic, **kwargs):
- self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
- self.auto_delete = kwargs.pop('auto_delete', conf.amqp_auto_delete)
- self.queue_arguments = _get_queue_arguments(conf)
- super(NotifyPublisher, self).__init__(conf, channel, exchange_name,
- topic, **kwargs)
-
- def reconnect(self, channel):
- super(NotifyPublisher, self).reconnect(channel)
-
- # NOTE(jerdfelt): Normally the consumer would create the queue, but
- # we do this to ensure that messages don't get dropped if the
- # consumer is started after we do
- queue = kombu.entity.Queue(channel=channel,
- exchange=self.exchange,
- durable=self.durable,
- auto_delete=self.auto_delete,
- name=self.routing_key,
- routing_key=self.routing_key,
- queue_arguments=self.queue_arguments)
- queue.declare()
+ while True:
+ try:
+ super(RetryOnMissingExchangePublisher, self).send(conn, msg,
+ timeout)
+ return
+ except conn.connection.channel_errors as exc:
+ # NOTE(noelbk/sileht):
+ # If rabbit dies, the consumer can be disconnected before the
+ # publisher sends, and if the consumer hasn't declared the
+ # queue, the publisher's will send a message to an exchange
+ # that's not bound to a queue, and the message wll be lost.
+ # So we set passive=True to the publisher exchange and catch
+ # the 404 kombu ChannelError and retry until the exchange
+ # appears
+ if exc.code == 404 and timer.check_return() > 0:
+ LOG.info(_LI("The exchange %(exchange)s to send to "
+ "%(routing_key)s doesn't exist yet, "
+ "retrying...") % {
+ 'exchange': self.exchange,
+ 'routing_key': self.routing_key})
+ time.sleep(1)
+ continue
+ raise
class DummyConnectionLock(object):
@@ -692,10 +703,14 @@ class Connection(object):
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'),
self.connection.info())
- # NOTE(sileht):
- # value choosen according the best practice from kombu:
+ # NOTE(sileht): value choosen according the best practice from kombu
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
- self._poll_timeout = 1
+ # For heatbeat, we can set a bigger timeout, and check we receive the
+ # heartbeat packets regulary
+ if self._heartbeat_supported_and_enabled():
+ self._poll_timeout = self._heartbeat_wait_timeout
+ else:
+ self._poll_timeout = 1
if self._url.startswith('memory://'):
# Kludge to speed up tests.
@@ -917,6 +932,28 @@ class Connection(object):
self._heartbeat_support_log_emitted = True
return False
+ @contextlib.contextmanager
+ def _transport_socket_timeout(self, timeout):
+ # NOTE(sileht): they are some case where the heartbeat check
+ # or the producer.send return only when the system socket
+ # timeout if reach. kombu doesn't allow use to customise this
+ # timeout so for py-amqp we tweak ourself
+ sock = getattr(self.connection.transport, 'sock', None)
+ if sock:
+ orig_timeout = sock.gettimeout()
+ sock.settimeout(timeout)
+ yield
+ if sock:
+ sock.settimeout(orig_timeout)
+
+ def _heartbeat_check(self):
+ # NOTE(sileht): we are suposed to send at least one heartbeat
+ # every heartbeat_timeout_threshold, so no need to way more
+ with self._transport_socket_timeout(
+ self.driver_conf.heartbeat_timeout_threshold):
+ self.connection.heartbeat_check(
+ rate=self.driver_conf.heartbeat_rate)
+
def _heartbeat_start(self):
if self._heartbeat_supported_and_enabled():
self._heartbeat_exit_event = threading.Event()
@@ -945,8 +982,7 @@ class Connection(object):
try:
try:
- self.connection.heartbeat_check(
- rate=self.driver_conf.heartbeat_rate)
+ self._heartbeat_check()
# NOTE(sileht): We need to drain event to receive
# heartbeat from the broker but don't hold the
# connection too much times. In amqpdriver a connection
@@ -1029,8 +1065,8 @@ class Connection(object):
raise StopIteration
if self._heartbeat_supported_and_enabled():
- self.connection.heartbeat_check(
- rate=self.driver_conf.heartbeat_rate)
+ self._heartbeat_check()
+
try:
return self.connection.drain_events(timeout=poll_timeout)
except socket.timeout as exc:
@@ -1045,32 +1081,20 @@ class Connection(object):
recoverable_error_callback=_recoverable_error_callback,
error_callback=_error_callback)
- @staticmethod
- def _log_publisher_send_error(topic, exc):
- log_info = {'topic': topic, 'err_str': exc}
- LOG.error(_("Failed to publish message to topic "
- "'%(topic)s': %(err_str)s"), log_info)
- LOG.debug('Exception', exc_info=exc)
-
- default_marker = object()
-
- def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
- error_callback=default_marker, **kwargs):
+ def publisher_send(self, publisher, msg, timeout=None, retry=None):
"""Send to a publisher based on the publisher class."""
- def _default_error_callback(exc):
- self._log_publisher_send_error(topic, exc)
-
- if error_callback is self.default_marker:
- error_callback = _default_error_callback
+ def _error_callback(exc):
+ log_info = {'topic': publisher.exchange_name, 'err_str': exc}
+ LOG.error(_("Failed to publish message to topic "
+ "'%(topic)s': %(err_str)s"), log_info)
+ LOG.debug('Exception', exc_info=exc)
def _publish():
- publisher = cls(self.driver_conf, self.channel, topic=topic,
- **kwargs)
- publisher.send(msg, timeout)
+ publisher.send(self, msg, timeout)
with self._connection_lock:
- self.ensure(_publish, retry=retry, error_callback=error_callback)
+ self.ensure(_publish, retry=retry, error_callback=_error_callback)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
@@ -1095,49 +1119,48 @@ class Connection(object):
def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""
- timer = rpc_common.DecayingTimer(duration=60)
- timer.start()
- # NOTE(sileht): retry at least 60sec, after we have a good change
- # that the caller is really dead too...
+ p = RetryOnMissingExchangePublisher(self.driver_conf,
+ exchange_name=msg_id,
+ routing_key=msg_id,
+ type='direct',
+ durable=False,
+ auto_delete=True)
- while True:
- try:
- self.publisher_send(DirectPublisher, msg_id, msg,
- error_callback=None)
- return
- except self.connection.channel_errors as exc:
- # NOTE(noelbk/sileht):
- # If rabbit dies, the consumer can be disconnected before the
- # publisher sends, and if the consumer hasn't declared the
- # queue, the publisher's will send a message to an exchange
- # that's not bound to a queue, and the message wll be lost.
- # So we set passive=True to the publisher exchange and catch
- # the 404 kombu ChannelError and retry until the exchange
- # appears
- if exc.code == 404 and timer.check_return() > 0:
- LOG.info(_LI("The exchange to reply to %s doesn't "
- "exist yet, retrying...") % msg_id)
- time.sleep(1)
- continue
- self._log_publisher_send_error(msg_id, exc)
- raise
- except Exception as exc:
- self._log_publisher_send_error(msg_id, exc)
- raise
+ self.publisher_send(p, msg)
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message."""
- self.publisher_send(TopicPublisher, topic, msg, timeout,
- exchange_name=exchange_name, retry=retry)
+ p = Publisher(self.driver_conf,
+ exchange_name=exchange_name,
+ routing_key=topic,
+ type='topic',
+ durable=self.driver_conf.amqp_durable_queues,
+ auto_delete=self.driver_conf.amqp_auto_delete)
+ self.publisher_send(p, msg, timeout, retry=retry)
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""
- self.publisher_send(FanoutPublisher, topic, msg, retry=retry)
+
+ p = Publisher(self.driver_conf,
+ exchange_name='%s_fanout' % topic,
+ routing_key=None,
+ type='fanout',
+ durable=False,
+ auto_delete=True)
+
+ self.publisher_send(p, msg, retry=retry)
def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
"""Send a notify message on a topic."""
- self.publisher_send(NotifyPublisher, topic, msg, timeout=None,
- exchange_name=exchange_name, retry=retry, **kwargs)
+ p = DeclareQueuePublisher(
+ self.driver_conf,
+ exchange_name=exchange_name,
+ routing_key=topic,
+ type='topic',
+ durable=self.driver_conf.amqp_durable_queues,
+ auto_delete=self.driver_conf.amqp_auto_delete)
+
+ self.publisher_send(p, msg, timeout=None, retry=retry)
def consume(self, limit=None, timeout=None):
"""Consume from all queues/consumers."""