summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/pika_driver/pika_poller.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers/pika_driver/pika_poller.py')
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_poller.py79
1 files changed, 36 insertions, 43 deletions
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py
index 5aa948a..3533dad 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_poller.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py
@@ -31,8 +31,7 @@ class PikaPoller(object):
connectivity related problem detected
"""
- def __init__(self, pika_engine, prefetch_count,
- incoming_message_class=pika_drv_msg.PikaIncomingMessage):
+ def __init__(self, pika_engine, prefetch_count, incoming_message_class):
"""Initialize required fields
:param pika_engine: PikaEngine, shared object with configuration and
@@ -110,8 +109,7 @@ class PikaPoller(object):
"""
self._message_queue.append(
self._incoming_message_class(
- self._pika_engine, self._channel, method, properties, body,
- True
+ self._pika_engine, None, method, properties, body
)
)
@@ -121,8 +119,7 @@ class PikaPoller(object):
"""
self._message_queue.append(
self._incoming_message_class(
- self._pika_engine, self._channel, method, properties, body,
- False
+ self._pika_engine, self._channel, method, properties, body
)
)
@@ -146,6 +143,11 @@ class PikaPoller(object):
LOG.exception("Unexpected error during closing connection")
self._connection = None
+ for i in xrange(len(self._message_queue) - 1, -1, -1):
+ message = self._message_queue[i]
+ if message.need_ack():
+ del self._message_queue[i]
+
def poll(self, timeout=None, prefetch_size=1):
"""Main method of this class - consumes message from RabbitMQ
@@ -158,32 +160,29 @@ class PikaPoller(object):
"""
expiration_time = time.time() + timeout if timeout else None
- while len(self._message_queue) < prefetch_size:
+ while True:
with self._lock:
- if not self._started:
- return None
-
- try:
- if self._channel is None:
- self._reconnect()
- # we need some time_limit here, not too small to avoid a
- # lot of not needed iterations but not too large to release
- # lock time to time and give a chance to perform another
- # method waiting this lock
- self._connection.process_data_events(
- time_limit=0.25
- )
- except Exception as e:
- LOG.warn("Exception during consuming message. " + str(e))
- self._cleanup()
- if timeout is not None:
- timeout = expiration_time - time.time()
- if timeout <= 0:
- break
-
- result = self._message_queue[:prefetch_size]
- self._message_queue = self._message_queue[prefetch_size:]
- return result
+ if timeout is not None:
+ timeout = expiration_time - time.time()
+ if (len(self._message_queue) < prefetch_size and
+ self._started and ((timeout is None) or timeout > 0)):
+ try:
+ if self._channel is None:
+ self._reconnect()
+ # we need some time_limit here, not too small to avoid
+ # a lot of not needed iterations but not too large to
+ # release lock time to time and give a chance to
+ # perform another method waiting this lock
+ self._connection.process_data_events(
+ time_limit=0.25
+ )
+ except pika_pool.Connection.connectivity_errors:
+ self._cleanup()
+ raise
+ else:
+ result = self._message_queue[:prefetch_size]
+ del self._message_queue[:prefetch_size]
+ return result
def start(self):
"""Starts poller. Should be called before polling to allow message
@@ -201,7 +200,6 @@ class PikaPoller(object):
return
self._started = False
- self._cleanup()
def reconnect(self):
"""Safe version of _reconnect. Performs reconnection to the broker."""
@@ -249,9 +247,7 @@ class RpcServicePikaPoller(PikaPoller):
:return Dictionary, declared_queue_name -> no_ack_mode
"""
- queue_expiration = (
- self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration
- )
+ queue_expiration = self._pika_engine.rpc_queue_expiration
queues_to_consume = {}
@@ -319,15 +315,11 @@ class RpcReplyPikaPoller(PikaPoller):
:return Dictionary, declared_queue_name -> no_ack_mode
"""
- queue_expiration = (
- self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration
- )
-
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel,
exchange=self._exchange, queue=self._queue,
routing_key=self._queue, exchange_type='direct',
- queue_expiration=queue_expiration,
+ queue_expiration=self._pika_engine.rpc_queue_expiration,
durable=False
)
@@ -363,8 +355,8 @@ class NotificationPikaPoller(PikaPoller):
"""
def __init__(self, pika_engine, targets_and_priorities,
queue_name=None, prefetch_count=100):
- """Adds exchange and queue parameter for declaring exchange and queue
- used for RPC reply delivery
+ """Adds targets_and_priorities and queue_name parameter
+ for declaring exchanges and queues used for notification delivery
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
@@ -379,7 +371,8 @@ class NotificationPikaPoller(PikaPoller):
self._queue_name = queue_name
super(NotificationPikaPoller, self).__init__(
- pika_engine, prefetch_count=prefetch_count
+ pika_engine, prefetch_count=prefetch_count,
+ incoming_message_class=pika_drv_msg.PikaIncomingMessage
)
def _declare_queue_binding(self):