diff options
author | Robert Gemmell <robbie@apache.org> | 2012-02-19 17:49:23 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-02-19 17:49:23 +0000 |
commit | 5c03ef9789f31918b23ed4579e7bfb8531ffa509 (patch) | |
tree | 54a62f465669da09207fb010278f7b6ad3bc8db3 | |
parent | 71b1f1af3bd41685e3094e9dbf808cacd92b4f9f (diff) | |
download | qpid-python-5c03ef9789f31918b23ed4579e7bfb8531ffa509.tar.gz |
QPID-3855: only increment activeConsumerCount during registration if the subscription is active. Add unit test to identify the issue and check behaviour following various state change notifications.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1291026 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 68 insertions, 3 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index c6d634fb28..891a492b7f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -100,7 +100,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private Exchange _alternateExchange; - private final QueueEntryList _entries; + private final QueueEntryList<QueueEntry> _entries; private final SubscriptionList _subscriptionList = new SubscriptionList(); @@ -449,7 +449,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - _activeSubscriberCount.incrementAndGet(); + if(subscription.isActive()) + { + _activeSubscriberCount.incrementAndGet(); + } subscription.setStateListener(this); subscription.setQueueContext(new QueueContext(_entries.getHead())); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index c82cb9f429..c345384e28 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -1185,6 +1185,62 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase ((AMQMessage) messages.get(1).getMessage()).getMessageId()); } + public void testActiveConsumerCount() throws Exception + { + final SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("testActiveConsumerCount"), false, new AMQShortString("testOwner"), + false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null); + + //verify adding an active subscription increases the count + final MockSubscription subscription1 = new MockSubscription(); + subscription1.setActive(true); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); + queue.registerSubscription(subscription1, false); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify adding an inactive subscription doesn't increase the count + final MockSubscription subscription2 = new MockSubscription(); + subscription2.setActive(false); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + queue.registerSubscription(subscription2, false); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify behaviour in face of expected state changes: + + //verify a subscription going suspended->active increases the count + queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE); + assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount()); + + //verify a subscription going active->suspended decreases the count + queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a subscription going suspended->closed doesn't change the count + queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a subscription going active->closed decreases the count + queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); + + //verify behaviour in face of unexpected state changes: + + //verify a subscription going closed->active increases the count + queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a subscription going active->active doesn't change the count + queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a subscription going closed->suspended doesn't change the count + queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a subscription going suspended->suspended doesn't change the count + queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + } + /** * A helper method to create a queue with given name * diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 3f76c90c12..1d6ccfbbc2 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -55,6 +55,7 @@ public class MockSubscription implements Subscription private static final AtomicLong idGenerator = new AtomicLong(0); // Create a simple ID that increments for ever new Subscription private final long _subscriptionID = idGenerator.getAndIncrement(); + private boolean _isActive = true; public MockSubscription() { @@ -150,7 +151,7 @@ public class MockSubscription implements Subscription public boolean isActive() { - return true; + return _isActive ; } public void confirmAutoClose() @@ -275,4 +276,9 @@ public class MockSubscription implements Subscription { return false; } + + public void setActive(final boolean isActive) + { + _isActive = isActive; + } } |