summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-02-24 20:05:10 +0000
committerGerrit Code Review <review@openstack.org>2016-02-24 20:05:10 +0000
commita17e42d5cf11ac4e176925cc5fb30db5e2034d0c (patch)
treeecdec4f0337db28af5476fdf794a2888e7942909
parent7a537b2c6025449ad94d70440c4707e8ffd80b7c (diff)
parent4e1b813fe391dcec6b7fb8819933181fea591a86 (diff)
downloadoslo-messaging-a17e42d5cf11ac4e176925cc5fb30db5e2034d0c.tar.gz
Merge "Improves poller's stop logic"
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_poller.py162
-rw-r--r--oslo_messaging/tests/drivers/pika/test_poller.py14
2 files changed, 110 insertions, 66 deletions
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py
index 9bace76..ce2b1c3 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_poller.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py
@@ -13,9 +13,9 @@
# under the License.
import threading
-import time
from oslo_log import log as logging
+from oslo_utils import timeutils
import pika_pool
import six
@@ -69,8 +69,7 @@ class PikaPoller(base.Listener):
if self._queues_to_consume is None:
self._queues_to_consume = self._declare_queue_binding()
- for queue, no_ack in six.iteritems(self._queues_to_consume):
- self._start_consuming(queue, no_ack)
+ self._start_consuming()
def _declare_queue_binding(self):
"""Is called by recovering connection logic if target RabbitMQ
@@ -83,27 +82,43 @@ class PikaPoller(base.Listener):
"It is base class. Please declare exchanges and queues here"
)
- def _start_consuming(self, queue, no_ack):
+ def _start_consuming(self):
"""Is called by recovering connection logic for starting consumption
- of the RabbitMQ queue
-
- :param queue: String, RabbitMQ queue name for consuming
- :param no_ack: Boolean, Choose consuming acknowledgement mode. If True,
- acknowledges are not needed. RabbitMQ considers message consumed
- after sending it to consumer immediately
+ of configured RabbitMQ queues
"""
- on_message_no_ack_callback = (
- self._on_message_no_ack_callback if no_ack
- else self._on_message_with_ack_callback
- )
+
+ assert self._queues_to_consume is not None
try:
- self._channel.basic_consume(on_message_no_ack_callback, queue,
- no_ack=no_ack)
+ for queue_info in self._queues_to_consume:
+ no_ack = queue_info["no_ack"]
+
+ on_message_no_ack_callback = (
+ self._on_message_no_ack_callback if no_ack
+ else self._on_message_with_ack_callback
+ )
+
+ queue_info["consumer_tag"] = self._channel.basic_consume(
+ on_message_no_ack_callback, queue_info["queue_name"],
+ no_ack=no_ack
+ )
except Exception:
self._queues_to_consume = None
raise
+ def _stop_consuming(self):
+ """Is called by poller's stop logic for stopping consumption
+ of configured RabbitMQ queues
+ """
+
+ assert self._queues_to_consume is not None
+
+ for queue_info in self._queues_to_consume:
+ consumer_tag = queue_info["consumer_tag"]
+ if consumer_tag is not None:
+ self._channel.basic_cancel(consumer_tag)
+ queue_info["consumer_tag"] = None
+
def _on_message_no_ack_callback(self, unused, method, properties, body):
"""Is called by Pika when message was received from queue listened with
no_ack=True mode
@@ -159,54 +174,54 @@ class PikaPoller(base.Listener):
timeout gets expired
:return: list of PikaIncomingMessage, RabbitMQ messages
"""
- expiration_time = time.time() + timeout if timeout else None
-
- while True:
- with self._lock:
- if timeout is not None:
- timeout = expiration_time - time.time()
- if (len(self._message_queue) < prefetch_size and
- self._started and ((timeout is None) or timeout > 0)):
+
+ with timeutils.StopWatch(timeout) as stop_watch:
+ while True:
+ with self._lock:
+ last_queue_size = len(self._message_queue)
+
+ if (last_queue_size >= prefetch_size
+ or stop_watch.expired()):
+ result = self._message_queue[:prefetch_size]
+ del self._message_queue[:prefetch_size]
+ return result
+
try:
- if self._channel is None:
- self._reconnect()
- # we need some time_limit here, not too small to avoid
- # a lot of not needed iterations but not too large to
- # release lock time to time and give a chance to
- # perform another method waiting this lock
- self._connection.process_data_events(
- time_limit=0.25
- )
+ if self._started:
+ if self._channel is None:
+ self._reconnect()
+ # we need some time_limit here, not too small to
+ # avoid a lot of not needed iterations but not too
+ # large to release lock time to time and give a
+ # chance to perform another method waiting this
+ # lock
+ self._connection.process_data_events(
+ time_limit=0.25
+ )
+ else:
+ # consumer is stopped so we don't expect new
+ # messages, just process already sent events
+ self._connection.process_data_events(
+ time_limit=0
+ )
+ # and return result if we don't see new messages
+ if last_queue_size == len(self._message_queue):
+ result = self._message_queue[:prefetch_size]
+ del self._message_queue[:prefetch_size]
+ return result
except pika_pool.Connection.connectivity_errors:
self._cleanup()
raise
- else:
- result = self._message_queue[:prefetch_size]
- del self._message_queue[:prefetch_size]
- return result
def start(self):
"""Starts poller. Should be called before polling to allow message
consuming
"""
- self._started = True
-
- self.reconnect()
-
- def stop(self):
- """Stops poller. Should be called when polling is not needed anymore to
- stop new message consuming. After that it is necessary to poll already
- prefetched messages
- """
with self._lock:
- if not self._started:
+ if self._started:
return
+ self._started = True
- self._started = False
-
- def reconnect(self):
- """Safe version of _reconnect. Performs reconnection to the broker."""
- with self._lock:
self._cleanup()
try:
self._reconnect()
@@ -219,6 +234,29 @@ class PikaPoller(base.Listener):
else:
raise exc
+ def stop(self):
+ """Stops poller. Should be called when polling is not needed anymore to
+ stop new message consuming. After that it is necessary to poll already
+ prefetched messages
+ """
+ with self._lock:
+ if not self._started:
+ return
+
+ if self._queues_to_consume and self._channel:
+ try:
+ self._stop_consuming()
+ except Exception as exc:
+ self._cleanup()
+ if isinstance(exc,
+ pika_pool.Connection.connectivity_errors):
+ raise pika_drv_exc.ConnectionException(
+ "Connectivity problem detected during "
+ "consumer canceling. " + str(exc))
+ else:
+ raise exc
+ self._started = False
+
def cleanup(self):
"""Safe version of _cleanup. Cleans up allocated resources (channel,
connection, etc).
@@ -257,7 +295,7 @@ class RpcServicePikaPoller(PikaPoller):
"""
queue_expiration = self._pika_engine.rpc_queue_expiration
- queues_to_consume = {}
+ queues_to_consume = []
for no_ack in [True, False]:
exchange = self._pika_engine.get_rpc_exchange_name(
@@ -272,7 +310,9 @@ class RpcServicePikaPoller(PikaPoller):
routing_key=queue, exchange_type='direct', durable=False,
queue_expiration=queue_expiration
)
- queues_to_consume[queue] = no_ack
+ queues_to_consume.append(
+ {"queue_name": queue, "no_ack": no_ack, "consumer_tag": None}
+ )
if self._target.server:
server_queue = self._pika_engine.get_rpc_queue_name(
@@ -283,7 +323,10 @@ class RpcServicePikaPoller(PikaPoller):
queue=server_queue, routing_key=server_queue,
exchange_type='direct', queue_expiration=queue_expiration
)
- queues_to_consume[server_queue] = no_ack
+ queues_to_consume.append(
+ {"queue_name": server_queue, "no_ack": no_ack,
+ "consumer_tag": None}
+ )
fanout_exchange = self._pika_engine.get_rpc_exchange_name(
self._target.exchange, self._target.topic, True, no_ack
@@ -335,7 +378,8 @@ class RpcReplyPikaPoller(PikaPoller):
durable=False
)
- return {self._queue: False}
+ return [{"queue_name": self._queue, "no_ack": False,
+ "consumer_tag": None}]
class NotificationPikaPoller(PikaPoller):
@@ -370,7 +414,7 @@ class NotificationPikaPoller(PikaPoller):
:return Dictionary, declared_queue_name -> no_ack_mode
"""
- queues_to_consume = {}
+ queues_to_consume = []
for target, priority in self._targets_and_priorities:
routing_key = '%s.%s' % (target.topic, priority)
queue = self._queue_name or routing_key
@@ -386,6 +430,8 @@ class NotificationPikaPoller(PikaPoller):
queue_expiration=None,
durable=self._pika_engine.notification_persistence,
)
- queues_to_consume[queue] = False
+ queues_to_consume.append(
+ {"queue_name": queue, "no_ack": False, "consumer_tag": None}
+ )
return queues_to_consume
diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py
index 7bfd86f..abb0804 100644
--- a/oslo_messaging/tests/drivers/pika/test_poller.py
+++ b/oslo_messaging/tests/drivers/pika/test_poller.py
@@ -83,14 +83,12 @@ class PikaPollerTestCase(unittest.TestCase):
for i in range(n):
params.append((object(), object(), object(), object()))
- index = [0]
-
def f(time_limit):
- for i in range(10):
- poller._on_message_no_ack_callback(
- *params[index[0]]
- )
- index[0] += 1
+ if poller._started:
+ for k in range(n):
+ poller._on_message_no_ack_callback(
+ *params[k]
+ )
self._poller_connection_mock.process_data_events.side_effect = f
@@ -111,7 +109,7 @@ class PikaPollerTestCase(unittest.TestCase):
self.assertEqual(incoming_message_class_mock.call_count, n)
self.assertEqual(
- self._poller_connection_mock.process_data_events.call_count, 1)
+ self._poller_connection_mock.process_data_events.call_count, 2)
for i in range(n - 1):
self.assertEqual(res2[i], incoming_message_class_mock.return_value)