diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java | 82 |
1 files changed, 42 insertions, 40 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index b40e5a28c2..c82d1b984a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -20,9 +20,11 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.message.ServerMessage; + import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import org.apache.qpid.server.message.ServerMessage; public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl> { @@ -40,12 +42,11 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl private final AMQQueue _queue; static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl> - _nextUpdater = - AtomicReferenceFieldUpdater.newUpdater - (SimpleQueueEntryImpl.class, SimpleQueueEntryImpl.class, "_next"); + _nextUpdater = SimpleQueueEntryImpl._nextUpdater; private AtomicLong _scavenges = new AtomicLong(0L); private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50); + private final AtomicReference<SimpleQueueEntryImpl> _unscavengedHWM = new AtomicReference<SimpleQueueEntryImpl>(); public SimpleQueueEntryList(AMQQueue queue) @@ -55,28 +56,17 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl _tail = _head; } - void advanceHead() - { - SimpleQueueEntryImpl next = _head.getNextNode(); - SimpleQueueEntryImpl newNext = _head.getNextValidEntry(); - - if (next == newNext) - { - if (_scavenges.incrementAndGet() > _scavengeCount) - { - _scavenges.set(0L); - scavenge(); - } - } - } - void scavenge() { + SimpleQueueEntryImpl hwm = _unscavengedHWM.getAndSet(null); SimpleQueueEntryImpl next = _head.getNextValidEntry(); - while (next != null) + if(hwm != null) { - next = next.getNextValidEntry(); + while (next != null && hwm.compareTo(next)>0) + { + next = next.getNextValidEntry(); + } } } @@ -126,7 +116,6 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl public static class QueueEntryIteratorImpl implements QueueEntryIterator<SimpleQueueEntryImpl> { - private SimpleQueueEntryImpl _lastNode; QueueEntryIteratorImpl(SimpleQueueEntryImpl startNode) @@ -134,10 +123,9 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl _lastNode = startNode; } - public boolean atTail() { - return _lastNode.getNextNode() == null; + return _lastNode.getNextValidEntry() == null; } public SimpleQueueEntryImpl getNode() @@ -147,28 +135,17 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl public boolean advance() { + SimpleQueueEntryImpl nextValidNode = _lastNode.getNextValidEntry(); - if(!atTail()) + if(nextValidNode != null) { - SimpleQueueEntryImpl nextNode = _lastNode.getNextNode(); - while(nextNode.isDispensed() && nextNode.getNextNode() != null) - { - nextNode = nextNode.getNextNode(); - } - _lastNode = nextNode; - return true; - - } - else - { - return false; + _lastNode = nextValidNode; } + return nextValidNode != null; } - } - public QueueEntryIteratorImpl iterator() { return new QueueEntryIteratorImpl(_head); @@ -182,7 +159,32 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl public void entryDeleted(SimpleQueueEntryImpl queueEntry) { - advanceHead(); + SimpleQueueEntryImpl next = _head.getNextNode(); + SimpleQueueEntryImpl newNext = _head.getNextValidEntry(); + + // the head of the queue has not been deleted, hence the deletion must have been mid queue. + if (next == newNext) + { + SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get(); + while(unscavengedHWM == null || unscavengedHWM.compareTo(queueEntry)<0) + { + _unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry); + unscavengedHWM = _unscavengedHWM.get(); + } + if (_scavenges.incrementAndGet() > _scavengeCount) + { + _scavenges.set(0L); + scavenge(); + } + } + else + { + SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get(); + if(unscavengedHWM != null && (next == null || unscavengedHWM.compareTo(next) < 0)) + { + _unscavengedHWM.compareAndSet(unscavengedHWM, null); + } + } } public int getPriorities() |