diff options
Diffstat (limited to 'java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r-- | java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java | 302 |
1 files changed, 165 insertions, 137 deletions
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 51ae822b2e..542f6ba0d1 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -45,9 +45,9 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; -import org.apache.qpid.server.subscription.MockSubscription; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionTarget; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.consumer.MockConsumer; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -69,8 +69,8 @@ public class SimpleAMQQueueTest extends QpidTestCase private String _owner = "owner"; private String _routingKey = "routing key"; private DirectExchange _exchange; - private MockSubscription _subscriptionTarget = new MockSubscription(); - private QueueSubscription _subscription; + private MockConsumer _consumerTarget = new MockConsumer(); + private QueueConsumer _consumer; private Map<String,Object> _arguments = null; @Override @@ -162,13 +162,13 @@ public class SimpleAMQQueueTest extends QpidTestCase } - public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException + public void testRegisterConsumerThenEnqueueMessage() throws AMQException { ServerMessage messageA = createMessage(new Long(24)); - // Check adding a subscription adds it to the queue - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + // Check adding a consumer adds it to the queue + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, @@ -183,49 +183,49 @@ public class SimpleAMQQueueTest extends QpidTestCase catch(InterruptedException e) { } - assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull(_subscription.getQueueContext().getReleasedEntry()); + assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); + assertNull(_consumer.getQueueContext().getReleasedEntry()); - // Check removing the subscription removes it's information from the queue - _subscription.close(); - assertTrue("Subscription still had queue", _subscriptionTarget.isClosed()); + // Check removing the consumer removes it's information from the queue + _consumer.close(); + assertTrue("Consumer still had queue", _consumerTarget.isClosed()); assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); assertFalse("Queue still has active consumer", 1 == _queue.getActiveConsumerCount()); ServerMessage messageB = createMessage(new Long (25)); _queue.enqueue(messageB); - assertNull(_subscription.getQueueContext()); + assertNull(_consumer.getQueueContext()); } - public void testEnqueueMessageThenRegisterSubscription() throws AMQException, InterruptedException + public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException { ServerMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); Thread.sleep(150); - assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); + assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after an enqueue", - _subscription.getQueueContext().getReleasedEntry()); + _consumer.getQueueContext().getReleasedEntry()); } /** * Tests enqueuing two messages. */ - public void testEnqueueTwoMessagesThenRegisterSubscription() throws Exception + public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception { ServerMessage messageA = createMessage(new Long(24)); ServerMessage messageB = createMessage(new Long(25)); _queue.enqueue(messageA); _queue.enqueue(messageB); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); Thread.sleep(150); - assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage()); + assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after enqueues", - _subscription.getQueueContext().getReleasedEntry()); + _consumer.getQueueContext().getReleasedEntry()); } /** @@ -240,9 +240,9 @@ public class SimpleAMQQueueTest extends QpidTestCase ServerMessage messageC = createMessage(new Long(26)); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, - Subscription.Option.SEES_REQUEUES)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() @@ -261,7 +261,9 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 3, + _consumerTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); @@ -272,12 +274,14 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 4, _subscriptionTarget.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 4, + _consumerTarget.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered()); assertNull("releasedEntry should be cleared after requeue processed", - _subscription.getQueueContext().getReleasedEntry()); + _consumer.getQueueContext().getReleasedEntry()); } /** @@ -289,9 +293,9 @@ public class SimpleAMQQueueTest extends QpidTestCase { ServerMessage messageA = createMessage(new Long(24)); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() @@ -313,7 +317,9 @@ public class SimpleAMQQueueTest extends QpidTestCase int subFlushWaitTime = 150; Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 1, _subscriptionTarget.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 1, + _consumerTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */ @@ -323,10 +329,12 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); - assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size()); + assertEquals("Total number of messages sent should not have changed", + 1, + _consumerTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertNull("releasedEntry should be cleared after requeue processed", - _subscription.getQueueContext().getReleasedEntry()); + _consumer.getQueueContext().getReleasedEntry()); } @@ -343,9 +351,9 @@ public class SimpleAMQQueueTest extends QpidTestCase ServerMessage messageB = createMessage(new Long(25)); ServerMessage messageC = createMessage(new Long(26)); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, - Subscription.Option.SEES_REQUEUES)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() @@ -364,7 +372,9 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 3, + _consumerTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); @@ -376,35 +386,37 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 5, _subscriptionTarget.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 5, + _consumerTarget.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered()); assertNull("releasedEntry should be cleared after requeue processed", - _subscription.getQueueContext().getReleasedEntry()); + _consumer.getQueueContext().getReleasedEntry()); } /** - * Tests that a release requeues an entry for a queue with multiple subscriptions. Verifies that a + * Tests that a release requeues an entry for a queue with multiple consumers. Verifies that a * requeue resends a message to a <i>single</i> subscriber. */ - public void testReleaseForQueueWithMultipleSubscriptions() throws Exception + public void testReleaseForQueueWithMultipleConsumers() throws Exception { ServerMessage messageA = createMessage(new Long(24)); ServerMessage messageB = createMessage(new Long(25)); - MockSubscription target1 = new MockSubscription(); - MockSubscription target2 = new MockSubscription(); + MockConsumer target1 = new MockConsumer(); + MockConsumer target2 = new MockConsumer(); - QueueSubscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, - Subscription.Option.SEES_REQUEUES)); + QueueConsumer consumer1 = _queue.addConsumer(target1, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); - QueueSubscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, - Subscription.Option.SEES_REQUEUES)); + QueueConsumer consumer2 = _queue.addConsumer(target2, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); @@ -433,22 +445,22 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to both subscriptions after release", + assertEquals("Unexpected total number of messages sent to both consumers after release", 3, target1.getMessages().size() + target2.getMessages().size()); assertNull("releasedEntry should be cleared after requeue processed", - subscription1.getQueueContext().getReleasedEntry()); + consumer1.getQueueContext().getReleasedEntry()); assertNull("releasedEntry should be cleared after requeue processed", - subscription2.getQueueContext().getReleasedEntry()); + consumer2.getQueueContext().getReleasedEntry()); } public void testExclusiveConsumer() throws AMQException { ServerMessage messageA = createMessage(new Long(24)); - // Check adding an exclusive subscription adds it to the queue + // Check adding an exclusive consumer adds it to the queue - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.EXCLUSIVE)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.EXCLUSIVE)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); @@ -464,16 +476,16 @@ public class SimpleAMQQueueTest extends QpidTestCase catch (InterruptedException e) { } - assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); + assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); // Check we cannot add a second subscriber to the queue - MockSubscription subB = new MockSubscription(); + MockConsumer subB = new MockConsumer(); Exception ex = null; try { - _queue.registerSubscription(subB, null, messageA.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + _queue.addConsumer(subB, null, messageA.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); } catch (AMQException e) @@ -483,16 +495,16 @@ public class SimpleAMQQueueTest extends QpidTestCase assertNotNull(ex); // Check we cannot add an exclusive subscriber to a queue with an - // existing subscription - _subscription.close(); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + // existing consumer + _consumer.close(); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); try { - _subscription = _queue.registerSubscription(subB, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.EXCLUSIVE)); + _consumer = _queue.addConsumer(subB, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.EXCLUSIVE)); } catch (AMQException e) @@ -509,12 +521,12 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.setDeleteOnNoConsumers(true); ServerMessage message = createMessage(new Long(25)); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); _queue.enqueue(message); - _subscription.close(); - assertTrue("Queue was not deleted when subscription was removed", + _consumer.close(); + assertTrue("Queue was not deleted when consumer was removed", _queue.isDeleted()); } @@ -523,13 +535,13 @@ public class SimpleAMQQueueTest extends QpidTestCase Long id = new Long(26); ServerMessage message = createMessage(id); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); _queue.enqueue(message); - QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry(); + QueueEntry entry = _consumer.getQueueContext().getLastSeenEntry(); entry.setRedelivered(); - _subscription.resend(entry); + _consumer.resend(entry); } @@ -656,19 +668,19 @@ public class SimpleAMQQueueTest extends QpidTestCase /** * processQueue() is used when asynchronously delivering messages to - * subscriptions which could not be delivered immediately during the + * consumers which could not be delivered immediately during the * enqueue() operation. * * A defect within the method would mean that delivery of these messages may * not occur should the Runner stop before all messages have been processed. * Such a defect was discovered when Selectors were used such that one and - * only one subscription can/will accept any given messages, but multiple - * subscriptions are present, and one of the earlier subscriptions receives + * only one consumer can/will accept any given messages, but multiple + * consumers are present, and one of the earlier consumers receives * more messages than the others. * * This test is to validate that the processQueue() method is able to * correctly deliver all of the messages present for asynchronous delivery - * to subscriptions in such a scenario. + * to consumers in such a scenario. */ public void testProcessQueueWithUniqueSelectors() throws Exception { @@ -677,10 +689,10 @@ public class SimpleAMQQueueTest extends QpidTestCase false, false, _virtualHost, factory, null) { @Override - public void deliverAsync(QueueSubscription sub) + public void deliverAsync(QueueConsumer sub) { // do nothing, i.e prevent deliveries by the SubFlushRunner - // when registering the new subscriptions + // when registering the new consumers } }; @@ -696,28 +708,28 @@ public class SimpleAMQQueueTest extends QpidTestCase QueueEntry msg4 = list.add(createMessage(4L)); QueueEntry msg5 = list.add(createMessage(5L)); - // Create lists of the entries each subscription should be interested - // in.Bias over 50% of the messages to the first subscription so that - // the later subscriptions reject them and report being done before - // the first subscription as the processQueue method proceeds. + // Create lists of the entries each consumer should be interested + // in.Bias over 50% of the messages to the first consumer so that + // the later consumers reject them and report being done before + // the first consumer as the processQueue method proceeds. List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3); List<String> msgListSub2 = createEntriesList(msg4); List<String> msgListSub3 = createEntriesList(msg5); - MockSubscription sub1 = new MockSubscription(msgListSub1); - MockSubscription sub2 = new MockSubscription(msgListSub2); - MockSubscription sub3 = new MockSubscription(msgListSub3); + MockConsumer sub1 = new MockConsumer(msgListSub1); + MockConsumer sub2 = new MockConsumer(msgListSub2); + MockConsumer sub3 = new MockConsumer(msgListSub3); - // register the subscriptions - testQueue.registerSubscription(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES)); - testQueue.registerSubscription(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES)); - testQueue.registerSubscription(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES)); + // register the consumers + testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); //check that no messages have been delivered to the - //subscriptions during registration + //consumers during registration assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size()); assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size()); assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); @@ -904,7 +916,7 @@ public class SimpleAMQQueueTest extends QpidTestCase false, "testOwner", false, false, _virtualHost, null) { @Override - public void deliverAsync(QueueSubscription sub) + public void deliverAsync(QueueConsumer sub) { // do nothing } @@ -919,8 +931,8 @@ public class SimpleAMQQueueTest extends QpidTestCase // latch to wait for message receipt final CountDownLatch latch = new CountDownLatch(messageNumber -1); - // create a subscription - MockSubscription subscription = new MockSubscription() + // create a consumer + MockConsumer consumer = new MockConsumer() { /** * Send a message and decrement latch @@ -937,7 +949,11 @@ public class SimpleAMQQueueTest extends QpidTestCase try { // subscribe - testQueue.registerSubscription(subscription, null, entries.get(0).getMessage().getClass(), "test", EnumSet.noneOf(Subscription.Option.class)); + testQueue.addConsumer(consumer, + null, + entries.get(0).getMessage().getClass(), + "test", + EnumSet.noneOf(Consumer.Option.class)); // process queue testQueue.processQueue(new QueueRunner(testQueue) @@ -962,11 +978,11 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.currentThread().interrupt(); } List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3)); - verifyReceivedMessages(expected, subscription.getMessages()); + verifyReceivedMessages(expected, consumer.getMessages()); } /** - * Tests that entry in dequeued state are not enqueued and not delivered to subscription + * Tests that entry in dequeued state are not enqueued and not delivered to consumer */ public void testEnqueueDequeuedEntry() { @@ -1002,7 +1018,7 @@ public class SimpleAMQQueueTest extends QpidTestCase } @Override - public boolean acquire(Subscription sub) + public boolean acquire(Consumer sub) { if(message.getMessageNumber() % 2 == 0) { @@ -1018,24 +1034,28 @@ public class SimpleAMQQueueTest extends QpidTestCase }; } }, null); - // create a subscription - MockSubscription subscription = new MockSubscription(); + // create a consumer + MockConsumer consumer = new MockConsumer(); - // register subscription + // register consumer try { - queue.registerSubscription(subscription, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class)); + queue.addConsumer(consumer, + null, + createMessage(-1l).getClass(), + "test", + EnumSet.noneOf(Consumer.Option.class)); } catch (AMQException e) { - fail("Failure to register subscription:" + e.getMessage()); + fail("Failure to register consumer:" + e.getMessage()); } // put test messages into a queue putGivenNumberOfMessages(queue, 4); // assert received messages - List<MessageInstance> messages = subscription.getMessages(); + List<MessageInstance> messages = consumer.getMessages(); assertEquals("Only 2 messages should be returned", 2, messages.size()); assertEquals("ID of first message should be 1", 1l, (messages.get(0).getMessage()).getMessageNumber()); @@ -1048,52 +1068,60 @@ public class SimpleAMQQueueTest extends QpidTestCase final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false, "testOwner", false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null); - //verify adding an active subscription increases the count - final MockSubscription subscription1 = new MockSubscription(); - subscription1.setActive(true); - subscription1.setState(SubscriptionTarget.State.ACTIVE); + //verify adding an active consumer increases the count + final MockConsumer consumer1 = new MockConsumer(); + consumer1.setActive(true); + consumer1.setState(ConsumerTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - queue.registerSubscription(subscription1, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class)); + queue.addConsumer(consumer1, + null, + createMessage(-1l).getClass(), + "test", + EnumSet.noneOf(Consumer.Option.class)); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify adding an inactive subscription doesn't increase the count - final MockSubscription subscription2 = new MockSubscription(); - subscription2.setActive(false); - subscription2.setState(SubscriptionTarget.State.SUSPENDED); + //verify adding an inactive consumer doesn't increase the count + final MockConsumer consumer2 = new MockConsumer(); + consumer2.setActive(false); + consumer2.setState(ConsumerTarget.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - queue.registerSubscription(subscription2, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class)); + queue.addConsumer(consumer2, + null, + createMessage(-1l).getClass(), + "test", + EnumSet.noneOf(Consumer.Option.class)); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify behaviour in face of expected state changes: - //verify a subscription going suspended->active increases the count - subscription2.setState(SubscriptionTarget.State.ACTIVE); + //verify a consumer going suspended->active increases the count + consumer2.setState(ConsumerTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount()); - //verify a subscription going active->suspended decreases the count - subscription2.setState(SubscriptionTarget.State.SUSPENDED); + //verify a consumer going active->suspended decreases the count + consumer2.setState(ConsumerTarget.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify a subscription going suspended->closed doesn't change the count - subscription2.setState(SubscriptionTarget.State.CLOSED); + //verify a consumer going suspended->closed doesn't change the count + consumer2.setState(ConsumerTarget.State.CLOSED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify a subscription going active->active doesn't change the count - subscription1.setState(SubscriptionTarget.State.ACTIVE); + //verify a consumer going active->active doesn't change the count + consumer1.setState(ConsumerTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - subscription1.setState(SubscriptionTarget.State.SUSPENDED); + consumer1.setState(ConsumerTarget.State.SUSPENDED); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - //verify a subscription going suspended->suspended doesn't change the count - subscription1.setState(SubscriptionTarget.State.SUSPENDED); + //verify a consumer going suspended->suspended doesn't change the count + consumer1.setState(ConsumerTarget.State.SUSPENDED); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - subscription1.setState(SubscriptionTarget.State.ACTIVE); + consumer1.setState(ConsumerTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify a subscription going active->closed decreases the count - subscription1.setState(SubscriptionTarget.State.CLOSED); + //verify a consumer going active->closed decreases the count + consumer1.setState(ConsumerTarget.State.CLOSED); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); } @@ -1248,9 +1276,9 @@ public class SimpleAMQQueueTest extends QpidTestCase return _queue; } - public MockSubscription getSubscription() + public MockConsumer getConsumer() { - return _subscriptionTarget; + return _consumerTarget; } public Map<String,Object> getArguments() |