diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java | 198 |
1 files changed, 198 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java new file mode 100644 index 0000000000..b97c2c55c5 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -0,0 +1,198 @@ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public class SimpleQueueEntryList implements QueueEntryList +{ + + private final QueueEntryImpl _head; + + private volatile QueueEntryImpl _tail; + + static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, QueueEntryImpl> + _tailUpdater = + AtomicReferenceFieldUpdater.newUpdater + (SimpleQueueEntryList.class, QueueEntryImpl.class, "_tail"); + + + private final AMQQueue _queue; + + static final AtomicReferenceFieldUpdater<QueueEntryImpl, QueueEntryImpl> + _nextUpdater = + AtomicReferenceFieldUpdater.newUpdater + (QueueEntryImpl.class, QueueEntryImpl.class, "_next"); + + private AtomicLong _scavenges = new AtomicLong(0L); + private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50); + + + public SimpleQueueEntryList(AMQQueue queue) + { + _queue = queue; + _head = new QueueEntryImpl(this); + _tail = _head; + } + + void advanceHead() + { + QueueEntryImpl next = _head.nextNode(); + QueueEntryImpl newNext = _head.getNext(); + + if (next == newNext) + { + if (_scavenges.incrementAndGet() > _scavengeCount) + { + _scavenges.set(0L); + scavenge(); + } + } + } + + void scavenge() + { + QueueEntryImpl next = _head.getNext(); + + while (next != null) + { + next = next.getNext(); + } + } + + + public AMQQueue getQueue() + { + return _queue; + } + + + public QueueEntry add(ServerMessage message) + { + QueueEntryImpl node = createQueueEntry(message); + for (;;) + { + QueueEntryImpl tail = _tail; + QueueEntryImpl next = tail.nextNode(); + if (tail == _tail) + { + if (next == null) + { + node.setEntryId(tail.getEntryId()+1); + if (_nextUpdater.compareAndSet(tail, null, node)) + { + _tailUpdater.compareAndSet(this, tail, node); + + return node; + } + } + else + { + _tailUpdater.compareAndSet(this,tail, next); + } + } + } + } + + protected QueueEntryImpl createQueueEntry(ServerMessage message) + { + return new QueueEntryImpl(this, message); + } + + public QueueEntry next(QueueEntry node) + { + return ((QueueEntryImpl)node).getNext(); + } + + + public static class QueueEntryIteratorImpl implements QueueEntryIterator + { + + private QueueEntryImpl _lastNode; + + QueueEntryIteratorImpl(QueueEntryImpl startNode) + { + _lastNode = startNode; + } + + + public boolean atTail() + { + return _lastNode.nextNode() == null; + } + + public QueueEntry getNode() + { + + return _lastNode; + + } + + public boolean advance() + { + + if(!atTail()) + { + QueueEntryImpl nextNode = _lastNode.nextNode(); + while(nextNode.isDeleted() && nextNode.nextNode() != null) + { + nextNode = nextNode.nextNode(); + } + _lastNode = nextNode; + return true; + + } + else + { + return false; + } + + } + + } + + + public QueueEntryIterator iterator() + { + return new QueueEntryIteratorImpl(_head); + } + + + public QueueEntry getHead() + { + return _head; + } + + static class Factory implements QueueEntryListFactory + { + + public QueueEntryList createQueueEntryList(AMQQueue queue) + { + return new SimpleQueueEntryList(queue); + } + } + + +} |