summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java59
1 files changed, 29 insertions, 30 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 20ee646a40..fcce6ff44e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -31,6 +31,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.txn.TransactionalContext;
public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
@@ -39,7 +40,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
private long _unackedSize;
- private Map<Long, UnacknowledgedMessage> _map;
+ private Map<Long, QueueEntry> _map;
private long _lastDeliveryTag;
@@ -48,16 +49,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
public UnacknowledgedMessageMapImpl(int prefetchLimit)
{
_prefetchLimit = prefetchLimit;
- _map = new LinkedHashMap<Long, UnacknowledgedMessage>(prefetchLimit);
+ _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
}
- /*public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map)
- {
- _lock = lock;
- _map = map;
- } */
-
- public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
+ public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
{
if (multiple)
{
@@ -65,7 +60,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
else
{
- msgs.add(get(deliveryTag));
+ msgs.put(deliveryTag, get(deliveryTag));
}
}
@@ -78,26 +73,27 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public void remove(List<UnacknowledgedMessage> msgs)
+ public void remove(Map<Long,QueueEntry> msgs)
{
synchronized (_lock)
{
- for (UnacknowledgedMessage msg : msgs)
+ for (Long deliveryTag : msgs.keySet())
{
- remove(msg.deliveryTag);
+ remove(deliveryTag);
}
}
}
- public UnacknowledgedMessage remove(long deliveryTag)
+ public QueueEntry remove(long deliveryTag)
{
synchronized (_lock)
{
- UnacknowledgedMessage message = _map.remove(deliveryTag);
+ QueueEntry message = _map.remove(deliveryTag);
if(message != null)
{
_unackedSize -= message.getMessage().getSize();
+ message.restoreCredit();
}
return message;
@@ -108,10 +104,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
synchronized (_lock)
{
- Collection<UnacknowledgedMessage> currentEntries = _map.values();
- for (UnacknowledgedMessage msg : currentEntries)
+ Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
+ for (Map.Entry<Long, QueueEntry> entry : currentEntries)
{
- visitor.callback(msg);
+ visitor.callback(entry.getKey().longValue(), entry.getValue());
}
visitor.visitComplete();
}
@@ -122,7 +118,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
return _lock;
}
- public void add(long deliveryTag, UnacknowledgedMessage message)
+ public void add(long deliveryTag, QueueEntry message)
{
synchronized (_lock)
{
@@ -132,12 +128,12 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public Collection<UnacknowledgedMessage> cancelAllMessages()
+ public Collection<QueueEntry> cancelAllMessages()
{
synchronized (_lock)
{
- Collection<UnacknowledgedMessage> currentEntries = _map.values();
- _map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit);
+ Collection<QueueEntry> currentEntries = _map.values();
+ _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
_unackedSize = 0l;
return currentEntries;
}
@@ -169,14 +165,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException
+ public void drainTo(Collection<QueueEntry> destination, long deliveryTag) throws AMQException
{
synchronized (_lock)
{
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = _map.entrySet().iterator();
+ Iterator<Map.Entry<Long, QueueEntry>> it = _map.entrySet().iterator();
while (it.hasNext())
{
- Map.Entry<Long, UnacknowledgedMessage> unacked = it.next();
+ Map.Entry<Long, QueueEntry> unacked = it.next();
if (unacked.getKey() > deliveryTag)
{
@@ -184,10 +180,13 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
" When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
}
-
it.remove();
+
_unackedSize -= unacked.getValue().getMessage().getSize();
+ unacked.getValue().restoreCredit();
+
+
destination.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
{
@@ -197,7 +196,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public UnacknowledgedMessage get(long key)
+ public QueueEntry get(long key)
{
synchronized (_lock)
{
@@ -213,13 +212,13 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- private void collect(long key, List<UnacknowledgedMessage> msgs)
+ private void collect(long key, Map<Long, QueueEntry> msgs)
{
synchronized (_lock)
{
- for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+ for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
{
- msgs.add(entry.getValue());
+ msgs.put(entry.getKey(),entry.getValue());
if (entry.getKey() == key)
{
break;