summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-02-19 17:49:23 +0000
committerRobert Gemmell <robbie@apache.org>2012-02-19 17:49:23 +0000
commit5c03ef9789f31918b23ed4579e7bfb8531ffa509 (patch)
tree54a62f465669da09207fb010278f7b6ad3bc8db3
parent71b1f1af3bd41685e3094e9dbf808cacd92b4f9f (diff)
downloadqpid-python-5c03ef9789f31918b23ed4579e7bfb8531ffa509.tar.gz
QPID-3855: only increment activeConsumerCount during registration if the subscription is active. Add unit test to identify the issue and check behaviour following various state change notifications.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1291026 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java7
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java56
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java8
3 files changed, 68 insertions, 3 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index c6d634fb28..891a492b7f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -100,7 +100,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private Exchange _alternateExchange;
- private final QueueEntryList _entries;
+ private final QueueEntryList<QueueEntry> _entries;
private final SubscriptionList _subscriptionList = new SubscriptionList();
@@ -449,7 +449,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- _activeSubscriberCount.incrementAndGet();
+ if(subscription.isActive())
+ {
+ _activeSubscriberCount.incrementAndGet();
+ }
subscription.setStateListener(this);
subscription.setQueueContext(new QueueContext(_entries.getHead()));
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index c82cb9f429..c345384e28 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -1185,6 +1185,62 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
((AMQMessage) messages.get(1).getMessage()).getMessageId());
}
+ public void testActiveConsumerCount() throws Exception
+ {
+ final SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("testActiveConsumerCount"), false, new AMQShortString("testOwner"),
+ false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
+
+ //verify adding an active subscription increases the count
+ final MockSubscription subscription1 = new MockSubscription();
+ subscription1.setActive(true);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+ queue.registerSubscription(subscription1, false);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify adding an inactive subscription doesn't increase the count
+ final MockSubscription subscription2 = new MockSubscription();
+ subscription2.setActive(false);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+ queue.registerSubscription(subscription2, false);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify behaviour in face of expected state changes:
+
+ //verify a subscription going suspended->active increases the count
+ queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
+
+ //verify a subscription going active->suspended decreases the count
+ queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going suspended->closed doesn't change the count
+ queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going active->closed decreases the count
+ queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
+ //verify behaviour in face of unexpected state changes:
+
+ //verify a subscription going closed->active increases the count
+ queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going active->active doesn't change the count
+ queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going closed->suspended doesn't change the count
+ queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going suspended->suspended doesn't change the count
+ queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+ }
+
/**
* A helper method to create a queue with given name
*
diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 3f76c90c12..1d6ccfbbc2 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -55,6 +55,7 @@ public class MockSubscription implements Subscription
private static final AtomicLong idGenerator = new AtomicLong(0);
// Create a simple ID that increments for ever new Subscription
private final long _subscriptionID = idGenerator.getAndIncrement();
+ private boolean _isActive = true;
public MockSubscription()
{
@@ -150,7 +151,7 @@ public class MockSubscription implements Subscription
public boolean isActive()
{
- return true;
+ return _isActive ;
}
public void confirmAutoClose()
@@ -275,4 +276,9 @@ public class MockSubscription implements Subscription
{
return false;
}
+
+ public void setActive(final boolean isActive)
+ {
+ _isActive = isActive;
+ }
}