summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2013-01-09 15:42:10 +0000
committerKeith Wall <kwall@apache.org>2013-01-09 15:42:10 +0000
commit5493e1c636ffcec7b3514687656cff198ff2708d (patch)
treef4b20c70abac6e9d8151724b89075956fc2cca9d
parentb3cc9da195f5573a9ebeb022c8fd9680c82496bb (diff)
downloadqpid-python-5493e1c636ffcec7b3514687656cff198ff2708d.tar.gz
QPID-4503: Producer transaction timeout detection feature may produce suprious open/idle alerts and close client connections/sessions without good cause
Merged from trunk with command svn merge -c 1421884 https://svn.apache.org/repos/asf/qpid/trunk/qpid/java git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.20@1430904 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java16
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java2
3 files changed, 13 insertions, 20 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 4367d7ee53..76a3a7f224 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -70,7 +70,6 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -113,7 +112,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
*/
private long _deliveryTag = 0;
- /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
+ /** A channel has a default queue (the last declared) that is used when no queue name is explicitly set */
private AMQQueue _defaultQueue;
/** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
@@ -209,10 +208,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- public boolean inTransaction()
- {
- return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
- }
private void incrementOutstandingTxnsIfNecessary()
{
@@ -1487,11 +1482,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
- if (inTransaction())
+ final long transactionStartTime = _transaction.getTransactionStartTime();
+ final long transactionUpdateTime = _txnUpdateTime.get();
+ if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0)
{
long currentTime = System.currentTimeMillis();
- long openTime = currentTime - _transaction.getTransactionStartTime();
- long idleTime = currentTime - _txnUpdateTime.get();
+ long openTime = currentTime - transactionStartTime;
+ long idleTime = currentTime - transactionUpdateTime;
_transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index eed55a2e85..075ed2a87c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -42,7 +42,6 @@ import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -449,11 +448,6 @@ public class ServerSession extends Session
return _transaction.isTransactional();
}
- public boolean inTransaction()
- {
- return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
- }
-
public void selectTx()
{
_transaction = new LocalTransaction(this.getMessageStore());
@@ -591,7 +585,7 @@ public class ServerSession extends Session
/**
* Update last transaction activity timestamp
*/
- public void updateTransactionalActivity()
+ private void updateTransactionalActivity()
{
if (isTransactional())
{
@@ -709,11 +703,13 @@ public class ServerSession extends Session
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
- if (inTransaction())
+ final long transactionStartTime = _transaction.getTransactionStartTime();
+ final long transactionUpdateTime = _txnUpdateTime.get();
+ if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0)
{
long currentTime = System.currentTimeMillis();
- long openTime = currentTime - _transaction.getTransactionStartTime();
- long idleTime = currentTime - _txnUpdateTime.get();
+ long openTime = currentTime - transactionStartTime;
+ long idleTime = currentTime - transactionUpdateTime;
_transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 3fbcff7e2c..f11fb1086e 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -50,7 +50,7 @@ public class LocalTransaction implements ServerTransaction
private volatile Transaction _transaction;
private MessageStore _transactionLog;
- private long _txnStartTime = 0L;
+ private volatile long _txnStartTime = 0L;
private StoreFuture _asyncTran;
public LocalTransaction(MessageStore transactionLog)