summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java121
1 files changed, 104 insertions, 17 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index ba02e6f6bd..d7844730d1 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -23,20 +23,21 @@ package org.apache.qpid.server.queue;
import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.txn.NonTransactionalContext;
import java.util.ArrayList;
public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
- private static final long MESSAGE_SIZE = 100L;
+ private static final int PRIORITIES = 3;
@Override
protected void setUp() throws Exception
- {
+ {
_arguments = new FieldTable();
- _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3);
+ _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
super.setUp();
}
@@ -64,20 +65,20 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
- ArrayList<QueueEntry> msgs = _subscription.getMessages();
+ ArrayList<QueueEntry> msgs = _subscription.getQueueEntries();
try
{
- assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId());
- assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessage().getMessageId());
- assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessage().getMessageId());
+ assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessageId());
+ assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessageId());
+ assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessageId());
- assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessage().getMessageId());
- assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessage().getMessageId());
- assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessage().getMessageId());
+ assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessageId());
+ assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessageId());
+ assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessageId());
- assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessage().getMessageId());
- assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessage().getMessageId());
- assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessage().getMessageId());
+ assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessageId());
+ assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessageId());
+ assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessageId());
}
catch (AssertionFailedError afe)
{
@@ -85,7 +86,6 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
int index = 1;
for (QueueEntry qe : msgs)
{
- System.err.println(index + ":" + qe.getMessage().getMessageId());
index++;
}
@@ -97,9 +97,96 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
protected AMQMessage createMessage(byte i) throws AMQException
{
AMQMessage message = super.createMessage();
-
- ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).setPriority(i);
+
+ ((BasicContentHeaderProperties) message.getContentHeaderBody().properties).setPriority(i);
return message;
}
+
+
+ public void testMessagesFlowToDiskWithPriority() throws AMQException, InterruptedException
+ {
+ int PRIORITIES = 1;
+ FieldTable arguments = new FieldTable();
+ arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
+
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ //Create a priorityQueue
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments);
+
+ MESSAGE_SIZE = 1;
+ long MEMORY_MAX = PRIORITIES * 2;
+ int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
+
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
+ {
+ sendMessage(txnContext, (msgCount % 10));
+ }
+
+ //Check that we can hold 10 messages without flowing
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageMaximum());
+ assertEquals(_queue.getMemoryUsageMaximum(), _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send another and ensure we are flowed
+ sendMessage(txnContext, 9);
+
+ //Give the Purging Thread a chance to run
+ Thread.yield();
+ Thread.sleep(500);
+
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+ assertEquals("Queue contains more messages than expected.", MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+ assertEquals("Queue over memory quota.",MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
+
+
+ //send another batch of messagse so the total in each queue is equal
+ for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) ; msgCount++)
+ {
+ sendMessage(txnContext, (msgCount % 10));
+
+ long usage = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + usage,
+ usage <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + usage, usage > 0);
+
+ }
+ assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.registerSubscription(_subscription, false);
+
+ int slept = 0;
+ while (_subscription.getQueueEntries().size() != MESSAGE_COUNT + 1 && slept < 10)
+ {
+ Thread.yield();
+ Thread.sleep(500);
+ slept++;
+ }
+
+ //Ensure the messages are retreived
+ assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT + 1, _subscription.getQueueEntries().size());
+
+ //Check the queue is still within it's limits.
+ assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
+ _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
+
+ for (int index = 0; index < MESSAGE_COUNT; index++)
+ {
+ // Ensure that we have received the messages and it wasn't flushed to disk before we received it.
+ AMQMessage message = _subscription.getMessages().get(index);
+ assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
+ }
+
+ }
+
}