diff options
Diffstat (limited to 'java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java | 198 |
1 files changed, 49 insertions, 149 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 52ad4a7c5b..2cd423d4c9 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -21,6 +21,13 @@ package org.apache.qpid.server.queue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Matchers.contains; +import static org.mockito.Matchers.eq; + import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; @@ -43,7 +50,6 @@ import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -80,7 +86,6 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void setExchange(AMQShortString exchange) { - //To change body of implemented methods use File | Settings | File Templates. } public boolean isImmediate() @@ -108,11 +113,11 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase PropertiesConfiguration env = new PropertiesConfiguration(); final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration(getClass().getName(), env); - vhostConfig.setMessageStoreFactoryClass(TestableMemoryMessageStoreFactory.class.getName()); + vhostConfig.setMessageStoreClass(TestableMemoryMessageStore.class.getName()); _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), vhostConfig); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, _virtualHost, _arguments); + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), false, false, _virtualHost, FieldTable.convertToMap(_arguments)); _exchange = (DirectExchange)_virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); } @@ -128,7 +133,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase { _queue.stop(); try { - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, false, _virtualHost, _arguments ); + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, _owner.asString(), false, false, _virtualHost, FieldTable.convertToMap(_arguments)); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) @@ -138,7 +143,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } try { - _queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP); + _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) @@ -147,8 +152,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase e.getMessage().contains("Host")); } - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, - false, _virtualHost, _arguments); + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), false, + false, _virtualHost, FieldTable.convertToMap(_arguments)); assertNotNull("Queue was not created", _queue); } @@ -480,7 +485,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void testAutoDeleteQueue() throws Exception { _queue.stop(); - _queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP); + _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP); _queue.setDeleteOnNoConsumers(true); _queue.registerSubscription(_subscription, false); AMQMessage message = createMessage(new Long(25)); @@ -692,7 +697,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void testProcessQueueWithUniqueSelectors() throws Exception { TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); - SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), "testQueue", false,"testOwner", + SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testQueue", false,"testOwner", false, false, _virtualHost, factory, null) { @Override @@ -840,120 +845,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } } - /** - * Tests that dequeued message is not copied as part of invocation of - * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String)} - */ - public void testCopyMessagesWithDequeuedEntry() - { - int messageNumber = 4; - int dequeueMessageIndex = 1; - String anotherQueueName = "testQueue2"; - - // put test messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message - dequeueMessage(_queue, dequeueMessageIndex); - - // create another queue - SimpleAMQQueue queue = createQueue(anotherQueueName); - - // copy messages into another queue - _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName); - // get messages on another queue - List<QueueEntry> entries = queue.getMessagesOnTheQueue(); - - // assert another queue entries - assertEquals(messageNumber - 1, entries.size()); - int expectedId = 0; - for (int i = 0; i < messageNumber - 1; i++) - { - Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId(); - if (i == dequeueMessageIndex) - { - assertFalse("Message with id " + dequeueMessageIndex - + " was dequeued and should not been copied into another queue!", - new Long(expectedId).equals(id)); - expectedId++; - } - assertEquals("Expected message with id " + expectedId + " but got message with id " + id, - new Long(expectedId), id); - expectedId++; - } - } - - /** - * Tests that dequeued message is not moved as part of invocation of - * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String)} - */ - public void testMovedMessagesWithDequeuedEntry() - { - int messageNumber = 4; - int dequeueMessageIndex = 1; - String anotherQueueName = "testQueue2"; - - // put messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message - dequeueMessage(_queue, dequeueMessageIndex); - - // create another queue - SimpleAMQQueue queue = createQueue(anotherQueueName); - - // move messages into another queue - _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName); - - // get messages on another queue - List<QueueEntry> entries = queue.getMessagesOnTheQueue(); - - // assert another queue entries - assertEquals(messageNumber - 1, entries.size()); - int expectedId = 0; - for (int i = 0; i < messageNumber - 1; i++) - { - Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId(); - if (i == dequeueMessageIndex) - { - assertFalse("Message with id " + dequeueMessageIndex - + " was dequeued and should not been copied into another queue!", - new Long(expectedId).equals(id)); - expectedId++; - } - assertEquals("Expected message with id " + expectedId + " but got message with id " + id, - new Long(expectedId), id); - expectedId++; - } - } - - /** - * Tests that messages in given range including dequeued one are deleted - * from the queue on invocation of - * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long)} - */ - public void testRemoveMessagesFromQueueWithDequeuedEntry() - { - int messageNumber = 4; - int dequeueMessageIndex = 1; - - // put messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message - dequeueMessage(_queue, dequeueMessageIndex); - - // remove messages - _queue.removeMessagesFromQueue(0, messageNumber); - - // get queue entries - List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); - - // assert queue entries - assertNotNull("Null is returned from getMessagesOnTheQueue", entries); - assertEquals("Queue should be empty", 0, entries.size()); - } /** * Tests that dequeued message on the top is not accounted and next message @@ -1029,7 +921,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase int dequeueMessageIndex = 1; // create queue with overridden method deliverAsync - SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("test"), + SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), new AMQShortString("test"), false, new AMQShortString("testOwner"), false, false, _virtualHost, null) { @Override @@ -1097,10 +989,10 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that entry in dequeued state are not enqueued and not delivered to subscription */ - public void testEqueueDequeuedEntry() + public void testEnqueueDequeuedEntry() { // create a queue where each even entry is considered a dequeued - SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("test"), false, + SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), new AMQShortString("test"), false, new AMQShortString("testOwner"), false, false, _virtualHost, new QueueEntryListFactory() { public QueueEntryList createQueueEntryList(AMQQueue queue) @@ -1178,7 +1070,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void testActiveConsumerCount() throws Exception { - final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("testActiveConsumerCount"), false, + final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), new AMQShortString("testActiveConsumerCount"), false, new AMQShortString("testOwner"), false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null); //verify adding an active subscription increases the count @@ -1232,29 +1124,37 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); } - /** - * A helper method to create a queue with given name - * - * @param name - * queue name - * @return queue - */ - private SimpleAMQQueue createQueue(String name) + public void testNotificationFiredOnEnqueue() throws Exception { - SimpleAMQQueue queue = null; - try - { - AMQShortString queueName = new AMQShortString(name); - AMQShortString ownerName = new AMQShortString(name + "Owner"); - queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(queueName, false, ownerName, false, false, - _virtualHost, _arguments); - } - catch (AMQException e) - { - fail("Failure to create a queue:" + e.getMessage()); - } - assertNotNull("Queue was not created", queue); - return queue; + AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class); + + _queue.setNotificationListener(listener); + _queue.setMaximumMessageCount(2); + + _queue.enqueue(createMessage(new Long(24))); + verifyZeroInteractions(listener); + + _queue.enqueue(createMessage(new Long(25))); + + verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold")); + } + + public void testNotificationFiredAsync() throws Exception + { + AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class); + + _queue.enqueue(createMessage(new Long(24))); + _queue.enqueue(createMessage(new Long(25))); + _queue.enqueue(createMessage(new Long(26))); + + _queue.setNotificationListener(listener); + _queue.setMaximumMessageCount(2); + + verifyZeroInteractions(listener); + + _queue.checkMessageStatus(); + + verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold")); } /** |