diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java | 69 |
1 files changed, 64 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index a46c5ae2e8..d27a5ed234 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -1,6 +1,10 @@ package org.apache.qpid.server.queue; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; + import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; /* * @@ -24,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; */ public class SimpleQueueEntryList implements QueueEntryList { + private final QueueEntryImpl _head; private volatile QueueEntryImpl _tail; @@ -40,9 +45,7 @@ public class SimpleQueueEntryList implements QueueEntryList _nextUpdater = AtomicReferenceFieldUpdater.newUpdater (QueueEntryImpl.class, QueueEntryImpl.class, "_next"); - - - + private AtomicLong _deletes = new AtomicLong(0L); public SimpleQueueEntryList(AMQQueue queue) @@ -52,21 +55,77 @@ public class SimpleQueueEntryList implements QueueEntryList _tail = _head; } + + void advanceHead() { + _deletes.incrementAndGet(); QueueEntryImpl head = _head.nextNode(); + boolean deleted = head.isDeleted(); while(head._next != null && head.isDeleted()) { + deleted = true; final QueueEntryImpl newhead = head.nextNode(); if(newhead != null) { - _nextUpdater.compareAndSet(_head,head, newhead); + if(_nextUpdater.compareAndSet(_head,head, newhead)) + { + _deletes.decrementAndGet(); + } } head = _head.nextNode(); } + + if(!deleted) + { + deleted = true; + } + + if(_deletes.get() > 1000L) + { + _deletes.set(0L); + scavenge(); + } } + void scavenge() + { + QueueEntryImpl root = _head; + QueueEntryImpl next = root.nextNode(); + + do + { + + + while(next._next != null && next.isDeleted()) + { + + final QueueEntryImpl newhead = next.nextNode(); + if(newhead != null) + { + _nextUpdater.compareAndSet(root,next, newhead); + } + next = root.nextNode(); + } + if(next._next != null) + { + if(!next.isDeleted()) + { + root = next; + next = root.nextNode(); + } + } + else + { + break; + } + + } while (next != null && next._next != null); + + } + + public AMQQueue getQueue() { @@ -74,7 +133,7 @@ public class SimpleQueueEntryList implements QueueEntryList } - public QueueEntry add(AMQMessage message) + public QueueEntry add(ServerMessage message) { QueueEntryImpl node = new QueueEntryImpl(this, message); for (;;) |