diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-02-24 20:05:10 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-02-24 20:05:10 +0000 |
commit | a17e42d5cf11ac4e176925cc5fb30db5e2034d0c (patch) | |
tree | ecdec4f0337db28af5476fdf794a2888e7942909 | |
parent | 7a537b2c6025449ad94d70440c4707e8ffd80b7c (diff) | |
parent | 4e1b813fe391dcec6b7fb8819933181fea591a86 (diff) | |
download | oslo-messaging-a17e42d5cf11ac4e176925cc5fb30db5e2034d0c.tar.gz |
Merge "Improves poller's stop logic"
-rw-r--r-- | oslo_messaging/_drivers/pika_driver/pika_poller.py | 162 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/pika/test_poller.py | 14 |
2 files changed, 110 insertions, 66 deletions
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index 9bace76..ce2b1c3 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -13,9 +13,9 @@ # under the License. import threading -import time from oslo_log import log as logging +from oslo_utils import timeutils import pika_pool import six @@ -69,8 +69,7 @@ class PikaPoller(base.Listener): if self._queues_to_consume is None: self._queues_to_consume = self._declare_queue_binding() - for queue, no_ack in six.iteritems(self._queues_to_consume): - self._start_consuming(queue, no_ack) + self._start_consuming() def _declare_queue_binding(self): """Is called by recovering connection logic if target RabbitMQ @@ -83,27 +82,43 @@ class PikaPoller(base.Listener): "It is base class. Please declare exchanges and queues here" ) - def _start_consuming(self, queue, no_ack): + def _start_consuming(self): """Is called by recovering connection logic for starting consumption - of the RabbitMQ queue - - :param queue: String, RabbitMQ queue name for consuming - :param no_ack: Boolean, Choose consuming acknowledgement mode. If True, - acknowledges are not needed. RabbitMQ considers message consumed - after sending it to consumer immediately + of configured RabbitMQ queues """ - on_message_no_ack_callback = ( - self._on_message_no_ack_callback if no_ack - else self._on_message_with_ack_callback - ) + + assert self._queues_to_consume is not None try: - self._channel.basic_consume(on_message_no_ack_callback, queue, - no_ack=no_ack) + for queue_info in self._queues_to_consume: + no_ack = queue_info["no_ack"] + + on_message_no_ack_callback = ( + self._on_message_no_ack_callback if no_ack + else self._on_message_with_ack_callback + ) + + queue_info["consumer_tag"] = self._channel.basic_consume( + on_message_no_ack_callback, queue_info["queue_name"], + no_ack=no_ack + ) except Exception: self._queues_to_consume = None raise + def _stop_consuming(self): + """Is called by poller's stop logic for stopping consumption + of configured RabbitMQ queues + """ + + assert self._queues_to_consume is not None + + for queue_info in self._queues_to_consume: + consumer_tag = queue_info["consumer_tag"] + if consumer_tag is not None: + self._channel.basic_cancel(consumer_tag) + queue_info["consumer_tag"] = None + def _on_message_no_ack_callback(self, unused, method, properties, body): """Is called by Pika when message was received from queue listened with no_ack=True mode @@ -159,54 +174,54 @@ class PikaPoller(base.Listener): timeout gets expired :return: list of PikaIncomingMessage, RabbitMQ messages """ - expiration_time = time.time() + timeout if timeout else None - - while True: - with self._lock: - if timeout is not None: - timeout = expiration_time - time.time() - if (len(self._message_queue) < prefetch_size and - self._started and ((timeout is None) or timeout > 0)): + + with timeutils.StopWatch(timeout) as stop_watch: + while True: + with self._lock: + last_queue_size = len(self._message_queue) + + if (last_queue_size >= prefetch_size + or stop_watch.expired()): + result = self._message_queue[:prefetch_size] + del self._message_queue[:prefetch_size] + return result + try: - if self._channel is None: - self._reconnect() - # we need some time_limit here, not too small to avoid - # a lot of not needed iterations but not too large to - # release lock time to time and give a chance to - # perform another method waiting this lock - self._connection.process_data_events( - time_limit=0.25 - ) + if self._started: + if self._channel is None: + self._reconnect() + # we need some time_limit here, not too small to + # avoid a lot of not needed iterations but not too + # large to release lock time to time and give a + # chance to perform another method waiting this + # lock + self._connection.process_data_events( + time_limit=0.25 + ) + else: + # consumer is stopped so we don't expect new + # messages, just process already sent events + self._connection.process_data_events( + time_limit=0 + ) + # and return result if we don't see new messages + if last_queue_size == len(self._message_queue): + result = self._message_queue[:prefetch_size] + del self._message_queue[:prefetch_size] + return result except pika_pool.Connection.connectivity_errors: self._cleanup() raise - else: - result = self._message_queue[:prefetch_size] - del self._message_queue[:prefetch_size] - return result def start(self): """Starts poller. Should be called before polling to allow message consuming """ - self._started = True - - self.reconnect() - - def stop(self): - """Stops poller. Should be called when polling is not needed anymore to - stop new message consuming. After that it is necessary to poll already - prefetched messages - """ with self._lock: - if not self._started: + if self._started: return + self._started = True - self._started = False - - def reconnect(self): - """Safe version of _reconnect. Performs reconnection to the broker.""" - with self._lock: self._cleanup() try: self._reconnect() @@ -219,6 +234,29 @@ class PikaPoller(base.Listener): else: raise exc + def stop(self): + """Stops poller. Should be called when polling is not needed anymore to + stop new message consuming. After that it is necessary to poll already + prefetched messages + """ + with self._lock: + if not self._started: + return + + if self._queues_to_consume and self._channel: + try: + self._stop_consuming() + except Exception as exc: + self._cleanup() + if isinstance(exc, + pika_pool.Connection.connectivity_errors): + raise pika_drv_exc.ConnectionException( + "Connectivity problem detected during " + "consumer canceling. " + str(exc)) + else: + raise exc + self._started = False + def cleanup(self): """Safe version of _cleanup. Cleans up allocated resources (channel, connection, etc). @@ -257,7 +295,7 @@ class RpcServicePikaPoller(PikaPoller): """ queue_expiration = self._pika_engine.rpc_queue_expiration - queues_to_consume = {} + queues_to_consume = [] for no_ack in [True, False]: exchange = self._pika_engine.get_rpc_exchange_name( @@ -272,7 +310,9 @@ class RpcServicePikaPoller(PikaPoller): routing_key=queue, exchange_type='direct', durable=False, queue_expiration=queue_expiration ) - queues_to_consume[queue] = no_ack + queues_to_consume.append( + {"queue_name": queue, "no_ack": no_ack, "consumer_tag": None} + ) if self._target.server: server_queue = self._pika_engine.get_rpc_queue_name( @@ -283,7 +323,10 @@ class RpcServicePikaPoller(PikaPoller): queue=server_queue, routing_key=server_queue, exchange_type='direct', queue_expiration=queue_expiration ) - queues_to_consume[server_queue] = no_ack + queues_to_consume.append( + {"queue_name": server_queue, "no_ack": no_ack, + "consumer_tag": None} + ) fanout_exchange = self._pika_engine.get_rpc_exchange_name( self._target.exchange, self._target.topic, True, no_ack @@ -335,7 +378,8 @@ class RpcReplyPikaPoller(PikaPoller): durable=False ) - return {self._queue: False} + return [{"queue_name": self._queue, "no_ack": False, + "consumer_tag": None}] class NotificationPikaPoller(PikaPoller): @@ -370,7 +414,7 @@ class NotificationPikaPoller(PikaPoller): :return Dictionary, declared_queue_name -> no_ack_mode """ - queues_to_consume = {} + queues_to_consume = [] for target, priority in self._targets_and_priorities: routing_key = '%s.%s' % (target.topic, priority) queue = self._queue_name or routing_key @@ -386,6 +430,8 @@ class NotificationPikaPoller(PikaPoller): queue_expiration=None, durable=self._pika_engine.notification_persistence, ) - queues_to_consume[queue] = False + queues_to_consume.append( + {"queue_name": queue, "no_ack": False, "consumer_tag": None} + ) return queues_to_consume diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py index 7bfd86f..abb0804 100644 --- a/oslo_messaging/tests/drivers/pika/test_poller.py +++ b/oslo_messaging/tests/drivers/pika/test_poller.py @@ -83,14 +83,12 @@ class PikaPollerTestCase(unittest.TestCase): for i in range(n): params.append((object(), object(), object(), object())) - index = [0] - def f(time_limit): - for i in range(10): - poller._on_message_no_ack_callback( - *params[index[0]] - ) - index[0] += 1 + if poller._started: + for k in range(n): + poller._on_message_no_ack_callback( + *params[k] + ) self._poller_connection_mock.process_data_events.side_effect = f @@ -111,7 +109,7 @@ class PikaPollerTestCase(unittest.TestCase): self.assertEqual(incoming_message_class_mock.call_count, n) self.assertEqual( - self._poller_connection_mock.process_data_events.call_count, 1) + self._poller_connection_mock.process_data_events.call_count, 2) for i in range(n - 1): self.assertEqual(res2[i], incoming_message_class_mock.return_value) |