summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java69
1 files changed, 64 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index a46c5ae2e8..d27a5ed234 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -1,6 +1,10 @@
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
/*
*
@@ -24,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
*/
public class SimpleQueueEntryList implements QueueEntryList
{
+
private final QueueEntryImpl _head;
private volatile QueueEntryImpl _tail;
@@ -40,9 +45,7 @@ public class SimpleQueueEntryList implements QueueEntryList
_nextUpdater =
AtomicReferenceFieldUpdater.newUpdater
(QueueEntryImpl.class, QueueEntryImpl.class, "_next");
-
-
-
+ private AtomicLong _deletes = new AtomicLong(0L);
public SimpleQueueEntryList(AMQQueue queue)
@@ -52,21 +55,77 @@ public class SimpleQueueEntryList implements QueueEntryList
_tail = _head;
}
+
+
void advanceHead()
{
+ _deletes.incrementAndGet();
QueueEntryImpl head = _head.nextNode();
+ boolean deleted = head.isDeleted();
while(head._next != null && head.isDeleted())
{
+ deleted = true;
final QueueEntryImpl newhead = head.nextNode();
if(newhead != null)
{
- _nextUpdater.compareAndSet(_head,head, newhead);
+ if(_nextUpdater.compareAndSet(_head,head, newhead))
+ {
+ _deletes.decrementAndGet();
+ }
}
head = _head.nextNode();
}
+
+ if(!deleted)
+ {
+ deleted = true;
+ }
+
+ if(_deletes.get() > 1000L)
+ {
+ _deletes.set(0L);
+ scavenge();
+ }
}
+ void scavenge()
+ {
+ QueueEntryImpl root = _head;
+ QueueEntryImpl next = root.nextNode();
+
+ do
+ {
+
+
+ while(next._next != null && next.isDeleted())
+ {
+
+ final QueueEntryImpl newhead = next.nextNode();
+ if(newhead != null)
+ {
+ _nextUpdater.compareAndSet(root,next, newhead);
+ }
+ next = root.nextNode();
+ }
+ if(next._next != null)
+ {
+ if(!next.isDeleted())
+ {
+ root = next;
+ next = root.nextNode();
+ }
+ }
+ else
+ {
+ break;
+ }
+
+ } while (next != null && next._next != null);
+
+ }
+
+
public AMQQueue getQueue()
{
@@ -74,7 +133,7 @@ public class SimpleQueueEntryList implements QueueEntryList
}
- public QueueEntry add(AMQMessage message)
+ public QueueEntry add(ServerMessage message)
{
QueueEntryImpl node = new QueueEntryImpl(this, message);
for (;;)