summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-02-24 03:36:12 +0000
committerGerrit Code Review <review@openstack.org>2016-02-24 03:36:12 +0000
commitf848a5672725d604ae0bbfa084de92ad537ae921 (patch)
treee8aa25939fa826385ee273861c6be1c73eec1b6f
parent629632bfff840cb6f65d56f63b99ec6684900d13 (diff)
parent7347e1445345b4e2a8bc9f93dd1ec53d7e32b516 (diff)
downloadoslo-messaging-f848a5672725d604ae0bbfa084de92ad537ae921.tar.gz
Merge "Reduce number of rabbitmq consumer tag used"
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py34
1 files changed, 23 insertions, 11 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 455767f..4a1ce66 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -15,6 +15,7 @@
import collections
import contextlib
import functools
+import itertools
import os
import random
import socket
@@ -487,8 +488,11 @@ class Connection(object):
self._initial_pid = os.getpid()
- self._consumers = []
- self._new_consumers = []
+ self._consumers = {}
+ self._new_tags = set()
+ self._active_tags = {}
+ self._tags = itertools.count(1)
+
self._consume_loop_stopped = False
self.channel = None
self.purpose = purpose
@@ -795,11 +799,14 @@ class Connection(object):
with self._connection_lock:
try:
- for tag, consumer in enumerate(self._consumers):
+ for consumer, tag in self._consumers.items():
consumer.cancel(tag=tag)
except recoverable_errors:
self.ensure_connection()
- self._consumers = []
+ self._consumers.clear()
+ self._active_tags.clear()
+ self._new_tags.clear()
+ self._tags = itertools.count(1)
def _heartbeat_supported_and_enabled(self):
if self.heartbeat_timeout_threshold <= 0:
@@ -909,8 +916,13 @@ class Connection(object):
def _declare_consumer():
consumer.declare(self)
- self._consumers.append(consumer)
- self._new_consumers.append(consumer)
+ tag = self._active_tags.get(consumer.queue_name)
+ if tag is None:
+ tag = next(self._tags)
+ self._active_tags[consumer.queue_name] = tag
+ self._new_tags.add(tag)
+
+ self._consumers[consumer] = tag
return consumer
with self._connection_lock:
@@ -929,7 +941,7 @@ class Connection(object):
def _recoverable_error_callback(exc):
if not isinstance(exc, rpc_common.Timeout):
- self._new_consumers = self._consumers
+ self._new_tags = set(self._consumers.values())
timer.check_return(_raise_timeout, exc)
def _error_callback(exc):
@@ -945,11 +957,11 @@ class Connection(object):
if not self.connection.connected:
raise self.connection.recoverable_connection_errors[0]
- if self._new_consumers:
- for tag, consumer in enumerate(self._consumers):
- if consumer in self._new_consumers:
+ if self._new_tags:
+ for consumer, tag in self._consumers.items():
+ if tag in self._new_tags:
consumer.consume(tag=tag)
- self._new_consumers = []
+ self._new_tags.remove(tag)
poll_timeout = (self._poll_timeout if timeout is None
else min(timeout, self._poll_timeout))