diff options
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java | 109 |
1 files changed, 77 insertions, 32 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 28d52f4fd1..79c744902d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -21,12 +21,8 @@ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.PropertiesConfiguration; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; @@ -49,12 +45,17 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + public class SimpleAMQQueueTest extends InternalBrokerBaseCase { @@ -68,7 +69,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase protected MockSubscription _subscription = new MockSubscription(); protected FieldTable _arguments = null; - MessagePublishInfo info = new MessagePublishInfo() + private MessagePublishInfo info = new MessagePublishInfo() { public AMQShortString getExchange() @@ -196,7 +197,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase { } assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry); + assertNull(((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); // Check removing the subscription removes it's information from the queue _queue.unregisterSubscription(_subscription); @@ -218,7 +219,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.registerSubscription(_subscription, false); Thread.sleep(150); assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext())._releasedEntry); + assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); } /** @@ -233,7 +234,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.registerSubscription(_subscription, false); Thread.sleep(150); assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext())._releasedEntry); + assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); } /** @@ -280,7 +281,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); } /** @@ -324,7 +325,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); } @@ -375,7 +376,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); } @@ -418,8 +419,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext()).getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext()).getReleasedEntry()); } public void testExclusiveConsumer() throws AMQException @@ -632,7 +633,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // Send persistent message qs.add(_queue); - MessageMetaData metaData = msg.headersReceived(); + MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis()); StoredMessage handle = _store.addMessage(metaData); msg.setStoredMessage(handle); @@ -837,7 +838,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message is not copied as part of invocation of - * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, ServerTransaction)} + * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String)} */ public void testCopyMessagesWithDequeuedEntry() { @@ -854,14 +855,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // create another queue SimpleAMQQueue queue = createQueue(anotherQueueName); - // create transaction - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); - // copy messages into another queue - _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); - - // commit transaction - txn.commit(); + _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName); // get messages on another queue List<QueueEntry> entries = queue.getMessagesOnTheQueue(); @@ -887,7 +882,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message is not moved as part of invocation of - * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, ServerTransaction)} + * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String)} */ public void testMovedMessagesWithDequeuedEntry() { @@ -904,14 +899,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // create another queue SimpleAMQQueue queue = createQueue(anotherQueueName); - // create transaction - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); - // move messages into another queue - _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); - - // commit transaction - txn.commit(); + _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName); // get messages on another queue List<QueueEntry> entries = queue.getMessagesOnTheQueue(); @@ -1183,6 +1172,62 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase ((AMQMessage) messages.get(1).getMessage()).getMessageId()); } + public void testActiveConsumerCount() throws Exception + { + final SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("testActiveConsumerCount"), false, new AMQShortString("testOwner"), + false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null); + + //verify adding an active subscription increases the count + final MockSubscription subscription1 = new MockSubscription(); + subscription1.setActive(true); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); + queue.registerSubscription(subscription1, false); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify adding an inactive subscription doesn't increase the count + final MockSubscription subscription2 = new MockSubscription(); + subscription2.setActive(false); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + queue.registerSubscription(subscription2, false); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify behaviour in face of expected state changes: + + //verify a subscription going suspended->active increases the count + queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE); + assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount()); + + //verify a subscription going active->suspended decreases the count + queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a subscription going suspended->closed doesn't change the count + queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a subscription going active->closed decreases the count + queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); + + //verify behaviour in face of unexpected state changes: + + //verify a subscription going closed->active increases the count + queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a subscription going active->active doesn't change the count + queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a subscription going closed->suspended doesn't change the count + queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a subscription going suspended->suspended doesn't change the count + queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + } + /** * A helper method to create a queue with given name * |