summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java659
1 files changed, 659 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
new file mode 100644
index 0000000000..7f742d455d
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -0,0 +1,659 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
+
+/**
+ * A sorted implementation of QueueEntryList.
+ * Uses the red/black tree algorithm specified in "Introduction to Algorithms".
+ * ISBN-10: 0262033844
+ * ISBN-13: 978-0262033848
+ * @see http://en.wikipedia.org/wiki/Red-black_tree
+ */
+public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl>
+{
+ private final SortedQueueEntryImpl _head;
+ private SortedQueueEntryImpl _root;
+ private long _entryId = Long.MIN_VALUE;
+ private final Object _lock = new Object();
+ private final AMQQueue _queue;
+ private final String _propertyName;
+
+ public SortedQueueEntryList(final AMQQueue queue, final String propertyName)
+ {
+ _queue = queue;
+ _head = new SortedQueueEntryImpl(this);
+ _propertyName = propertyName;
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public SortedQueueEntryImpl add(final ServerMessage message)
+ {
+ synchronized(_lock)
+ {
+ String key = null;
+ final Object val = message.getMessageHeader().getHeader(_propertyName);
+ if(val != null)
+ {
+ key = val.toString();
+ }
+
+ final SortedQueueEntryImpl entry = new SortedQueueEntryImpl(this,message, ++_entryId);
+ entry.setKey(key);
+
+ insert(entry);
+
+ return entry;
+ }
+ }
+
+ /**
+ * Red Black Tree insert implementation.
+ * @param entry the entry to insert.
+ */
+ private void insert(final SortedQueueEntryImpl entry)
+ {
+ SortedQueueEntryImpl node = _root;
+ if((node = _root) == null)
+ {
+ _root = entry;
+ _head.setNext(entry);
+ entry.setPrev(_head);
+ return;
+ }
+ else
+ {
+ SortedQueueEntryImpl parent = null;
+ while(node != null)
+ {
+ parent = node;
+ if(entry.compareTo(node) < 0)
+ {
+ node = node.getLeft();
+ }
+ else
+ {
+ node = node.getRight();
+ }
+ }
+ entry.setParent(parent);
+
+ if(entry.compareTo(parent) < 0)
+ {
+ parent.setLeft(entry);
+ final SortedQueueEntryImpl prev = parent.getPrev();
+ entry.setNext(parent);
+ prev.setNext(entry);
+ entry.setPrev(prev);
+ parent.setPrev(entry);
+ }
+ else
+ {
+ parent.setRight(entry);
+
+ final SortedQueueEntryImpl next = parent.getNextValidEntry();
+ entry.setNext(next);
+ parent.setNext(entry);
+
+ if(next != null)
+ {
+ next.setPrev(entry);
+ }
+ entry.setPrev(parent);
+ }
+ }
+ entry.setColour(Colour.RED);
+ insertFixup(entry);
+ }
+
+ private void insertFixup(SortedQueueEntryImpl entry)
+ {
+ while(isParentColour(entry, Colour.RED))
+ {
+ final SortedQueueEntryImpl grandparent = nodeGrandparent(entry);
+
+ if(nodeParent(entry) == leftChild(grandparent))
+ {
+ final SortedQueueEntryImpl y = rightChild(grandparent);
+ if(isNodeColour(y, Colour.RED))
+ {
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(y, Colour.BLACK);
+ setColour(grandparent, Colour.RED);
+ entry = grandparent;
+ }
+ else
+ {
+ if(entry == rightChild(nodeParent(entry)))
+ {
+ entry = nodeParent(entry);
+ leftRotate(entry);
+ }
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(nodeGrandparent(entry), Colour.RED);
+ rightRotate(nodeGrandparent(entry));
+ }
+ }
+ else
+ {
+ final SortedQueueEntryImpl y = leftChild(grandparent);
+ if(isNodeColour(y, Colour.RED))
+ {
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(y, Colour.BLACK);
+ setColour(grandparent, Colour.RED);
+ entry = grandparent;
+ }
+ else
+ {
+ if(entry == leftChild(nodeParent(entry)))
+ {
+ entry = nodeParent(entry);
+ rightRotate(entry);
+ }
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(nodeGrandparent(entry), Colour.RED);
+ leftRotate(nodeGrandparent(entry));
+ }
+ }
+ }
+ _root.setColour(Colour.BLACK);
+ }
+
+ private void leftRotate(final SortedQueueEntryImpl entry)
+ {
+ if(entry != null)
+ {
+ final SortedQueueEntryImpl rightChild = rightChild(entry);
+ entry.setRight(rightChild.getLeft());
+ if(entry.getRight() != null)
+ {
+ entry.getRight().setParent(entry);
+ }
+ rightChild.setParent(entry.getParent());
+ if(entry.getParent() == null)
+ {
+ _root = rightChild;
+ }
+ else if(entry == entry.getParent().getLeft())
+ {
+ entry.getParent().setLeft(rightChild);
+ }
+ else
+ {
+ entry.getParent().setRight(rightChild);
+ }
+ rightChild.setLeft(entry);
+ entry.setParent(rightChild);
+ }
+ }
+
+ private void rightRotate(final SortedQueueEntryImpl entry)
+ {
+ if(entry != null)
+ {
+ final SortedQueueEntryImpl leftChild = leftChild(entry);
+ entry.setLeft(leftChild.getRight());
+ if(entry.getLeft() != null)
+ {
+ leftChild.getRight().setParent(entry);
+ }
+ leftChild.setParent(entry.getParent());
+ if(leftChild.getParent() == null)
+ {
+ _root = leftChild;
+ }
+ else if(entry == entry.getParent().getRight())
+ {
+ entry.getParent().setRight(leftChild);
+ }
+ else
+ {
+ entry.getParent().setLeft(leftChild);
+ }
+ leftChild.setRight(entry);
+ entry.setParent(leftChild);
+ }
+ }
+
+ private void setColour(final SortedQueueEntryImpl node, final Colour colour)
+ {
+ if(node != null)
+ {
+ node.setColour(colour);
+ }
+ }
+
+ private SortedQueueEntryImpl leftChild(final SortedQueueEntryImpl node)
+ {
+ return node == null ? null : node.getLeft();
+ }
+
+ private SortedQueueEntryImpl rightChild(final SortedQueueEntryImpl node)
+ {
+ return node == null ? null : node.getRight();
+ }
+
+ private SortedQueueEntryImpl nodeParent(final SortedQueueEntryImpl node)
+ {
+ return node == null ? null : node.getParent();
+ }
+
+ private SortedQueueEntryImpl nodeGrandparent(final SortedQueueEntryImpl node)
+ {
+ return nodeParent(nodeParent(node));
+ }
+
+ private boolean isParentColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour)
+ {
+
+ return node != null && isNodeColour(node.getParent(), colour);
+ }
+
+ protected boolean isNodeColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour)
+ {
+ return (node == null ? Colour.BLACK : node.getColour()) == colour;
+ }
+
+ public SortedQueueEntryImpl next(final SortedQueueEntryImpl node)
+ {
+ synchronized(_lock)
+ {
+ if(node.isDispensed() && _head != node)
+ {
+ SortedQueueEntryImpl current = _head;
+ SortedQueueEntryImpl next;
+ while(current != null)
+ {
+ next = current.getNextValidEntry();
+ if(current.compareTo(node)>0 && !current.isDispensed())
+ {
+ break;
+ }
+ else
+ {
+ current = next;
+ }
+ }
+ return current;
+ }
+ else
+ {
+ return node.getNextValidEntry();
+ }
+ }
+ }
+
+ public QueueEntryIterator<SortedQueueEntryImpl> iterator()
+ {
+ return new QueueEntryIteratorImpl(_head);
+ }
+
+ public SortedQueueEntryImpl getHead()
+ {
+ return _head;
+ }
+
+ protected SortedQueueEntryImpl getRoot()
+ {
+ return _root;
+ }
+
+ public void entryDeleted(final SortedQueueEntryImpl entry)
+ {
+ synchronized(_lock)
+ {
+ // If the node to be removed has two children, we swap the position
+ // of the node and its successor in the tree
+ if(leftChild(entry) != null && rightChild(entry) != null)
+ {
+ swapWithSuccessor(entry);
+ }
+
+ // Then deal with the easy doubly linked list deletion (need to do
+ // this after the swap as the swap uses next
+ final SortedQueueEntryImpl prev = entry.getPrev();
+ if(prev != null)
+ {
+ prev.setNext(entry.getNextValidEntry());
+ }
+
+ final SortedQueueEntryImpl next = entry.getNextValidEntry();
+ if(next != null)
+ {
+ next.setPrev(prev);
+ }
+
+ // now deal with splicing
+ final SortedQueueEntryImpl chosenChild;
+
+ if(leftChild(entry) != null)
+ {
+ chosenChild = leftChild(entry);
+ }
+ else
+ {
+ chosenChild = rightChild(entry);
+ }
+
+ if(chosenChild != null)
+ {
+ // we have one child (x), we can move it up to replace x
+ chosenChild.setParent(entry.getParent());
+ if(chosenChild.getParent() == null)
+ {
+ _root = chosenChild;
+ }
+ else if(entry == entry.getParent().getLeft())
+ {
+ entry.getParent().setLeft(chosenChild);
+ }
+ else
+ {
+ entry.getParent().setRight(chosenChild);
+ }
+
+ entry.setLeft(null);
+ entry.setRight(null);
+ entry.setParent(null);
+
+ if(entry.getColour() == Colour.BLACK)
+ {
+ deleteFixup(chosenChild);
+ }
+
+ }
+ else
+ {
+ // no children
+ if(entry.getParent() == null)
+ {
+ // no parent either - the tree is empty
+ _root = null;
+ }
+ else
+ {
+ if(entry.getColour() == Colour.BLACK)
+ {
+ deleteFixup(entry);
+ }
+
+ if(entry.getParent() != null)
+ {
+ if(entry.getParent().getLeft() == entry)
+ {
+ entry.getParent().setLeft(null);
+ }
+ else if(entry.getParent().getRight() == entry)
+ {
+ entry.getParent().setRight(null);
+ }
+ entry.setParent(null);
+ }
+ }
+ }
+
+ }
+ }
+
+ public int getPriorities()
+ {
+ return 0;
+ }
+
+ /**
+ * Swaps the position of the node in the tree with it's successor
+ * (that is the node with the next highest key)
+ * @param entry
+ */
+ private void swapWithSuccessor(final SortedQueueEntryImpl entry)
+ {
+ final SortedQueueEntryImpl next = entry.getNextValidEntry();
+ final SortedQueueEntryImpl nextParent = next.getParent();
+ final SortedQueueEntryImpl nextLeft = next.getLeft();
+ final SortedQueueEntryImpl nextRight = next.getRight();
+ final Colour nextColour = next.getColour();
+
+ // Special case - the successor is the right child of the node
+ if(next == entry.getRight())
+ {
+ next.setParent(entry.getParent());
+ if(next.getParent() == null)
+ {
+ _root = next;
+ }
+ else if(next.getParent().getLeft() == entry)
+ {
+ next.getParent().setLeft(next);
+ }
+ else
+ {
+ next.getParent().setRight(next);
+ }
+
+ next.setRight(entry);
+ entry.setParent(next);
+ next.setLeft(entry.getLeft());
+
+ if(next.getLeft() != null)
+ {
+ next.getLeft().setParent(next);
+ }
+
+ next.setColour(entry.getColour());
+ entry.setColour(nextColour);
+ entry.setLeft(nextLeft);
+
+ if(nextLeft != null)
+ {
+ nextLeft.setParent(entry);
+ }
+ entry.setRight(nextRight);
+ if(nextRight != null)
+ {
+ nextRight.setParent(entry);
+ }
+ }
+ else
+ {
+ next.setParent(entry.getParent());
+ if(next.getParent() == null)
+ {
+ _root = next;
+ }
+ else if(next.getParent().getLeft() == entry)
+ {
+ next.getParent().setLeft(next);
+ }
+ else
+ {
+ next.getParent().setRight(next);
+ }
+
+ next.setLeft(entry.getLeft());
+ if(next.getLeft() != null)
+ {
+ next.getLeft().setParent(next);
+ }
+ next.setRight(entry.getRight());
+ if(next.getRight() != null)
+ {
+ next.getRight().setParent(next);
+ }
+ next.setColour(entry.getColour());
+
+ entry.setParent(nextParent);
+ if(nextParent.getLeft() == next)
+ {
+ nextParent.setLeft(entry);
+ }
+ else
+ {
+ nextParent.setRight(entry);
+ }
+
+ entry.setLeft(nextLeft);
+ if(nextLeft != null)
+ {
+ nextLeft.setParent(entry);
+ }
+ entry.setRight(nextRight);
+ if(nextRight != null)
+ {
+ nextRight.setParent(entry);
+ }
+ entry.setColour(nextColour);
+ }
+ }
+
+ private void deleteFixup(SortedQueueEntryImpl entry)
+ {
+ int i = 0;
+ while(entry != null && entry != _root
+ && isNodeColour(entry, Colour.BLACK))
+ {
+ i++;
+
+ if(i > 1000)
+ {
+ return;
+ }
+
+ if(entry == leftChild(nodeParent(entry)))
+ {
+ SortedQueueEntryImpl rightSibling = rightChild(nodeParent(entry));
+ if(isNodeColour(rightSibling, Colour.RED))
+ {
+ setColour(rightSibling, Colour.BLACK);
+ nodeParent(entry).setColour(Colour.RED);
+ leftRotate(nodeParent(entry));
+ rightSibling = rightChild(nodeParent(entry));
+ }
+
+ if(isNodeColour(leftChild(rightSibling), Colour.BLACK)
+ && isNodeColour(rightChild(rightSibling), Colour.BLACK))
+ {
+ setColour(rightSibling, Colour.RED);
+ entry = nodeParent(entry);
+ }
+ else
+ {
+ if(isNodeColour(rightChild(rightSibling), Colour.BLACK))
+ {
+ setColour(leftChild(rightSibling), Colour.BLACK);
+ rightSibling.setColour(Colour.RED);
+ rightRotate(rightSibling);
+ rightSibling = rightChild(nodeParent(entry));
+ }
+ setColour(rightSibling, getColour(nodeParent(entry)));
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(rightChild(rightSibling), Colour.BLACK);
+ leftRotate(nodeParent(entry));
+ entry = _root;
+ }
+ }
+ else
+ {
+ SortedQueueEntryImpl leftSibling = leftChild(nodeParent(entry));
+ if(isNodeColour(leftSibling, Colour.RED))
+ {
+ setColour(leftSibling, Colour.BLACK);
+ nodeParent(entry).setColour(Colour.RED);
+ rightRotate(nodeParent(entry));
+ leftSibling = leftChild(nodeParent(entry));
+ }
+
+ if(isNodeColour(leftChild(leftSibling), Colour.BLACK)
+ && isNodeColour(rightChild(leftSibling), Colour.BLACK))
+ {
+ setColour(leftSibling, Colour.RED);
+ entry = nodeParent(entry);
+ }
+ else
+ {
+ if(isNodeColour(leftChild(leftSibling), Colour.BLACK))
+ {
+ setColour(rightChild(leftSibling), Colour.BLACK);
+ leftSibling.setColour(Colour.RED);
+ leftRotate(leftSibling);
+ leftSibling = leftChild(nodeParent(entry));
+ }
+ setColour(leftSibling, getColour(nodeParent(entry)));
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(leftChild(leftSibling), Colour.BLACK);
+ rightRotate(nodeParent(entry));
+ entry = _root;
+ }
+ }
+ }
+ setColour(entry, Colour.BLACK);
+ }
+
+ private Colour getColour(final SortedQueueEntryImpl x)
+ {
+ return x == null ? null : x.getColour();
+ }
+
+ public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntryImpl>
+ {
+ private SortedQueueEntryImpl _lastNode;
+
+ public QueueEntryIteratorImpl(final SortedQueueEntryImpl startNode)
+ {
+ _lastNode = startNode;
+ }
+
+ public boolean atTail()
+ {
+ return next(_lastNode) == null;
+ }
+
+ public SortedQueueEntryImpl getNode()
+ {
+ return _lastNode;
+ }
+
+ public boolean advance()
+ {
+ if(!atTail())
+ {
+ SortedQueueEntryImpl nextNode = next(_lastNode);
+ while(nextNode.isDispensed() && next(nextNode) != null)
+ {
+ nextNode = next(nextNode);
+ }
+ _lastNode = nextNode;
+ return true;
+
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+}