summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java25
1 files changed, 11 insertions, 14 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 26ac562fb2..5fde08cbdd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -491,7 +491,7 @@ public class AMQChannel
if (!unacked.isQueueDeleted())
{
// Mark message redelivered
- unacked.getMessage().setRedelivered(true);
+ unacked.setRedelivered(true);
// Ensure message is released for redelivery
unacked.release();
@@ -522,7 +522,7 @@ public class AMQChannel
if (unacked != null)
{
// Mark message redelivered
- unacked.getMessage().setRedelivered(true);
+ unacked.setRedelivered(true);
// Ensure message is released for redelivery
if (!unacked.isQueueDeleted())
@@ -611,13 +611,10 @@ public class AMQChannel
for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
{
- QueueEntry message = entry.getValue();
+ QueueEntry queueEntry = entry.getValue();
long deliveryTag = entry.getKey();
-
-
- AMQMessage msg = message.getMessage();
- AMQQueue queue = message.getQueue();
+ AMQQueue queue = queueEntry.getQueue();
// Our Java Client will always suspend the channel when resending!
// If the client has requested the messages be resent then it is
@@ -635,16 +632,16 @@ public class AMQChannel
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
- msg.setRedelivered(true);
+ queueEntry.setRedelivered(true);
- Subscription sub = message.getDeliveredSubscription();
+ Subscription sub = queueEntry.getDeliveredSubscription();
if (sub != null)
{
- if(!queue.resend(message, sub))
+ if(!queue.resend(queueEntry, sub))
{
- msgToRequeue.put(deliveryTag, message);
+ msgToRequeue.put(deliveryTag, queueEntry);
}
}
else
@@ -652,11 +649,11 @@ public class AMQChannel
if (_log.isInfoEnabled())
{
- _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
+ _log.info("DeliveredSubscription not recorded so just requeueing(" + queueEntry.toString()
+ ")to prevent loss");
}
// move this message to requeue
- msgToRequeue.put(deliveryTag, message);
+ msgToRequeue.put(deliveryTag, queueEntry);
}
} // for all messages
// } else !isSuspend
@@ -888,7 +885,7 @@ public class AMQChannel
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
- getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag());
+ getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag());
}
};