summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java7
1 files changed, 5 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 5204f13e81..0f3c690cc6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.ack;
+import org.apache.qpid.server.store.StoreContext;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -160,7 +161,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException
{
synchronized (_lock)
{
@@ -176,10 +177,12 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
" When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
}
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ unacked.getValue().discard(storeContext);
+
it.remove();
_unackedSize -= unacked.getValue().getMessage().getSize();
- destination.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
{
break;