diff options
author | Robert Gemmell <robbie@apache.org> | 2010-03-04 11:17:48 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2010-03-04 11:17:48 +0000 |
commit | 380f3fa63615c12b6b002924d86b15441bbde65b (patch) | |
tree | b70ecf877f491546e79876f1d87334b404ff1dde | |
parent | 11d0830854190774df9d46f0745142e26ec1feb5 (diff) | |
download | qpid-python-380f3fa63615c12b6b002924d86b15441bbde65b.tar.gz |
QPID-2379: add TxnStarts, TxnCommits, TxnRejects, TxnCount on Session delegate
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@918939 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 111 insertions, 10 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 3e155e104c..b6c06f7f34 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -1300,26 +1300,22 @@ public class QMFService implements ConfigStore.ConfigEventListener public Long getTxnStarts() { - // TODO - return 0l; + return _obj.getTxnStarts(); } public Long getTxnCommits() { - // TODO - return 0l; + return _obj.getTxnCommits(); } public Long getTxnRejects() { - // TODO - return 0l; + return _obj.getTxnRejects(); } public Long getTxnCount() { - // TODO - return 0l; + return _obj.getTxnCount(); } public Long getClientCredit() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1b03ee2334..15ac52305f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -84,6 +84,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; public class AMQChannel implements SessionConfig { @@ -132,6 +133,11 @@ public class AMQChannel implements SessionConfig private final AtomicBoolean _suspended = new AtomicBoolean(false); private ServerTransaction _transaction; + + private final AtomicLong _txnStarts = new AtomicLong(0); + private final AtomicLong _txnCommits = new AtomicLong(0); + private final AtomicLong _txnRejects = new AtomicLong(0); + private final AtomicLong _txnCount = new AtomicLong(0); // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; @@ -180,6 +186,7 @@ public class AMQChannel implements SessionConfig public void setLocalTransactional() { _transaction = new LocalTransaction(_messageStore); + _txnStarts.incrementAndGet(); } public boolean isTransactional() @@ -189,6 +196,40 @@ public class AMQChannel implements SessionConfig // theory return !(_transaction instanceof AutoCommitTransaction); } + + private void incrementOutstandingTxnsIfNecessary() + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 1 if 0. + _txnCount.compareAndSet(0,1); + } + + private void decrementOutstandingTxnsIfNecessary() + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 0 if 1. + _txnCount.compareAndSet(1,0); + } + + public Long getTxnStarts() + { + return _txnStarts.get(); + } + + public Long getTxnCommits() + { + return _txnCommits.get(); + } + + public Long getTxnRejects() + { + return _txnRejects.get(); + } + + public Long getTxnCount() + { + return _txnCount.get(); + } public int getChannelId() { @@ -278,7 +319,7 @@ public class AMQChannel implements SessionConfig else { _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues)); - + incrementOutstandingTxnsIfNecessary(); } } } @@ -845,6 +886,9 @@ public class AMQChannel implements SessionConfig _transaction.commit(); + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); } public void rollback() throws AMQException @@ -877,6 +921,10 @@ public class AMQChannel implements SessionConfig finally { _rollingBack = false; + + _txnRejects.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); } postRollbackTask.run(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java index ae01ab25ea..e46e951588 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java @@ -38,4 +38,12 @@ public interface SessionConfig extends ConfiguredObject<SessionConfigType, Sessi Long getExpiryTime(); Long getMaxClientRate(); + + Long getTxnStarts(); + + Long getTxnCommits(); + + Long getTxnRejects(); + + Long getTxnCount(); }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 3e48ac2619..a65f3938a6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -61,6 +61,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; public class ServerSession extends Session implements PrincipalHolder, SessionConfig { @@ -92,6 +93,11 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); private ServerTransaction _transaction; + + private final AtomicLong _txnStarts = new AtomicLong(0); + private final AtomicLong _txnCommits = new AtomicLong(0); + private final AtomicLong _txnRejects = new AtomicLong(0); + private final AtomicLong _txnCount = new AtomicLong(0); private Principal _principal; @@ -160,7 +166,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo } }); - + incrementOutstandingTxnsIfNecessary(); } @@ -391,13 +397,56 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public void commit() { _transaction.commit(); + + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); } public void rollback() { _transaction.rollback(); + + _txnRejects.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } + + + private void incrementOutstandingTxnsIfNecessary() + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 1 if 0. + _txnCount.compareAndSet(0,1); + } + + private void decrementOutstandingTxnsIfNecessary() + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 0 if 1. + _txnCount.compareAndSet(1,0); + } + + public Long getTxnStarts() + { + return _txnStarts.get(); } + public Long getTxnCommits() + { + return _txnCommits.get(); + } + + public Long getTxnRejects() + { + return _txnRejects.get(); + } + + public Long getTxnCount() + { + return _txnCount.get(); + } + public Principal getPrincipal() { return _principal; |