summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java506
1 files changed, 302 insertions, 204 deletions
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index b0e5a510b8..5abc97cee9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -29,6 +29,8 @@ import static org.mockito.Matchers.contains;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.EnumSet;
import java.util.Map;
import org.apache.log4j.Logger;
@@ -38,13 +40,15 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -65,7 +69,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
private String _owner = "owner";
private String _routingKey = "routing key";
private DirectExchange _exchange;
- private MockSubscription _subscription = new MockSubscription();
+ private MockConsumer _consumerTarget = new MockConsumer();
+ private QueueConsumer _consumer;
private Map<String,Object> _arguments = null;
@Override
@@ -157,20 +162,21 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
- public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException
+ public void testRegisterConsumerThenEnqueueMessage() throws AMQException
{
- // Check adding a subscription adds it to the queue
- _queue.registerSubscription(_subscription, false);
- assertEquals("Subscription did not get queue", _queue,
- _subscription.getQueue());
+ ServerMessage messageA = createMessage(new Long(24));
+
+ // Check adding a consumer adds it to the queue
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
- _queue.getActiveConsumerCount());
+ _queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
- ServerMessage messageA = createMessage(new Long(24));
- _queue.enqueue(messageA);
+ _queue.enqueue(messageA, null);
try
{
Thread.sleep(2000L);
@@ -178,45 +184,51 @@ public class SimpleAMQQueueTest extends QpidTestCase
catch(InterruptedException e)
{
}
- assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull(((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+ assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
+ assertNull(_consumer.getQueueContext().getReleasedEntry());
- // Check removing the subscription removes it's information from the queue
- _queue.unregisterSubscription(_subscription);
- assertTrue("Subscription still had queue", _subscription.isClosed());
+ // Check removing the consumer removes it's information from the queue
+ _consumer.close();
+ assertTrue("Consumer still had queue", _consumerTarget.isClosed());
assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
assertFalse("Queue still has active consumer",
- 1 == _queue.getActiveConsumerCount());
+ 1 == _queue.getActiveConsumerCount());
ServerMessage messageB = createMessage(new Long (25));
- _queue.enqueue(messageB);
- assertNull(_subscription.getQueueContext());
+ _queue.enqueue(messageB, null);
+ assertNull(_consumer.getQueueContext());
}
- public void testEnqueueMessageThenRegisterSubscription() throws AMQException, InterruptedException
+ public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException
{
ServerMessage messageA = createMessage(new Long(24));
- _queue.enqueue(messageA);
- _queue.registerSubscription(_subscription, false);
+ _queue.enqueue(messageA, null);
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
Thread.sleep(150);
- assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+ assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
+ assertNull("There should be no releasedEntry after an enqueue",
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests enqueuing two messages.
*/
- public void testEnqueueTwoMessagesThenRegisterSubscription() throws Exception
+ public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
- _queue.enqueue(messageA);
- _queue.enqueue(messageB);
- _queue.registerSubscription(_subscription, false);
+ _queue.enqueue(messageA, null);
+ _queue.enqueue(messageB, null);
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
Thread.sleep(150);
- assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+ assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
+ assertNull("There should be no releasedEntry after enqueues",
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
@@ -225,21 +237,19 @@ public class SimpleAMQQueueTest extends QpidTestCase
*/
public void testReleasedMessageIsResentToSubscriber() throws Exception
{
- _queue.registerSubscription(_subscription, false);
-
- final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
- {
- public void onEnqueue(QueueEntry entry)
- {
- queueEntries.add(entry);
- }
- };
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
ServerMessage messageC = createMessage(new Long(26));
+
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+
+ final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
+
/* Enqueue three messages */
_queue.enqueue(messageA, postEnqueueAction);
@@ -248,7 +258,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 3,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -259,11 +271,14 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 4,
+ _consumerTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed",
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
@@ -273,20 +288,17 @@ public class SimpleAMQQueueTest extends QpidTestCase
*/
public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception
{
- _queue.registerSubscription(_subscription, false);
+ ServerMessage messageA = createMessage(new Long(24));
+
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
- {
- public void onEnqueue(QueueEntry entry)
- {
- queueEntries.add(entry);
- }
- };
+ EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue one message with expiration set for a short time in the future */
- ServerMessage messageA = createMessage(new Long(24));
int messageExpirationOffset = 200;
final long expiration = System.currentTimeMillis() + messageExpirationOffset;
when(messageA.getExpiration()).thenReturn(expiration);
@@ -296,7 +308,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
int subFlushWaitTime = 150;
Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 1,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
/* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */
@@ -306,9 +320,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
- assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size());
+ assertEquals("Total number of messages sent should not have changed",
+ 1,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed",
+ _consumer.getQueueContext().getReleasedEntry());
}
@@ -320,21 +337,18 @@ public class SimpleAMQQueueTest extends QpidTestCase
*/
public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception
{
- _queue.registerSubscription(_subscription, false);
-
- final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
- {
- public void onEnqueue(QueueEntry entry)
- {
- queueEntries.add(entry);
- }
- };
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
ServerMessage messageC = createMessage(new Long(26));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+
+ final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
+
/* Enqueue three messages */
_queue.enqueue(messageA, postEnqueueAction);
@@ -343,7 +357,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 3,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -355,37 +371,41 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 5,
+ _consumerTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed",
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
- * Tests that a release requeues an entry for a queue with multiple subscriptions. Verifies that a
+ * Tests that a release requeues an entry for a queue with multiple consumers. Verifies that a
* requeue resends a message to a <i>single</i> subscriber.
*/
- public void testReleaseForQueueWithMultipleSubscriptions() throws Exception
+ public void testReleaseForQueueWithMultipleConsumers() throws Exception
{
- MockSubscription subscription1 = new MockSubscription();
- MockSubscription subscription2 = new MockSubscription();
+ ServerMessage messageA = createMessage(new Long(24));
+ ServerMessage messageB = createMessage(new Long(25));
- _queue.registerSubscription(subscription1, false);
- _queue.registerSubscription(subscription2, false);
+ MockConsumer target1 = new MockConsumer();
+ MockConsumer target2 = new MockConsumer();
- final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
- {
- public void onEnqueue(QueueEntry entry)
- {
- queueEntries.add(entry);
- }
- };
- ServerMessage messageA = createMessage(new Long(24));
- ServerMessage messageB = createMessage(new Long(25));
+ QueueConsumer consumer1 = _queue.addConsumer(target1, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+
+ QueueConsumer consumer2 = _queue.addConsumer(target2, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+
+
+ final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue two messages */
@@ -394,32 +414,40 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to both after enqueue",
+ 2,
+ target1.getMessages().size() + target2.getMessages().size());
/* Now release the first message only, causing it to be requeued */
queueEntries.get(0).release();
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext()).getReleasedEntry());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext()).getReleasedEntry());
+ assertEquals("Unexpected total number of messages sent to both consumers after release",
+ 3,
+ target1.getMessages().size() + target2.getMessages().size());
+ assertNull("releasedEntry should be cleared after requeue processed",
+ consumer1.getQueueContext().getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed",
+ consumer2.getQueueContext().getReleasedEntry());
}
public void testExclusiveConsumer() throws AMQException
{
- // Check adding an exclusive subscription adds it to the queue
- _queue.registerSubscription(_subscription, true);
- assertEquals("Subscription did not get queue", _queue,
- _subscription.getQueue());
+ ServerMessage messageA = createMessage(new Long(24));
+ // Check adding an exclusive consumer adds it to the queue
+
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+
assertEquals("Queue does not have consumer", 1,
- _queue.getConsumerCount());
+ _queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
- _queue.getActiveConsumerCount());
+ _queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
- ServerMessage messageA = createMessage(new Long(24));
- _queue.enqueue(messageA);
+ _queue.enqueue(messageA, null);
try
{
Thread.sleep(2000L);
@@ -427,14 +455,18 @@ public class SimpleAMQQueueTest extends QpidTestCase
catch (InterruptedException e)
{
}
- assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
- Subscription subB = new MockSubscription();
+ MockConsumer subB = new MockConsumer();
Exception ex = null;
try
{
- _queue.registerSubscription(subB, false);
+
+ _queue.addConsumer(subB, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+
}
catch (AMQException e)
{
@@ -443,12 +475,18 @@ public class SimpleAMQQueueTest extends QpidTestCase
assertNotNull(ex);
// Check we cannot add an exclusive subscriber to a queue with an
- // existing subscription
- _queue.unregisterSubscription(_subscription);
- _queue.registerSubscription(_subscription, false);
+ // existing consumer
+ _consumer.close();
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+
try
{
- _queue.registerSubscription(subB, true);
+
+ _consumer = _queue.addConsumer(subB, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.EXCLUSIVE));
+
}
catch (AMQException e)
{
@@ -462,23 +500,45 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.stop();
_queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP);
_queue.setDeleteOnNoConsumers(true);
- _queue.registerSubscription(_subscription, false);
- ServerMessage message = createMessage(new Long(25));
- _queue.enqueue(message);
- _queue.unregisterSubscription(_subscription);
- assertTrue("Queue was not deleted when subscription was removed",
+
+ ServerMessage message = createMessage(new Long(25));
+ _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+
+ _queue.enqueue(message, null);
+ _consumer.close();
+ assertTrue("Queue was not deleted when consumer was removed",
_queue.isDeleted());
}
public void testResend() throws Exception
{
- _queue.registerSubscription(_subscription, false);
Long id = new Long(26);
ServerMessage message = createMessage(id);
- _queue.enqueue(message);
- QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
- entry.setRedelivered();
- _queue.resend(entry, _subscription);
+
+ _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+
+ _queue.enqueue(message, new Action<MessageInstance<? extends Consumer>>()
+ {
+ @Override
+ public void performAction(final MessageInstance<? extends Consumer> object)
+ {
+ QueueEntry entry = (QueueEntry) object;
+ entry.setRedelivered();
+ try
+ {
+ _consumer.resend(entry);
+ }
+ catch (AMQException e)
+ {
+ fail("Exception thrown: " + e.getMessage());
+ }
+ }
+ });
+
+
}
@@ -489,7 +549,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
// Get message id
Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
@@ -505,7 +565,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
@@ -526,7 +586,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
@@ -547,7 +607,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
}
// Get non-existent 0th QueueEntry & check returned list was empty
@@ -605,19 +665,19 @@ public class SimpleAMQQueueTest extends QpidTestCase
/**
* processQueue() is used when asynchronously delivering messages to
- * subscriptions which could not be delivered immediately during the
+ * consumers which could not be delivered immediately during the
* enqueue() operation.
*
* A defect within the method would mean that delivery of these messages may
* not occur should the Runner stop before all messages have been processed.
* Such a defect was discovered when Selectors were used such that one and
- * only one subscription can/will accept any given messages, but multiple
- * subscriptions are present, and one of the earlier subscriptions receives
+ * only one consumer can/will accept any given messages, but multiple
+ * consumers are present, and one of the earlier consumers receives
* more messages than the others.
*
* This test is to validate that the processQueue() method is able to
* correctly deliver all of the messages present for asynchronous delivery
- * to subscriptions in such a scenario.
+ * to consumers in such a scenario.
*/
public void testProcessQueueWithUniqueSelectors() throws Exception
{
@@ -626,10 +686,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
false, false, _virtualHost, factory, null)
{
@Override
- public void deliverAsync(Subscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
// do nothing, i.e prevent deliveries by the SubFlushRunner
- // when registering the new subscriptions
+ // when registering the new consumers
}
};
@@ -645,25 +705,28 @@ public class SimpleAMQQueueTest extends QpidTestCase
QueueEntry msg4 = list.add(createMessage(4L));
QueueEntry msg5 = list.add(createMessage(5L));
- // Create lists of the entries each subscription should be interested
- // in.Bias over 50% of the messages to the first subscription so that
- // the later subscriptions reject them and report being done before
- // the first subscription as the processQueue method proceeds.
- List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3);
- List<QueueEntry> msgListSub2 = createEntriesList(msg4);
- List<QueueEntry> msgListSub3 = createEntriesList(msg5);
-
- MockSubscription sub1 = new MockSubscription(msgListSub1);
- MockSubscription sub2 = new MockSubscription(msgListSub2);
- MockSubscription sub3 = new MockSubscription(msgListSub3);
-
- // register the subscriptions
- testQueue.registerSubscription(sub1, false);
- testQueue.registerSubscription(sub2, false);
- testQueue.registerSubscription(sub3, false);
+ // Create lists of the entries each consumer should be interested
+ // in.Bias over 50% of the messages to the first consumer so that
+ // the later consumers reject them and report being done before
+ // the first consumer as the processQueue method proceeds.
+ List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3);
+ List<String> msgListSub2 = createEntriesList(msg4);
+ List<String> msgListSub3 = createEntriesList(msg5);
+
+ MockConsumer sub1 = new MockConsumer(msgListSub1);
+ MockConsumer sub2 = new MockConsumer(msgListSub2);
+ MockConsumer sub3 = new MockConsumer(msgListSub3);
+
+ // register the consumers
+ testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+ testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+ testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
//check that no messages have been delivered to the
- //subscriptions during registration
+ //consumers during registration
assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
@@ -680,9 +743,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
});
// check expected messages delivered to correct consumers
- verifyReceivedMessages(msgListSub1, sub1.getMessages());
- verifyReceivedMessages(msgListSub2, sub2.getMessages());
- verifyReceivedMessages(msgListSub3, sub3.getMessages());
+ verifyReceivedMessages(Arrays.asList((MessageInstance)msg1,msg2,msg3), sub1.getMessages());
+ verifyReceivedMessages(Collections.singletonList((MessageInstance)msg4), sub2.getMessages());
+ verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages());
}
/**
@@ -850,7 +913,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
false, "testOwner", false, false, _virtualHost, null)
{
@Override
- public void deliverAsync(Subscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
// do nothing
}
@@ -865,15 +928,15 @@ public class SimpleAMQQueueTest extends QpidTestCase
// latch to wait for message receipt
final CountDownLatch latch = new CountDownLatch(messageNumber -1);
- // create a subscription
- MockSubscription subscription = new MockSubscription()
+ // create a consumer
+ MockConsumer consumer = new MockConsumer()
{
/**
* Send a message and decrement latch
* @param entry
* @param batch
*/
- public void send(QueueEntry entry, boolean batch) throws AMQException
+ public void send(MessageInstance entry, boolean batch) throws AMQException
{
super.send(entry, batch);
latch.countDown();
@@ -883,7 +946,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
try
{
// subscribe
- testQueue.registerSubscription(subscription, false);
+ testQueue.addConsumer(consumer,
+ null,
+ entries.get(0).getMessage().getClass(),
+ "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
// process queue
testQueue.processQueue(new QueueRunner(testQueue)
@@ -907,12 +975,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
Thread.currentThread().interrupt();
}
- List<QueueEntry> expected = createEntriesList(entries.get(0), entries.get(2), entries.get(3));
- verifyReceivedMessages(expected, subscription.getMessages());
+ List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3));
+ verifyReceivedMessages(expected, consumer.getMessages());
}
/**
- * Tests that entry in dequeued state are not enqueued and not delivered to subscription
+ * Tests that entry in dequeued state are not enqueued and not delivered to consumer
*/
public void testEnqueueDequeuedEntry()
{
@@ -948,7 +1016,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
@Override
- public boolean acquire(Subscription sub)
+ public boolean acquire(QueueConsumer sub)
{
if(message.getMessageNumber() % 2 == 0)
{
@@ -964,24 +1032,29 @@ public class SimpleAMQQueueTest extends QpidTestCase
};
}
}, null);
- // create a subscription
- MockSubscription subscription = new MockSubscription();
+ // create a consumer
+ MockConsumer consumer = new MockConsumer();
- // register subscription
+ // register consumer
try
{
- queue.registerSubscription(subscription, false);
+ queue.addConsumer(consumer,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
}
catch (AMQException e)
{
- fail("Failure to register subscription:" + e.getMessage());
+ fail("Failure to register consumer:" + e.getMessage());
}
// put test messages into a queue
putGivenNumberOfMessages(queue, 4);
// assert received messages
- List<QueueEntry> messages = subscription.getMessages();
+ List<MessageInstance> messages = consumer.getMessages();
assertEquals("Only 2 messages should be returned", 2, messages.size());
assertEquals("ID of first message should be 1", 1l,
(messages.get(0).getMessage()).getMessageNumber());
@@ -994,55 +1067,64 @@ public class SimpleAMQQueueTest extends QpidTestCase
final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false,
"testOwner", false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
- //verify adding an active subscription increases the count
- final MockSubscription subscription1 = new MockSubscription();
- subscription1.setActive(true);
+ //verify adding an active consumer increases the count
+ final MockConsumer consumer1 = new MockConsumer();
+ consumer1.setActive(true);
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- queue.registerSubscription(subscription1, false);
+ queue.addConsumer(consumer1,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify adding an inactive subscription doesn't increase the count
- final MockSubscription subscription2 = new MockSubscription();
- subscription2.setActive(false);
+ //verify adding an inactive consumer doesn't increase the count
+ final MockConsumer consumer2 = new MockConsumer();
+ consumer2.setActive(false);
+ consumer2.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- queue.registerSubscription(subscription2, false);
+ queue.addConsumer(consumer2,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify behaviour in face of expected state changes:
- //verify a subscription going suspended->active increases the count
- queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
+ //verify a consumer going suspended->active increases the count
+ consumer2.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
- //verify a subscription going active->suspended decreases the count
- queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
+ //verify a consumer going active->suspended decreases the count
+ consumer2.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going suspended->closed doesn't change the count
- queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED);
+ //verify a consumer going suspended->closed doesn't change the count
+ consumer2.setState(ConsumerTarget.State.CLOSED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going active->closed decreases the count
- queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED);
- assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+ //verify a consumer going active->active doesn't change the count
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify behaviour in face of unexpected state changes:
+ consumer1.setState(ConsumerTarget.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- //verify a subscription going closed->active increases the count
- queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+ //verify a consumer going suspended->suspended doesn't change the count
+ consumer1.setState(ConsumerTarget.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- //verify a subscription going active->active doesn't change the count
- queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going closed->suspended doesn't change the count
- queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+ //verify a consumer going active->closed decreases the count
+ consumer1.setState(ConsumerTarget.State.CLOSED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- //verify a subscription going suspended->suspended doesn't change the count
- queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
}
public void testNotificationFiredOnEnqueue() throws Exception
@@ -1052,10 +1134,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.setNotificationListener(listener);
_queue.setMaximumMessageCount(2);
- _queue.enqueue(createMessage(new Long(24)));
+ _queue.enqueue(createMessage(new Long(24)), null);
verifyZeroInteractions(listener);
- _queue.enqueue(createMessage(new Long(25)));
+ _queue.enqueue(createMessage(new Long(25)), null);
verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
}
@@ -1064,9 +1146,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class);
- _queue.enqueue(createMessage(new Long(24)));
- _queue.enqueue(createMessage(new Long(25)));
- _queue.enqueue(createMessage(new Long(26)));
+ _queue.enqueue(createMessage(new Long(24)), null);
+ _queue.enqueue(createMessage(new Long(25)), null);
+ _queue.enqueue(createMessage(new Long(26)), null);
_queue.setNotificationListener(listener);
_queue.setMaximumMessageCount(2);
@@ -1132,7 +1214,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
// Put message on queue
try
{
- queue.enqueue(message);
+ queue.enqueue(message,null);
}
catch (AMQException e)
{
@@ -1167,23 +1249,23 @@ public class SimpleAMQQueueTest extends QpidTestCase
return entry;
}
- private List<QueueEntry> createEntriesList(QueueEntry... entries)
+ private List<String> createEntriesList(QueueEntry... entries)
{
- ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>();
+ ArrayList<String> entriesList = new ArrayList<String>();
for (QueueEntry entry : entries)
{
- entriesList.add(entry);
+ entriesList.add(entry.getMessage().getMessageHeader().getMessageId());
}
return entriesList;
}
- private void verifyReceivedMessages(List<QueueEntry> expected,
- List<QueueEntry> delivered)
+ private void verifyReceivedMessages(List<MessageInstance> expected,
+ List<MessageInstance> delivered)
{
assertEquals("Consumer did not receive the expected number of messages",
expected.size(), delivered.size());
- for (QueueEntry msg : expected)
+ for (MessageInstance msg : expected)
{
assertTrue("Consumer did not receive msg: "
+ msg.getMessage().getMessageNumber(), delivered.contains(msg));
@@ -1195,9 +1277,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
return _queue;
}
- public MockSubscription getSubscription()
+ public MockConsumer getConsumer()
{
- return _subscription;
+ return _consumerTarget;
}
public Map<String,Object> getArguments()
@@ -1213,20 +1295,36 @@ public class SimpleAMQQueueTest extends QpidTestCase
protected ServerMessage createMessage(Long id) throws AMQException
{
+ AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.getMessageId()).thenReturn(String.valueOf(id));
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn(id);
+ when(message.getMessageHeader()).thenReturn(header);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
- AMQMessageHeader hdr = mock(AMQMessageHeader.class);
- when(message.getMessageHeader()).thenReturn(hdr);
when(message.newReference()).thenReturn(ref);
return message;
}
+ private static class EntryListAddingAction implements Action<MessageInstance<? extends Consumer>>
+ {
+ private final ArrayList<QueueEntry> _queueEntries;
+
+ public EntryListAddingAction(final ArrayList<QueueEntry> queueEntries)
+ {
+ _queueEntries = queueEntries;
+ }
+
+ public void performAction(MessageInstance<? extends Consumer> entry)
+ {
+ _queueEntries.add((QueueEntry) entry);
+ }
+ }
+
class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
{
QueueEntryList _list;