diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java | 59 |
1 files changed, 29 insertions, 30 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 20ee646a40..fcce6ff44e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -31,6 +31,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.txn.TransactionalContext; public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap @@ -39,7 +40,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap private long _unackedSize; - private Map<Long, UnacknowledgedMessage> _map; + private Map<Long, QueueEntry> _map; private long _lastDeliveryTag; @@ -48,16 +49,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap public UnacknowledgedMessageMapImpl(int prefetchLimit) { _prefetchLimit = prefetchLimit; - _map = new LinkedHashMap<Long, UnacknowledgedMessage>(prefetchLimit); + _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit); } - /*public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map) - { - _lock = lock; - _map = map; - } */ - - public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs) + public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs) { if (multiple) { @@ -65,7 +60,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } else { - msgs.add(get(deliveryTag)); + msgs.put(deliveryTag, get(deliveryTag)); } } @@ -78,26 +73,27 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public void remove(List<UnacknowledgedMessage> msgs) + public void remove(Map<Long,QueueEntry> msgs) { synchronized (_lock) { - for (UnacknowledgedMessage msg : msgs) + for (Long deliveryTag : msgs.keySet()) { - remove(msg.deliveryTag); + remove(deliveryTag); } } } - public UnacknowledgedMessage remove(long deliveryTag) + public QueueEntry remove(long deliveryTag) { synchronized (_lock) { - UnacknowledgedMessage message = _map.remove(deliveryTag); + QueueEntry message = _map.remove(deliveryTag); if(message != null) { _unackedSize -= message.getMessage().getSize(); + message.restoreCredit(); } return message; @@ -108,10 +104,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { synchronized (_lock) { - Collection<UnacknowledgedMessage> currentEntries = _map.values(); - for (UnacknowledgedMessage msg : currentEntries) + Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet(); + for (Map.Entry<Long, QueueEntry> entry : currentEntries) { - visitor.callback(msg); + visitor.callback(entry.getKey().longValue(), entry.getValue()); } visitor.visitComplete(); } @@ -122,7 +118,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap return _lock; } - public void add(long deliveryTag, UnacknowledgedMessage message) + public void add(long deliveryTag, QueueEntry message) { synchronized (_lock) { @@ -132,12 +128,12 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public Collection<UnacknowledgedMessage> cancelAllMessages() + public Collection<QueueEntry> cancelAllMessages() { synchronized (_lock) { - Collection<UnacknowledgedMessage> currentEntries = _map.values(); - _map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit); + Collection<QueueEntry> currentEntries = _map.values(); + _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit); _unackedSize = 0l; return currentEntries; } @@ -169,14 +165,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException + public void drainTo(Collection<QueueEntry> destination, long deliveryTag) throws AMQException { synchronized (_lock) { - Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = _map.entrySet().iterator(); + Iterator<Map.Entry<Long, QueueEntry>> it = _map.entrySet().iterator(); while (it.hasNext()) { - Map.Entry<Long, UnacknowledgedMessage> unacked = it.next(); + Map.Entry<Long, QueueEntry> unacked = it.next(); if (unacked.getKey() > deliveryTag) { @@ -184,10 +180,13 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString()); } - it.remove(); + _unackedSize -= unacked.getValue().getMessage().getSize(); + unacked.getValue().restoreCredit(); + + destination.add(unacked.getValue()); if (unacked.getKey() == deliveryTag) { @@ -197,7 +196,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public UnacknowledgedMessage get(long key) + public QueueEntry get(long key) { synchronized (_lock) { @@ -213,13 +212,13 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - private void collect(long key, List<UnacknowledgedMessage> msgs) + private void collect(long key, Map<Long, QueueEntry> msgs) { synchronized (_lock) { - for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) + for (Map.Entry<Long, QueueEntry> entry : _map.entrySet()) { - msgs.add(entry.getValue()); + msgs.put(entry.getKey(),entry.getValue()); if (entry.getKey() == key) { break; |