summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java82
1 files changed, 42 insertions, 40 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index b40e5a28c2..c82d1b984a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -20,9 +20,11 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.ServerMessage;
+
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.apache.qpid.server.message.ServerMessage;
public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl>
{
@@ -40,12 +42,11 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
private final AMQQueue _queue;
static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl>
- _nextUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (SimpleQueueEntryImpl.class, SimpleQueueEntryImpl.class, "_next");
+ _nextUpdater = SimpleQueueEntryImpl._nextUpdater;
private AtomicLong _scavenges = new AtomicLong(0L);
private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
+ private final AtomicReference<SimpleQueueEntryImpl> _unscavengedHWM = new AtomicReference<SimpleQueueEntryImpl>();
public SimpleQueueEntryList(AMQQueue queue)
@@ -55,28 +56,17 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
_tail = _head;
}
- void advanceHead()
- {
- SimpleQueueEntryImpl next = _head.getNextNode();
- SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
-
- if (next == newNext)
- {
- if (_scavenges.incrementAndGet() > _scavengeCount)
- {
- _scavenges.set(0L);
- scavenge();
- }
- }
- }
-
void scavenge()
{
+ SimpleQueueEntryImpl hwm = _unscavengedHWM.getAndSet(null);
SimpleQueueEntryImpl next = _head.getNextValidEntry();
- while (next != null)
+ if(hwm != null)
{
- next = next.getNextValidEntry();
+ while (next != null && hwm.compareTo(next)>0)
+ {
+ next = next.getNextValidEntry();
+ }
}
}
@@ -126,7 +116,6 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
public static class QueueEntryIteratorImpl implements QueueEntryIterator<SimpleQueueEntryImpl>
{
-
private SimpleQueueEntryImpl _lastNode;
QueueEntryIteratorImpl(SimpleQueueEntryImpl startNode)
@@ -134,10 +123,9 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
_lastNode = startNode;
}
-
public boolean atTail()
{
- return _lastNode.getNextNode() == null;
+ return _lastNode.getNextValidEntry() == null;
}
public SimpleQueueEntryImpl getNode()
@@ -147,28 +135,17 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
public boolean advance()
{
+ SimpleQueueEntryImpl nextValidNode = _lastNode.getNextValidEntry();
- if(!atTail())
+ if(nextValidNode != null)
{
- SimpleQueueEntryImpl nextNode = _lastNode.getNextNode();
- while(nextNode.isDispensed() && nextNode.getNextNode() != null)
- {
- nextNode = nextNode.getNextNode();
- }
- _lastNode = nextNode;
- return true;
-
- }
- else
- {
- return false;
+ _lastNode = nextValidNode;
}
+ return nextValidNode != null;
}
-
}
-
public QueueEntryIteratorImpl iterator()
{
return new QueueEntryIteratorImpl(_head);
@@ -182,7 +159,32 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
public void entryDeleted(SimpleQueueEntryImpl queueEntry)
{
- advanceHead();
+ SimpleQueueEntryImpl next = _head.getNextNode();
+ SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
+
+ // the head of the queue has not been deleted, hence the deletion must have been mid queue.
+ if (next == newNext)
+ {
+ SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get();
+ while(unscavengedHWM == null || unscavengedHWM.compareTo(queueEntry)<0)
+ {
+ _unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry);
+ unscavengedHWM = _unscavengedHWM.get();
+ }
+ if (_scavenges.incrementAndGet() > _scavengeCount)
+ {
+ _scavenges.set(0L);
+ scavenge();
+ }
+ }
+ else
+ {
+ SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get();
+ if(unscavengedHWM != null && (next == null || unscavengedHWM.compareTo(next) < 0))
+ {
+ _unscavengedHWM.compareAndSet(unscavengedHWM, null);
+ }
+ }
}
public int getPriorities()