summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-03-04 11:19:00 +0000
committerRobert Gemmell <robbie@apache.org>2010-03-04 11:19:00 +0000
commit7d21eac1b9295f0a9d1472eb3805b76c8c22da5a (patch)
treeb75cc7218a71c20fce729e9bb9f82b7db14114a9 /qpid/java/broker/src/main/java
parent6f7e476ee694881cecccbe73ae7011c1cdba7ee0 (diff)
downloadqpid-python-7d21eac1b9295f0a9d1472eb3805b76c8c22da5a.tar.gz
QPID-2379: add BytesTxnDequeues and MsgTxnDequeues on Queue delegate
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@918942 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java25
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java5
8 files changed, 44 insertions, 8 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 3f1f354585..69d9ccb431 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -917,8 +917,7 @@ public class QMFService implements ConfigStore.ConfigEventListener
public Long getMsgTxnDequeues()
{
- // TODO
- return 0l;
+ return _obj.getMsgTxnDequeues();
}
public Long getMsgPersistEnqueues()
@@ -958,8 +957,7 @@ public class QMFService implements ConfigStore.ConfigEventListener
public Long getByteTxnDequeues()
{
- // TODO
- return 0l;
+ return _obj.getByteTxnDequeues();
}
public Long getBytePersistEnqueues()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
index 0b5df9b8fa..c3593561bb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
@@ -62,7 +62,11 @@ public interface QueueConfig extends ConfiguredObject<QueueConfigType, QueueConf
long getByteTxnEnqueues();
+ long getByteTxnDequeues();
+
long getMsgTxnEnqueues();
+
+ long getMsgTxnDequeues();
long getPersistentByteEnqueues();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 81ea2083e0..d3867d8140 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -109,7 +109,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
void requeue(QueueEntryImpl storeContext, Subscription subscription);
- void dequeue(QueueEntry entry);
+ void dequeue(QueueEntry entry, Subscription sub);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index ada7726fc0..bf4015eb7a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -366,13 +366,14 @@ public class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
+ Subscription s = null;
if (state instanceof SubscriptionAcquiredState)
{
- Subscription s = ((SubscriptionAcquiredState) state).getSubscription();
+ s = ((SubscriptionAcquiredState) state).getSubscription();
s.onDequeue(this);
}
- getQueue().dequeue(this);
+ getQueue().dequeue(this,s);
if(_stateChangeListeners != null)
{
notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index d99b551936..df2aec2534 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -125,6 +125,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final AtomicInteger _counsumerCountHigh = new AtomicInteger(0);
private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
+ private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
+ private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -680,6 +682,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_byteTxnEnqueues.addAndGet(message.getSize());
}
}
+
+ private void incrementTxnDequeueStats(QueueEntry entry)
+ {
+ _msgTxnDequeues.incrementAndGet();
+ _byteTxnDequeues.addAndGet(entry.getSize());
+ }
private void deliverMessage(final Subscription sub, final QueueEntry entry)
throws AMQException
@@ -764,7 +772,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
deliverAsync();
}
- public void dequeue(QueueEntry entry)
+ public void dequeue(QueueEntry entry, Subscription sub)
{
decrementQueueCount();
decrementQueueSize(entry);
@@ -772,6 +780,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
_deliveredMessages.decrementAndGet();
}
+
+ if(sub != null && sub.isSessionTransactional())
+ {
+ incrementTxnDequeueStats(entry);
+ }
checkCapacity();
@@ -2084,10 +2097,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _byteTxnEnqueues.get();
}
+ public long getByteTxnDequeues()
+ {
+ return _byteTxnDequeues.get();
+ }
+
public long getMsgTxnEnqueues()
{
return _msgTxnEnqueues.get();
}
+
+ public long getMsgTxnDequeues()
+ {
+ return _msgTxnDequeues.get();
+ }
public long getPersistentByteEnqueues()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 9e9d2da579..0a3576ff42 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -105,4 +105,5 @@ public interface Subscription
public Object get(String key);
+ boolean isSessionTransactional();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index b0ea0ef506..156b05d15c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -766,4 +766,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return null;
}
+ public boolean isSessionTransactional()
+ {
+ return _channel.isTransactional();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 54c294c76d..4bad81ec17 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -848,4 +848,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
//TODO
return Collections.EMPTY_MAP;
}
+
+ public boolean isSessionTransactional()
+ {
+ return _session.isTransactional();
+ }
}