diff options
-rw-r--r-- | oslo_messaging/_drivers/amqp1_driver/controller.py | 157 | ||||
-rw-r--r-- | oslo_messaging/_drivers/amqp1_driver/opts.py | 9 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_amqp1.py | 6 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_amqp_driver.py | 90 |
4 files changed, 192 insertions, 70 deletions
diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index 49aba92..56fd977 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -104,10 +104,10 @@ class SendTask(Task): self.target = target() if isinstance(target, Target) else target self.message = message self.deadline = deadline - self.retry = retry self.wait_for_ack = wait_for_ack self.service = SERVICE_NOTIFY if notification else SERVICE_RPC self.timer = None + self._retry = None if retry is None or retry < 0 else retry self._wakeup = threading.Event() self._error = None @@ -122,18 +122,15 @@ class SendTask(Task): """Called immediately before the message is handed off to the i/o system. This implies that the sender link is up. """ - if not self.wait_for_ack: - # sender is not concerned with waiting for acknowledgment - # "best effort at-most-once delivery" - self._cleanup() - self._wakeup.set() + pass def _on_ack(self, state, info): - """Called by eventloop thread when the ack/nack is received from the - peer. + """If wait_for_ack is True, this is called by the eventloop thread when + the ack/nack is received from the peer. If wait_for_ack is False this + is called by the eventloop right after the message is written to the + link. In the last case state will always be set to ACCEPTED. """ if state != pyngus.SenderLink.ACCEPTED: - # TODO(kgiusti): could retry if deadline not hit msg = ("{name} message send to {target} failed: remote" " disposition: {disp}, info:" "{info}".format(name=self.name, @@ -179,15 +176,23 @@ class SendTask(Task): self.timer.cancel() self.timer = None + @property + def _can_retry(self): + # has the retry count expired? + if self._retry is not None: + self._retry -= 1 + if self._retry < 0: + return False + return True + class RPCCallTask(SendTask): """Performs an RPC Call. Sends the request and waits for a response from the destination. """ - - def __init__(self, target, message, deadline, retry, wait_for_ack): + def __init__(self, target, message, deadline, retry): super(RPCCallTask, self).__init__("RPC Call", message, target, - deadline, retry, wait_for_ack) + deadline, retry, wait_for_ack=True) self._reply_link = None self._reply_msg = None self._msg_id = None @@ -198,32 +203,30 @@ class RPCCallTask(SendTask): def _prepare(self, sender): # reserve a message id for mapping the received response + if self._msg_id: + # already set so this is a re-transmit. To be safe cancel the old + # msg_id and allocate a fresh one. + self._reply_link.cancel_response(self._msg_id) self._reply_link = sender._reply_link rl = self._reply_link self._msg_id = rl.prepare_for_response(self.message, self._on_reply) def _on_reply(self, message): # called if/when the reply message arrives - if self._wakeup.is_set(): - LOG.debug("RPC Reply received after call completed") - return self._reply_msg = message - self._reply_link = None + self._msg_id = None # to prevent _cleanup() from cancelling it self._cleanup() self._wakeup.set() def _on_ack(self, state, info): - if self._wakeup.is_set(): - LOG.debug("RPC ACKed after call completed: %s %s", state, info) - return if state != pyngus.SenderLink.ACCEPTED: super(RPCCallTask, self)._on_ack(state, info) # must wait for reply if ACCEPTED def _cleanup(self): - if self._reply_link and self._msg_id: + if self._msg_id: self._reply_link.cancel_response(self._msg_id) - self._msg_id = None + self._reply_link = None super(RPCCallTask, self)._cleanup() @@ -260,18 +263,23 @@ class Sender(pyngus.SenderEventHandler): self._address = None self._link = None self._scheduler = scheduler - self._delay = delay # for re-connecting + self._delay = delay # for re-connecting/re-transmitting # holds all pending SendTasks self._pending_sends = collections.deque() # holds all messages sent but not yet acked self._unacked = set() self._reply_link = None self._connection = None + self._resend_timer = None @property def pending_messages(self): return len(self._pending_sends) + @property + def unacked_messages(self): + return len(self._unacked) + def attach(self, connection, reply_link, addresser): """Open the link. Called by the Controller when the AMQP connection becomes active. @@ -290,6 +298,9 @@ class Sender(pyngus.SenderEventHandler): LOG.debug("Sender %s detached", self._address) self._connection = None self._reply_link = None + if self._resend_timer: + self._resend_timer.cancel() + self._resend_timer = None if self._link: self._link.close() @@ -376,11 +387,9 @@ class Sender(pyngus.SenderEventHandler): # sends that have exhausted their retry count: expired = set() for send_task in self._pending_sends: - if send_task.retry is not None: - send_task.retry -= 1 - if send_task.retry <= 0: - expired.add(send_task) - send_task._on_error("Message send failed: %s" % reason) + if not send_task._can_retry: + expired.add(send_task) + send_task._on_error("Message send failed: %s" % reason) while expired: self._pending_sends.remove(expired.pop()) @@ -401,26 +410,75 @@ class Sender(pyngus.SenderEventHandler): def _can_send(self): return self._link and self._link.active + # acknowledge status + _TIMED_OUT = pyngus.SenderLink.TIMED_OUT + _ACCEPTED = pyngus.SenderLink.ACCEPTED + _RELEASED = pyngus.SenderLink.RELEASED + _MODIFIED = pyngus.SenderLink.MODIFIED + def _send(self, send_task): send_task._prepare(self) send_task.message.address = self._address + if send_task.wait_for_ack: + self._unacked.add(send_task) + + def pyngus_callback(link, handle, state, info): + # invoked when the message bus (n)acks this message + if state == Sender._TIMED_OUT: + # ignore pyngus timeout - we maintain our own timer + # which will properly deal with this case + return + self._unacked.discard(send_task) + if state == Sender._ACCEPTED: + send_task._on_ack(Sender._ACCEPTED, info) + elif (state == Sender._RELEASED + or (state == Sender._MODIFIED and + # assuming delivery-failed means in-doubt: + not info.get("delivery-failed") and + not info.get("undeliverable-here"))): + # These states indicate that the message was never + # forwarded beyond the next hop so they can be + # re-transmitted without risk of duplication + self._resend(send_task) + else: + # some error - let task figure it out... + send_task._on_ack(state, info) + + self._link.send(send_task.message, + delivery_callback=pyngus_callback, + handle=self, + deadline=send_task.deadline) + else: # do not wait for ack + self._link.send(send_task.message, + delivery_callback=None, + handle=self, + deadline=send_task.deadline) + send_task._on_ack(pyngus.SenderLink.ACCEPTED, {}) + + def _resend(self, send_task): + # the message bus returned the message without forwarding it. Wait a + # bit for other outstanding sends to finish - most likely ending up + # here since they are all going to the same destination - then resend + # this message + if send_task._can_retry: + # note well: once there is something on the pending list no further + # messages will be sent (they will all queue up behind this one). + self._pending_sends.append(send_task) + if self._resend_timer is None: + sched = self._scheduler + # this will get the pending sends going again + self._resend_timer = sched.defer(self._resend_pending, + self._delay) + else: + send_task._on_error("Send retries exhausted") - def pyngus_callback(link, handle, state, info): - # invoked when the message bus (n)acks this message - if state == pyngus.SenderLink.TIMED_OUT: - # ignore pyngus timeout - we maintain our own timer - return - self._unacked.discard(send_task) - send_task._on_ack(state, info) - - self._unacked.add(send_task) - self._link.send(send_task.message, - delivery_callback=pyngus_callback, - handle=self, - deadline=send_task.deadline) + def _resend_pending(self): + # run from the _resend_timer, attempt to resend pending messages + self._resend_timer = None + self._send_pending() def _send_pending(self): - # send all pending messages + # flush all pending messages out if self._can_send: while self._pending_sends: self._send(self._pending_sends.popleft()) @@ -472,7 +530,7 @@ class Replies(pyngus.ReceiverEventHandler): self._receiver.close() def destroy(self): - self._correlation = None + self._correlation.clear() if self._receiver: self._receiver.destroy() self._receiver = None @@ -494,11 +552,10 @@ class Replies(pyngus.ReceiverEventHandler): """Abort waiting for the response message corresponding to msg_id. This can be used if the request fails and no reply is expected. """ - if self._correlation: - try: - del self._correlation[msg_id] - except KeyError: - pass + try: + del self._correlation[msg_id] + except KeyError: + pass @property def active(self): @@ -864,8 +921,6 @@ class Controller(pyngus.ConnectionEventHandler): if send_task.deadline and send_task.deadline <= now(): send_task._on_timeout() return - if send_task.retry is None or send_task.retry < 0: - send_task.retry = None key = keyify(send_task.target, send_task.service) sender = self._all_senders.get(key) if not sender: @@ -1142,7 +1197,7 @@ class Controller(pyngus.ConnectionEventHandler): self._active_senders.clear() unused = [] for key, sender in iteritems(self._all_senders): - # clean up any unused sender links + # clean up any sender links that no longer have messages to send if sender.pending_messages == 0: unused.append(key) else: @@ -1183,7 +1238,7 @@ class Controller(pyngus.ConnectionEventHandler): purge = set(self._all_senders.keys()) - self._active_senders for key in purge: sender = self._all_senders[key] - if sender.pending_messages == 0: + if not sender.pending_messages and not sender.unacked_messages: sender.detach() self._purged_senders.append(self._all_senders.pop(key)) self._active_senders.clear() diff --git a/oslo_messaging/_drivers/amqp1_driver/opts.py b/oslo_messaging/_drivers/amqp1_driver/opts.py index 9278ebd..127dd0d 100644 --- a/oslo_messaging/_drivers/amqp1_driver/opts.py +++ b/oslo_messaging/_drivers/amqp1_driver/opts.py @@ -109,11 +109,16 @@ amqp1_opts = [ help='Time to pause between re-connecting an AMQP 1.0 link that' ' failed due to a recoverable error.'), + cfg.IntOpt('default_reply_retry', + default=0, + min=-1, + help='The maximum number of attempts to re-send a reply message' + ' which failed due to a recoverable error.'), + cfg.IntOpt('default_reply_timeout', default=30, min=5, - help='The deadline for an rpc reply message delivery.' - ' Only used when caller does not provide a timeout expiry.'), + help='The deadline for an rpc reply message delivery.'), cfg.IntOpt('default_send_timeout', default=30, diff --git a/oslo_messaging/_drivers/impl_amqp1.py b/oslo_messaging/_drivers/impl_amqp1.py index e48c478..44b99ff 100644 --- a/oslo_messaging/_drivers/impl_amqp1.py +++ b/oslo_messaging/_drivers/impl_amqp1.py @@ -109,7 +109,7 @@ class ProtonIncomingMessage(base.RpcIncomingMessage): task = controller.SendTask("RPC Reply", response, self._reply_to, # analogous to kombu missing dest t/o: deadline, - retry=0, + retry=driver._default_reply_retry, wait_for_ack=ack) driver._ctrl.add_task(task) rc = task.wait() @@ -216,6 +216,7 @@ class ProtonDriver(base.BaseDriver): self._default_reply_timeout = opt_name.default_reply_timeout self._default_send_timeout = opt_name.default_send_timeout self._default_notify_timeout = opt_name.default_notify_timeout + self._default_reply_retry = opt_name.default_reply_retry # which message types should be sent pre-settled? ps = [s.lower() for s in opt_name.pre_settled] @@ -301,8 +302,7 @@ class ProtonDriver(base.BaseDriver): expire = compute_timeout(self._default_send_timeout) if wait_for_reply: ack = not self._pre_settle_call - task = controller.RPCCallTask(target, request, expire, retry, - wait_for_ack=ack) + task = controller.RPCCallTask(target, request, expire, retry) else: ack = not self._pre_settle_cast task = controller.SendTask("RPC Cast", request, target, expire, diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py index 12db37a..30b796b 100644 --- a/oslo_messaging/tests/drivers/test_amqp_driver.py +++ b/oslo_messaging/tests/drivers/test_amqp_driver.py @@ -288,7 +288,7 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto): driver.cleanup() def test_send_timeout(self): - """Verify send timeout.""" + """Verify send timeout - no reply sent.""" driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) target = oslo_messaging.Target(topic="test-topic") listener = _ListenerThread( @@ -310,17 +310,19 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto): driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) target = oslo_messaging.Target(topic="no listener") - # the broker will send a nack: + # the broker will send a nack (released) since there is no active + # listener for the target: self.assertRaises(oslo_messaging.MessageDeliveryFailure, driver.send, target, {"context": "whatever"}, {"method": "drop"}, wait_for_reply=True, + retry=0, timeout=1.0) driver.cleanup() def test_send_not_acked(self): - """Verify exception thrown if send Nacked.""" + """Verify exception thrown ack dropped.""" self.config(pre_settled=[], group="oslo_messaging_amqp") driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) @@ -333,7 +335,8 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto): driver.send, target, {"context": "whatever"}, {"method": "drop"}, - wait_for_reply=False) + retry=0, + wait_for_reply=True) driver.cleanup() def test_no_ack_cast(self): @@ -393,7 +396,7 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto): driver.cleanup() def test_call_failed_reply(self): - """Send back an exception""" + """Send back an exception generated at the listener""" class _FailedResponder(_ListenerThread): def __init__(self, listener): super(_FailedResponder, self).__init__(listener, 1) @@ -434,7 +437,7 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto): self.started.set() while not self._done: for in_msg in self.listener.poll(timeout=0.5): - # reply will never be acked: + # reply will never be acked (simulate drop): in_msg._reply_to = "!no-ack!" in_msg.reply(reply={'correlation-id': in_msg.message.get("id")}) @@ -458,6 +461,7 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto): def test_listener_requeue(self): "Emulate Server requeue on listener incoming messages" + self.config(pre_settled=[], group="oslo_messaging_amqp") driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) driver.require_features(requeue=True) target = oslo_messaging.Target(topic="test-topic") @@ -472,10 +476,6 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto): listener.join(timeout=30) self.assertFalse(listener.isAlive()) - for x in listener.get_messages(): - x.requeue() - self.assertEqual(x.message, {"msg": "value"}) - predicate = lambda: (self._broker.sender_link_requeue_count == 1) _wait_until(predicate, 30) self.assertTrue(predicate()) @@ -575,7 +575,7 @@ class TestAmqpNotification(_AmqpBrokerTestCaseAuto): try: driver.send_notification(oslo_messaging.Target(topic=t), "context", {'target': t}, - version) + version, retry=0) except oslo_messaging.MessageDeliveryFailure: excepted_targets.append(t) @@ -592,15 +592,18 @@ class TestAmqpNotification(_AmqpBrokerTestCaseAuto): driver.cleanup() def test_released_notification(self): + """Broker sends a Nack (released)""" driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) self.assertRaises(oslo_messaging.MessageDeliveryFailure, driver.send_notification, oslo_messaging.Target(topic="bad address"), "context", {'target': "bad address"}, - 2.0) + 2.0, + retry=0) driver.cleanup() def test_notification_not_acked(self): + """Simulate drop of ack from broker""" driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) # set this directly so we can use a value < minimum allowed driver._default_notify_timeout = 2 @@ -608,7 +611,7 @@ class TestAmqpNotification(_AmqpBrokerTestCaseAuto): driver.send_notification, oslo_messaging.Target(topic="!no-ack!"), "context", {'target': "!no-ack!"}, - 2.0) + 2.0, retry=0) driver.cleanup() def test_no_ack_notification(self): @@ -1388,6 +1391,64 @@ class TestAddressing(test_utils.BaseTestCase): LegacyAddresser) +@testtools.skipUnless(pyngus, "proton modules not present") +class TestMessageRetransmit(_AmqpBrokerTestCase): + # test message is retransmitted if safe to do so + def _test_retransmit(self, nack_method): + self._nack_count = 2 + + def _on_message(message, handle, link): + if self._nack_count: + self._nack_count -= 1 + nack_method(link, handle) + else: + self._broker.forward_message(message, handle, link) + + self._broker.on_message = _on_message + self._broker.start() + self.config(link_retry_delay=1, pre_settled=[], + group="oslo_messaging_amqp") + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + target = oslo_messaging.Target(topic="test-topic") + listener = _ListenerThread(driver.listen(target, + None, + None)._poll_style_listener, + 1) + rc = driver.send(target, {"context": "whatever"}, + {"method": "echo", "id": "blah"}, + wait_for_reply=True, + retry=2) # initial send + up to 2 resends + self.assertIsNotNone(rc) + self.assertEqual(0, self._nack_count) + self.assertEqual(rc.get('correlation-id'), 'blah') + listener.join(timeout=30) + self.assertFalse(listener.isAlive()) + driver.cleanup() + + def test_released(self): + # should retry and succeed + self._test_retransmit(lambda l, h: l.message_released(h)) + + def test_modified(self): + # should retry and succeed + self._test_retransmit(lambda l, h: l.message_modified(h, + False, + False, + {})) + + def test_modified_failed(self): + # since delivery_failed is set to True, should fail + self.assertRaises(oslo_messaging.MessageDeliveryFailure, + self._test_retransmit, + lambda l, h: l.message_modified(h, True, False, {})) + + def test_rejected(self): + # rejected - should fail + self.assertRaises(oslo_messaging.MessageDeliveryFailure, + self._test_retransmit, + lambda l, h: l.message_rejected(h, {})) + + class FakeBroker(threading.Thread): """A test AMQP message 'broker'.""" @@ -1609,7 +1670,7 @@ class FakeBroker(threading.Thread): def message_received(self, receiver_link, message, handle): """Forward this message out the proper sending link.""" - self.server.forward_message(message, handle, receiver_link) + self.server.on_message(message, handle, receiver_link) if self.link.capacity < 1: self.server.on_credit_exhausted(self.link) @@ -1674,6 +1735,7 @@ class FakeBroker(threading.Thread): self.on_sender_active = lambda link: None self.on_receiver_active = lambda link: link.add_capacity(10) self.on_credit_exhausted = lambda link: link.add_capacity(10) + self.on_message = lambda m, h, l: self.forward_message(m, h, l) def start(self): """Start the server.""" |