diff options
Diffstat (limited to 'oslo_messaging/_drivers/pika_driver/pika_poller.py')
-rw-r--r-- | oslo_messaging/_drivers/pika_driver/pika_poller.py | 79 |
1 files changed, 36 insertions, 43 deletions
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index 5aa948a..3533dad 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -31,8 +31,7 @@ class PikaPoller(object): connectivity related problem detected """ - def __init__(self, pika_engine, prefetch_count, - incoming_message_class=pika_drv_msg.PikaIncomingMessage): + def __init__(self, pika_engine, prefetch_count, incoming_message_class): """Initialize required fields :param pika_engine: PikaEngine, shared object with configuration and @@ -110,8 +109,7 @@ class PikaPoller(object): """ self._message_queue.append( self._incoming_message_class( - self._pika_engine, self._channel, method, properties, body, - True + self._pika_engine, None, method, properties, body ) ) @@ -121,8 +119,7 @@ class PikaPoller(object): """ self._message_queue.append( self._incoming_message_class( - self._pika_engine, self._channel, method, properties, body, - False + self._pika_engine, self._channel, method, properties, body ) ) @@ -146,6 +143,11 @@ class PikaPoller(object): LOG.exception("Unexpected error during closing connection") self._connection = None + for i in xrange(len(self._message_queue) - 1, -1, -1): + message = self._message_queue[i] + if message.need_ack(): + del self._message_queue[i] + def poll(self, timeout=None, prefetch_size=1): """Main method of this class - consumes message from RabbitMQ @@ -158,32 +160,29 @@ class PikaPoller(object): """ expiration_time = time.time() + timeout if timeout else None - while len(self._message_queue) < prefetch_size: + while True: with self._lock: - if not self._started: - return None - - try: - if self._channel is None: - self._reconnect() - # we need some time_limit here, not too small to avoid a - # lot of not needed iterations but not too large to release - # lock time to time and give a chance to perform another - # method waiting this lock - self._connection.process_data_events( - time_limit=0.25 - ) - except Exception as e: - LOG.warn("Exception during consuming message. " + str(e)) - self._cleanup() - if timeout is not None: - timeout = expiration_time - time.time() - if timeout <= 0: - break - - result = self._message_queue[:prefetch_size] - self._message_queue = self._message_queue[prefetch_size:] - return result + if timeout is not None: + timeout = expiration_time - time.time() + if (len(self._message_queue) < prefetch_size and + self._started and ((timeout is None) or timeout > 0)): + try: + if self._channel is None: + self._reconnect() + # we need some time_limit here, not too small to avoid + # a lot of not needed iterations but not too large to + # release lock time to time and give a chance to + # perform another method waiting this lock + self._connection.process_data_events( + time_limit=0.25 + ) + except pika_pool.Connection.connectivity_errors: + self._cleanup() + raise + else: + result = self._message_queue[:prefetch_size] + del self._message_queue[:prefetch_size] + return result def start(self): """Starts poller. Should be called before polling to allow message @@ -201,7 +200,6 @@ class PikaPoller(object): return self._started = False - self._cleanup() def reconnect(self): """Safe version of _reconnect. Performs reconnection to the broker.""" @@ -249,9 +247,7 @@ class RpcServicePikaPoller(PikaPoller): :return Dictionary, declared_queue_name -> no_ack_mode """ - queue_expiration = ( - self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration - ) + queue_expiration = self._pika_engine.rpc_queue_expiration queues_to_consume = {} @@ -319,15 +315,11 @@ class RpcReplyPikaPoller(PikaPoller): :return Dictionary, declared_queue_name -> no_ack_mode """ - queue_expiration = ( - self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration - ) - self._pika_engine.declare_queue_binding_by_channel( channel=self._channel, exchange=self._exchange, queue=self._queue, routing_key=self._queue, exchange_type='direct', - queue_expiration=queue_expiration, + queue_expiration=self._pika_engine.rpc_queue_expiration, durable=False ) @@ -363,8 +355,8 @@ class NotificationPikaPoller(PikaPoller): """ def __init__(self, pika_engine, targets_and_priorities, queue_name=None, prefetch_count=100): - """Adds exchange and queue parameter for declaring exchange and queue - used for RPC reply delivery + """Adds targets_and_priorities and queue_name parameter + for declaring exchanges and queues used for notification delivery :param pika_engine: PikaEngine, shared object with configuration and shared driver functionality @@ -379,7 +371,8 @@ class NotificationPikaPoller(PikaPoller): self._queue_name = queue_name super(NotificationPikaPoller, self).__init__( - pika_engine, prefetch_count=prefetch_count + pika_engine, prefetch_count=prefetch_count, + incoming_message_class=pika_drv_msg.PikaIncomingMessage ) def _declare_queue_binding(self): |