summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-04-03 17:08:06 +0000
committerRobert Gemmell <robbie@apache.org>2012-04-03 17:08:06 +0000
commitb53577acedf415b475ac3358728b909b4e3177dd (patch)
tree66e406e7a5cf1c4b9ccd5a0db23f05c9cdadbe28
parent2cbc215be53cabc54e9fba48a588a2bf0962de68 (diff)
downloadqpid-python-b53577acedf415b475ac3358728b909b4e3177dd.tar.gz
QPID-3927: ensure that priority is properly accounted for when comparing messages on different QueueEntryLists contained within the encompassing PriorityQueue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1309050 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java50
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java117
3 files changed, 165 insertions, 4 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index 05141a48a1..1d13ee66c0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -25,19 +25,19 @@ import org.apache.qpid.server.message.ServerMessage;
public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
{
private final AMQQueue _queue;
- private final SimpleQueueEntryList[] _priorityLists;
+ private final PriorityQueueEntrySubList[] _priorityLists;
private final int _priorities;
private final int _priorityOffset;
public PriorityQueueList(AMQQueue queue, int priorities)
{
_queue = queue;
- _priorityLists = new SimpleQueueEntryList[priorities];
+ _priorityLists = new PriorityQueueEntrySubList[priorities];
_priorities = priorities;
_priorityOffset = 5-((priorities + 1)/2);
for(int i = 0; i < priorities; i++)
{
- _priorityLists[i] = new SimpleQueueEntryList(queue);
+ _priorityLists[i] = new PriorityQueueEntrySubList(queue);
}
}
@@ -161,4 +161,48 @@ public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
return new PriorityQueueList(queue, _priorities);
}
}
+
+ private static class PriorityQueueEntrySubList extends SimpleQueueEntryList
+ {
+ public PriorityQueueEntrySubList(AMQQueue queue)
+ {
+ super(queue);
+ }
+
+ @Override
+ protected PriorityQueueEntryImpl createQueueEntry(ServerMessage<?> message)
+ {
+ return new PriorityQueueEntryImpl(this, message);
+ }
+ }
+
+ private static class PriorityQueueEntryImpl extends SimpleQueueEntryImpl
+ {
+ public PriorityQueueEntryImpl(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message)
+ {
+ super(queueEntryList, message);
+ }
+
+ @Override
+ public int compareTo(final QueueEntry o)
+ {
+ byte thisPriority = getMessageHeader().getPriority();
+ byte otherPriority = o.getMessageHeader().getPriority();
+
+ if(thisPriority != otherPriority)
+ {
+ /*
+ * Different priorities, so answer can only be greater than or less than
+ *
+ * A message with higher priority (e.g. 5) is conceptually 'earlier' in the
+ * priority queue than one with a lower priority (e.g. 4).
+ */
+ return thisPriority > otherPriority ? -1 : 1;
+ }
+ else
+ {
+ return super.compareTo(o);
+ }
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index c82d1b984a..b8d8ec19f4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -104,7 +104,7 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
}
}
- protected SimpleQueueEntryImpl createQueueEntry(ServerMessage message)
+ protected SimpleQueueEntryImpl createQueueEntry(ServerMessage<?> message)
{
return new SimpleQueueEntryImpl(this, message);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
new file mode 100644
index 0000000000..e8c0470915
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class PriorityQueueListTest extends QpidTestCase
+{
+ private static final byte[] PRIORITIES = {4, 5, 5, 4};
+ PriorityQueueList _list = new PriorityQueueList(null, 10);
+
+ private QueueEntry _priority4message1;
+ private QueueEntry _priority4message2;
+ private QueueEntry _priority5message1;
+ private QueueEntry _priority5message2;
+
+ protected void setUp()
+ {
+ QueueEntry[] entries = new QueueEntry[PRIORITIES.length];
+
+ for (int i = 0; i < PRIORITIES.length; i++)
+ {
+ ServerMessage<?> message = mock(ServerMessage.class);
+ AMQMessageHeader header = mock(AMQMessageHeader.class);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ MessageReference<ServerMessage> ref = mock(MessageReference.class);
+
+ when(message.getMessageHeader()).thenReturn(header);
+ when(message.newReference()).thenReturn(ref);
+ when(ref.getMessage()).thenReturn(message);
+ when(header.getPriority()).thenReturn(PRIORITIES[i]);
+
+ entries[i] = _list.add(message);
+ }
+
+ _priority4message1 = entries[0];
+ _priority4message2 = entries[3];
+ _priority5message1 = entries[1];
+ _priority5message2 = entries[2];
+ }
+
+ public void testPriorityQueueEntryCompareToItself()
+ {
+ //check messages compare to themselves properly
+ assertEquals("message should compare 'equal' to itself",
+ 0, _priority4message1.compareTo(_priority4message1));
+
+ assertEquals("message should compare 'equal' to itself",
+ 0, _priority5message2.compareTo(_priority5message2));
+ }
+
+ public void testPriorityQueueEntryCompareToSamePriority()
+ {
+ //check messages with the same priority are ordered properly
+ assertEquals("first message should be 'earlier' than second message of the same priority",
+ -1, _priority4message1.compareTo(_priority4message2));
+
+ assertEquals("first message should be 'earlier' than second message of the same priority",
+ -1, _priority5message1.compareTo(_priority5message2));
+
+ //and in reverse
+ assertEquals("second message should be 'later' than first message of the same priority",
+ 1, _priority4message2.compareTo(_priority4message1));
+
+ assertEquals("second message should be 'later' than first message of the same priority",
+ 1, _priority5message2.compareTo(_priority5message1));
+ }
+
+ public void testPriorityQueueEntryCompareToDifferentPriority()
+ {
+ //check messages with higher priority are ordered 'earlier' than those with lower priority
+ assertEquals("first message with priority 5 should be 'earlier' than first message of priority 4",
+ -1, _priority5message1.compareTo(_priority4message1));
+ assertEquals("first message with priority 5 should be 'earlier' than second message of priority 4",
+ -1, _priority5message1.compareTo(_priority4message2));
+
+ assertEquals("second message with priority 5 should be 'earlier' than first message of priority 4",
+ -1, _priority5message2.compareTo(_priority4message1));
+ assertEquals("second message with priority 5 should be 'earlier' than second message of priority 4",
+ -1, _priority5message2.compareTo(_priority4message2));
+
+ //and in reverse
+ assertEquals("first message with priority 4 should be 'later' than first message of priority 5",
+ 1, _priority4message1.compareTo(_priority5message1));
+ assertEquals("first message with priority 4 should be 'later' than second message of priority 5",
+ 1, _priority4message1.compareTo(_priority5message2));
+
+ assertEquals("second message with priority 4 should be 'later' than first message of priority 5",
+ 1, _priority4message2.compareTo(_priority5message1));
+ assertEquals("second message with priority 4 should be 'later' than second message of priority 5",
+ 1, _priority4message2.compareTo(_priority5message2));
+ }
+}