diff options
12 files changed, 66 insertions, 10 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 3f1f354585..69d9ccb431 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -917,8 +917,7 @@ public class QMFService implements ConfigStore.ConfigEventListener public Long getMsgTxnDequeues() { - // TODO - return 0l; + return _obj.getMsgTxnDequeues(); } public Long getMsgPersistEnqueues() @@ -958,8 +957,7 @@ public class QMFService implements ConfigStore.ConfigEventListener public Long getByteTxnDequeues() { - // TODO - return 0l; + return _obj.getByteTxnDequeues(); } public Long getBytePersistEnqueues() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java index 0b5df9b8fa..c3593561bb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java @@ -62,7 +62,11 @@ public interface QueueConfig extends ConfiguredObject<QueueConfigType, QueueConf long getByteTxnEnqueues(); + long getByteTxnDequeues(); + long getMsgTxnEnqueues(); + + long getMsgTxnDequeues(); long getPersistentByteEnqueues(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 81ea2083e0..d3867d8140 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -109,7 +109,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer void requeue(QueueEntryImpl storeContext, Subscription subscription); - void dequeue(QueueEntry entry); + void dequeue(QueueEntry entry, Subscription sub); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index ada7726fc0..bf4015eb7a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -366,13 +366,14 @@ public class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { + Subscription s = null; if (state instanceof SubscriptionAcquiredState) { - Subscription s = ((SubscriptionAcquiredState) state).getSubscription(); + s = ((SubscriptionAcquiredState) state).getSubscription(); s.onDequeue(this); } - getQueue().dequeue(this); + getQueue().dequeue(this,s); if(_stateChangeListeners != null) { notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index d99b551936..df2aec2534 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -125,6 +125,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final AtomicInteger _counsumerCountHigh = new AtomicInteger(0); private final AtomicLong _msgTxnEnqueues = new AtomicLong(0); private final AtomicLong _byteTxnEnqueues = new AtomicLong(0); + private final AtomicLong _msgTxnDequeues = new AtomicLong(0); + private final AtomicLong _byteTxnDequeues = new AtomicLong(0); private final AtomicInteger _bindingCountHigh = new AtomicInteger(); @@ -680,6 +682,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _byteTxnEnqueues.addAndGet(message.getSize()); } } + + private void incrementTxnDequeueStats(QueueEntry entry) + { + _msgTxnDequeues.incrementAndGet(); + _byteTxnDequeues.addAndGet(entry.getSize()); + } private void deliverMessage(final Subscription sub, final QueueEntry entry) throws AMQException @@ -764,7 +772,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener deliverAsync(); } - public void dequeue(QueueEntry entry) + public void dequeue(QueueEntry entry, Subscription sub) { decrementQueueCount(); decrementQueueSize(entry); @@ -772,6 +780,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _deliveredMessages.decrementAndGet(); } + + if(sub != null && sub.isSessionTransactional()) + { + incrementTxnDequeueStats(entry); + } checkCapacity(); @@ -2084,10 +2097,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _byteTxnEnqueues.get(); } + public long getByteTxnDequeues() + { + return _byteTxnDequeues.get(); + } + public long getMsgTxnEnqueues() { return _msgTxnEnqueues.get(); } + + public long getMsgTxnDequeues() + { + return _msgTxnDequeues.get(); + } public long getPersistentByteEnqueues() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 9e9d2da579..0a3576ff42 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -105,4 +105,5 @@ public interface Subscription public Object get(String key); + boolean isSessionTransactional(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index b0ea0ef506..156b05d15c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -766,4 +766,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return null; } + public boolean isSessionTransactional() + { + return _channel.isTransactional(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 54c294c76d..4bad81ec17 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -848,4 +848,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr //TODO return Collections.EMPTY_MAP; } + + public boolean isSessionTransactional() + { + return _session.isTransactional(); + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index b0fbab4146..7063beefca 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -293,7 +293,7 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } - public void dequeue(QueueEntry entry) + public void dequeue(QueueEntry entry, Subscription sub) { //To change body of implemented methods use File | Settings | File Templates. } @@ -562,4 +562,14 @@ public class MockAMQQueue implements AMQQueue { return 0; } + + public long getByteTxnDequeues() + { + return 0; + } + + public long getMsgTxnDequeues() + { + return 0; + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 2abeb7ed30..9346b1eda0 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -437,7 +437,7 @@ public class SimpleAMQQueueTest extends TestCase AMQMessage amqmsg = new AMQMessage(handle); entry.setMessage(amqmsg); - _queue.dequeue(entry); + _queue.dequeue(entry,null); // Check that it is dequeued data = _store.getMessages().get(1L); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index e6fd2172f0..e6367c4468 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -235,4 +235,9 @@ public class MockSubscription implements Subscription { return messages; } + + public boolean isSessionTransactional() + { + return false; + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 352f6ad119..1152797dbf 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -286,4 +286,9 @@ public class SubscriptionTestHelper implements Subscription { return key.toString(); } + + public boolean isSessionTransactional() + { + return false; + } } |