diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 25 |
1 files changed, 11 insertions, 14 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 26ac562fb2..5fde08cbdd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -491,7 +491,7 @@ public class AMQChannel if (!unacked.isQueueDeleted()) { // Mark message redelivered - unacked.getMessage().setRedelivered(true); + unacked.setRedelivered(true); // Ensure message is released for redelivery unacked.release(); @@ -522,7 +522,7 @@ public class AMQChannel if (unacked != null) { // Mark message redelivered - unacked.getMessage().setRedelivered(true); + unacked.setRedelivered(true); // Ensure message is released for redelivery if (!unacked.isQueueDeleted()) @@ -611,13 +611,10 @@ public class AMQChannel for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet()) { - QueueEntry message = entry.getValue(); + QueueEntry queueEntry = entry.getValue(); long deliveryTag = entry.getKey(); - - - AMQMessage msg = message.getMessage(); - AMQQueue queue = message.getQueue(); + AMQQueue queue = queueEntry.getQueue(); // Our Java Client will always suspend the channel when resending! // If the client has requested the messages be resent then it is @@ -635,16 +632,16 @@ public class AMQChannel // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. - msg.setRedelivered(true); + queueEntry.setRedelivered(true); - Subscription sub = message.getDeliveredSubscription(); + Subscription sub = queueEntry.getDeliveredSubscription(); if (sub != null) { - if(!queue.resend(message, sub)) + if(!queue.resend(queueEntry, sub)) { - msgToRequeue.put(deliveryTag, message); + msgToRequeue.put(deliveryTag, queueEntry); } } else @@ -652,11 +649,11 @@ public class AMQChannel if (_log.isInfoEnabled()) { - _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + _log.info("DeliveredSubscription not recorded so just requeueing(" + queueEntry.toString() + ")to prevent loss"); } // move this message to requeue - msgToRequeue.put(deliveryTag, message); + msgToRequeue.put(deliveryTag, queueEntry); } } // for all messages // } else !isSuspend @@ -888,7 +885,7 @@ public class AMQChannel public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException { - getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag()); + getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag()); } }; |