diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java | 213 |
1 files changed, 88 insertions, 125 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java index 3e6299cb8a..9ea81660c6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java @@ -20,108 +20,121 @@ */ package org.apache.qpid.server.subscription; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.subscription.Subscription; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.nio.ByteBuffer; public class SubscriptionList { + private final SubscriptionNode _head = new SubscriptionNode(); - private final AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head); - private final AtomicReference<SubscriptionNode> _subNodeMarker = new AtomicReference<SubscriptionNode>(_head); - private final AtomicInteger _size = new AtomicInteger(); + private AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head); + private AtomicInteger _size = new AtomicInteger(); + - public static final class SubscriptionNode + public final class SubscriptionNode { private final AtomicBoolean _deleted = new AtomicBoolean(); private final AtomicReference<SubscriptionNode> _next = new AtomicReference<SubscriptionNode>(); private final Subscription _sub; + public SubscriptionNode() { - //used for sentinel head and dummy node construction + _sub = null; _deleted.set(true); } public SubscriptionNode(final Subscription sub) { - //used for regular node construction _sub = sub; } - /** - * Retrieves the first non-deleted node following the current node. - * Any deleted non-tail nodes encountered during the search are unlinked. - * - * @return the next non-deleted node, or null if none was found. - */ - public SubscriptionNode findNext() + + public SubscriptionNode getNext() { + SubscriptionNode next = nextNode(); while(next != null && next.isDeleted()) { + final SubscriptionNode newNext = next.nextNode(); if(newNext != null) { - //try to move our _next reference forward to the 'newNext' - //node to unlink the deleted node _next.compareAndSet(next, newNext); next = nextNode(); } else { - //'newNext' is null, meaning 'next' is the current tail. Can't unlink - //the tail node for thread safety reasons, just use the null. next = null; } - } + } return next; } - /** - * Gets the immediately next referenced node in the structure. - * - * @return the immediately next node in the structure, or null if at the tail. - */ - protected SubscriptionNode nextNode() + private SubscriptionNode nextNode() { return _next.get(); } - /** - * Used to initialise the 'next' reference. Will only succeed if the reference was not previously set. - * - * @param node the SubscriptionNode to set as 'next' - * @return whether the operation succeeded - */ - private boolean setNext(final SubscriptionNode node) - { - return _next.compareAndSet(null, node); - } - public boolean isDeleted() { return _deleted.get(); } + public boolean delete() { - return _deleted.compareAndSet(false,true); + if(_deleted.compareAndSet(false,true)) + { + _size.decrementAndGet(); + advanceHead(); + return true; + } + else + { + return false; + } } + public Subscription getSubscription() { return _sub; } } - private void insert(final SubscriptionNode node, final boolean count) + + public SubscriptionList(AMQQueue queue) + { + } + + private void advanceHead() + { + SubscriptionNode head = _head.nextNode(); + while(head._next.get() != null && head.isDeleted()) + { + + final SubscriptionNode newhead = head.nextNode(); + if(newhead != null) + { + _head._next.compareAndSet(head, newhead); + } + head = _head.nextNode(); + } + } + + + public SubscriptionNode add(Subscription sub) { + SubscriptionNode node = new SubscriptionNode(sub); for (;;) { SubscriptionNode tail = _tail.get(); @@ -130,14 +143,11 @@ public class SubscriptionList { if (next == null) { - if (tail.setNext(node)) + if (tail._next.compareAndSet(null, node)) { _tail.compareAndSet(tail, node); - if(count) - { - _size.incrementAndGet(); - } - return; + _size.incrementAndGet(); + return node; } } else @@ -146,101 +156,27 @@ public class SubscriptionList } } } - } - public void add(final Subscription sub) - { - SubscriptionNode node = new SubscriptionNode(sub); - insert(node, true); } - public boolean remove(final Subscription sub) + public boolean remove(Subscription sub) { - SubscriptionNode prevNode = _head; - SubscriptionNode node = _head.nextNode(); - + SubscriptionNode node = _head.getNext(); while(node != null) { - if(sub.equals(node.getSubscription()) && node.delete()) + if(sub.equals(node._sub) && node.delete()) { - _size.decrementAndGet(); - - SubscriptionNode tail = _tail.get(); - if(node == tail) - { - //we cant remove the last node from the structure for - //correctness reasons, however we have just 'deleted' - //the tail. Inserting an empty dummy node after it will - //let us scavenge the node containing the Subscription. - insert(new SubscriptionNode(), false); - } - - //advance the next node reference in the 'prevNode' to scavange - //the newly 'deleted' node for the Subscription. - prevNode.findNext(); - - nodeMarkerCleanup(node); - return true; } - - prevNode = node; - node = node.findNext(); + node = node.getNext(); } - return false; } - private void nodeMarkerCleanup(final SubscriptionNode node) - { - SubscriptionNode markedNode = _subNodeMarker.get(); - if(node == markedNode) - { - //if the marked node is the one we are removing, then - //replace it with a dummy pointing at the next node. - //this is OK as the marked node is only used to index - //into the list and find the next node to use. - //Because we inserted a dummy if node was the - //tail, markedNode.nextNode() can never be null. - SubscriptionNode dummy = new SubscriptionNode(); - dummy.setNext(markedNode.nextNode()); - - //if the CAS fails the marked node has changed, thus - //we don't care about the dummy and just forget it - _subNodeMarker.compareAndSet(markedNode, dummy); - } - else if(markedNode != null) - { - //if the marked node was already deleted then it could - //hold subsequently removed nodes after it in the list - //in memory. Scavenge it to ensure their actual removal. - if(markedNode != _head && markedNode.isDeleted()) - { - markedNode.findNext(); - } - } - } - - public boolean updateMarkedNode(final SubscriptionNode expected, final SubscriptionNode nextNode) - { - return _subNodeMarker.compareAndSet(expected, nextNode); - } - - /** - * Get the current marked SubscriptionNode. This should only be used only to index into the list and find the next node - * after the mark, since if the previously marked node was subsequently deleted the item returned may be a dummy node - * with reference to the next node. - * - * @return the previously marked node (or a dummy if it was subsequently deleted) - */ - public SubscriptionNode getMarkedNode() - { - return _subNodeMarker.get(); - } - public static class SubscriptionNodeIterator { + private SubscriptionNode _lastNode; SubscriptionNodeIterator(SubscriptionNode startNode) @@ -248,25 +184,49 @@ public class SubscriptionList _lastNode = startNode; } + + public boolean atTail() + { + return _lastNode.nextNode() == null; + } + public SubscriptionNode getNode() { + return _lastNode; + } public boolean advance() { - SubscriptionNode nextNode = _lastNode.findNext(); - _lastNode = nextNode; - return _lastNode != null; + if(!atTail()) + { + SubscriptionNode nextNode = _lastNode.nextNode(); + while(nextNode.isDeleted() && nextNode.nextNode() != null) + { + nextNode = nextNode.nextNode(); + } + _lastNode = nextNode; + return true; + + } + else + { + return false; + } + } + } + public SubscriptionNodeIterator iterator() { return new SubscriptionNodeIterator(_head); } + public SubscriptionNode getHead() { return _head; @@ -276,6 +236,9 @@ public class SubscriptionList { return _size.get(); } + + + } |