summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java64
1 files changed, 53 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 76d975a789..dfd9315226 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -98,7 +98,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private final Lock _stateChangeLock = new ReentrantLock();
private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private AMQQueue.Context _queueContext;
+ private volatile AMQQueue.Context _queueContext;
private final AtomicBoolean _deleted = new AtomicBoolean(false);
@@ -125,11 +125,15 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private LogActor _logActor;
private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
- private UUID _id;
+ private UUID _qmfId;
private String _traceExclude;
private String _trace;
private final long _createTime = System.currentTimeMillis();
private final AtomicLong _deliveredCount = new AtomicLong(0);
+ private final AtomicLong _deliveredBytes = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+
private final Map<String, Object> _arguments;
private int _deferredMessageCredit;
private long _deferredSizeCredit;
@@ -185,10 +189,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
_queue = queue;
- Map<String, Object> arguments = queue.getArguments() == null ? Collections.EMPTY_MAP : queue.getArguments();
+ Map<String, Object> arguments = queue.getArguments();
_traceExclude = (String) arguments.get("qpid.trace.exclude");
_trace = (String) arguments.get("qpid.trace.id");
- _id = getConfigStore().createId();
+ _qmfId = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
String filterLogString = null;
@@ -199,9 +203,13 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
filterLogString.length() > 0));
}
-
}
+ public String getConsumerName()
+ {
+ return _destination;
+ }
+
public boolean isSuspended()
{
return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
@@ -620,10 +628,15 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
_session.sendMessage(xfr, _postIdSettingAction);
entry.incrementDeliveryCount();
_deliveredCount.incrementAndGet();
+ _deliveredBytes.addAndGet(entry.getSize());
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
{
forceDequeue(entry, false);
}
+ else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ recordUnacknowledged(entry);
+ }
}
else
{
@@ -632,6 +645,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
}
+ void recordUnacknowledged(QueueEntry entry)
+ {
+ _unacknowledgedCount.incrementAndGet();
+ _unacknowledgedBytes.addAndGet(entry.getSize());
+ }
+
private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
{
_deferredMessageCredit += deferredMessageCredit;
@@ -653,7 +672,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
{
- AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
+ AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
new ServerTransaction.Action()
{
@@ -690,7 +709,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
entry.setRedelivered();
}
- if (getSession().isClosing() || !setRedelivered)
+ if (getSessionModel().isClosing() || !setRedelivered)
{
entry.decrementDeliveryCount();
}
@@ -918,6 +937,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
// TODO Fix Store Context / cleanup
if(entry.isAcquiredBy(this))
{
+ _unacknowledgedBytes.addAndGet(-entry.getSize());
+ _unacknowledgedCount.decrementAndGet();
entry.discard();
}
}
@@ -944,7 +965,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return false;
}
- ServerSession getSession()
+ public ServerSession getSessionModel()
{
return _session;
}
@@ -952,7 +973,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public SessionConfig getSessionConfig()
{
- return getSession();
+ return getSessionModel();
}
public boolean isBrowsing()
@@ -990,9 +1011,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return _flowMode.toString();
}
- public UUID getId()
+ @Override
+ public UUID getQMFId()
{
- return _id;
+ return _qmfId;
}
public String getName()
@@ -1073,4 +1095,24 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
_session.getConnection().flush();
}
+
+ public long getBytesOut()
+ {
+ return _deliveredBytes.longValue();
+ }
+
+ public long getMessagesOut()
+ {
+ return _deliveredCount.longValue();
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return _unacknowledgedBytes.longValue();
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return _unacknowledgedCount.longValue();
+ }
}