diff options
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java')
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java | 121 |
1 files changed, 104 insertions, 17 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index ba02e6f6bd..d7844730d1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -23,20 +23,21 @@ package org.apache.qpid.server.queue; import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.txn.NonTransactionalContext; import java.util.ArrayList; public class AMQPriorityQueueTest extends SimpleAMQQueueTest { - private static final long MESSAGE_SIZE = 100L; + private static final int PRIORITIES = 3; @Override protected void setUp() throws Exception - { + { _arguments = new FieldTable(); - _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3); + _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES); super.setUp(); } @@ -64,20 +65,20 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest _queue.registerSubscription(_subscription, false); Thread.sleep(150); - ArrayList<QueueEntry> msgs = _subscription.getMessages(); + ArrayList<QueueEntry> msgs = _subscription.getQueueEntries(); try { - assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId()); - assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessage().getMessageId()); - assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessage().getMessageId()); + assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessageId()); + assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessageId()); + assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessageId()); - assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessage().getMessageId()); - assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessage().getMessageId()); - assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessage().getMessageId()); + assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessageId()); + assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessageId()); + assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessageId()); - assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessage().getMessageId()); - assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessage().getMessageId()); - assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessage().getMessageId()); + assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessageId()); + assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessageId()); + assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessageId()); } catch (AssertionFailedError afe) { @@ -85,7 +86,6 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest int index = 1; for (QueueEntry qe : msgs) { - System.err.println(index + ":" + qe.getMessage().getMessageId()); index++; } @@ -97,9 +97,96 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest protected AMQMessage createMessage(byte i) throws AMQException { AMQMessage message = super.createMessage(); - - ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).setPriority(i); + + ((BasicContentHeaderProperties) message.getContentHeaderBody().properties).setPriority(i); return message; } + + + public void testMessagesFlowToDiskWithPriority() throws AMQException, InterruptedException + { + int PRIORITIES = 1; + FieldTable arguments = new FieldTable(); + arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES); + + // Create IncomingMessage and nondurable queue + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + + //Create a priorityQueue + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments); + + MESSAGE_SIZE = 1; + long MEMORY_MAX = PRIORITIES * 2; + int MESSAGE_COUNT = (int) MEMORY_MAX * 2; + //Set the Memory Usage to be very low + _queue.setMemoryUsageMaximum(MEMORY_MAX); + + for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++) + { + sendMessage(txnContext, (msgCount % 10)); + } + + //Check that we can hold 10 messages without flowing + assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageMaximum()); + assertEquals(_queue.getMemoryUsageMaximum(), _queue.getMemoryUsageCurrent()); + assertTrue("Queue is flowed.", !_queue.isFlowed()); + + // Send another and ensure we are flowed + sendMessage(txnContext, 9); + + //Give the Purging Thread a chance to run + Thread.yield(); + Thread.sleep(500); + + assertTrue("Queue is not flowed.", _queue.isFlowed()); + assertEquals("Queue contains more messages than expected.", MESSAGE_COUNT / 2 + 1, _queue.getMessageCount()); + assertEquals("Queue over memory quota.",MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent()); + + + //send another batch of messagse so the total in each queue is equal + for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) ; msgCount++) + { + sendMessage(txnContext, (msgCount % 10)); + + long usage = _queue.getMemoryUsageCurrent(); + assertTrue("Queue has gone over quota:" + usage, + usage <= _queue.getMemoryUsageMaximum()); + + assertTrue("Queue has a negative quota:" + usage, usage > 0); + + } + assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent()); + assertTrue("Queue is not flowed.", _queue.isFlowed()); + + _queue.registerSubscription(_subscription, false); + + int slept = 0; + while (_subscription.getQueueEntries().size() != MESSAGE_COUNT + 1 && slept < 10) + { + Thread.yield(); + Thread.sleep(500); + slept++; + } + + //Ensure the messages are retreived + assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT + 1, _subscription.getQueueEntries().size()); + + //Check the queue is still within it's limits. + assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(), + _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum()); + + assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0); + + for (int index = 0; index < MESSAGE_COUNT; index++) + { + // Ensure that we have received the messages and it wasn't flushed to disk before we received it. + AMQMessage message = _subscription.getMessages().get(index); + assertNotNull("Message:" + message.debugIdentity() + " was null.", message); + } + + } + } |