diff options
Diffstat (limited to 'qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r-- | qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java | 506 |
1 files changed, 302 insertions, 204 deletions
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index b0e5a510b8..5abc97cee9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -29,6 +29,8 @@ import static org.mockito.Matchers.contains; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; +import java.util.Arrays; +import java.util.EnumSet; import java.util.Map; import org.apache.log4j.Logger; @@ -38,13 +40,15 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; -import org.apache.qpid.server.subscription.MockSubscription; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.consumer.MockConsumer; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -65,7 +69,8 @@ public class SimpleAMQQueueTest extends QpidTestCase private String _owner = "owner"; private String _routingKey = "routing key"; private DirectExchange _exchange; - private MockSubscription _subscription = new MockSubscription(); + private MockConsumer _consumerTarget = new MockConsumer(); + private QueueConsumer _consumer; private Map<String,Object> _arguments = null; @Override @@ -157,20 +162,21 @@ public class SimpleAMQQueueTest extends QpidTestCase } - public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException + public void testRegisterConsumerThenEnqueueMessage() throws AMQException { - // Check adding a subscription adds it to the queue - _queue.registerSubscription(_subscription, false); - assertEquals("Subscription did not get queue", _queue, - _subscription.getQueue()); + ServerMessage messageA = createMessage(new Long(24)); + + // Check adding a consumer adds it to the queue + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, - _queue.getActiveConsumerCount()); + _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber - ServerMessage messageA = createMessage(new Long(24)); - _queue.enqueue(messageA); + _queue.enqueue(messageA, null); try { Thread.sleep(2000L); @@ -178,45 +184,51 @@ public class SimpleAMQQueueTest extends QpidTestCase catch(InterruptedException e) { } - assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull(((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); + assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); + assertNull(_consumer.getQueueContext().getReleasedEntry()); - // Check removing the subscription removes it's information from the queue - _queue.unregisterSubscription(_subscription); - assertTrue("Subscription still had queue", _subscription.isClosed()); + // Check removing the consumer removes it's information from the queue + _consumer.close(); + assertTrue("Consumer still had queue", _consumerTarget.isClosed()); assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); assertFalse("Queue still has active consumer", - 1 == _queue.getActiveConsumerCount()); + 1 == _queue.getActiveConsumerCount()); ServerMessage messageB = createMessage(new Long (25)); - _queue.enqueue(messageB); - assertNull(_subscription.getQueueContext()); + _queue.enqueue(messageB, null); + assertNull(_consumer.getQueueContext()); } - public void testEnqueueMessageThenRegisterSubscription() throws AMQException, InterruptedException + public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException { ServerMessage messageA = createMessage(new Long(24)); - _queue.enqueue(messageA); - _queue.registerSubscription(_subscription, false); + _queue.enqueue(messageA, null); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); Thread.sleep(150); - assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); + assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); + assertNull("There should be no releasedEntry after an enqueue", + _consumer.getQueueContext().getReleasedEntry()); } /** * Tests enqueuing two messages. */ - public void testEnqueueTwoMessagesThenRegisterSubscription() throws Exception + public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception { ServerMessage messageA = createMessage(new Long(24)); ServerMessage messageB = createMessage(new Long(25)); - _queue.enqueue(messageA); - _queue.enqueue(messageB); - _queue.registerSubscription(_subscription, false); + _queue.enqueue(messageA, null); + _queue.enqueue(messageB, null); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); Thread.sleep(150); - assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); + assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage()); + assertNull("There should be no releasedEntry after enqueues", + _consumer.getQueueContext().getReleasedEntry()); } /** @@ -225,21 +237,19 @@ public class SimpleAMQQueueTest extends QpidTestCase */ public void testReleasedMessageIsResentToSubscriber() throws Exception { - _queue.registerSubscription(_subscription, false); - - final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - PostEnqueueAction postEnqueueAction = new PostEnqueueAction() - { - public void onEnqueue(QueueEntry entry) - { - queueEntries.add(entry); - } - }; ServerMessage messageA = createMessage(new Long(24)); ServerMessage messageB = createMessage(new Long(25)); ServerMessage messageC = createMessage(new Long(26)); + + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); + /* Enqueue three messages */ _queue.enqueue(messageA, postEnqueueAction); @@ -248,7 +258,9 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 3, + _consumerTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); @@ -259,11 +271,14 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 4, + _consumerTarget.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", + _consumer.getQueueContext().getReleasedEntry()); } /** @@ -273,20 +288,17 @@ public class SimpleAMQQueueTest extends QpidTestCase */ public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception { - _queue.registerSubscription(_subscription, false); + ServerMessage messageA = createMessage(new Long(24)); + + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - PostEnqueueAction postEnqueueAction = new PostEnqueueAction() - { - public void onEnqueue(QueueEntry entry) - { - queueEntries.add(entry); - } - }; + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); /* Enqueue one message with expiration set for a short time in the future */ - ServerMessage messageA = createMessage(new Long(24)); int messageExpirationOffset = 200; final long expiration = System.currentTimeMillis() + messageExpirationOffset; when(messageA.getExpiration()).thenReturn(expiration); @@ -296,7 +308,9 @@ public class SimpleAMQQueueTest extends QpidTestCase int subFlushWaitTime = 150; Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 1, + _consumerTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */ @@ -306,9 +320,12 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); - assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size()); + assertEquals("Total number of messages sent should not have changed", + 1, + _consumerTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", + _consumer.getQueueContext().getReleasedEntry()); } @@ -320,21 +337,18 @@ public class SimpleAMQQueueTest extends QpidTestCase */ public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception { - _queue.registerSubscription(_subscription, false); - - final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - PostEnqueueAction postEnqueueAction = new PostEnqueueAction() - { - public void onEnqueue(QueueEntry entry) - { - queueEntries.add(entry); - } - }; ServerMessage messageA = createMessage(new Long(24)); ServerMessage messageB = createMessage(new Long(25)); ServerMessage messageC = createMessage(new Long(26)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); + /* Enqueue three messages */ _queue.enqueue(messageA, postEnqueueAction); @@ -343,7 +357,9 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 3, + _consumerTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); @@ -355,37 +371,41 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 5, + _consumerTarget.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", + _consumer.getQueueContext().getReleasedEntry()); } /** - * Tests that a release requeues an entry for a queue with multiple subscriptions. Verifies that a + * Tests that a release requeues an entry for a queue with multiple consumers. Verifies that a * requeue resends a message to a <i>single</i> subscriber. */ - public void testReleaseForQueueWithMultipleSubscriptions() throws Exception + public void testReleaseForQueueWithMultipleConsumers() throws Exception { - MockSubscription subscription1 = new MockSubscription(); - MockSubscription subscription2 = new MockSubscription(); + ServerMessage messageA = createMessage(new Long(24)); + ServerMessage messageB = createMessage(new Long(25)); - _queue.registerSubscription(subscription1, false); - _queue.registerSubscription(subscription2, false); + MockConsumer target1 = new MockConsumer(); + MockConsumer target2 = new MockConsumer(); - final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - PostEnqueueAction postEnqueueAction = new PostEnqueueAction() - { - public void onEnqueue(QueueEntry entry) - { - queueEntries.add(entry); - } - }; - ServerMessage messageA = createMessage(new Long(24)); - ServerMessage messageB = createMessage(new Long(25)); + QueueConsumer consumer1 = _queue.addConsumer(target1, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + QueueConsumer consumer2 = _queue.addConsumer(target2, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + + final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); /* Enqueue two messages */ @@ -394,32 +414,40 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size()); + assertEquals("Unexpected total number of messages sent to both after enqueue", + 2, + target1.getMessages().size() + target2.getMessages().size()); /* Now release the first message only, causing it to be requeued */ queueEntries.get(0).release(); Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext()).getReleasedEntry()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext()).getReleasedEntry()); + assertEquals("Unexpected total number of messages sent to both consumers after release", + 3, + target1.getMessages().size() + target2.getMessages().size()); + assertNull("releasedEntry should be cleared after requeue processed", + consumer1.getQueueContext().getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", + consumer2.getQueueContext().getReleasedEntry()); } public void testExclusiveConsumer() throws AMQException { - // Check adding an exclusive subscription adds it to the queue - _queue.registerSubscription(_subscription, true); - assertEquals("Subscription did not get queue", _queue, - _subscription.getQueue()); + ServerMessage messageA = createMessage(new Long(24)); + // Check adding an exclusive consumer adds it to the queue + + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + assertEquals("Queue does not have consumer", 1, - _queue.getConsumerCount()); + _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, - _queue.getActiveConsumerCount()); + _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber - ServerMessage messageA = createMessage(new Long(24)); - _queue.enqueue(messageA); + _queue.enqueue(messageA, null); try { Thread.sleep(2000L); @@ -427,14 +455,18 @@ public class SimpleAMQQueueTest extends QpidTestCase catch (InterruptedException e) { } - assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); + assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); // Check we cannot add a second subscriber to the queue - Subscription subB = new MockSubscription(); + MockConsumer subB = new MockConsumer(); Exception ex = null; try { - _queue.registerSubscription(subB, false); + + _queue.addConsumer(subB, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + } catch (AMQException e) { @@ -443,12 +475,18 @@ public class SimpleAMQQueueTest extends QpidTestCase assertNotNull(ex); // Check we cannot add an exclusive subscriber to a queue with an - // existing subscription - _queue.unregisterSubscription(_subscription); - _queue.registerSubscription(_subscription, false); + // existing consumer + _consumer.close(); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + try { - _queue.registerSubscription(subB, true); + + _consumer = _queue.addConsumer(subB, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.EXCLUSIVE)); + } catch (AMQException e) { @@ -462,23 +500,45 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.stop(); _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP); _queue.setDeleteOnNoConsumers(true); - _queue.registerSubscription(_subscription, false); - ServerMessage message = createMessage(new Long(25)); - _queue.enqueue(message); - _queue.unregisterSubscription(_subscription); - assertTrue("Queue was not deleted when subscription was removed", + + ServerMessage message = createMessage(new Long(25)); + _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + _queue.enqueue(message, null); + _consumer.close(); + assertTrue("Queue was not deleted when consumer was removed", _queue.isDeleted()); } public void testResend() throws Exception { - _queue.registerSubscription(_subscription, false); Long id = new Long(26); ServerMessage message = createMessage(id); - _queue.enqueue(message); - QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry(); - entry.setRedelivered(); - _queue.resend(entry, _subscription); + + _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + + _queue.enqueue(message, new Action<MessageInstance<? extends Consumer>>() + { + @Override + public void performAction(final MessageInstance<? extends Consumer> object) + { + QueueEntry entry = (QueueEntry) object; + entry.setRedelivered(); + try + { + _consumer.resend(entry); + } + catch (AMQException e) + { + fail("Exception thrown: " + e.getMessage()); + } + } + }); + + } @@ -489,7 +549,7 @@ public class SimpleAMQQueueTest extends QpidTestCase ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message); + _queue.enqueue(message, null); // Get message id Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); @@ -505,7 +565,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Long messageId = new Long(i); ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message); + _queue.enqueue(message, null); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5); @@ -526,7 +586,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Long messageId = new Long(i); ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message); + _queue.enqueue(message, null); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5); @@ -547,7 +607,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Long messageId = new Long(i); ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message); + _queue.enqueue(message, null); } // Get non-existent 0th QueueEntry & check returned list was empty @@ -605,19 +665,19 @@ public class SimpleAMQQueueTest extends QpidTestCase /** * processQueue() is used when asynchronously delivering messages to - * subscriptions which could not be delivered immediately during the + * consumers which could not be delivered immediately during the * enqueue() operation. * * A defect within the method would mean that delivery of these messages may * not occur should the Runner stop before all messages have been processed. * Such a defect was discovered when Selectors were used such that one and - * only one subscription can/will accept any given messages, but multiple - * subscriptions are present, and one of the earlier subscriptions receives + * only one consumer can/will accept any given messages, but multiple + * consumers are present, and one of the earlier consumers receives * more messages than the others. * * This test is to validate that the processQueue() method is able to * correctly deliver all of the messages present for asynchronous delivery - * to subscriptions in such a scenario. + * to consumers in such a scenario. */ public void testProcessQueueWithUniqueSelectors() throws Exception { @@ -626,10 +686,10 @@ public class SimpleAMQQueueTest extends QpidTestCase false, false, _virtualHost, factory, null) { @Override - public void deliverAsync(Subscription sub) + public void deliverAsync(QueueConsumer sub) { // do nothing, i.e prevent deliveries by the SubFlushRunner - // when registering the new subscriptions + // when registering the new consumers } }; @@ -645,25 +705,28 @@ public class SimpleAMQQueueTest extends QpidTestCase QueueEntry msg4 = list.add(createMessage(4L)); QueueEntry msg5 = list.add(createMessage(5L)); - // Create lists of the entries each subscription should be interested - // in.Bias over 50% of the messages to the first subscription so that - // the later subscriptions reject them and report being done before - // the first subscription as the processQueue method proceeds. - List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3); - List<QueueEntry> msgListSub2 = createEntriesList(msg4); - List<QueueEntry> msgListSub3 = createEntriesList(msg5); - - MockSubscription sub1 = new MockSubscription(msgListSub1); - MockSubscription sub2 = new MockSubscription(msgListSub2); - MockSubscription sub3 = new MockSubscription(msgListSub3); - - // register the subscriptions - testQueue.registerSubscription(sub1, false); - testQueue.registerSubscription(sub2, false); - testQueue.registerSubscription(sub3, false); + // Create lists of the entries each consumer should be interested + // in.Bias over 50% of the messages to the first consumer so that + // the later consumers reject them and report being done before + // the first consumer as the processQueue method proceeds. + List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3); + List<String> msgListSub2 = createEntriesList(msg4); + List<String> msgListSub3 = createEntriesList(msg5); + + MockConsumer sub1 = new MockConsumer(msgListSub1); + MockConsumer sub2 = new MockConsumer(msgListSub2); + MockConsumer sub3 = new MockConsumer(msgListSub3); + + // register the consumers + testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); //check that no messages have been delivered to the - //subscriptions during registration + //consumers during registration assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size()); assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size()); assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); @@ -680,9 +743,9 @@ public class SimpleAMQQueueTest extends QpidTestCase }); // check expected messages delivered to correct consumers - verifyReceivedMessages(msgListSub1, sub1.getMessages()); - verifyReceivedMessages(msgListSub2, sub2.getMessages()); - verifyReceivedMessages(msgListSub3, sub3.getMessages()); + verifyReceivedMessages(Arrays.asList((MessageInstance)msg1,msg2,msg3), sub1.getMessages()); + verifyReceivedMessages(Collections.singletonList((MessageInstance)msg4), sub2.getMessages()); + verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages()); } /** @@ -850,7 +913,7 @@ public class SimpleAMQQueueTest extends QpidTestCase false, "testOwner", false, false, _virtualHost, null) { @Override - public void deliverAsync(Subscription sub) + public void deliverAsync(QueueConsumer sub) { // do nothing } @@ -865,15 +928,15 @@ public class SimpleAMQQueueTest extends QpidTestCase // latch to wait for message receipt final CountDownLatch latch = new CountDownLatch(messageNumber -1); - // create a subscription - MockSubscription subscription = new MockSubscription() + // create a consumer + MockConsumer consumer = new MockConsumer() { /** * Send a message and decrement latch * @param entry * @param batch */ - public void send(QueueEntry entry, boolean batch) throws AMQException + public void send(MessageInstance entry, boolean batch) throws AMQException { super.send(entry, batch); latch.countDown(); @@ -883,7 +946,12 @@ public class SimpleAMQQueueTest extends QpidTestCase try { // subscribe - testQueue.registerSubscription(subscription, false); + testQueue.addConsumer(consumer, + null, + entries.get(0).getMessage().getClass(), + "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); // process queue testQueue.processQueue(new QueueRunner(testQueue) @@ -907,12 +975,12 @@ public class SimpleAMQQueueTest extends QpidTestCase { Thread.currentThread().interrupt(); } - List<QueueEntry> expected = createEntriesList(entries.get(0), entries.get(2), entries.get(3)); - verifyReceivedMessages(expected, subscription.getMessages()); + List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3)); + verifyReceivedMessages(expected, consumer.getMessages()); } /** - * Tests that entry in dequeued state are not enqueued and not delivered to subscription + * Tests that entry in dequeued state are not enqueued and not delivered to consumer */ public void testEnqueueDequeuedEntry() { @@ -948,7 +1016,7 @@ public class SimpleAMQQueueTest extends QpidTestCase } @Override - public boolean acquire(Subscription sub) + public boolean acquire(QueueConsumer sub) { if(message.getMessageNumber() % 2 == 0) { @@ -964,24 +1032,29 @@ public class SimpleAMQQueueTest extends QpidTestCase }; } }, null); - // create a subscription - MockSubscription subscription = new MockSubscription(); + // create a consumer + MockConsumer consumer = new MockConsumer(); - // register subscription + // register consumer try { - queue.registerSubscription(subscription, false); + queue.addConsumer(consumer, + null, + createMessage(-1l).getClass(), + "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); } catch (AMQException e) { - fail("Failure to register subscription:" + e.getMessage()); + fail("Failure to register consumer:" + e.getMessage()); } // put test messages into a queue putGivenNumberOfMessages(queue, 4); // assert received messages - List<QueueEntry> messages = subscription.getMessages(); + List<MessageInstance> messages = consumer.getMessages(); assertEquals("Only 2 messages should be returned", 2, messages.size()); assertEquals("ID of first message should be 1", 1l, (messages.get(0).getMessage()).getMessageNumber()); @@ -994,55 +1067,64 @@ public class SimpleAMQQueueTest extends QpidTestCase final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false, "testOwner", false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null); - //verify adding an active subscription increases the count - final MockSubscription subscription1 = new MockSubscription(); - subscription1.setActive(true); + //verify adding an active consumer increases the count + final MockConsumer consumer1 = new MockConsumer(); + consumer1.setActive(true); + consumer1.setState(ConsumerTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - queue.registerSubscription(subscription1, false); + queue.addConsumer(consumer1, + null, + createMessage(-1l).getClass(), + "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify adding an inactive subscription doesn't increase the count - final MockSubscription subscription2 = new MockSubscription(); - subscription2.setActive(false); + //verify adding an inactive consumer doesn't increase the count + final MockConsumer consumer2 = new MockConsumer(); + consumer2.setActive(false); + consumer2.setState(ConsumerTarget.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - queue.registerSubscription(subscription2, false); + queue.addConsumer(consumer2, + null, + createMessage(-1l).getClass(), + "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify behaviour in face of expected state changes: - //verify a subscription going suspended->active increases the count - queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE); + //verify a consumer going suspended->active increases the count + consumer2.setState(ConsumerTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount()); - //verify a subscription going active->suspended decreases the count - queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED); + //verify a consumer going active->suspended decreases the count + consumer2.setState(ConsumerTarget.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify a subscription going suspended->closed doesn't change the count - queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED); + //verify a consumer going suspended->closed doesn't change the count + consumer2.setState(ConsumerTarget.State.CLOSED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify a subscription going active->closed decreases the count - queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED); - assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); + //verify a consumer going active->active doesn't change the count + consumer1.setState(ConsumerTarget.State.ACTIVE); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify behaviour in face of unexpected state changes: + consumer1.setState(ConsumerTarget.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - //verify a subscription going closed->active increases the count - queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE); - assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + //verify a consumer going suspended->suspended doesn't change the count + consumer1.setState(ConsumerTarget.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - //verify a subscription going active->active doesn't change the count - queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE); + consumer1.setState(ConsumerTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify a subscription going closed->suspended doesn't change the count - queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED); - assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + //verify a consumer going active->closed decreases the count + consumer1.setState(ConsumerTarget.State.CLOSED); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - //verify a subscription going suspended->suspended doesn't change the count - queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED); - assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); } public void testNotificationFiredOnEnqueue() throws Exception @@ -1052,10 +1134,10 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.setNotificationListener(listener); _queue.setMaximumMessageCount(2); - _queue.enqueue(createMessage(new Long(24))); + _queue.enqueue(createMessage(new Long(24)), null); verifyZeroInteractions(listener); - _queue.enqueue(createMessage(new Long(25))); + _queue.enqueue(createMessage(new Long(25)), null); verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold")); } @@ -1064,9 +1146,9 @@ public class SimpleAMQQueueTest extends QpidTestCase { AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class); - _queue.enqueue(createMessage(new Long(24))); - _queue.enqueue(createMessage(new Long(25))); - _queue.enqueue(createMessage(new Long(26))); + _queue.enqueue(createMessage(new Long(24)), null); + _queue.enqueue(createMessage(new Long(25)), null); + _queue.enqueue(createMessage(new Long(26)), null); _queue.setNotificationListener(listener); _queue.setMaximumMessageCount(2); @@ -1132,7 +1214,7 @@ public class SimpleAMQQueueTest extends QpidTestCase // Put message on queue try { - queue.enqueue(message); + queue.enqueue(message,null); } catch (AMQException e) { @@ -1167,23 +1249,23 @@ public class SimpleAMQQueueTest extends QpidTestCase return entry; } - private List<QueueEntry> createEntriesList(QueueEntry... entries) + private List<String> createEntriesList(QueueEntry... entries) { - ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>(); + ArrayList<String> entriesList = new ArrayList<String>(); for (QueueEntry entry : entries) { - entriesList.add(entry); + entriesList.add(entry.getMessage().getMessageHeader().getMessageId()); } return entriesList; } - private void verifyReceivedMessages(List<QueueEntry> expected, - List<QueueEntry> delivered) + private void verifyReceivedMessages(List<MessageInstance> expected, + List<MessageInstance> delivered) { assertEquals("Consumer did not receive the expected number of messages", expected.size(), delivered.size()); - for (QueueEntry msg : expected) + for (MessageInstance msg : expected) { assertTrue("Consumer did not receive msg: " + msg.getMessage().getMessageNumber(), delivered.contains(msg)); @@ -1195,9 +1277,9 @@ public class SimpleAMQQueueTest extends QpidTestCase return _queue; } - public MockSubscription getSubscription() + public MockConsumer getConsumer() { - return _subscription; + return _consumerTarget; } public Map<String,Object> getArguments() @@ -1213,20 +1295,36 @@ public class SimpleAMQQueueTest extends QpidTestCase protected ServerMessage createMessage(Long id) throws AMQException { + AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getMessageId()).thenReturn(String.valueOf(id)); ServerMessage message = mock(ServerMessage.class); when(message.getMessageNumber()).thenReturn(id); + when(message.getMessageHeader()).thenReturn(header); MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); - AMQMessageHeader hdr = mock(AMQMessageHeader.class); - when(message.getMessageHeader()).thenReturn(hdr); when(message.newReference()).thenReturn(ref); return message; } + private static class EntryListAddingAction implements Action<MessageInstance<? extends Consumer>> + { + private final ArrayList<QueueEntry> _queueEntries; + + public EntryListAddingAction(final ArrayList<QueueEntry> queueEntries) + { + _queueEntries = queueEntries; + } + + public void performAction(MessageInstance<? extends Consumer> entry) + { + _queueEntries.add((QueueEntry) entry); + } + } + class TestSimpleQueueEntryListFactory implements QueueEntryListFactory { QueueEntryList _list; |