diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java | 64 |
1 files changed, 53 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 76d975a789..dfd9315226 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -98,7 +98,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private final Lock _stateChangeLock = new ReentrantLock(); private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private AMQQueue.Context _queueContext; + private volatile AMQQueue.Context _queueContext; private final AtomicBoolean _deleted = new AtomicBoolean(false); @@ -125,11 +125,15 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private LogActor _logActor; private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); - private UUID _id; + private UUID _qmfId; private String _traceExclude; private String _trace; private final long _createTime = System.currentTimeMillis(); private final AtomicLong _deliveredCount = new AtomicLong(0); + private final AtomicLong _deliveredBytes = new AtomicLong(0); + private final AtomicLong _unacknowledgedCount = new AtomicLong(0); + private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); + private final Map<String, Object> _arguments; private int _deferredMessageCredit; private long _deferredSizeCredit; @@ -185,10 +189,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } _queue = queue; - Map<String, Object> arguments = queue.getArguments() == null ? Collections.EMPTY_MAP : queue.getArguments(); + Map<String, Object> arguments = queue.getArguments(); _traceExclude = (String) arguments.get("qpid.trace.exclude"); _trace = (String) arguments.get("qpid.trace.id"); - _id = getConfigStore().createId(); + _qmfId = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); String filterLogString = null; @@ -199,9 +203,13 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive, filterLogString.length() > 0)); } - } + public String getConsumerName() + { + return _destination; + } + public boolean isSuspended() { return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension @@ -620,10 +628,15 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr _session.sendMessage(xfr, _postIdSettingAction); entry.incrementDeliveryCount(); _deliveredCount.incrementAndGet(); + _deliveredBytes.addAndGet(entry.getSize()); if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED) { forceDequeue(entry, false); } + else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED) + { + recordUnacknowledged(entry); + } } else { @@ -632,6 +645,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } } + void recordUnacknowledged(QueueEntry entry) + { + _unacknowledgedCount.incrementAndGet(); + _unacknowledgedBytes.addAndGet(entry.getSize()); + } + private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) { _deferredMessageCredit += deferredMessageCredit; @@ -653,7 +672,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private void forceDequeue(final QueueEntry entry, final boolean restoreCredit) { - AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); + AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action() { @@ -690,7 +709,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr entry.setRedelivered(); } - if (getSession().isClosing() || !setRedelivered) + if (getSessionModel().isClosing() || !setRedelivered) { entry.decrementDeliveryCount(); } @@ -918,6 +937,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr // TODO Fix Store Context / cleanup if(entry.isAcquiredBy(this)) { + _unacknowledgedBytes.addAndGet(-entry.getSize()); + _unacknowledgedCount.decrementAndGet(); entry.discard(); } } @@ -944,7 +965,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return false; } - ServerSession getSession() + public ServerSession getSessionModel() { return _session; } @@ -952,7 +973,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public SessionConfig getSessionConfig() { - return getSession(); + return getSessionModel(); } public boolean isBrowsing() @@ -990,9 +1011,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return _flowMode.toString(); } - public UUID getId() + @Override + public UUID getQMFId() { - return _id; + return _qmfId; } public String getName() @@ -1073,4 +1095,24 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { _session.getConnection().flush(); } + + public long getBytesOut() + { + return _deliveredBytes.longValue(); + } + + public long getMessagesOut() + { + return _deliveredCount.longValue(); + } + + public long getUnacknowledgedBytes() + { + return _unacknowledgedBytes.longValue(); + } + + public long getUnacknowledgedMessages() + { + return _unacknowledgedCount.longValue(); + } } |