summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-03-04 11:19:00 +0000
committerRobert Gemmell <robbie@apache.org>2010-03-04 11:19:00 +0000
commit7d21eac1b9295f0a9d1472eb3805b76c8c22da5a (patch)
treeb75cc7218a71c20fce729e9bb9f82b7db14114a9
parent6f7e476ee694881cecccbe73ae7011c1cdba7ee0 (diff)
downloadqpid-python-7d21eac1b9295f0a9d1472eb3805b76c8c22da5a.tar.gz
QPID-2379: add BytesTxnDequeues and MsgTxnDequeues on Queue delegate
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@918942 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java25
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java12
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java5
12 files changed, 66 insertions, 10 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 3f1f354585..69d9ccb431 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -917,8 +917,7 @@ public class QMFService implements ConfigStore.ConfigEventListener
public Long getMsgTxnDequeues()
{
- // TODO
- return 0l;
+ return _obj.getMsgTxnDequeues();
}
public Long getMsgPersistEnqueues()
@@ -958,8 +957,7 @@ public class QMFService implements ConfigStore.ConfigEventListener
public Long getByteTxnDequeues()
{
- // TODO
- return 0l;
+ return _obj.getByteTxnDequeues();
}
public Long getBytePersistEnqueues()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
index 0b5df9b8fa..c3593561bb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
@@ -62,7 +62,11 @@ public interface QueueConfig extends ConfiguredObject<QueueConfigType, QueueConf
long getByteTxnEnqueues();
+ long getByteTxnDequeues();
+
long getMsgTxnEnqueues();
+
+ long getMsgTxnDequeues();
long getPersistentByteEnqueues();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 81ea2083e0..d3867d8140 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -109,7 +109,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
void requeue(QueueEntryImpl storeContext, Subscription subscription);
- void dequeue(QueueEntry entry);
+ void dequeue(QueueEntry entry, Subscription sub);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index ada7726fc0..bf4015eb7a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -366,13 +366,14 @@ public class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
+ Subscription s = null;
if (state instanceof SubscriptionAcquiredState)
{
- Subscription s = ((SubscriptionAcquiredState) state).getSubscription();
+ s = ((SubscriptionAcquiredState) state).getSubscription();
s.onDequeue(this);
}
- getQueue().dequeue(this);
+ getQueue().dequeue(this,s);
if(_stateChangeListeners != null)
{
notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index d99b551936..df2aec2534 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -125,6 +125,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final AtomicInteger _counsumerCountHigh = new AtomicInteger(0);
private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
+ private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
+ private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -680,6 +682,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_byteTxnEnqueues.addAndGet(message.getSize());
}
}
+
+ private void incrementTxnDequeueStats(QueueEntry entry)
+ {
+ _msgTxnDequeues.incrementAndGet();
+ _byteTxnDequeues.addAndGet(entry.getSize());
+ }
private void deliverMessage(final Subscription sub, final QueueEntry entry)
throws AMQException
@@ -764,7 +772,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
deliverAsync();
}
- public void dequeue(QueueEntry entry)
+ public void dequeue(QueueEntry entry, Subscription sub)
{
decrementQueueCount();
decrementQueueSize(entry);
@@ -772,6 +780,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
_deliveredMessages.decrementAndGet();
}
+
+ if(sub != null && sub.isSessionTransactional())
+ {
+ incrementTxnDequeueStats(entry);
+ }
checkCapacity();
@@ -2084,10 +2097,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _byteTxnEnqueues.get();
}
+ public long getByteTxnDequeues()
+ {
+ return _byteTxnDequeues.get();
+ }
+
public long getMsgTxnEnqueues()
{
return _msgTxnEnqueues.get();
}
+
+ public long getMsgTxnDequeues()
+ {
+ return _msgTxnDequeues.get();
+ }
public long getPersistentByteEnqueues()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 9e9d2da579..0a3576ff42 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -105,4 +105,5 @@ public interface Subscription
public Object get(String key);
+ boolean isSessionTransactional();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index b0ea0ef506..156b05d15c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -766,4 +766,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return null;
}
+ public boolean isSessionTransactional()
+ {
+ return _channel.isTransactional();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 54c294c76d..4bad81ec17 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -848,4 +848,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
//TODO
return Collections.EMPTY_MAP;
}
+
+ public boolean isSessionTransactional()
+ {
+ return _session.isTransactional();
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index b0fbab4146..7063beefca 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -293,7 +293,7 @@ public class MockAMQQueue implements AMQQueue
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeue(QueueEntry entry)
+ public void dequeue(QueueEntry entry, Subscription sub)
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -562,4 +562,14 @@ public class MockAMQQueue implements AMQQueue
{
return 0;
}
+
+ public long getByteTxnDequeues()
+ {
+ return 0;
+ }
+
+ public long getMsgTxnDequeues()
+ {
+ return 0;
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 2abeb7ed30..9346b1eda0 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -437,7 +437,7 @@ public class SimpleAMQQueueTest extends TestCase
AMQMessage amqmsg = new AMQMessage(handle);
entry.setMessage(amqmsg);
- _queue.dequeue(entry);
+ _queue.dequeue(entry,null);
// Check that it is dequeued
data = _store.getMessages().get(1L);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index e6fd2172f0..e6367c4468 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -235,4 +235,9 @@ public class MockSubscription implements Subscription
{
return messages;
}
+
+ public boolean isSessionTransactional()
+ {
+ return false;
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 352f6ad119..1152797dbf 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -286,4 +286,9 @@ public class SubscriptionTestHelper implements Subscription
{
return key.toString();
}
+
+ public boolean isSessionTransactional()
+ {
+ return false;
+ }
}