diff options
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java | 754 |
1 files changed, 79 insertions, 675 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 0e136c523f..67d093d00a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -36,16 +36,13 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; -import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -54,8 +51,6 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; public class SimpleAMQQueueTest extends InternalBrokerBaseCase { @@ -232,10 +227,10 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } /** - * Tests that a released queue entry is resent to the subscriber. Verifies also that the + * Tests that a re-queued message is resent to the subscriber. Verifies also that the * QueueContext._releasedEntry is reset to null after the entry has been reset. */ - public void testReleasedMessageIsResentToSubscriber() throws Exception + public void testRequeuedMessageIsResentToSubscriber() throws Exception { _queue.registerSubscription(_subscription, false); @@ -258,18 +253,19 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.enqueue(messageB, postEnqueueAction); _queue.enqueue(messageC, postEnqueueAction); - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(150); // Work done by SubFlushRunner Thread assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); - /* Now release the first message only, causing it to be requeued */ + /* Now requeue the first message only */ queueEntries.get(0).release(); + _queue.requeue(queueEntries.get(0)); - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(150); // Work done by SubFlushRunner Thread assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); @@ -279,11 +275,11 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } /** - * Tests that a released message that becomes expired is not resent to the subscriber. + * Tests that a re-queued message that becomes expired is not resent to the subscriber. * This tests ensures that SimpleAMQQueueEntry.getNextAvailableEntry avoids expired entries. * Verifies also that the QueueContext._releasedEntry is reset to null after the entry has been reset. */ - public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception + public void testRequeuedMessageThatBecomesExpiredIsNotRedelivered() throws Exception { _queue.registerSubscription(_subscription, false); @@ -305,16 +301,17 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.enqueue(messageA, postEnqueueAction); int subFlushWaitTime = 150; - Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); - /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */ + /* Wait a little more to be sure that message will have expired, then requeue it */ Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10); queueEntries.get(0).release(); + _queue.requeue(queueEntries.get(0)); - Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size()); @@ -324,12 +321,12 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } /** - * Tests that if a client releases entries 'out of order' (the order + * Tests that if a client requeues messages 'out of order' (the order * used by QueueEntryImpl.compareTo) that messages are still resent * successfully. Specifically this test ensures the {@see SimpleAMQQueue#requeue()} * can correctly move the _releasedEntry to an earlier position in the QueueEntry list. */ - public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception + public void testMessagesRequeuedOutOfComparableOrderAreDelivered() throws Exception { _queue.registerSubscription(_subscription, false); @@ -352,19 +349,21 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.enqueue(messageB, postEnqueueAction); _queue.enqueue(messageC, postEnqueueAction); - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(150); // Work done by SubFlushRunner Thread assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); - /* Now release the third and first message only, causing it to be requeued */ + /* Now requeue the third and first message only */ queueEntries.get(2).release(); queueEntries.get(0).release(); + _queue.requeue(queueEntries.get(2)); + _queue.requeue(queueEntries.get(0)); - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(150); // Work done by SubFlushRunner Thread assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); @@ -375,10 +374,10 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** - * Tests that a release requeues an entry for a queue with multiple subscriptions. Verifies that a + * Tests a requeue for a queue with multiple subscriptions. Verifies that a * requeue resends a message to a <i>single</i> subscriber. */ - public void testReleaseForQueueWithMultipleSubscriptions() throws Exception + public void testRequeueForQueueWithMultipleSubscriptions() throws Exception { MockSubscription subscription1 = new MockSubscription(); MockSubscription subscription2 = new MockSubscription(); @@ -403,16 +402,66 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.enqueue(messageA, postEnqueueAction); _queue.enqueue(messageB, postEnqueueAction); - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(150); // Work done by SubFlushRunner Thread - assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size()); + assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size()); + assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size()); - /* Now release the first message only, causing it to be requeued */ - queueEntries.get(0).release(); + /* Now requeue a message (for any subscription) */ - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + queueEntries.get(0).release(); + _queue.requeue((QueueEntryImpl)queueEntries.get(0)); + + Thread.sleep(150); // Work done by SubFlushRunner Thread - assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size()); + assertEquals("Unexpected total number of messages sent to all subscriptions after requeue", 3, subscription1.getMessages().size() + subscription2.getMessages().size()); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry); + } + + /** + * Tests a requeue for a queue with multiple subscriptions. Verifies that a + * subscriber specific requeue resends the message to <i>that</i> subscriber. + */ + public void testSubscriptionSpecificRequeueForQueueWithMultipleSubscriptions() throws Exception + { + MockSubscription subscription1 = new MockSubscription(); + MockSubscription subscription2 = new MockSubscription(); + + _queue.registerSubscription(subscription1, false); + _queue.registerSubscription(subscription2, false); + + final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + PostEnqueueAction postEnqueueAction = new PostEnqueueAction() + { + public void onEnqueue(QueueEntry entry) + { + queueEntries.add(entry); + } + }; + + AMQMessage messageA = createMessage(new Long(24)); + AMQMessage messageB = createMessage(new Long(25)); + + /* Enqueue two messages */ + + _queue.enqueue(messageA, postEnqueueAction); + _queue.enqueue(messageB, postEnqueueAction); + + Thread.sleep(150); // Work done by SubFlushRunner Thread + + assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size()); + assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size()); + + /* Now requeue a message (for first subscription) */ + + queueEntries.get(0).release(); + _queue.requeue((QueueEntryImpl)queueEntries.get(0), subscription1); + + Thread.sleep(150); // Work done by SubFlushRunner Thread + + assertEquals("Unexpected total number of messages sent to subscription1 after requeue", 2, subscription1.getMessages().size()); + assertEquals("Unexpected total number of messages sent to subscription2 after requeue", 1, subscription2.getMessages().size()); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry); } @@ -611,8 +660,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // Create IncomingMessage and nondurable queue final IncomingMessage msg = new IncomingMessage(info); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.setProperties(new BasicContentHeaderProperties()); - ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) 2); + contentHeaderBody.properties = new BasicContentHeaderProperties(); + ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); msg.setContentHeaderBody(contentHeaderBody); final ArrayList<BaseQueue> qs = new ArrayList<BaseQueue>(); @@ -658,635 +707,6 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } - /** - * processQueue() is used when asynchronously delivering messages to - * subscriptions which could not be delivered immediately during the - * enqueue() operation. - * - * A defect within the method would mean that delivery of these messages may - * not occur should the Runner stop before all messages have been processed. - * Such a defect was discovered when Selectors were used such that one and - * only one subscription can/will accept any given messages, but multiple - * subscriptions are present, and one of the earlier subscriptions receives - * more messages than the others. - * - * This test is to validate that the processQueue() method is able to - * correctly deliver all of the messages present for asynchronous delivery - * to subscriptions in such a scenario. - */ - public void testProcessQueueWithUniqueSelectors() throws Exception - { - TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); - SimpleAMQQueue testQueue = new SimpleAMQQueue("testQueue", false, "testOwner",false, - false, _virtualHost, factory, null) - { - @Override - public void deliverAsync(Subscription sub) - { - // do nothing, i.e prevent deliveries by the SubFlushRunner - // when registering the new subscriptions - } - }; - - // retrieve the QueueEntryList the queue creates and insert the test - // messages, thus avoiding straight-through delivery attempts during - //enqueue() process. - QueueEntryList list = factory.getQueueEntryList(); - assertNotNull("QueueEntryList should have been created", list); - - QueueEntry msg1 = list.add(createMessage(1L)); - QueueEntry msg2 = list.add(createMessage(2L)); - QueueEntry msg3 = list.add(createMessage(3L)); - QueueEntry msg4 = list.add(createMessage(4L)); - QueueEntry msg5 = list.add(createMessage(5L)); - - // Create lists of the entries each subscription should be interested - // in.Bias over 50% of the messages to the first subscription so that - // the later subscriptions reject them and report being done before - // the first subscription as the processQueue method proceeds. - List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3); - List<QueueEntry> msgListSub2 = createEntriesList(msg4); - List<QueueEntry> msgListSub3 = createEntriesList(msg5); - - MockSubscription sub1 = new MockSubscription(msgListSub1); - MockSubscription sub2 = new MockSubscription(msgListSub2); - MockSubscription sub3 = new MockSubscription(msgListSub3); - - // register the subscriptions - testQueue.registerSubscription(sub1, false); - testQueue.registerSubscription(sub2, false); - testQueue.registerSubscription(sub3, false); - - //check that no messages have been delivered to the - //subscriptions during registration - assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size()); - assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size()); - assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); - - // call processQueue to deliver the messages - testQueue.processQueue(new QueueRunner(testQueue, 1) - { - @Override - public void run() - { - // we dont actually want/need this runner to do any work - // because we we are already doing it! - } - }); - - // check expected messages delivered to correct consumers - verifyRecievedMessages(msgListSub1, sub1.getMessages()); - verifyRecievedMessages(msgListSub2, sub2.getMessages()); - verifyRecievedMessages(msgListSub3, sub3.getMessages()); - } - - /** - * Tests that dequeued message is not present in the list returned form - * {@link SimpleAMQQueue#getMessagesOnTheQueue()} - */ - public void testGetMessagesOnTheQueueWithDequeuedEntry() - { - int messageNumber = 4; - int dequeueMessageIndex = 1; - - // send test messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message - dequeueMessage(_queue, dequeueMessageIndex); - - // get messages on the queue - List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); - - // assert queue entries - assertEquals(messageNumber - 1, entries.size()); - int expectedId = 0; - for (int i = 0; i < messageNumber - 1; i++) - { - Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId(); - if (i == dequeueMessageIndex) - { - assertFalse("Message with id " + dequeueMessageIndex - + " was dequeued and should not be returned by method getMessagesOnTheQueue!", - new Long(expectedId).equals(id)); - expectedId++; - } - assertEquals("Expected message with id " + expectedId + " but got message with id " + id, - new Long(expectedId), id); - expectedId++; - } - } - - /** - * Tests that dequeued message is not present in the list returned form - * {@link SimpleAMQQueue#getMessagesOnTheQueue(QueueEntryFilter)} - */ - public void testGetMessagesOnTheQueueByQueueEntryFilterWithDequeuedEntry() - { - int messageNumber = 4; - int dequeueMessageIndex = 1; - - // send test messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message - dequeueMessage(_queue, dequeueMessageIndex); - - // get messages on the queue with filter accepting all available messages - List<QueueEntry> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter() - { - public boolean accept(QueueEntry entry) - { - return true; - } - - public boolean filterComplete() - { - return false; - } - }); - - // assert entries on the queue - assertEquals(messageNumber - 1, entries.size()); - int expectedId = 0; - for (int i = 0; i < messageNumber - 1; i++) - { - Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId(); - if (i == dequeueMessageIndex) - { - assertFalse("Message with id " + dequeueMessageIndex - + " was dequeued and should not be returned by method getMessagesOnTheQueue!", - new Long(expectedId).equals(id)); - expectedId++; - } - assertEquals("Expected message with id " + expectedId + " but got message with id " + id, - new Long(expectedId), id); - expectedId++; - } - } - - /** - * Tests that dequeued message is not copied as part of invocation of - * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)} - */ - public void testCopyMessagesWithDequeuedEntry() - { - int messageNumber = 4; - int dequeueMessageIndex = 1; - String anotherQueueName = "testQueue2"; - - // put test messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message - dequeueMessage(_queue, dequeueMessageIndex); - - // create another queue - SimpleAMQQueue queue = createQueue(anotherQueueName); - - // create transaction - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); - - // copy messages into another queue - _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); - - // commit transaction - txn.commit(); - - // get messages on another queue - List<QueueEntry> entries = queue.getMessagesOnTheQueue(); - - // assert another queue entries - assertEquals(messageNumber - 1, entries.size()); - int expectedId = 0; - for (int i = 0; i < messageNumber - 1; i++) - { - Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId(); - if (i == dequeueMessageIndex) - { - assertFalse("Message with id " + dequeueMessageIndex - + " was dequeued and should not been copied into another queue!", - new Long(expectedId).equals(id)); - expectedId++; - } - assertEquals("Expected message with id " + expectedId + " but got message with id " + id, - new Long(expectedId), id); - expectedId++; - } - } - - /** - * Tests that dequeued message is not moved as part of invocation of - * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)} - */ - public void testMovedMessagesWithDequeuedEntry() - { - int messageNumber = 4; - int dequeueMessageIndex = 1; - String anotherQueueName = "testQueue2"; - - // put messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message - dequeueMessage(_queue, dequeueMessageIndex); - - // create another queue - SimpleAMQQueue queue = createQueue(anotherQueueName); - - // create transaction - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); - - // move messages into another queue - _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); - - // commit transaction - txn.commit(); - - // get messages on another queue - List<QueueEntry> entries = queue.getMessagesOnTheQueue(); - - // assert another queue entries - assertEquals(messageNumber - 1, entries.size()); - int expectedId = 0; - for (int i = 0; i < messageNumber - 1; i++) - { - Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId(); - if (i == dequeueMessageIndex) - { - assertFalse("Message with id " + dequeueMessageIndex - + " was dequeued and should not been copied into another queue!", - new Long(expectedId).equals(id)); - expectedId++; - } - assertEquals("Expected message with id " + expectedId + " but got message with id " + id, - new Long(expectedId), id); - expectedId++; - } - } - - /** - * Tests that messages in given range including dequeued one are deleted - * from the queue on invocation of - * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)} - */ - public void testRemoveMessagesFromQueueWithDequeuedEntry() - { - int messageNumber = 4; - int dequeueMessageIndex = 1; - - // put messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message - dequeueMessage(_queue, dequeueMessageIndex); - - // remove messages - _queue.removeMessagesFromQueue(0, messageNumber); - - // get queue entries - List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); - - // assert queue entries - assertNotNull("Null is returned from getMessagesOnTheQueue", entries); - assertEquals("Queue should be empty", 0, entries.size()); - } - - /** - * Tests that dequeued message on the top is not accounted and next message - * is deleted from the queue on invocation of - * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)} - */ - public void testDeleteMessageFromTopWithDequeuedEntryOnTop() - { - int messageNumber = 4; - int dequeueMessageIndex = 0; - - // put messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message on top - dequeueMessage(_queue, dequeueMessageIndex); - - //delete message from top - _queue.deleteMessageFromTop(); - - //get queue netries - List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); - - // assert queue entries - assertNotNull("Null is returned from getMessagesOnTheQueue", entries); - assertEquals("Expected " + (messageNumber - 2) + " number of messages but recieved " + entries.size(), - messageNumber - 2, entries.size()); - assertEquals("Expected first entry with id 2", new Long(2), - ((AMQMessage) entries.get(0).getMessage()).getMessageId()); - } - - /** - * Tests that all messages including dequeued one are deleted from the queue - * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)} - */ - public void testClearQueueWithDequeuedEntry() - { - int messageNumber = 4; - int dequeueMessageIndex = 1; - - // put messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message on a test queue - dequeueMessage(_queue, dequeueMessageIndex); - - // clean queue - try - { - _queue.clearQueue(); - } - catch (AMQException e) - { - fail("Failure to clear queue:" + e.getMessage()); - } - - // get queue entries - List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); - - // assert queue entries - assertNotNull(entries); - assertEquals(0, entries.size()); - } - - /** - * Tests whether dequeued entry is sent to subscriber in result of - * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)} - */ - public void testProcessQueueWithDequeuedEntry() - { - // total number of messages to send - int messageNumber = 4; - int dequeueMessageIndex = 1; - - // create queue with overridden method deliverAsync - SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString("test"), false, - new AMQShortString("testOwner"), false, false, _virtualHost, null) - { - @Override - public void deliverAsync(Subscription sub) - { - // do nothing - } - }; - - // put messages - List<QueueEntry> entries = enqueueGivenNumberOfMessages(testQueue, messageNumber); - - // dequeue message - dequeueMessage(testQueue, dequeueMessageIndex); - - // latch to wait for message receipt - final CountDownLatch latch = new CountDownLatch(messageNumber -1); - - // create a subscription - MockSubscription subscription = new MockSubscription() - { - /** - * Send a message and decrement latch - */ - public void send(QueueEntry msg) throws AMQException - { - super.send(msg); - latch.countDown(); - } - }; - - try - { - // subscribe - testQueue.registerSubscription(subscription, false); - - // process queue - testQueue.processQueue(new QueueRunner(testQueue, 1) - { - public void run() - { - // do nothing - } - }); - } - catch (AMQException e) - { - fail("Failure to process queue:" + e.getMessage()); - } - // wait up to 1 minute for message receipt - try - { - latch.await(1, TimeUnit.MINUTES); - } - catch (InterruptedException e1) - { - Thread.currentThread().interrupt(); - } - List<QueueEntry> expected = createEntriesList(entries.get(0), entries.get(2), entries.get(3)); - verifyRecievedMessages(expected, subscription.getMessages()); - } - - /** - * Tests that entry in dequeued state are not enqueued and not delivered to subscription - */ - public void testEqueueDequeuedEntry() - { - // create a queue where each even entry is considered a dequeued - SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("test"), false, new AMQShortString("testOwner"), - false, false, _virtualHost, new QueueEntryListFactory() - { - public QueueEntryList createQueueEntryList(AMQQueue queue) - { - /** - * Override SimpleQueueEntryList to create a dequeued - * entries for messages with even id - */ - return new SimpleQueueEntryList(queue) - { - /** - * Entries with even message id are considered - * dequeued! - */ - protected QueueEntryImpl createQueueEntry(final ServerMessage message) - { - return new QueueEntryImpl(this, message) - { - public boolean isDequeued() - { - return (((AMQMessage) message).getMessageId().longValue() % 2 == 0); - } - - public boolean isDispensed() - { - return (((AMQMessage) message).getMessageId().longValue() % 2 == 0); - } - - public boolean isAvailable() - { - return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0); - } - }; - } - }; - } - }, null); - // create a subscription - MockSubscription subscription = new MockSubscription(); - - // register subscription - try - { - queue.registerSubscription(subscription, false); - } - catch (AMQException e) - { - fail("Failure to register subscription:" + e.getMessage()); - } - - // put test messages into a queue - putGivenNumberOfMessages(queue, 4); - - // assert received messages - List<QueueEntry> messages = subscription.getMessages(); - assertEquals("Only 2 messages should be returned", 2, messages.size()); - assertEquals("ID of first message should be 1", new Long(1), - ((AMQMessage) messages.get(0).getMessage()).getMessageId()); - assertEquals("ID of second message should be 3", new Long(3), - ((AMQMessage) messages.get(1).getMessage()).getMessageId()); - } - - /** - * A helper method to create a queue with given name - * - * @param name - * queue name - * @return queue - */ - private SimpleAMQQueue createQueue(String name) - { - SimpleAMQQueue queue = null; - try - { - AMQShortString queueName = new AMQShortString(name); - AMQShortString ownerName = new AMQShortString(name + "Owner"); - queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(queueName, false, ownerName, false, false, - _virtualHost, _arguments); - } - catch (AMQException e) - { - fail("Failure to create a queue:" + e.getMessage()); - } - assertNotNull("Queue was not created", queue); - return queue; - } - - /** - * A helper method to put given number of messages into queue - * <p> - * All messages are asserted that they are present on queue - * - * @param queue - * queue to put messages into - * @param messageNumber - * number of messages to put into queue - */ - private List<QueueEntry> enqueueGivenNumberOfMessages(AMQQueue queue, int messageNumber) - { - putGivenNumberOfMessages(queue, messageNumber); - - // make sure that all enqueued messages are on the queue - List<QueueEntry> entries = queue.getMessagesOnTheQueue(); - assertEquals(messageNumber, entries.size()); - for (int i = 0; i < messageNumber; i++) - { - assertEquals(new Long(i), ((AMQMessage)entries.get(i).getMessage()).getMessageId()); - } - return entries; - } - - /** - * A helper method to put given number of messages into queue - * <p> - * Queue is not checked if messages are added into queue - * - * @param queue - * queue to put messages into - * @param messageNumber - * number of messages to put into queue - * @param queue - * @param messageNumber - */ - private void putGivenNumberOfMessages(AMQQueue queue, int messageNumber) - { - for (int i = 0; i < messageNumber; i++) - { - // Create message - Long messageId = new Long(i); - AMQMessage message = null; - try - { - message = createMessage(messageId); - } - catch (AMQException e) - { - fail("Failure to create a test message:" + e.getMessage()); - } - // Put message on queue - try - { - queue.enqueue(message); - } - catch (AMQException e) - { - fail("Failure to put message on queue:" + e.getMessage()); - } - } - } - - /** - * A helper method to dequeue an entry on queue with given index - * - * @param queue - * queue to dequeue message on - * @param dequeueMessageIndex - * entry index to dequeue. - */ - private QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex) - { - List<QueueEntry> entries = queue.getMessagesOnTheQueue(); - QueueEntry entry = entries.get(dequeueMessageIndex); - entry.acquire(); - entry.dequeue(); - assertTrue(entry.isDequeued()); - return entry; - } - - private List<QueueEntry> createEntriesList(QueueEntry... entries) - { - ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>(); - for (QueueEntry entry : entries) - { - entriesList.add(entry); - } - return entriesList; - } - - private void verifyRecievedMessages(List<QueueEntry> expected, - List<QueueEntry> delivered) - { - assertEquals("Consumer did not receive the expected number of messages", - expected.size(), delivered.size()); - - for (QueueEntry msg : expected) - { - assertTrue("Consumer did not recieve msg: " - + msg.getMessage().getMessageNumber(), delivered.contains(msg)); - } - } - public class TestMessage extends AMQMessage { private final long _tag; @@ -1327,20 +747,4 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase AMQMessage messageA = new TestMessage(id, id, info); return messageA; } - - class TestSimpleQueueEntryListFactory implements QueueEntryListFactory - { - QueueEntryList _list; - - public QueueEntryList createQueueEntryList(AMQQueue queue) - { - _list = new SimpleQueueEntryList(queue); - return _list; - } - - public QueueEntryList getQueueEntryList() - { - return _list; - } - } } |