summaryrefslogtreecommitdiff
path: root/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java198
1 files changed, 49 insertions, 149 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 52ad4a7c5b..2cd423d4c9 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -21,6 +21,13 @@
package org.apache.qpid.server.queue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
@@ -43,7 +50,6 @@ import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -80,7 +86,6 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
public void setExchange(AMQShortString exchange)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public boolean isImmediate()
@@ -108,11 +113,11 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
PropertiesConfiguration env = new PropertiesConfiguration();
final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration(getClass().getName(), env);
- vhostConfig.setMessageStoreFactoryClass(TestableMemoryMessageStoreFactory.class.getName());
+ vhostConfig.setMessageStoreClass(TestableMemoryMessageStore.class.getName());
_virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), vhostConfig);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, _virtualHost, _arguments);
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), false, false, _virtualHost, FieldTable.convertToMap(_arguments));
_exchange = (DirectExchange)_virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
}
@@ -128,7 +133,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
_queue.stop();
try {
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, false, _virtualHost, _arguments );
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, _owner.asString(), false, false, _virtualHost, FieldTable.convertToMap(_arguments));
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
@@ -138,7 +143,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
}
try {
- _queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP);
+ _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
@@ -147,8 +152,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
e.getMessage().contains("Host"));
}
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
- false, _virtualHost, _arguments);
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), false,
+ false, _virtualHost, FieldTable.convertToMap(_arguments));
assertNotNull("Queue was not created", _queue);
}
@@ -480,7 +485,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
public void testAutoDeleteQueue() throws Exception
{
_queue.stop();
- _queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP);
+ _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP);
_queue.setDeleteOnNoConsumers(true);
_queue.registerSubscription(_subscription, false);
AMQMessage message = createMessage(new Long(25));
@@ -692,7 +697,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
public void testProcessQueueWithUniqueSelectors() throws Exception
{
TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
- SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), "testQueue", false,"testOwner",
+ SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testQueue", false,"testOwner",
false, false, _virtualHost, factory, null)
{
@Override
@@ -840,120 +845,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
}
}
- /**
- * Tests that dequeued message is not copied as part of invocation of
- * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String)}
- */
- public void testCopyMessagesWithDequeuedEntry()
- {
- int messageNumber = 4;
- int dequeueMessageIndex = 1;
- String anotherQueueName = "testQueue2";
-
- // put test messages into a test queue
- enqueueGivenNumberOfMessages(_queue, messageNumber);
-
- // dequeue message
- dequeueMessage(_queue, dequeueMessageIndex);
-
- // create another queue
- SimpleAMQQueue queue = createQueue(anotherQueueName);
-
- // copy messages into another queue
- _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName);
- // get messages on another queue
- List<QueueEntry> entries = queue.getMessagesOnTheQueue();
-
- // assert another queue entries
- assertEquals(messageNumber - 1, entries.size());
- int expectedId = 0;
- for (int i = 0; i < messageNumber - 1; i++)
- {
- Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId();
- if (i == dequeueMessageIndex)
- {
- assertFalse("Message with id " + dequeueMessageIndex
- + " was dequeued and should not been copied into another queue!",
- new Long(expectedId).equals(id));
- expectedId++;
- }
- assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
- new Long(expectedId), id);
- expectedId++;
- }
- }
-
- /**
- * Tests that dequeued message is not moved as part of invocation of
- * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String)}
- */
- public void testMovedMessagesWithDequeuedEntry()
- {
- int messageNumber = 4;
- int dequeueMessageIndex = 1;
- String anotherQueueName = "testQueue2";
-
- // put messages into a test queue
- enqueueGivenNumberOfMessages(_queue, messageNumber);
-
- // dequeue message
- dequeueMessage(_queue, dequeueMessageIndex);
-
- // create another queue
- SimpleAMQQueue queue = createQueue(anotherQueueName);
-
- // move messages into another queue
- _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName);
-
- // get messages on another queue
- List<QueueEntry> entries = queue.getMessagesOnTheQueue();
-
- // assert another queue entries
- assertEquals(messageNumber - 1, entries.size());
- int expectedId = 0;
- for (int i = 0; i < messageNumber - 1; i++)
- {
- Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId();
- if (i == dequeueMessageIndex)
- {
- assertFalse("Message with id " + dequeueMessageIndex
- + " was dequeued and should not been copied into another queue!",
- new Long(expectedId).equals(id));
- expectedId++;
- }
- assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
- new Long(expectedId), id);
- expectedId++;
- }
- }
-
- /**
- * Tests that messages in given range including dequeued one are deleted
- * from the queue on invocation of
- * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long)}
- */
- public void testRemoveMessagesFromQueueWithDequeuedEntry()
- {
- int messageNumber = 4;
- int dequeueMessageIndex = 1;
-
- // put messages into a test queue
- enqueueGivenNumberOfMessages(_queue, messageNumber);
-
- // dequeue message
- dequeueMessage(_queue, dequeueMessageIndex);
-
- // remove messages
- _queue.removeMessagesFromQueue(0, messageNumber);
-
- // get queue entries
- List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
-
- // assert queue entries
- assertNotNull("Null is returned from getMessagesOnTheQueue", entries);
- assertEquals("Queue should be empty", 0, entries.size());
- }
/**
* Tests that dequeued message on the top is not accounted and next message
@@ -1029,7 +921,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
int dequeueMessageIndex = 1;
// create queue with overridden method deliverAsync
- SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("test"),
+ SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), new AMQShortString("test"),
false, new AMQShortString("testOwner"), false, false, _virtualHost, null)
{
@Override
@@ -1097,10 +989,10 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that entry in dequeued state are not enqueued and not delivered to subscription
*/
- public void testEqueueDequeuedEntry()
+ public void testEnqueueDequeuedEntry()
{
// create a queue where each even entry is considered a dequeued
- SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("test"), false,
+ SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), new AMQShortString("test"), false,
new AMQShortString("testOwner"), false, false, _virtualHost, new QueueEntryListFactory()
{
public QueueEntryList createQueueEntryList(AMQQueue queue)
@@ -1178,7 +1070,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
public void testActiveConsumerCount() throws Exception
{
- final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("testActiveConsumerCount"), false,
+ final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), new AMQShortString("testActiveConsumerCount"), false,
new AMQShortString("testOwner"), false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
//verify adding an active subscription increases the count
@@ -1232,29 +1124,37 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
}
- /**
- * A helper method to create a queue with given name
- *
- * @param name
- * queue name
- * @return queue
- */
- private SimpleAMQQueue createQueue(String name)
+ public void testNotificationFiredOnEnqueue() throws Exception
{
- SimpleAMQQueue queue = null;
- try
- {
- AMQShortString queueName = new AMQShortString(name);
- AMQShortString ownerName = new AMQShortString(name + "Owner");
- queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(queueName, false, ownerName, false, false,
- _virtualHost, _arguments);
- }
- catch (AMQException e)
- {
- fail("Failure to create a queue:" + e.getMessage());
- }
- assertNotNull("Queue was not created", queue);
- return queue;
+ AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class);
+
+ _queue.setNotificationListener(listener);
+ _queue.setMaximumMessageCount(2);
+
+ _queue.enqueue(createMessage(new Long(24)));
+ verifyZeroInteractions(listener);
+
+ _queue.enqueue(createMessage(new Long(25)));
+
+ verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
+ }
+
+ public void testNotificationFiredAsync() throws Exception
+ {
+ AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class);
+
+ _queue.enqueue(createMessage(new Long(24)));
+ _queue.enqueue(createMessage(new Long(25)));
+ _queue.enqueue(createMessage(new Long(26)));
+
+ _queue.setNotificationListener(listener);
+ _queue.setMaximumMessageCount(2);
+
+ verifyZeroInteractions(listener);
+
+ _queue.checkMessageStatus();
+
+ verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
}
/**