summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java125
1 files changed, 123 insertions, 2 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 67d093d00a..41ca751684 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -660,8 +660,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// Create IncomingMessage and nondurable queue
final IncomingMessage msg = new IncomingMessage(info);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.properties = new BasicContentHeaderProperties();
- ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+ ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
final ArrayList<BaseQueue> qs = new ArrayList<BaseQueue>();
@@ -707,6 +707,111 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
}
+ /**
+ * processQueue() is used when asynchronously delivering messages to
+ * subscriptions which could not be delivered immediately during the
+ * enqueue() operation.
+ *
+ * A defect within the method would mean that delivery of these messages may
+ * not occur should the Runner stop before all messages have been processed.
+ * Such a defect was discovered when Selectors were used such that one and
+ * only one subscription can/will accept any given messages, but multiple
+ * subscriptions are present, and one of the earlier subscriptions receives
+ * more messages than the others.
+ *
+ * This test is to validate that the processQueue() method is able to
+ * correctly deliver all of the messages present for asynchronous delivery
+ * to subscriptions in such a scenario.
+ */
+ public void testProcessQueueWithUniqueSelectors() throws Exception
+ {
+ TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
+ SimpleAMQQueue testQueue = new SimpleAMQQueue("testQueue", false, "testOwner",false,
+ false, _virtualHost, factory, null)
+ {
+ @Override
+ public void deliverAsync(Subscription sub)
+ {
+ // do nothing, i.e prevent deliveries by the SubFlushRunner
+ // when registering the new subscriptions
+ }
+ };
+
+ // retrieve the QueueEntryList the queue creates and insert the test
+ // messages, thus avoiding straight-through delivery attempts during
+ //enqueue() process.
+ QueueEntryList list = factory.getQueueEntryList();
+ assertNotNull("QueueEntryList should have been created", list);
+
+ QueueEntry msg1 = list.add(createMessage(1L));
+ QueueEntry msg2 = list.add(createMessage(2L));
+ QueueEntry msg3 = list.add(createMessage(3L));
+ QueueEntry msg4 = list.add(createMessage(4L));
+ QueueEntry msg5 = list.add(createMessage(5L));
+
+ // Create lists of the entries each subscription should be interested
+ // in.Bias over 50% of the messages to the first subscription so that
+ // the later subscriptions reject them and report being done before
+ // the first subscription as the processQueue method proceeds.
+ List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3);
+ List<QueueEntry> msgListSub2 = createEntriesList(msg4);
+ List<QueueEntry> msgListSub3 = createEntriesList(msg5);
+
+ MockSubscription sub1 = new MockSubscription(msgListSub1);
+ MockSubscription sub2 = new MockSubscription(msgListSub2);
+ MockSubscription sub3 = new MockSubscription(msgListSub3);
+
+ // register the subscriptions
+ testQueue.registerSubscription(sub1, false);
+ testQueue.registerSubscription(sub2, false);
+ testQueue.registerSubscription(sub3, false);
+
+ //check that no messages have been delivered to the
+ //subscriptions during registration
+ assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
+ assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
+ assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
+
+ // call processQueue to deliver the messages
+ testQueue.processQueue(new QueueRunner(testQueue, 1)
+ {
+ @Override
+ public void run()
+ {
+ // we dont actually want/need this runner to do any work
+ // because we we are already doing it!
+ }
+ });
+
+ // check expected messages delivered to correct consumers
+ verifyRecievedMessages(msgListSub1, sub1.getMessages());
+ verifyRecievedMessages(msgListSub2, sub2.getMessages());
+ verifyRecievedMessages(msgListSub3, sub3.getMessages());
+ }
+
+ private List<QueueEntry> createEntriesList(QueueEntry... entries)
+ {
+ ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>();
+ for (QueueEntry entry : entries)
+ {
+ entriesList.add(entry);
+ }
+ return entriesList;
+ }
+
+ private void verifyRecievedMessages(List<QueueEntry> expected,
+ List<QueueEntry> delivered)
+ {
+ assertEquals("Consumer did not receive the expected number of messages",
+ expected.size(), delivered.size());
+
+ for (QueueEntry msg : expected)
+ {
+ assertTrue("Consumer did not recieve msg: "
+ + msg.getMessage().getMessageNumber(), delivered.contains(msg));
+ }
+ }
+
public class TestMessage extends AMQMessage
{
private final long _tag;
@@ -747,4 +852,20 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
AMQMessage messageA = new TestMessage(id, id, info);
return messageA;
}
+
+ class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
+ {
+ QueueEntryList _list;
+
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
+ {
+ _list = new SimpleQueueEntryList(queue);
+ return _list;
+ }
+
+ public QueueEntryList getQueueEntryList()
+ {
+ return _list;
+ }
+ }
}