summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java213
1 files changed, 88 insertions, 125 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
index 3e6299cb8a..9ea81660c6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
@@ -20,108 +20,121 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.subscription.Subscription;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.nio.ByteBuffer;
public class SubscriptionList
{
+
private final SubscriptionNode _head = new SubscriptionNode();
- private final AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
- private final AtomicReference<SubscriptionNode> _subNodeMarker = new AtomicReference<SubscriptionNode>(_head);
- private final AtomicInteger _size = new AtomicInteger();
+ private AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
+ private AtomicInteger _size = new AtomicInteger();
+
- public static final class SubscriptionNode
+ public final class SubscriptionNode
{
private final AtomicBoolean _deleted = new AtomicBoolean();
private final AtomicReference<SubscriptionNode> _next = new AtomicReference<SubscriptionNode>();
private final Subscription _sub;
+
public SubscriptionNode()
{
- //used for sentinel head and dummy node construction
+
_sub = null;
_deleted.set(true);
}
public SubscriptionNode(final Subscription sub)
{
- //used for regular node construction
_sub = sub;
}
- /**
- * Retrieves the first non-deleted node following the current node.
- * Any deleted non-tail nodes encountered during the search are unlinked.
- *
- * @return the next non-deleted node, or null if none was found.
- */
- public SubscriptionNode findNext()
+
+ public SubscriptionNode getNext()
{
+
SubscriptionNode next = nextNode();
while(next != null && next.isDeleted())
{
+
final SubscriptionNode newNext = next.nextNode();
if(newNext != null)
{
- //try to move our _next reference forward to the 'newNext'
- //node to unlink the deleted node
_next.compareAndSet(next, newNext);
next = nextNode();
}
else
{
- //'newNext' is null, meaning 'next' is the current tail. Can't unlink
- //the tail node for thread safety reasons, just use the null.
next = null;
}
- }
+ }
return next;
}
- /**
- * Gets the immediately next referenced node in the structure.
- *
- * @return the immediately next node in the structure, or null if at the tail.
- */
- protected SubscriptionNode nextNode()
+ private SubscriptionNode nextNode()
{
return _next.get();
}
- /**
- * Used to initialise the 'next' reference. Will only succeed if the reference was not previously set.
- *
- * @param node the SubscriptionNode to set as 'next'
- * @return whether the operation succeeded
- */
- private boolean setNext(final SubscriptionNode node)
- {
- return _next.compareAndSet(null, node);
- }
-
public boolean isDeleted()
{
return _deleted.get();
}
+
public boolean delete()
{
- return _deleted.compareAndSet(false,true);
+ if(_deleted.compareAndSet(false,true))
+ {
+ _size.decrementAndGet();
+ advanceHead();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
+
public Subscription getSubscription()
{
return _sub;
}
}
- private void insert(final SubscriptionNode node, final boolean count)
+
+ public SubscriptionList(AMQQueue queue)
+ {
+ }
+
+ private void advanceHead()
+ {
+ SubscriptionNode head = _head.nextNode();
+ while(head._next.get() != null && head.isDeleted())
+ {
+
+ final SubscriptionNode newhead = head.nextNode();
+ if(newhead != null)
+ {
+ _head._next.compareAndSet(head, newhead);
+ }
+ head = _head.nextNode();
+ }
+ }
+
+
+ public SubscriptionNode add(Subscription sub)
{
+ SubscriptionNode node = new SubscriptionNode(sub);
for (;;)
{
SubscriptionNode tail = _tail.get();
@@ -130,14 +143,11 @@ public class SubscriptionList
{
if (next == null)
{
- if (tail.setNext(node))
+ if (tail._next.compareAndSet(null, node))
{
_tail.compareAndSet(tail, node);
- if(count)
- {
- _size.incrementAndGet();
- }
- return;
+ _size.incrementAndGet();
+ return node;
}
}
else
@@ -146,101 +156,27 @@ public class SubscriptionList
}
}
}
- }
- public void add(final Subscription sub)
- {
- SubscriptionNode node = new SubscriptionNode(sub);
- insert(node, true);
}
- public boolean remove(final Subscription sub)
+ public boolean remove(Subscription sub)
{
- SubscriptionNode prevNode = _head;
- SubscriptionNode node = _head.nextNode();
-
+ SubscriptionNode node = _head.getNext();
while(node != null)
{
- if(sub.equals(node.getSubscription()) && node.delete())
+ if(sub.equals(node._sub) && node.delete())
{
- _size.decrementAndGet();
-
- SubscriptionNode tail = _tail.get();
- if(node == tail)
- {
- //we cant remove the last node from the structure for
- //correctness reasons, however we have just 'deleted'
- //the tail. Inserting an empty dummy node after it will
- //let us scavenge the node containing the Subscription.
- insert(new SubscriptionNode(), false);
- }
-
- //advance the next node reference in the 'prevNode' to scavange
- //the newly 'deleted' node for the Subscription.
- prevNode.findNext();
-
- nodeMarkerCleanup(node);
-
return true;
}
-
- prevNode = node;
- node = node.findNext();
+ node = node.getNext();
}
-
return false;
}
- private void nodeMarkerCleanup(final SubscriptionNode node)
- {
- SubscriptionNode markedNode = _subNodeMarker.get();
- if(node == markedNode)
- {
- //if the marked node is the one we are removing, then
- //replace it with a dummy pointing at the next node.
- //this is OK as the marked node is only used to index
- //into the list and find the next node to use.
- //Because we inserted a dummy if node was the
- //tail, markedNode.nextNode() can never be null.
- SubscriptionNode dummy = new SubscriptionNode();
- dummy.setNext(markedNode.nextNode());
-
- //if the CAS fails the marked node has changed, thus
- //we don't care about the dummy and just forget it
- _subNodeMarker.compareAndSet(markedNode, dummy);
- }
- else if(markedNode != null)
- {
- //if the marked node was already deleted then it could
- //hold subsequently removed nodes after it in the list
- //in memory. Scavenge it to ensure their actual removal.
- if(markedNode != _head && markedNode.isDeleted())
- {
- markedNode.findNext();
- }
- }
- }
-
- public boolean updateMarkedNode(final SubscriptionNode expected, final SubscriptionNode nextNode)
- {
- return _subNodeMarker.compareAndSet(expected, nextNode);
- }
-
- /**
- * Get the current marked SubscriptionNode. This should only be used only to index into the list and find the next node
- * after the mark, since if the previously marked node was subsequently deleted the item returned may be a dummy node
- * with reference to the next node.
- *
- * @return the previously marked node (or a dummy if it was subsequently deleted)
- */
- public SubscriptionNode getMarkedNode()
- {
- return _subNodeMarker.get();
- }
-
public static class SubscriptionNodeIterator
{
+
private SubscriptionNode _lastNode;
SubscriptionNodeIterator(SubscriptionNode startNode)
@@ -248,25 +184,49 @@ public class SubscriptionList
_lastNode = startNode;
}
+
+ public boolean atTail()
+ {
+ return _lastNode.nextNode() == null;
+ }
+
public SubscriptionNode getNode()
{
+
return _lastNode;
+
}
public boolean advance()
{
- SubscriptionNode nextNode = _lastNode.findNext();
- _lastNode = nextNode;
- return _lastNode != null;
+ if(!atTail())
+ {
+ SubscriptionNode nextNode = _lastNode.nextNode();
+ while(nextNode.isDeleted() && nextNode.nextNode() != null)
+ {
+ nextNode = nextNode.nextNode();
+ }
+ _lastNode = nextNode;
+ return true;
+
+ }
+ else
+ {
+ return false;
+ }
+
}
+
}
+
public SubscriptionNodeIterator iterator()
{
return new SubscriptionNodeIterator(_head);
}
+
public SubscriptionNode getHead()
{
return _head;
@@ -276,6 +236,9 @@ public class SubscriptionList
{
return _size.get();
}
+
+
+
}