summaryrefslogtreecommitdiff
path: root/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java302
1 files changed, 165 insertions, 137 deletions
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 51ae822b2e..542f6ba0d1 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -45,9 +45,9 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -69,8 +69,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
private String _owner = "owner";
private String _routingKey = "routing key";
private DirectExchange _exchange;
- private MockSubscription _subscriptionTarget = new MockSubscription();
- private QueueSubscription _subscription;
+ private MockConsumer _consumerTarget = new MockConsumer();
+ private QueueConsumer _consumer;
private Map<String,Object> _arguments = null;
@Override
@@ -162,13 +162,13 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
- public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException
+ public void testRegisterConsumerThenEnqueueMessage() throws AMQException
{
ServerMessage messageA = createMessage(new Long(24));
- // Check adding a subscription adds it to the queue
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ // Check adding a consumer adds it to the queue
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
@@ -183,49 +183,49 @@ public class SimpleAMQQueueTest extends QpidTestCase
catch(InterruptedException e)
{
}
- assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull(_subscription.getQueueContext().getReleasedEntry());
+ assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
+ assertNull(_consumer.getQueueContext().getReleasedEntry());
- // Check removing the subscription removes it's information from the queue
- _subscription.close();
- assertTrue("Subscription still had queue", _subscriptionTarget.isClosed());
+ // Check removing the consumer removes it's information from the queue
+ _consumer.close();
+ assertTrue("Consumer still had queue", _consumerTarget.isClosed());
assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
assertFalse("Queue still has active consumer",
1 == _queue.getActiveConsumerCount());
ServerMessage messageB = createMessage(new Long (25));
_queue.enqueue(messageB);
- assertNull(_subscription.getQueueContext());
+ assertNull(_consumer.getQueueContext());
}
- public void testEnqueueMessageThenRegisterSubscription() throws AMQException, InterruptedException
+ public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException
{
ServerMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
Thread.sleep(150);
- assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after an enqueue",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests enqueuing two messages.
*/
- public void testEnqueueTwoMessagesThenRegisterSubscription() throws Exception
+ public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
_queue.enqueue(messageA);
_queue.enqueue(messageB);
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
Thread.sleep(150);
- assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after enqueues",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
@@ -240,9 +240,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
ServerMessage messageC = createMessage(new Long(26));
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -261,7 +261,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 3,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -272,12 +274,14 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 4, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 4,
+ _consumerTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
@@ -289,9 +293,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
ServerMessage messageA = createMessage(new Long(24));
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -313,7 +317,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
int subFlushWaitTime = 150;
Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 1, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 1,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
/* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */
@@ -323,10 +329,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
- assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size());
+ assertEquals("Total number of messages sent should not have changed",
+ 1,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
@@ -343,9 +351,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
ServerMessage messageB = createMessage(new Long(25));
ServerMessage messageC = createMessage(new Long(26));
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -364,7 +372,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 3,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -376,35 +386,37 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 5, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 5,
+ _consumerTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
- * Tests that a release requeues an entry for a queue with multiple subscriptions. Verifies that a
+ * Tests that a release requeues an entry for a queue with multiple consumers. Verifies that a
* requeue resends a message to a <i>single</i> subscriber.
*/
- public void testReleaseForQueueWithMultipleSubscriptions() throws Exception
+ public void testReleaseForQueueWithMultipleConsumers() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
- MockSubscription target1 = new MockSubscription();
- MockSubscription target2 = new MockSubscription();
+ MockConsumer target1 = new MockConsumer();
+ MockConsumer target2 = new MockConsumer();
- QueueSubscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES));
+ QueueConsumer consumer1 = _queue.addConsumer(target1, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
- QueueSubscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES));
+ QueueConsumer consumer2 = _queue.addConsumer(target2, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
@@ -433,22 +445,22 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to both subscriptions after release",
+ assertEquals("Unexpected total number of messages sent to both consumers after release",
3,
target1.getMessages().size() + target2.getMessages().size());
assertNull("releasedEntry should be cleared after requeue processed",
- subscription1.getQueueContext().getReleasedEntry());
+ consumer1.getQueueContext().getReleasedEntry());
assertNull("releasedEntry should be cleared after requeue processed",
- subscription2.getQueueContext().getReleasedEntry());
+ consumer2.getQueueContext().getReleasedEntry());
}
public void testExclusiveConsumer() throws AMQException
{
ServerMessage messageA = createMessage(new Long(24));
- // Check adding an exclusive subscription adds it to the queue
+ // Check adding an exclusive consumer adds it to the queue
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.EXCLUSIVE));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.EXCLUSIVE));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
@@ -464,16 +476,16 @@ public class SimpleAMQQueueTest extends QpidTestCase
catch (InterruptedException e)
{
}
- assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
- MockSubscription subB = new MockSubscription();
+ MockConsumer subB = new MockConsumer();
Exception ex = null;
try
{
- _queue.registerSubscription(subB, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _queue.addConsumer(subB, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
}
catch (AMQException e)
@@ -483,16 +495,16 @@ public class SimpleAMQQueueTest extends QpidTestCase
assertNotNull(ex);
// Check we cannot add an exclusive subscriber to a queue with an
- // existing subscription
- _subscription.close();
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ // existing consumer
+ _consumer.close();
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
try
{
- _subscription = _queue.registerSubscription(subB, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.EXCLUSIVE));
+ _consumer = _queue.addConsumer(subB, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.EXCLUSIVE));
}
catch (AMQException e)
@@ -509,12 +521,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.setDeleteOnNoConsumers(true);
ServerMessage message = createMessage(new Long(25));
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
_queue.enqueue(message);
- _subscription.close();
- assertTrue("Queue was not deleted when subscription was removed",
+ _consumer.close();
+ assertTrue("Queue was not deleted when consumer was removed",
_queue.isDeleted());
}
@@ -523,13 +535,13 @@ public class SimpleAMQQueueTest extends QpidTestCase
Long id = new Long(26);
ServerMessage message = createMessage(id);
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
_queue.enqueue(message);
- QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
+ QueueEntry entry = _consumer.getQueueContext().getLastSeenEntry();
entry.setRedelivered();
- _subscription.resend(entry);
+ _consumer.resend(entry);
}
@@ -656,19 +668,19 @@ public class SimpleAMQQueueTest extends QpidTestCase
/**
* processQueue() is used when asynchronously delivering messages to
- * subscriptions which could not be delivered immediately during the
+ * consumers which could not be delivered immediately during the
* enqueue() operation.
*
* A defect within the method would mean that delivery of these messages may
* not occur should the Runner stop before all messages have been processed.
* Such a defect was discovered when Selectors were used such that one and
- * only one subscription can/will accept any given messages, but multiple
- * subscriptions are present, and one of the earlier subscriptions receives
+ * only one consumer can/will accept any given messages, but multiple
+ * consumers are present, and one of the earlier consumers receives
* more messages than the others.
*
* This test is to validate that the processQueue() method is able to
* correctly deliver all of the messages present for asynchronous delivery
- * to subscriptions in such a scenario.
+ * to consumers in such a scenario.
*/
public void testProcessQueueWithUniqueSelectors() throws Exception
{
@@ -677,10 +689,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
false, false, _virtualHost, factory, null)
{
@Override
- public void deliverAsync(QueueSubscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
// do nothing, i.e prevent deliveries by the SubFlushRunner
- // when registering the new subscriptions
+ // when registering the new consumers
}
};
@@ -696,28 +708,28 @@ public class SimpleAMQQueueTest extends QpidTestCase
QueueEntry msg4 = list.add(createMessage(4L));
QueueEntry msg5 = list.add(createMessage(5L));
- // Create lists of the entries each subscription should be interested
- // in.Bias over 50% of the messages to the first subscription so that
- // the later subscriptions reject them and report being done before
- // the first subscription as the processQueue method proceeds.
+ // Create lists of the entries each consumer should be interested
+ // in.Bias over 50% of the messages to the first consumer so that
+ // the later consumers reject them and report being done before
+ // the first consumer as the processQueue method proceeds.
List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3);
List<String> msgListSub2 = createEntriesList(msg4);
List<String> msgListSub3 = createEntriesList(msg5);
- MockSubscription sub1 = new MockSubscription(msgListSub1);
- MockSubscription sub2 = new MockSubscription(msgListSub2);
- MockSubscription sub3 = new MockSubscription(msgListSub3);
+ MockConsumer sub1 = new MockConsumer(msgListSub1);
+ MockConsumer sub2 = new MockConsumer(msgListSub2);
+ MockConsumer sub3 = new MockConsumer(msgListSub3);
- // register the subscriptions
- testQueue.registerSubscription(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
- testQueue.registerSubscription(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
- testQueue.registerSubscription(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
+ // register the consumers
+ testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+ testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+ testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
//check that no messages have been delivered to the
- //subscriptions during registration
+ //consumers during registration
assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
@@ -904,7 +916,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
false, "testOwner", false, false, _virtualHost, null)
{
@Override
- public void deliverAsync(QueueSubscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
// do nothing
}
@@ -919,8 +931,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
// latch to wait for message receipt
final CountDownLatch latch = new CountDownLatch(messageNumber -1);
- // create a subscription
- MockSubscription subscription = new MockSubscription()
+ // create a consumer
+ MockConsumer consumer = new MockConsumer()
{
/**
* Send a message and decrement latch
@@ -937,7 +949,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
try
{
// subscribe
- testQueue.registerSubscription(subscription, null, entries.get(0).getMessage().getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+ testQueue.addConsumer(consumer,
+ null,
+ entries.get(0).getMessage().getClass(),
+ "test",
+ EnumSet.noneOf(Consumer.Option.class));
// process queue
testQueue.processQueue(new QueueRunner(testQueue)
@@ -962,11 +978,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.currentThread().interrupt();
}
List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3));
- verifyReceivedMessages(expected, subscription.getMessages());
+ verifyReceivedMessages(expected, consumer.getMessages());
}
/**
- * Tests that entry in dequeued state are not enqueued and not delivered to subscription
+ * Tests that entry in dequeued state are not enqueued and not delivered to consumer
*/
public void testEnqueueDequeuedEntry()
{
@@ -1002,7 +1018,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
@Override
- public boolean acquire(Subscription sub)
+ public boolean acquire(Consumer sub)
{
if(message.getMessageNumber() % 2 == 0)
{
@@ -1018,24 +1034,28 @@ public class SimpleAMQQueueTest extends QpidTestCase
};
}
}, null);
- // create a subscription
- MockSubscription subscription = new MockSubscription();
+ // create a consumer
+ MockConsumer consumer = new MockConsumer();
- // register subscription
+ // register consumer
try
{
- queue.registerSubscription(subscription, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+ queue.addConsumer(consumer,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.noneOf(Consumer.Option.class));
}
catch (AMQException e)
{
- fail("Failure to register subscription:" + e.getMessage());
+ fail("Failure to register consumer:" + e.getMessage());
}
// put test messages into a queue
putGivenNumberOfMessages(queue, 4);
// assert received messages
- List<MessageInstance> messages = subscription.getMessages();
+ List<MessageInstance> messages = consumer.getMessages();
assertEquals("Only 2 messages should be returned", 2, messages.size());
assertEquals("ID of first message should be 1", 1l,
(messages.get(0).getMessage()).getMessageNumber());
@@ -1048,52 +1068,60 @@ public class SimpleAMQQueueTest extends QpidTestCase
final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false,
"testOwner", false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
- //verify adding an active subscription increases the count
- final MockSubscription subscription1 = new MockSubscription();
- subscription1.setActive(true);
- subscription1.setState(SubscriptionTarget.State.ACTIVE);
+ //verify adding an active consumer increases the count
+ final MockConsumer consumer1 = new MockConsumer();
+ consumer1.setActive(true);
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- queue.registerSubscription(subscription1, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+ queue.addConsumer(consumer1,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.noneOf(Consumer.Option.class));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify adding an inactive subscription doesn't increase the count
- final MockSubscription subscription2 = new MockSubscription();
- subscription2.setActive(false);
- subscription2.setState(SubscriptionTarget.State.SUSPENDED);
+ //verify adding an inactive consumer doesn't increase the count
+ final MockConsumer consumer2 = new MockConsumer();
+ consumer2.setActive(false);
+ consumer2.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- queue.registerSubscription(subscription2, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+ queue.addConsumer(consumer2,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.noneOf(Consumer.Option.class));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify behaviour in face of expected state changes:
- //verify a subscription going suspended->active increases the count
- subscription2.setState(SubscriptionTarget.State.ACTIVE);
+ //verify a consumer going suspended->active increases the count
+ consumer2.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
- //verify a subscription going active->suspended decreases the count
- subscription2.setState(SubscriptionTarget.State.SUSPENDED);
+ //verify a consumer going active->suspended decreases the count
+ consumer2.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going suspended->closed doesn't change the count
- subscription2.setState(SubscriptionTarget.State.CLOSED);
+ //verify a consumer going suspended->closed doesn't change the count
+ consumer2.setState(ConsumerTarget.State.CLOSED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going active->active doesn't change the count
- subscription1.setState(SubscriptionTarget.State.ACTIVE);
+ //verify a consumer going active->active doesn't change the count
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- subscription1.setState(SubscriptionTarget.State.SUSPENDED);
+ consumer1.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- //verify a subscription going suspended->suspended doesn't change the count
- subscription1.setState(SubscriptionTarget.State.SUSPENDED);
+ //verify a consumer going suspended->suspended doesn't change the count
+ consumer1.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- subscription1.setState(SubscriptionTarget.State.ACTIVE);
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going active->closed decreases the count
- subscription1.setState(SubscriptionTarget.State.CLOSED);
+ //verify a consumer going active->closed decreases the count
+ consumer1.setState(ConsumerTarget.State.CLOSED);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
}
@@ -1248,9 +1276,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
return _queue;
}
- public MockSubscription getSubscription()
+ public MockConsumer getConsumer()
{
- return _subscriptionTarget;
+ return _consumerTarget;
}
public Map<String,Object> getArguments()