diff options
author | Keith Wall <kwall@apache.org> | 2013-01-09 15:42:10 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2013-01-09 15:42:10 +0000 |
commit | 5493e1c636ffcec7b3514687656cff198ff2708d (patch) | |
tree | f4b20c70abac6e9d8151724b89075956fc2cca9d | |
parent | b3cc9da195f5573a9ebeb022c8fd9680c82496bb (diff) | |
download | qpid-python-5493e1c636ffcec7b3514687656cff198ff2708d.tar.gz |
QPID-4503: Producer transaction timeout detection feature may produce suprious open/idle alerts and close client connections/sessions without good cause
Merged from trunk with command svn merge -c 1421884 https://svn.apache.org/repos/asf/qpid/trunk/qpid/java
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.20@1430904 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 13 insertions, 20 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 4367d7ee53..76a3a7f224 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -70,7 +70,6 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQProtocolEngine; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -113,7 +112,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F */ private long _deliveryTag = 0; - /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ + /** A channel has a default queue (the last declared) that is used when no queue name is explicitly set */ private AMQQueue _defaultQueue; /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ @@ -209,10 +208,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - public boolean inTransaction() - { - return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; - } private void incrementOutstandingTxnsIfNecessary() { @@ -1487,11 +1482,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { - if (inTransaction()) + final long transactionStartTime = _transaction.getTransactionStartTime(); + final long transactionUpdateTime = _txnUpdateTime.get(); + if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0) { long currentTime = System.currentTimeMillis(); - long openTime = currentTime - _transaction.getTransactionStartTime(); - long idleTime = currentTime - _txnUpdateTime.get(); + long openTime = currentTime - transactionStartTime; + long idleTime = currentTime - transactionUpdateTime; _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index eed55a2e85..075ed2a87c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -42,7 +42,6 @@ import javax.security.auth.Subject; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -449,11 +448,6 @@ public class ServerSession extends Session return _transaction.isTransactional(); } - public boolean inTransaction() - { - return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; - } - public void selectTx() { _transaction = new LocalTransaction(this.getMessageStore()); @@ -591,7 +585,7 @@ public class ServerSession extends Session /** * Update last transaction activity timestamp */ - public void updateTransactionalActivity() + private void updateTransactionalActivity() { if (isTransactional()) { @@ -709,11 +703,13 @@ public class ServerSession extends Session public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { - if (inTransaction()) + final long transactionStartTime = _transaction.getTransactionStartTime(); + final long transactionUpdateTime = _txnUpdateTime.get(); + if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0) { long currentTime = System.currentTimeMillis(); - long openTime = currentTime - _transaction.getTransactionStartTime(); - long idleTime = currentTime - _txnUpdateTime.get(); + long openTime = currentTime - transactionStartTime; + long idleTime = currentTime - transactionUpdateTime; _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 3fbcff7e2c..f11fb1086e 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -50,7 +50,7 @@ public class LocalTransaction implements ServerTransaction private volatile Transaction _transaction; private MessageStore _transactionLog; - private long _txnStartTime = 0L; + private volatile long _txnStartTime = 0L; private StoreFuture _asyncTran; public LocalTransaction(MessageStore transactionLog) |