summaryrefslogtreecommitdiff
path: root/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java140
1 files changed, 78 insertions, 62 deletions
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 62120f26d3..f2b53f95c3 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -168,14 +168,15 @@ public class SimpleAMQQueueTest extends QpidTestCase
// Check adding a consumer adds it to the queue
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
_queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
- _queue.enqueue(messageA);
+ _queue.enqueue(messageA, null);
try
{
Thread.sleep(2000L);
@@ -194,7 +195,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
1 == _queue.getActiveConsumerCount());
ServerMessage messageB = createMessage(new Long (25));
- _queue.enqueue(messageB);
+ _queue.enqueue(messageB, null);
assertNull(_consumer.getQueueContext());
}
@@ -202,9 +203,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException
{
ServerMessage messageA = createMessage(new Long(24));
- _queue.enqueue(messageA);
+ _queue.enqueue(messageA, null);
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
Thread.sleep(150);
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after an enqueue",
@@ -218,10 +220,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
- _queue.enqueue(messageA);
- _queue.enqueue(messageB);
+ _queue.enqueue(messageA, null);
+ _queue.enqueue(messageB, null);
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
Thread.sleep(150);
assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after enqueues",
@@ -245,13 +248,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
- {
- public void performAction(MessageInstance entry)
- {
- queueEntries.add((QueueEntry) entry);
- }
- };
+ EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue three messages */
@@ -298,13 +295,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Consumer.Option.ACQUIRES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
- {
- public void performAction(MessageInstance entry)
- {
- queueEntries.add((QueueEntry) entry);
- }
- };
+ EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue one message with expiration set for a short time in the future */
@@ -356,13 +347,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
- {
- public void performAction(MessageInstance entry)
- {
- queueEntries.add((QueueEntry) entry);
- }
- };
+ EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue three messages */
@@ -420,14 +405,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
- {
- public void performAction(MessageInstance entry)
- {
- queueEntries.add((QueueEntry)entry);
- }
- };
-
+ EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue two messages */
@@ -460,7 +438,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
// Check adding an exclusive consumer adds it to the queue
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.EXCLUSIVE));
+ EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
@@ -468,7 +447,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
- _queue.enqueue(messageA);
+ _queue.enqueue(messageA, null);
try
{
Thread.sleep(2000L);
@@ -485,7 +464,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
_queue.addConsumer(subB, null, messageA.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
}
catch (AMQException e)
@@ -498,7 +478,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
// existing consumer
_consumer.close();
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
try
{
@@ -522,9 +503,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
ServerMessage message = createMessage(new Long(25));
_consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
_consumer.close();
assertTrue("Queue was not deleted when consumer was removed",
_queue.isDeleted());
@@ -536,12 +518,27 @@ public class SimpleAMQQueueTest extends QpidTestCase
ServerMessage message = createMessage(id);
_consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+
+ _queue.enqueue(message, new Action<MessageInstance<QueueConsumer>>()
+ {
+ @Override
+ public void performAction(final MessageInstance<QueueConsumer> object)
+ {
+ QueueEntry entry = (QueueEntry) object;
+ entry.setRedelivered();
+ try
+ {
+ _consumer.resend(entry);
+ }
+ catch (AMQException e)
+ {
+ fail("Exception thrown: " + e.getMessage());
+ }
+ }
+ });
+
- _queue.enqueue(message);
- QueueEntry entry = _consumer.getQueueContext().getLastSeenEntry();
- entry.setRedelivered();
- _consumer.resend(entry);
}
@@ -552,7 +549,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
// Get message id
Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
@@ -568,7 +565,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
@@ -589,7 +586,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
@@ -610,7 +607,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
}
// Get non-existent 0th QueueEntry & check returned list was empty
@@ -953,7 +950,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
null,
entries.get(0).getMessage().getClass(),
"test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
// process queue
testQueue.processQueue(new QueueRunner(testQueue)
@@ -1018,7 +1016,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
@Override
- public boolean acquire(Consumer sub)
+ public boolean acquire(QueueConsumer sub)
{
if(message.getMessageNumber() % 2 == 0)
{
@@ -1044,7 +1042,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
null,
createMessage(-1l).getClass(),
"test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
}
catch (AMQException e)
{
@@ -1077,7 +1076,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
null,
createMessage(-1l).getClass(),
"test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify adding an inactive consumer doesn't increase the count
@@ -1089,7 +1089,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
null,
createMessage(-1l).getClass(),
"test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify behaviour in face of expected state changes:
@@ -1133,10 +1134,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.setNotificationListener(listener);
_queue.setMaximumMessageCount(2);
- _queue.enqueue(createMessage(new Long(24)));
+ _queue.enqueue(createMessage(new Long(24)), null);
verifyZeroInteractions(listener);
- _queue.enqueue(createMessage(new Long(25)));
+ _queue.enqueue(createMessage(new Long(25)), null);
verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
}
@@ -1145,9 +1146,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class);
- _queue.enqueue(createMessage(new Long(24)));
- _queue.enqueue(createMessage(new Long(25)));
- _queue.enqueue(createMessage(new Long(26)));
+ _queue.enqueue(createMessage(new Long(24)), null);
+ _queue.enqueue(createMessage(new Long(25)), null);
+ _queue.enqueue(createMessage(new Long(26)), null);
_queue.setNotificationListener(listener);
_queue.setMaximumMessageCount(2);
@@ -1309,6 +1310,21 @@ public class SimpleAMQQueueTest extends QpidTestCase
return message;
}
+ private static class EntryListAddingAction implements Action<MessageInstance<QueueConsumer>>
+ {
+ private final ArrayList<QueueEntry> _queueEntries;
+
+ public EntryListAddingAction(final ArrayList<QueueEntry> queueEntries)
+ {
+ _queueEntries = queueEntries;
+ }
+
+ public void performAction(MessageInstance entry)
+ {
+ _queueEntries.add((QueueEntry) entry);
+ }
+ }
+
class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
{
QueueEntryList _list;