diff options
4 files changed, 16 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 6d360b2084..54f1db845e 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -1404,8 +1404,7 @@ public class QMFService implements ConfigStore.ConfigEventListener public Long getDelivered() { - // TODO - return 0l; + return _obj.getDelivered(); } public UUID getId() diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/SubscriptionConfig.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/SubscriptionConfig.java index 985ecb2be9..b101d70553 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/SubscriptionConfig.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/SubscriptionConfig.java @@ -43,5 +43,5 @@ public interface SubscriptionConfig extends ConfiguredObject<SubscriptionConfigT boolean isExplicitAcknowledge(); - + Long getDelivered(); }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index c548f3ccad..eed57fbf39 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -94,6 +94,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private LogSubject _logSubject; private LogActor _logActor; private UUID _id; + private final AtomicLong _deliveredCount = new AtomicLong(0); private long _createTime = System.currentTimeMillis(); @@ -340,7 +341,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage { return getQueue().getConfigStore(); } - + + public Long getDelivered() + { + return _deliveredCount.get(); + } public synchronized void setQueue(AMQQueue queue, boolean exclusive) { @@ -648,6 +653,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage throws AMQException { _deliveryMethod.deliverToClient(this,entry,deliveryTag); + _deliveredCount.incrementAndGet(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 9d2f3506cd..5b3f5250c5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -101,6 +101,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private String _traceExclude; private String _trace; private long _createTime = System.currentTimeMillis(); + private final AtomicLong _deliveredCount = new AtomicLong(0); public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode, @@ -257,6 +258,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { return getQueue().getConfigStore(); } + + public Long getDelivered() + { + return _deliveredCount.get(); + } public void creditStateChanged(boolean hasCredit) { @@ -558,7 +564,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } _session.sendMessage(xfr, _postIdSettingAction); - + _deliveredCount.incrementAndGet(); if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED) { forceDequeue(entry, false); |