summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2016-02-15 12:27:21 +0100
committerMehdi Abaakouk <sileht@redhat.com>2016-02-19 08:28:56 +0100
commit7347e1445345b4e2a8bc9f93dd1ec53d7e32b516 (patch)
tree50a6511e64fb7a0d218fc138d3b4dda323732c30
parent03888224a4e01b0c9af11a6db52417104b0f0991 (diff)
downloadoslo-messaging-7347e1445345b4e2a8bc9f93dd1ec53d7e32b516.tar.gz
Reduce number of rabbitmq consumer tag used
The rabbit driver creates a rabbit consumer tag per AMQP consumer (in oslo.messsaging meaning) while we should create only one consumer tag per queue name. For RPC, this doesn't change anything because each AMQP consumer (in oslo.messaging meaning) have its own queue. But for notification API, this is a huge improvement. For ceilometer this reduces the number of rabbit consumer to declare from 15 to 2 on rabbit side. Change-Id: I181df12439485950e944c280c8920476976447d7
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py34
1 files changed, 23 insertions, 11 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 455767f..4a1ce66 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -15,6 +15,7 @@
import collections
import contextlib
import functools
+import itertools
import os
import random
import socket
@@ -487,8 +488,11 @@ class Connection(object):
self._initial_pid = os.getpid()
- self._consumers = []
- self._new_consumers = []
+ self._consumers = {}
+ self._new_tags = set()
+ self._active_tags = {}
+ self._tags = itertools.count(1)
+
self._consume_loop_stopped = False
self.channel = None
self.purpose = purpose
@@ -795,11 +799,14 @@ class Connection(object):
with self._connection_lock:
try:
- for tag, consumer in enumerate(self._consumers):
+ for consumer, tag in self._consumers.items():
consumer.cancel(tag=tag)
except recoverable_errors:
self.ensure_connection()
- self._consumers = []
+ self._consumers.clear()
+ self._active_tags.clear()
+ self._new_tags.clear()
+ self._tags = itertools.count(1)
def _heartbeat_supported_and_enabled(self):
if self.heartbeat_timeout_threshold <= 0:
@@ -909,8 +916,13 @@ class Connection(object):
def _declare_consumer():
consumer.declare(self)
- self._consumers.append(consumer)
- self._new_consumers.append(consumer)
+ tag = self._active_tags.get(consumer.queue_name)
+ if tag is None:
+ tag = next(self._tags)
+ self._active_tags[consumer.queue_name] = tag
+ self._new_tags.add(tag)
+
+ self._consumers[consumer] = tag
return consumer
with self._connection_lock:
@@ -929,7 +941,7 @@ class Connection(object):
def _recoverable_error_callback(exc):
if not isinstance(exc, rpc_common.Timeout):
- self._new_consumers = self._consumers
+ self._new_tags = set(self._consumers.values())
timer.check_return(_raise_timeout, exc)
def _error_callback(exc):
@@ -945,11 +957,11 @@ class Connection(object):
if not self.connection.connected:
raise self.connection.recoverable_connection_errors[0]
- if self._new_consumers:
- for tag, consumer in enumerate(self._consumers):
- if consumer in self._new_consumers:
+ if self._new_tags:
+ for consumer, tag in self._consumers.items():
+ if tag in self._new_tags:
consumer.consume(tag=tag)
- self._new_consumers = []
+ self._new_tags.remove(tag)
poll_timeout = (self._poll_timeout if timeout is None
else min(timeout, self._poll_timeout))