diff options
author | Mehdi Abaakouk <sileht@redhat.com> | 2016-02-15 12:27:21 +0100 |
---|---|---|
committer | Mehdi Abaakouk <sileht@redhat.com> | 2016-02-19 08:28:56 +0100 |
commit | 7347e1445345b4e2a8bc9f93dd1ec53d7e32b516 (patch) | |
tree | 50a6511e64fb7a0d218fc138d3b4dda323732c30 | |
parent | 03888224a4e01b0c9af11a6db52417104b0f0991 (diff) | |
download | oslo-messaging-7347e1445345b4e2a8bc9f93dd1ec53d7e32b516.tar.gz |
Reduce number of rabbitmq consumer tag used
The rabbit driver creates a rabbit consumer tag per
AMQP consumer (in oslo.messsaging meaning) while we
should create only one consumer tag per queue name.
For RPC, this doesn't change anything because each AMQP
consumer (in oslo.messaging meaning) have its own queue.
But for notification API, this is a huge improvement.
For ceilometer this reduces the number of rabbit consumer to
declare from 15 to 2 on rabbit side.
Change-Id: I181df12439485950e944c280c8920476976447d7
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 34 |
1 files changed, 23 insertions, 11 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 455767f..4a1ce66 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -15,6 +15,7 @@ import collections import contextlib import functools +import itertools import os import random import socket @@ -487,8 +488,11 @@ class Connection(object): self._initial_pid = os.getpid() - self._consumers = [] - self._new_consumers = [] + self._consumers = {} + self._new_tags = set() + self._active_tags = {} + self._tags = itertools.count(1) + self._consume_loop_stopped = False self.channel = None self.purpose = purpose @@ -795,11 +799,14 @@ class Connection(object): with self._connection_lock: try: - for tag, consumer in enumerate(self._consumers): + for consumer, tag in self._consumers.items(): consumer.cancel(tag=tag) except recoverable_errors: self.ensure_connection() - self._consumers = [] + self._consumers.clear() + self._active_tags.clear() + self._new_tags.clear() + self._tags = itertools.count(1) def _heartbeat_supported_and_enabled(self): if self.heartbeat_timeout_threshold <= 0: @@ -909,8 +916,13 @@ class Connection(object): def _declare_consumer(): consumer.declare(self) - self._consumers.append(consumer) - self._new_consumers.append(consumer) + tag = self._active_tags.get(consumer.queue_name) + if tag is None: + tag = next(self._tags) + self._active_tags[consumer.queue_name] = tag + self._new_tags.add(tag) + + self._consumers[consumer] = tag return consumer with self._connection_lock: @@ -929,7 +941,7 @@ class Connection(object): def _recoverable_error_callback(exc): if not isinstance(exc, rpc_common.Timeout): - self._new_consumers = self._consumers + self._new_tags = set(self._consumers.values()) timer.check_return(_raise_timeout, exc) def _error_callback(exc): @@ -945,11 +957,11 @@ class Connection(object): if not self.connection.connected: raise self.connection.recoverable_connection_errors[0] - if self._new_consumers: - for tag, consumer in enumerate(self._consumers): - if consumer in self._new_consumers: + if self._new_tags: + for consumer, tag in self._consumers.items(): + if tag in self._new_tags: consumer.consume(tag=tag) - self._new_consumers = [] + self._new_tags.remove(tag) poll_timeout = (self._poll_timeout if timeout is None else min(timeout, self._poll_timeout)) |