diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-02-24 03:36:12 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-02-24 03:36:12 +0000 |
commit | f848a5672725d604ae0bbfa084de92ad537ae921 (patch) | |
tree | e8aa25939fa826385ee273861c6be1c73eec1b6f | |
parent | 629632bfff840cb6f65d56f63b99ec6684900d13 (diff) | |
parent | 7347e1445345b4e2a8bc9f93dd1ec53d7e32b516 (diff) | |
download | oslo-messaging-f848a5672725d604ae0bbfa084de92ad537ae921.tar.gz |
Merge "Reduce number of rabbitmq consumer tag used"
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 34 |
1 files changed, 23 insertions, 11 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 455767f..4a1ce66 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -15,6 +15,7 @@ import collections import contextlib import functools +import itertools import os import random import socket @@ -487,8 +488,11 @@ class Connection(object): self._initial_pid = os.getpid() - self._consumers = [] - self._new_consumers = [] + self._consumers = {} + self._new_tags = set() + self._active_tags = {} + self._tags = itertools.count(1) + self._consume_loop_stopped = False self.channel = None self.purpose = purpose @@ -795,11 +799,14 @@ class Connection(object): with self._connection_lock: try: - for tag, consumer in enumerate(self._consumers): + for consumer, tag in self._consumers.items(): consumer.cancel(tag=tag) except recoverable_errors: self.ensure_connection() - self._consumers = [] + self._consumers.clear() + self._active_tags.clear() + self._new_tags.clear() + self._tags = itertools.count(1) def _heartbeat_supported_and_enabled(self): if self.heartbeat_timeout_threshold <= 0: @@ -909,8 +916,13 @@ class Connection(object): def _declare_consumer(): consumer.declare(self) - self._consumers.append(consumer) - self._new_consumers.append(consumer) + tag = self._active_tags.get(consumer.queue_name) + if tag is None: + tag = next(self._tags) + self._active_tags[consumer.queue_name] = tag + self._new_tags.add(tag) + + self._consumers[consumer] = tag return consumer with self._connection_lock: @@ -929,7 +941,7 @@ class Connection(object): def _recoverable_error_callback(exc): if not isinstance(exc, rpc_common.Timeout): - self._new_consumers = self._consumers + self._new_tags = set(self._consumers.values()) timer.check_return(_raise_timeout, exc) def _error_callback(exc): @@ -945,11 +957,11 @@ class Connection(object): if not self.connection.connected: raise self.connection.recoverable_connection_errors[0] - if self._new_consumers: - for tag, consumer in enumerate(self._consumers): - if consumer in self._new_consumers: + if self._new_tags: + for consumer, tag in self._consumers.items(): + if tag in self._new_tags: consumer.consume(tag=tag) - self._new_consumers = [] + self._new_tags.remove(tag) poll_timeout = (self._poll_timeout if timeout is None else min(timeout, self._poll_timeout)) |