summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-03-04 11:17:32 +0000
committerRobert Gemmell <robbie@apache.org>2010-03-04 11:17:32 +0000
commit11d0830854190774df9d46f0745142e26ec1feb5 (patch)
tree9fc1eaee5e15595fe133215b0fb7fc4fa0e9bee7
parent41dfda502d5b716bb710b2bb4827fd1d7099af9a (diff)
downloadqpid-python-11d0830854190774df9d46f0745142e26ec1feb5.tar.gz
QPID-2379: add ConsumerCountHigh to Queue delegate
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@918938 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java16
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java5
4 files changed, 23 insertions, 3 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 33847683bb..3e155e104c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -981,8 +981,7 @@ public class QMFService implements ConfigStore.ConfigEventListener
public Long getConsumerCountHigh()
{
- // TODO
- return 0l;
+ return (long) _obj.getConsumerCountHigh();
}
public Long getConsumerCountLow()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
index a451091fee..8a5559c155 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
@@ -45,6 +45,8 @@ public interface QueueConfig extends ConfiguredObject<QueueConfigType, QueueConf
long getQueueDepth();
int getConsumerCount();
+
+ int getConsumerCountHigh();
int getBindingCount();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index d25d73b383..c64b9047de 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -119,8 +119,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final AtomicLong _enqueueSize = new AtomicLong();
private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong();
private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
- private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();;
+ private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
+ private final AtomicInteger _counsumerCountHigh = new AtomicInteger(0);
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -406,6 +407,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
subscription.setNoLocal(_nolocal);
}
_subscriptionList.add(subscription);
+
+ //Increment consumerCountHigh if necessary. (un)registerSubscription are both
+ //synchronized methods so we don't need additional synchronization here
+ if(_counsumerCountHigh.get() < getConsumerCount())
+ {
+ _counsumerCountHigh.incrementAndGet();
+ }
+
if (isDeleted())
{
subscription.queueDeleted(this);
@@ -801,6 +810,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return _subscriptionList.size();
}
+
+ public int getConsumerCountHigh()
+ {
+ return _counsumerCountHigh.get();
+ }
public int getActiveConsumerCount()
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 10a828d07c..1a2bcd87c5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -547,4 +547,9 @@ public class MockAMQQueue implements AMQQueue
{
return false;
}
+
+ public int getConsumerCountHigh()
+ {
+ return 0;
+ }
}