diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java | 47 |
1 files changed, 30 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index 62dd76f832..0ea88e4ab6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -59,7 +59,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { _logger.debug("Rejecting:" + body.getDeliveryTag() + ": Requeue:" + body.getRequeue() + - //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); } @@ -70,26 +69,23 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR if (message == null) { _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); -// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known"); } else { if (message.isQueueDeleted()) { - _logger.warn("Message's Queue as already been purged, unable to Reject. " + - "Dropping message should use Dead Letter Queue"); + _logger.warn("Message's Queue has already been purged, dropping message"); message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); if(message != null) { message.discard(); } - //sendtoDeadLetterQueue(msg) return; } if (message.getMessage() == null) { - _logger.warn("Message as already been purged, unable to Reject."); + _logger.warn("Message has already been purged, unable to Reject."); return; } @@ -98,27 +94,44 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + ": Requeue:" + body.getRequeue() + - //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); } - // If we haven't requested message to be resent to this consumer then reject it from ever getting it. - //if (!evt.getMethod().resend) - { - message.reject(); - } + message.reject(); if (body.getRequeue()) { channel.requeue(deliveryTag); + + //this requeue represents a message rejected from the pre-dispatch queue + //therefore we need to amend the delivery counter. + message.decrementDeliveryCount(); } else { - _logger.warn("Dropping message as requeue not required and there is no dead letter queue"); - message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); - //sendtoDeadLetterQueue(AMQMessage message) -// message.queue = channel.getDefaultDeadLetterQueue(); -// channel.requeue(deliveryTag); + final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag); + _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag); + if (maxDeliveryCountEnabled) + { + final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag); + _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag); + if (deliveredTooManyTimes) + { + channel.deadLetter(body.getDeliveryTag()); + } + else + { + //this requeue represents a message rejected because of a recover/rollback that we + //are not ready to DLQ. We rely on the reject command to resend from the unacked map + //and therefore need to increment the delivery counter so we cancel out the effect + //of the AMQChannel#resend() decrement. + message.incrementDeliveryCount(); + } + } + else + { + channel.deadLetter(body.getDeliveryTag()); + } } } } |