summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitriy Ukhlov <dukhlov@mirantis.com>2015-12-14 18:49:50 +0200
committerDmitriy Ukhlov <dukhlov@mirantis.com>2015-12-14 18:49:50 +0200
commit3976a2ff81408b7f86e898eb0a87634a3f9ed2c0 (patch)
tree16996dc91811c437f7e7a633c370c9c7313db684
parentcc0f8cc8a9ff25c9fb081cac5366c12a0c06ec53 (diff)
downloadoslo-messaging-3976a2ff81408b7f86e898eb0a87634a3f9ed2c0.tar.gz
Fixes conflicts after merging master
Change-Id: I0d75c19e3002a3aad2dd35bbaea203fa9ba0c0ea
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_listener.py30
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_poller.py94
2 files changed, 52 insertions, 72 deletions
diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py
index 8eff1fe..2c33168 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_listener.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_listener.py
@@ -97,17 +97,27 @@ class RpcReplyPikaListener(object):
"""
while self._reply_poller:
try:
- message = self._reply_poller.poll()
- if message is None:
+ try:
+ messages = self._reply_poller.poll()
+ except pika_drv_exc.EstablishConnectionException:
+ LOG.exception("Problem during establishing connection for "
+ "reply polling")
+ time.sleep(
+ self._pika_engine.host_connection_reconnect_delay
+ )
continue
- message.acknowledge()
- future = self._reply_waiting_futures.pop(message.msg_id, None)
- if future is not None:
- future.set_result(message)
- except pika_drv_exc.EstablishConnectionException:
- LOG.exception("Problem during establishing connection for "
- "reply polling")
- time.sleep(self._pika_engine.host_connection_reconnect_delay)
+
+ for message in messages:
+ try:
+ message.acknowledge()
+ future = self._reply_waiting_futures.pop(
+ message.msg_id, None
+ )
+ if future is not None:
+ future.set_result(message)
+ except Exception:
+ LOG.exception("Unexpected exception during processing"
+ "reply message")
except BaseException:
LOG.exception("Unexpected exception during reply polling")
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py
index 185c8d0..1390ced 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_poller.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py
@@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import collections
import threading
import time
@@ -31,27 +30,30 @@ class PikaPoller(object):
connectivity related problem detected
"""
- def __init__(self, pika_engine, prefetch_count):
+ def __init__(self, pika_engine, prefetch_count,
+ incoming_message_class=pika_drv_msg.PikaIncomingMessage):
"""Initialize required fields
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param prefetch_count: Integer, maximum count of unacknowledged
messages which RabbitMQ broker sends to this consumer
+ :param incoming_message_class: PikaIncomingMessage, wrapper for
+ consumed RabbitMQ message
"""
self._pika_engine = pika_engine
+ self._prefetch_count = prefetch_count
+ self._incoming_message_class = incoming_message_class
self._connection = None
self._channel = None
self._lock = threading.Lock()
- self._prefetch_count = prefetch_count
-
self._started = False
self._queues_to_consume = None
- self._message_queue = collections.deque()
+ self._message_queue = []
def _reconnect(self):
"""Performs reconnection to the broker. It is unsafe method for
@@ -106,7 +108,10 @@ class PikaPoller(object):
no_ack=True mode
"""
self._message_queue.append(
- (self._channel, method, properties, body, True)
+ self._incoming_message_class(
+ self._pika_engine, self._channel, method, properties, body,
+ True
+ )
)
def _on_message_with_ack_callback(self, unused, method, properties, body):
@@ -114,7 +119,10 @@ class PikaPoller(object):
no_ack=False mode
"""
self._message_queue.append(
- (self._channel, method, properties, body, False)
+ self._incoming_message_class(
+ self._pika_engine, self._channel, method, properties, body,
+ False
+ )
)
def _cleanup(self):
@@ -137,17 +145,19 @@ class PikaPoller(object):
LOG.exception("Unexpected error during closing connection")
self._connection = None
- def poll(self, timeout=None):
+ def poll(self, timeout=None, prefetch_size=1):
"""Main method of this class - consumes message from RabbitMQ
:param: timeout: float, seconds, timeout for waiting new incoming
message, None means wait forever
- :return: tuple, RabbitMQ message related data
- (channel, method, properties, body, no_ack)
+ :param: prefetch_size: Integer, count of messages which we are want to
+ poll. It blocks until prefetch_size messages are consumed or until
+ timeout gets expired
+ :return: list of PikaIncomingMessage, RabbitMQ messages
"""
expiration_time = time.time() + timeout if timeout else None
- while not self._message_queue:
+ while len(self._message_queue) < prefetch_size:
with self._lock:
if not self._started:
return None
@@ -162,15 +172,17 @@ class PikaPoller(object):
self._connection.process_data_events(
time_limit=0.25
)
- except Exception:
+ except Exception as e:
+ LOG.warn("Exception during consuming message. " + str(e))
self._cleanup()
- raise
if timeout is not None:
timeout = expiration_time - time.time()
if timeout <= 0:
- return None
+ break
- return self._message_queue.popleft()
+ result = self._message_queue[:prefetch_size]
+ self._message_queue = self._message_queue[prefetch_size:]
+ return result
def start(self):
"""Starts poller. Should be called before polling to allow message
@@ -226,7 +238,9 @@ class RpcServicePikaPoller(PikaPoller):
self._target = target
super(RpcServicePikaPoller, self).__init__(
- pika_engine, prefetch_count=prefetch_count)
+ pika_engine, prefetch_count=prefetch_count,
+ incoming_message_class=pika_drv_msg.RpcPikaIncomingMessage
+ )
def _declare_queue_binding(self):
"""Overrides base method and perform declaration of RabbitMQ exchanges
@@ -274,21 +288,6 @@ class RpcServicePikaPoller(PikaPoller):
)
return queues_to_consume
- def poll(self, timeout=None):
- """Overrides base method and wrap RabbitMQ message into
- RpcPikaIncomingMessage
-
- :param: timeout: float, seconds, timeout for waiting new incoming
- message, None means wait forever
- :return: RpcPikaIncomingMessage, consumed RPC message
- """
- msg = super(RpcServicePikaPoller, self).poll(timeout)
- if msg is None:
- return None
- return pika_drv_msg.RpcPikaIncomingMessage(
- self._pika_engine, *msg
- )
-
class RpcReplyPikaPoller(PikaPoller):
"""PikaPoller implementation for polling RPC reply messages. Overrides
@@ -309,7 +308,8 @@ class RpcReplyPikaPoller(PikaPoller):
self._queue = queue
super(RpcReplyPikaPoller, self).__init__(
- pika_engine, prefetch_count
+ pika_engine=pika_engine, prefetch_count=prefetch_count,
+ incoming_message_class=pika_drv_msg.RpcReplyPikaIncomingMessage
)
def _declare_queue_binding(self):
@@ -355,21 +355,6 @@ class RpcReplyPikaPoller(PikaPoller):
retrier(self.reconnect)()
- def poll(self, timeout=None):
- """Overrides base method and wrap RabbitMQ message into
- RpcReplyPikaIncomingMessage
-
- :param: timeout: float, seconds, timeout for waiting new incoming
- message, None means wait forever
- :return: RpcReplyPikaIncomingMessage, consumed RPC reply message
- """
- msg = super(RpcReplyPikaPoller, self).poll(timeout)
- if msg is None:
- return None
- return pika_drv_msg.RpcReplyPikaIncomingMessage(
- self._pika_engine, *msg
- )
-
class NotificationPikaPoller(PikaPoller):
"""PikaPoller implementation for polling Notification messages. Overrides
@@ -421,18 +406,3 @@ class NotificationPikaPoller(PikaPoller):
queues_to_consume[queue] = False
return queues_to_consume
-
- def poll(self, timeout=None):
- """Overrides base method and wrap RabbitMQ message into
- PikaIncomingMessage
-
- :param: timeout: float, seconds, timeout for waiting new incoming
- message, None means wait forever
- :return: PikaIncomingMessage, consumed Notification message
- """
- msg = super(NotificationPikaPoller, self).poll(timeout)
- if msg is None:
- return None
- return pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, *msg
- )