diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-06-09 12:55:20 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-06-09 12:55:20 +0000 |
commit | 33152a1f98020400cfe5e9fb4a11cccc7a358235 (patch) | |
tree | 46c7f95916480b94b10312b311645186ec6eadfd | |
parent | 41d0d875a10b48eea89f46ae3c426a023dfae27c (diff) | |
parent | 2daf4dccc323c48ec2bc32ec59523fd3c0ec589f (diff) | |
download | oslo-messaging-33152a1f98020400cfe5e9fb4a11cccc7a358235.tar.gz |
Merge "rabbit: Set timeout on the underlying socket" into stable/kilo
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 329 |
1 files changed, 176 insertions, 153 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index bba9008..6cbf462 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -365,111 +365,122 @@ class FanoutConsumer(ConsumerBase): class Publisher(object): - """Base Publisher class.""" + """Publisher that silently creates exchange but no queues.""" - def __init__(self, channel, exchange_name, routing_key, **kwargs): + passive = False + + def __init__(self, conf, exchange_name, routing_key, type, durable, + auto_delete): """Init the Publisher class with the exchange_name, routing_key, - and other options + type, durable auto_delete """ + self.queue_arguments = _get_queue_arguments(conf) self.exchange_name = exchange_name self.routing_key = routing_key - self.kwargs = kwargs - self.reconnect(channel) - - def reconnect(self, channel): - """Re-establish the Producer after a rabbit reconnection.""" + self.auto_delete = auto_delete + self.durable = durable self.exchange = kombu.entity.Exchange(name=self.exchange_name, - **self.kwargs) - self.producer = kombu.messaging.Producer(exchange=self.exchange, - channel=channel, - routing_key=self.routing_key) - - def send(self, msg, timeout=None): - """Send a message.""" + type=type, + exclusive=False, + durable=durable, + auto_delete=auto_delete, + passive=self.passive) + + def send(self, conn, msg, timeout=None): + """Send a message on an channel.""" + producer = kombu.messaging.Producer(exchange=self.exchange, + channel=conn.channel, + routing_key=self.routing_key) + + headers = {} if timeout: - # - # AMQP TTL is in milliseconds when set in the header. - # - self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) - else: - self.producer.publish(msg) - - -class DirectPublisher(Publisher): - """Publisher class for 'direct'.""" - def __init__(self, conf, channel, topic, **kwargs): - """Init a 'direct' publisher. - - Kombu options may be passed as keyword args to override defaults - """ - - options = {'durable': False, - 'auto_delete': True, - 'exclusive': False, - 'passive': True} - options.update(kwargs) - super(DirectPublisher, self).__init__(channel, topic, topic, - type='direct', **options) - - -class TopicPublisher(Publisher): - """Publisher class for 'topic'.""" - def __init__(self, conf, channel, exchange_name, topic, **kwargs): - """Init a 'topic' publisher. - - Kombu options may be passed as keyword args to override defaults - """ - options = {'durable': conf.amqp_durable_queues, - 'auto_delete': conf.amqp_auto_delete, - 'exclusive': False} - - options.update(kwargs) - super(TopicPublisher, self).__init__(channel, - exchange_name, - topic, - type='topic', - **options) + # AMQP TTL is in milliseconds when set in the property. + # Details: http://www.rabbitmq.com/ttl.html#per-message-ttl + # NOTE(sileht): this amqp header doesn't exists ... LP#1444854 + headers['ttl'] = timeout * 1000 + + # NOTE(sileht): no need to wait more, caller expects + # a answer before timeout is reached + transport_timeout = timeout + + heartbeat_timeout = conn.driver_conf.heartbeat_timeout_threshold + if (conn._heartbeat_supported_and_enabled() and ( + transport_timeout is None or + transport_timeout > heartbeat_timeout)): + # NOTE(sileht): we are supposed to send heartbeat every + # heartbeat_timeout, no need to wait more otherwise will + # disconnect us, so raise timeout earlier ourself + transport_timeout = heartbeat_timeout + + with conn._transport_socket_timeout(transport_timeout): + producer.publish(msg, headers=headers) + + +class DeclareQueuePublisher(Publisher): + """Publisher that declares a default queue + + When the exchange is missing instead of silently creating an exchange + not binded to a queue, this publisher creates a default queue + named with the routing_key. + + This is mainly used to not miss notifications in case of nobody consumes + them yet. If the future consumer binds the default queue it can retrieve + missing messages. + """ + # FIXME(sileht): The side effect of this is that we declare again and + # again the same queue, and generate a lot of useless rabbit traffic. + # https://bugs.launchpad.net/oslo.messaging/+bug/1437902 + + def send(self, conn, msg, timeout=None): + queue = kombu.entity.Queue( + channel=conn.channel, + exchange=self.exchange, + durable=self.durable, + auto_delete=self.auto_delete, + name=self.routing_key, + routing_key=self.routing_key, + queue_arguments=self.queue_arguments) + queue.declare() + super(DeclareQueuePublisher, self).send( + conn, msg, timeout) -class FanoutPublisher(Publisher): - """Publisher class for 'fanout'.""" - def __init__(self, conf, channel, topic, **kwargs): - """Init a 'fanout' publisher. +class RetryOnMissingExchangePublisher(Publisher): + """Publisher that retry during 60 seconds if the exchange is missing.""" - Kombu options may be passed as keyword args to override defaults - """ - options = {'durable': False, - 'auto_delete': True, - 'exclusive': False} - options.update(kwargs) - super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, - None, type='fanout', **options) + passive = True + def send(self, conn, msg, timeout=None): + # TODO(sileht): + # * use timeout parameter when available + # * use rpc_timeout if not instead of hardcoded 60 + # * use @retrying + timer = rpc_common.DecayingTimer(duration=60) + timer.start() -class NotifyPublisher(TopicPublisher): - """Publisher class for 'notify'.""" - - def __init__(self, conf, channel, exchange_name, topic, **kwargs): - self.durable = kwargs.pop('durable', conf.amqp_durable_queues) - self.auto_delete = kwargs.pop('auto_delete', conf.amqp_auto_delete) - self.queue_arguments = _get_queue_arguments(conf) - super(NotifyPublisher, self).__init__(conf, channel, exchange_name, - topic, **kwargs) - - def reconnect(self, channel): - super(NotifyPublisher, self).reconnect(channel) - - # NOTE(jerdfelt): Normally the consumer would create the queue, but - # we do this to ensure that messages don't get dropped if the - # consumer is started after we do - queue = kombu.entity.Queue(channel=channel, - exchange=self.exchange, - durable=self.durable, - auto_delete=self.auto_delete, - name=self.routing_key, - routing_key=self.routing_key, - queue_arguments=self.queue_arguments) - queue.declare() + while True: + try: + super(RetryOnMissingExchangePublisher, self).send(conn, msg, + timeout) + return + except conn.connection.channel_errors as exc: + # NOTE(noelbk/sileht): + # If rabbit dies, the consumer can be disconnected before the + # publisher sends, and if the consumer hasn't declared the + # queue, the publisher's will send a message to an exchange + # that's not bound to a queue, and the message wll be lost. + # So we set passive=True to the publisher exchange and catch + # the 404 kombu ChannelError and retry until the exchange + # appears + if exc.code == 404 and timer.check_return() > 0: + LOG.info(_LI("The exchange %(exchange)s to send to " + "%(routing_key)s doesn't exist yet, " + "retrying...") % { + 'exchange': self.exchange, + 'routing_key': self.routing_key}) + time.sleep(1) + continue + raise class DummyConnectionLock(object): @@ -692,10 +703,14 @@ class Connection(object): LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'), self.connection.info()) - # NOTE(sileht): - # value choosen according the best practice from kombu: + # NOTE(sileht): value choosen according the best practice from kombu # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop - self._poll_timeout = 1 + # For heatbeat, we can set a bigger timeout, and check we receive the + # heartbeat packets regulary + if self._heartbeat_supported_and_enabled(): + self._poll_timeout = self._heartbeat_wait_timeout + else: + self._poll_timeout = 1 if self._url.startswith('memory://'): # Kludge to speed up tests. @@ -917,6 +932,28 @@ class Connection(object): self._heartbeat_support_log_emitted = True return False + @contextlib.contextmanager + def _transport_socket_timeout(self, timeout): + # NOTE(sileht): they are some case where the heartbeat check + # or the producer.send return only when the system socket + # timeout if reach. kombu doesn't allow use to customise this + # timeout so for py-amqp we tweak ourself + sock = getattr(self.connection.transport, 'sock', None) + if sock: + orig_timeout = sock.gettimeout() + sock.settimeout(timeout) + yield + if sock: + sock.settimeout(orig_timeout) + + def _heartbeat_check(self): + # NOTE(sileht): we are suposed to send at least one heartbeat + # every heartbeat_timeout_threshold, so no need to way more + with self._transport_socket_timeout( + self.driver_conf.heartbeat_timeout_threshold): + self.connection.heartbeat_check( + rate=self.driver_conf.heartbeat_rate) + def _heartbeat_start(self): if self._heartbeat_supported_and_enabled(): self._heartbeat_exit_event = threading.Event() @@ -945,8 +982,7 @@ class Connection(object): try: try: - self.connection.heartbeat_check( - rate=self.driver_conf.heartbeat_rate) + self._heartbeat_check() # NOTE(sileht): We need to drain event to receive # heartbeat from the broker but don't hold the # connection too much times. In amqpdriver a connection @@ -1029,8 +1065,8 @@ class Connection(object): raise StopIteration if self._heartbeat_supported_and_enabled(): - self.connection.heartbeat_check( - rate=self.driver_conf.heartbeat_rate) + self._heartbeat_check() + try: return self.connection.drain_events(timeout=poll_timeout) except socket.timeout as exc: @@ -1045,32 +1081,20 @@ class Connection(object): recoverable_error_callback=_recoverable_error_callback, error_callback=_error_callback) - @staticmethod - def _log_publisher_send_error(topic, exc): - log_info = {'topic': topic, 'err_str': exc} - LOG.error(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s"), log_info) - LOG.debug('Exception', exc_info=exc) - - default_marker = object() - - def publisher_send(self, cls, topic, msg, timeout=None, retry=None, - error_callback=default_marker, **kwargs): + def publisher_send(self, publisher, msg, timeout=None, retry=None): """Send to a publisher based on the publisher class.""" - def _default_error_callback(exc): - self._log_publisher_send_error(topic, exc) - - if error_callback is self.default_marker: - error_callback = _default_error_callback + def _error_callback(exc): + log_info = {'topic': publisher.exchange_name, 'err_str': exc} + LOG.error(_("Failed to publish message to topic " + "'%(topic)s': %(err_str)s"), log_info) + LOG.debug('Exception', exc_info=exc) def _publish(): - publisher = cls(self.driver_conf, self.channel, topic=topic, - **kwargs) - publisher.send(msg, timeout) + publisher.send(self, msg, timeout) with self._connection_lock: - self.ensure(_publish, retry=retry, error_callback=error_callback) + self.ensure(_publish, retry=retry, error_callback=_error_callback) def declare_direct_consumer(self, topic, callback): """Create a 'direct' queue. @@ -1095,49 +1119,48 @@ class Connection(object): def direct_send(self, msg_id, msg): """Send a 'direct' message.""" - timer = rpc_common.DecayingTimer(duration=60) - timer.start() - # NOTE(sileht): retry at least 60sec, after we have a good change - # that the caller is really dead too... + p = RetryOnMissingExchangePublisher(self.driver_conf, + exchange_name=msg_id, + routing_key=msg_id, + type='direct', + durable=False, + auto_delete=True) - while True: - try: - self.publisher_send(DirectPublisher, msg_id, msg, - error_callback=None) - return - except self.connection.channel_errors as exc: - # NOTE(noelbk/sileht): - # If rabbit dies, the consumer can be disconnected before the - # publisher sends, and if the consumer hasn't declared the - # queue, the publisher's will send a message to an exchange - # that's not bound to a queue, and the message wll be lost. - # So we set passive=True to the publisher exchange and catch - # the 404 kombu ChannelError and retry until the exchange - # appears - if exc.code == 404 and timer.check_return() > 0: - LOG.info(_LI("The exchange to reply to %s doesn't " - "exist yet, retrying...") % msg_id) - time.sleep(1) - continue - self._log_publisher_send_error(msg_id, exc) - raise - except Exception as exc: - self._log_publisher_send_error(msg_id, exc) - raise + self.publisher_send(p, msg) def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): """Send a 'topic' message.""" - self.publisher_send(TopicPublisher, topic, msg, timeout, - exchange_name=exchange_name, retry=retry) + p = Publisher(self.driver_conf, + exchange_name=exchange_name, + routing_key=topic, + type='topic', + durable=self.driver_conf.amqp_durable_queues, + auto_delete=self.driver_conf.amqp_auto_delete) + self.publisher_send(p, msg, timeout, retry=retry) def fanout_send(self, topic, msg, retry=None): """Send a 'fanout' message.""" - self.publisher_send(FanoutPublisher, topic, msg, retry=retry) + + p = Publisher(self.driver_conf, + exchange_name='%s_fanout' % topic, + routing_key=None, + type='fanout', + durable=False, + auto_delete=True) + + self.publisher_send(p, msg, retry=retry) def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): """Send a notify message on a topic.""" - self.publisher_send(NotifyPublisher, topic, msg, timeout=None, - exchange_name=exchange_name, retry=retry, **kwargs) + p = DeclareQueuePublisher( + self.driver_conf, + exchange_name=exchange_name, + routing_key=topic, + type='topic', + durable=self.driver_conf.amqp_durable_queues, + auto_delete=self.driver_conf.amqp_auto_delete) + + self.publisher_send(p, msg, timeout=None, retry=retry) def consume(self, limit=None, timeout=None): """Consume from all queues/consumers.""" |