diff options
Diffstat (limited to 'java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r-- | java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java | 140 |
1 files changed, 78 insertions, 62 deletions
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 62120f26d3..f2b53f95c3 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -168,14 +168,15 @@ public class SimpleAMQQueueTest extends QpidTestCase // Check adding a consumer adds it to the queue _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber - _queue.enqueue(messageA); + _queue.enqueue(messageA, null); try { Thread.sleep(2000L); @@ -194,7 +195,7 @@ public class SimpleAMQQueueTest extends QpidTestCase 1 == _queue.getActiveConsumerCount()); ServerMessage messageB = createMessage(new Long (25)); - _queue.enqueue(messageB); + _queue.enqueue(messageB, null); assertNull(_consumer.getQueueContext()); } @@ -202,9 +203,10 @@ public class SimpleAMQQueueTest extends QpidTestCase public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException { ServerMessage messageA = createMessage(new Long(24)); - _queue.enqueue(messageA); + _queue.enqueue(messageA, null); _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); Thread.sleep(150); assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after an enqueue", @@ -218,10 +220,11 @@ public class SimpleAMQQueueTest extends QpidTestCase { ServerMessage messageA = createMessage(new Long(24)); ServerMessage messageB = createMessage(new Long(25)); - _queue.enqueue(messageA); - _queue.enqueue(messageB); + _queue.enqueue(messageA, null); + _queue.enqueue(messageB, null); _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); Thread.sleep(150); assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after enqueues", @@ -245,13 +248,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Consumer.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>() - { - public void performAction(MessageInstance entry) - { - queueEntries.add((QueueEntry) entry); - } - }; + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); /* Enqueue three messages */ @@ -298,13 +295,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Consumer.Option.ACQUIRES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>() - { - public void performAction(MessageInstance entry) - { - queueEntries.add((QueueEntry) entry); - } - }; + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); /* Enqueue one message with expiration set for a short time in the future */ @@ -356,13 +347,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Consumer.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>() - { - public void performAction(MessageInstance entry) - { - queueEntries.add((QueueEntry) entry); - } - }; + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); /* Enqueue three messages */ @@ -420,14 +405,7 @@ public class SimpleAMQQueueTest extends QpidTestCase final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>() - { - public void performAction(MessageInstance entry) - { - queueEntries.add((QueueEntry)entry); - } - }; - + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); /* Enqueue two messages */ @@ -460,7 +438,8 @@ public class SimpleAMQQueueTest extends QpidTestCase // Check adding an exclusive consumer adds it to the queue _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.EXCLUSIVE)); + EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); @@ -468,7 +447,7 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber - _queue.enqueue(messageA); + _queue.enqueue(messageA, null); try { Thread.sleep(2000L); @@ -485,7 +464,8 @@ public class SimpleAMQQueueTest extends QpidTestCase { _queue.addConsumer(subB, null, messageA.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); } catch (AMQException e) @@ -498,7 +478,8 @@ public class SimpleAMQQueueTest extends QpidTestCase // existing consumer _consumer.close(); _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); try { @@ -522,9 +503,10 @@ public class SimpleAMQQueueTest extends QpidTestCase ServerMessage message = createMessage(new Long(25)); _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); - _queue.enqueue(message); + _queue.enqueue(message, null); _consumer.close(); assertTrue("Queue was not deleted when consumer was removed", _queue.isDeleted()); @@ -536,12 +518,27 @@ public class SimpleAMQQueueTest extends QpidTestCase ServerMessage message = createMessage(id); _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + + _queue.enqueue(message, new Action<MessageInstance<QueueConsumer>>() + { + @Override + public void performAction(final MessageInstance<QueueConsumer> object) + { + QueueEntry entry = (QueueEntry) object; + entry.setRedelivered(); + try + { + _consumer.resend(entry); + } + catch (AMQException e) + { + fail("Exception thrown: " + e.getMessage()); + } + } + }); + - _queue.enqueue(message); - QueueEntry entry = _consumer.getQueueContext().getLastSeenEntry(); - entry.setRedelivered(); - _consumer.resend(entry); } @@ -552,7 +549,7 @@ public class SimpleAMQQueueTest extends QpidTestCase ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message); + _queue.enqueue(message, null); // Get message id Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); @@ -568,7 +565,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Long messageId = new Long(i); ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message); + _queue.enqueue(message, null); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5); @@ -589,7 +586,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Long messageId = new Long(i); ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message); + _queue.enqueue(message, null); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5); @@ -610,7 +607,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Long messageId = new Long(i); ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message); + _queue.enqueue(message, null); } // Get non-existent 0th QueueEntry & check returned list was empty @@ -953,7 +950,8 @@ public class SimpleAMQQueueTest extends QpidTestCase null, entries.get(0).getMessage().getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); // process queue testQueue.processQueue(new QueueRunner(testQueue) @@ -1018,7 +1016,7 @@ public class SimpleAMQQueueTest extends QpidTestCase } @Override - public boolean acquire(Consumer sub) + public boolean acquire(QueueConsumer sub) { if(message.getMessageNumber() % 2 == 0) { @@ -1044,7 +1042,8 @@ public class SimpleAMQQueueTest extends QpidTestCase null, createMessage(-1l).getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); } catch (AMQException e) { @@ -1077,7 +1076,8 @@ public class SimpleAMQQueueTest extends QpidTestCase null, createMessage(-1l).getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify adding an inactive consumer doesn't increase the count @@ -1089,7 +1089,8 @@ public class SimpleAMQQueueTest extends QpidTestCase null, createMessage(-1l).getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify behaviour in face of expected state changes: @@ -1133,10 +1134,10 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.setNotificationListener(listener); _queue.setMaximumMessageCount(2); - _queue.enqueue(createMessage(new Long(24))); + _queue.enqueue(createMessage(new Long(24)), null); verifyZeroInteractions(listener); - _queue.enqueue(createMessage(new Long(25))); + _queue.enqueue(createMessage(new Long(25)), null); verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold")); } @@ -1145,9 +1146,9 @@ public class SimpleAMQQueueTest extends QpidTestCase { AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class); - _queue.enqueue(createMessage(new Long(24))); - _queue.enqueue(createMessage(new Long(25))); - _queue.enqueue(createMessage(new Long(26))); + _queue.enqueue(createMessage(new Long(24)), null); + _queue.enqueue(createMessage(new Long(25)), null); + _queue.enqueue(createMessage(new Long(26)), null); _queue.setNotificationListener(listener); _queue.setMaximumMessageCount(2); @@ -1309,6 +1310,21 @@ public class SimpleAMQQueueTest extends QpidTestCase return message; } + private static class EntryListAddingAction implements Action<MessageInstance<QueueConsumer>> + { + private final ArrayList<QueueEntry> _queueEntries; + + public EntryListAddingAction(final ArrayList<QueueEntry> queueEntries) + { + _queueEntries = queueEntries; + } + + public void performAction(MessageInstance entry) + { + _queueEntries.add((QueueEntry) entry); + } + } + class TestSimpleQueueEntryListFactory implements QueueEntryListFactory { QueueEntryList _list; |