diff options
author | Robert Gemmell <robbie@apache.org> | 2012-04-03 17:08:06 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-04-03 17:08:06 +0000 |
commit | b53577acedf415b475ac3358728b909b4e3177dd (patch) | |
tree | 66e406e7a5cf1c4b9ccd5a0db23f05c9cdadbe28 | |
parent | 2cbc215be53cabc54e9fba48a588a2bf0962de68 (diff) | |
download | qpid-python-b53577acedf415b475ac3358728b909b4e3177dd.tar.gz |
QPID-3927: ensure that priority is properly accounted for when comparing messages on different QueueEntryLists contained within the encompassing PriorityQueue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1309050 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 165 insertions, 4 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index 05141a48a1..1d13ee66c0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -25,19 +25,19 @@ import org.apache.qpid.server.message.ServerMessage; public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl> { private final AMQQueue _queue; - private final SimpleQueueEntryList[] _priorityLists; + private final PriorityQueueEntrySubList[] _priorityLists; private final int _priorities; private final int _priorityOffset; public PriorityQueueList(AMQQueue queue, int priorities) { _queue = queue; - _priorityLists = new SimpleQueueEntryList[priorities]; + _priorityLists = new PriorityQueueEntrySubList[priorities]; _priorities = priorities; _priorityOffset = 5-((priorities + 1)/2); for(int i = 0; i < priorities; i++) { - _priorityLists[i] = new SimpleQueueEntryList(queue); + _priorityLists[i] = new PriorityQueueEntrySubList(queue); } } @@ -161,4 +161,48 @@ public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl> return new PriorityQueueList(queue, _priorities); } } + + private static class PriorityQueueEntrySubList extends SimpleQueueEntryList + { + public PriorityQueueEntrySubList(AMQQueue queue) + { + super(queue); + } + + @Override + protected PriorityQueueEntryImpl createQueueEntry(ServerMessage<?> message) + { + return new PriorityQueueEntryImpl(this, message); + } + } + + private static class PriorityQueueEntryImpl extends SimpleQueueEntryImpl + { + public PriorityQueueEntryImpl(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message) + { + super(queueEntryList, message); + } + + @Override + public int compareTo(final QueueEntry o) + { + byte thisPriority = getMessageHeader().getPriority(); + byte otherPriority = o.getMessageHeader().getPriority(); + + if(thisPriority != otherPriority) + { + /* + * Different priorities, so answer can only be greater than or less than + * + * A message with higher priority (e.g. 5) is conceptually 'earlier' in the + * priority queue than one with a lower priority (e.g. 4). + */ + return thisPriority > otherPriority ? -1 : 1; + } + else + { + return super.compareTo(o); + } + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index c82d1b984a..b8d8ec19f4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -104,7 +104,7 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl } } - protected SimpleQueueEntryImpl createQueueEntry(ServerMessage message) + protected SimpleQueueEntryImpl createQueueEntry(ServerMessage<?> message) { return new SimpleQueueEntryImpl(this, message); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java new file mode 100644 index 0000000000..e8c0470915 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.test.utils.QpidTestCase; + +public class PriorityQueueListTest extends QpidTestCase +{ + private static final byte[] PRIORITIES = {4, 5, 5, 4}; + PriorityQueueList _list = new PriorityQueueList(null, 10); + + private QueueEntry _priority4message1; + private QueueEntry _priority4message2; + private QueueEntry _priority5message1; + private QueueEntry _priority5message2; + + protected void setUp() + { + QueueEntry[] entries = new QueueEntry[PRIORITIES.length]; + + for (int i = 0; i < PRIORITIES.length; i++) + { + ServerMessage<?> message = mock(ServerMessage.class); + AMQMessageHeader header = mock(AMQMessageHeader.class); + @SuppressWarnings({ "rawtypes", "unchecked" }) + MessageReference<ServerMessage> ref = mock(MessageReference.class); + + when(message.getMessageHeader()).thenReturn(header); + when(message.newReference()).thenReturn(ref); + when(ref.getMessage()).thenReturn(message); + when(header.getPriority()).thenReturn(PRIORITIES[i]); + + entries[i] = _list.add(message); + } + + _priority4message1 = entries[0]; + _priority4message2 = entries[3]; + _priority5message1 = entries[1]; + _priority5message2 = entries[2]; + } + + public void testPriorityQueueEntryCompareToItself() + { + //check messages compare to themselves properly + assertEquals("message should compare 'equal' to itself", + 0, _priority4message1.compareTo(_priority4message1)); + + assertEquals("message should compare 'equal' to itself", + 0, _priority5message2.compareTo(_priority5message2)); + } + + public void testPriorityQueueEntryCompareToSamePriority() + { + //check messages with the same priority are ordered properly + assertEquals("first message should be 'earlier' than second message of the same priority", + -1, _priority4message1.compareTo(_priority4message2)); + + assertEquals("first message should be 'earlier' than second message of the same priority", + -1, _priority5message1.compareTo(_priority5message2)); + + //and in reverse + assertEquals("second message should be 'later' than first message of the same priority", + 1, _priority4message2.compareTo(_priority4message1)); + + assertEquals("second message should be 'later' than first message of the same priority", + 1, _priority5message2.compareTo(_priority5message1)); + } + + public void testPriorityQueueEntryCompareToDifferentPriority() + { + //check messages with higher priority are ordered 'earlier' than those with lower priority + assertEquals("first message with priority 5 should be 'earlier' than first message of priority 4", + -1, _priority5message1.compareTo(_priority4message1)); + assertEquals("first message with priority 5 should be 'earlier' than second message of priority 4", + -1, _priority5message1.compareTo(_priority4message2)); + + assertEquals("second message with priority 5 should be 'earlier' than first message of priority 4", + -1, _priority5message2.compareTo(_priority4message1)); + assertEquals("second message with priority 5 should be 'earlier' than second message of priority 4", + -1, _priority5message2.compareTo(_priority4message2)); + + //and in reverse + assertEquals("first message with priority 4 should be 'later' than first message of priority 5", + 1, _priority4message1.compareTo(_priority5message1)); + assertEquals("first message with priority 4 should be 'later' than second message of priority 5", + 1, _priority4message1.compareTo(_priority5message2)); + + assertEquals("second message with priority 4 should be 'later' than first message of priority 5", + 1, _priority4message2.compareTo(_priority5message1)); + assertEquals("second message with priority 4 should be 'later' than second message of priority 5", + 1, _priority4message2.compareTo(_priority5message2)); + } +} |