summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-03-04 11:17:48 +0000
committerRobert Gemmell <robbie@apache.org>2010-03-04 11:17:48 +0000
commit380f3fa63615c12b6b002924d86b15441bbde65b (patch)
treeb70ecf877f491546e79876f1d87334b404ff1dde
parent11d0830854190774df9d46f0745142e26ec1feb5 (diff)
downloadqpid-python-380f3fa63615c12b6b002924d86b15441bbde65b.tar.gz
QPID-2379: add TxnStarts, TxnCommits, TxnRejects, TxnCount on Session delegate
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@918939 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java50
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java51
4 files changed, 111 insertions, 10 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 3e155e104c..b6c06f7f34 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -1300,26 +1300,22 @@ public class QMFService implements ConfigStore.ConfigEventListener
public Long getTxnStarts()
{
- // TODO
- return 0l;
+ return _obj.getTxnStarts();
}
public Long getTxnCommits()
{
- // TODO
- return 0l;
+ return _obj.getTxnCommits();
}
public Long getTxnRejects()
{
- // TODO
- return 0l;
+ return _obj.getTxnRejects();
}
public Long getTxnCount()
{
- // TODO
- return 0l;
+ return _obj.getTxnCount();
}
public Long getClientCredit()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1b03ee2334..15ac52305f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -84,6 +84,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQChannel implements SessionConfig
{
@@ -132,6 +133,11 @@ public class AMQChannel implements SessionConfig
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private ServerTransaction _transaction;
+
+ private final AtomicLong _txnStarts = new AtomicLong(0);
+ private final AtomicLong _txnCommits = new AtomicLong(0);
+ private final AtomicLong _txnRejects = new AtomicLong(0);
+ private final AtomicLong _txnCount = new AtomicLong(0);
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
@@ -180,6 +186,7 @@ public class AMQChannel implements SessionConfig
public void setLocalTransactional()
{
_transaction = new LocalTransaction(_messageStore);
+ _txnStarts.incrementAndGet();
}
public boolean isTransactional()
@@ -189,6 +196,40 @@ public class AMQChannel implements SessionConfig
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
+
+ private void incrementOutstandingTxnsIfNecessary()
+ {
+ //There can currently only be at most one outstanding transaction
+ //due to only having LocalTransaction support. Set value to 1 if 0.
+ _txnCount.compareAndSet(0,1);
+ }
+
+ private void decrementOutstandingTxnsIfNecessary()
+ {
+ //There can currently only be at most one outstanding transaction
+ //due to only having LocalTransaction support. Set value to 0 if 1.
+ _txnCount.compareAndSet(1,0);
+ }
+
+ public Long getTxnStarts()
+ {
+ return _txnStarts.get();
+ }
+
+ public Long getTxnCommits()
+ {
+ return _txnCommits.get();
+ }
+
+ public Long getTxnRejects()
+ {
+ return _txnRejects.get();
+ }
+
+ public Long getTxnCount()
+ {
+ return _txnCount.get();
+ }
public int getChannelId()
{
@@ -278,7 +319,7 @@ public class AMQChannel implements SessionConfig
else
{
_transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
-
+ incrementOutstandingTxnsIfNecessary();
}
}
}
@@ -845,6 +886,9 @@ public class AMQChannel implements SessionConfig
_transaction.commit();
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
}
public void rollback() throws AMQException
@@ -877,6 +921,10 @@ public class AMQChannel implements SessionConfig
finally
{
_rollingBack = false;
+
+ _txnRejects.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
}
postRollbackTask.run();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
index ae01ab25ea..e46e951588 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
@@ -38,4 +38,12 @@ public interface SessionConfig extends ConfiguredObject<SessionConfigType, Sessi
Long getExpiryTime();
Long getMaxClientRate();
+
+ Long getTxnStarts();
+
+ Long getTxnCommits();
+
+ Long getTxnRejects();
+
+ Long getTxnCount();
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 3e48ac2619..a65f3938a6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -61,6 +61,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
public class ServerSession extends Session implements PrincipalHolder, SessionConfig
{
@@ -92,6 +93,11 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
private ServerTransaction _transaction;
+
+ private final AtomicLong _txnStarts = new AtomicLong(0);
+ private final AtomicLong _txnCommits = new AtomicLong(0);
+ private final AtomicLong _txnRejects = new AtomicLong(0);
+ private final AtomicLong _txnCount = new AtomicLong(0);
private Principal _principal;
@@ -160,7 +166,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
}
});
-
+ incrementOutstandingTxnsIfNecessary();
}
@@ -391,13 +397,56 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
public void commit()
{
_transaction.commit();
+
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
}
public void rollback()
{
_transaction.rollback();
+
+ _txnRejects.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
+
+
+ private void incrementOutstandingTxnsIfNecessary()
+ {
+ //There can currently only be at most one outstanding transaction
+ //due to only having LocalTransaction support. Set value to 1 if 0.
+ _txnCount.compareAndSet(0,1);
+ }
+
+ private void decrementOutstandingTxnsIfNecessary()
+ {
+ //There can currently only be at most one outstanding transaction
+ //due to only having LocalTransaction support. Set value to 0 if 1.
+ _txnCount.compareAndSet(1,0);
+ }
+
+ public Long getTxnStarts()
+ {
+ return _txnStarts.get();
}
+ public Long getTxnCommits()
+ {
+ return _txnCommits.get();
+ }
+
+ public Long getTxnRejects()
+ {
+ return _txnRejects.get();
+ }
+
+ public Long getTxnCount()
+ {
+ return _txnCount.get();
+ }
+
public Principal getPrincipal()
{
return _principal;