summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2011-03-08 00:11:30 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2011-03-08 00:11:30 +0000
commit5c797b1f2ebce8b79f118dc64cd0c1b3b9efd23c (patch)
treeb7dca5bdcbc6a4cbb68e45d5c2b48d999f473544
parent3572fae7ed32a0ab01e9247e693c26f118513f20 (diff)
downloadqpid-python-5c797b1f2ebce8b79f118dc64cd0c1b3b9efd23c.tar.gz
QPID-2985: Add producer configurable transaction timeouts
Port of QPID-2864 changes from 0.5.x-dev branch to trunk. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1079042 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java53
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java52
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java38
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java43
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java101
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java1
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java5
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java59
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java31
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java2
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java82
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java72
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java335
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java253
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes5
26 files changed, 1145 insertions, 106 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 4f86c82578..1c91de6d15 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQMethodBody;
@@ -141,6 +142,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
+ private final AtomicLong _txnUpdateTime = new AtomicLong(0);
private final AMQProtocolSession _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
@@ -200,6 +202,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
return !(_transaction instanceof AutoCommitTransaction);
}
+ public boolean inTransaction()
+ {
+ return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
+ }
+
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -295,7 +302,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
});
deliverCurrentMessageIfComplete();
-
}
}
@@ -333,6 +339,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
_transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional()));
incrementOutstandingTxnsIfNecessary();
+ updateTransactionalActivity();
}
}
}
@@ -794,6 +801,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
+ updateTransactionalActivity();
}
private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
@@ -968,6 +976,17 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
+ /**
+ * Update last transaction activity timestamp
+ */
+ private void updateTransactionalActivity()
+ {
+ if (isTransactional())
+ {
+ _txnUpdateTime.set(System.currentTimeMillis());
+ }
+ }
+
public String toString()
{
return "["+_session.toString()+":"+_channelId+"]";
@@ -1407,4 +1426,36 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
_session.mgmtCloseChannel(_channelId);
}
+
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+ {
+ if (inTransaction())
+ {
+ long currentTime = System.currentTimeMillis();
+ long openTime = currentTime - _transaction.getTransactionStartTime();
+ long idleTime = currentTime - _txnUpdateTime.get();
+
+ // Log a warning on idle or open transactions
+ if (idleWarn > 0L && idleTime > idleWarn)
+ {
+ CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime));
+ _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms");
+ }
+ else if (openWarn > 0L && openTime > openWarn)
+ {
+ CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime));
+ _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms");
+ }
+
+ // Close connection for idle or open transactions that have timed out
+ if (idleClose > 0L && idleTime > idleClose)
+ {
+ getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+ }
+ else if (openClose > 0L && openTime > openClose)
+ {
+ getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+ }
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index d9d7083543..48f2d776bb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -313,4 +313,24 @@ public class VirtualHostConfiguration extends ConfigurationPlugin
{
return getIntValue("housekeeping.poolSize", Runtime.getRuntime().availableProcessors());
}
+
+ public long getTransactionTimeoutOpenWarn()
+ {
+ return getLongValue("transactionTimeout.openWarn", 0L);
+ }
+
+ public long getTransactionTimeoutOpenClose()
+ {
+ return getLongValue("transactionTimeout.openClose", 0L);
+ }
+
+ public long getTransactionTimeoutIdleWarn()
+ {
+ return getLongValue("transactionTimeout.idleWarn", 0L);
+ }
+
+ public long getTransactionTimeoutIdleClose()
+ {
+ return getLongValue("transactionTimeout.idleClose", 0L);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index bac751e0c8..c06305ee4e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -20,19 +20,19 @@
*/
package org.apache.qpid.server.connection;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
public class ConnectionRegistry implements IConnectionRegistry, Closeable
{
- private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>();
+ private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>();
private Logger _logger = Logger.getLogger(ConnectionRegistry.class);
@@ -40,44 +40,42 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
{
// None required
}
-
- public void expireClosedChannels()
- {
- for (AMQProtocolSession connection : _registry)
- {
- connection.closeIfLingeringClosedChannels();
- }
- }
/** Close all of the currently open connections. */
public void close()
{
while (!_registry.isEmpty())
{
- AMQProtocolSession connection = _registry.get(0);
-
- try
- {
- connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down",
- 0, 0,
- connection.getProtocolOutputConverter().getProtocolMajorVersion(),
- connection.getProtocolOutputConverter().getProtocolMinorVersion(),
- (Throwable) null), true);
- }
- catch (AMQException e)
- {
- _logger.warn("Error closing connection:" + e.getMessage());
- }
+ AMQConnectionModel connection = _registry.get(0);
+ closeConnection(connection, AMQConstant.INTERNAL_ERROR, "Broker is shutting down");
+ }
+ }
+
+ public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
+ {
+ try
+ {
+ connection.close(cause, message);
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Error closing connection:" + e.getMessage());
}
}
- public void registerConnection(AMQProtocolSession connnection)
+ public void registerConnection(AMQConnectionModel connnection)
{
_registry.add(connnection);
}
- public void deregisterConnection(AMQProtocolSession connnection)
+ public void deregisterConnection(AMQConnectionModel connnection)
{
_registry.remove(connnection);
}
+
+ @Override
+ public List<AMQConnectionModel> getConnections()
+ {
+ return new ArrayList<AMQConnectionModel>(_registry);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
index 002269bbaa..b4f5bffa57 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
@@ -20,18 +20,23 @@
*/
package org.apache.qpid.server.connection;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import java.util.List;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
public interface IConnectionRegistry
{
-
public void initialise();
public void close() throws AMQException;
+
+ public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message);
+
+ public List<AMQConnectionModel> getConnections();
- public void registerConnection(AMQProtocolSession connnection);
-
- public void deregisterConnection(AMQProtocolSession connnection);
+ public void registerConnection(AMQConnectionModel connnection);
+ public void deregisterConnection(AMQConnectionModel connnection);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
index 53bcd712f2..ed8c0d0ce9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
@@ -28,3 +28,7 @@ PREFETCH_SIZE = CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number}
# 0 - queue causing flow control
FLOW_ENFORCED = CHN-1005 : Flow Control Enforced (Queue {0})
FLOW_REMOVED = CHN-1006 : Flow Control Removed
+# Channel Transactions
+# 0 - time in milliseconds
+OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms
+IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index bcda385f64..4ef84631b4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -20,14 +20,34 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.protocol.AMQConstant;
+import java.util.List;
+import java.util.UUID;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
public interface AMQConnectionModel
{
+ /**
+ * get a unique id for this connection.
+ *
+ * @return a {@link UUID} representing the connection
+ */
+ public UUID getId();
+
+ /**
+ * Close the underlying Connection
+ *
+ * @param cause
+ * @param message
+ * @throws org.apache.qpid.AMQException
+ */
+ public void close(AMQConstant cause, String message) throws AMQException;
/**
* Close the given requested Session
+ *
* @param session
* @param cause
* @param message
@@ -36,4 +56,16 @@ public interface AMQConnectionModel
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
public long getConnectionId();
+
+ /**
+ * Get a list of all sessions using this connection.
+ *
+ * @return a list of {@link AMQSessionModel}s
+ */
+ public List<AMQSessionModel> getSessionModels();
+
+ /**
+ * Return a {@link LogSubject} for the connection.
+ */
+ public LogSubject getLogSubject();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index a1ffe272fd..aef905772a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -30,7 +30,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -1078,19 +1077,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
return (_clientVersion == null) ? null : _clientVersion.toString();
}
- public void closeIfLingeringClosedChannels()
- {
- for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
- {
- if (id.getValue() + 30000 > System.currentTimeMillis())
- {
- // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
- _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
- closeProtocolSession();
- }
- }
- }
-
public Boolean isIncoming()
{
return true;
@@ -1263,7 +1249,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
{
-
closeChannel((Integer)session.getID());
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1274,5 +1259,28 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
0,0);
writeFrame(responseBody.generateFrame((Integer)session.getID()));
+ }
+
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
+ getProtocolOutputConverter().getProtocolMajorVersion(),
+ getProtocolOutputConverter().getProtocolMinorVersion(),
+ (Throwable) null), true);
+ }
+
+ public List<AMQSessionModel> getSessionModels()
+ {
+ List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ for (AMQChannel channel : getChannels())
+ {
+ sessions.add((AMQSessionModel) channel);
+ }
+ return sessions;
+ }
+
+ public LogSubject getLogSubject()
+ {
+ return _logSubject;
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index f48a214933..c64ed4ad5a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -231,7 +231,5 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin
List<AMQChannel> getChannels();
- void closeIfLingeringClosedChannels();
-
void mgmtCloseChannel(int channelId);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index a9b2354d75..bc63403a86 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -20,15 +20,35 @@
*/
package org.apache.qpid.server.protocol;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogSubject;
public interface AMQSessionModel
{
- Object getID();
+ public Object getID();
- AMQConnectionModel getConnectionModel();
+ public AMQConnectionModel getConnectionModel();
- String getClientID();
+ public String getClientID();
+
+ public void close() throws AMQException;
- LogSubject getLogSubject();
+ public LogSubject getLogSubject();
+
+ /**
+ * This method is called from the housekeeping thread to check the status of
+ * transactions on this session and react appropriately.
+ *
+ * If a transaction is open for too long or idle for too long then a warning
+ * is logged or the connection is closed, depending on the configuration. An open
+ * transaction is one that has recent activity. The transaction age is counted
+ * from the time the transaction was started. An idle transaction is one that
+ * has had no activity, such as publishing or acknowledgeing messages.
+ *
+ * @param openWarn time in milliseconds before alerting on open transaction
+ * @param openClose time in milliseconds before closing connection with open transaction
+ * @param idleWarn time in milliseconds before alerting on idle transaction
+ * @param idleClose time in milliseconds before closing connection with idle transaction
+ */
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index b36ac84cdd..a20436f029 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -97,7 +97,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private FlowCreditManager_0_10 _creditManager;
-
private StateListener _stateListener = new StateListener()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index d2addfde0c..e635ad0188 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -24,6 +24,9 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -37,10 +40,12 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionCloseCode;
import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.Session;
public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject
{
@@ -54,6 +59,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
+ public UUID getId()
+ {
+ return _config.getId();
+ }
+
@Override
protected void invoke(Method method)
{
@@ -110,6 +120,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
public void setVirtualHost(VirtualHost virtualHost)
{
_virtualHost = virtualHost;
+ _virtualHost.getConnectionRegistry().registerConnection(this);
}
public void setConnectionConfig(final ConnectionConfig config)
@@ -145,6 +156,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
((ServerSession)session).close();
}
+
+ public LogSubject getLogSubject()
+ {
+ return (LogSubject) this;
+ }
@Override
public void received(ProtocolEvent event)
@@ -215,4 +231,31 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
{
return _actor;
}
+
+ @Override
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
+ try
+ {
+ replyCode = ConnectionCloseCode.get(cause.getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // Ignore
+ }
+ close(replyCode, message);
+ getVirtualHost().getConnectionRegistry().deregisterConnection(this);
+ }
+
+ @Override
+ public List<AMQSessionModel> getSessionModels()
+ {
+ List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ for (Session ssn : getChannels())
+ {
+ sessions.add((AMQSessionModel) ssn);
+ }
+ return sessions;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 2b9e92f685..fb27dec949 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -138,6 +138,7 @@ public class ServerConnectionDelegate extends ServerDelegate
sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
sconn.setState(Connection.State.CLOSING);
}
+
}
@Override
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 540ad3fffd..714b2aa61f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -20,12 +20,26 @@
*/
package org.apache.qpid.server.transport;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-import static org.apache.qpid.util.Serial.gt;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
+import static org.apache.qpid.util.Serial.*;
-import com.sun.security.auth.UserPrincipal;
+import java.lang.ref.WeakReference;
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -38,6 +52,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
@@ -48,8 +64,6 @@ import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageTransfer;
@@ -58,24 +72,15 @@ import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.lang.ref.WeakReference;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
+import com.sun.security.auth.UserPrincipal;
public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject
{
+ private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
+
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
private final UUID _id;
@@ -111,6 +116,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
+ private final AtomicLong _txnUpdateTime = new AtomicLong(0);
private Principal _principal;
@@ -141,7 +147,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
_connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
_principal = new UserPrincipal(connection.getAuthorizationID());
- _reference = new WeakReference(this);
+ _reference = new WeakReference<Session>(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
}
@@ -160,8 +166,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
{
-
- _transaction.enqueue(queues,message, new ServerTransaction.Action()
+ _transaction.enqueue(queues,message, new ServerTransaction.Action()
{
BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
@@ -189,6 +194,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
});
incrementOutstandingTxnsIfNecessary();
+ updateTransactionalActivity();
}
@@ -377,6 +383,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
entry.release();
}
});
+ updateTransactionalActivity();
}
public Collection<Subscription_0_10> getSubscriptions()
@@ -425,6 +432,11 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
+
+ public boolean inTransaction()
+ {
+ return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
+ }
public void selectTx()
{
@@ -471,6 +483,17 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
}
}
+ /**
+ * Update last transaction activity timestamp
+ */
+ public void updateTransactionalActivity()
+ {
+ if (isTransactional())
+ {
+ _txnUpdateTime.set(System.currentTimeMillis());
+ }
+ }
+
public Long getTxnStarts()
{
return _txnStarts.get();
@@ -606,6 +629,38 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
return (LogSubject) this;
}
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+ {
+ if (inTransaction())
+ {
+ long currentTime = System.currentTimeMillis();
+ long openTime = currentTime - _transaction.getTransactionStartTime();
+ long idleTime = currentTime - _txnUpdateTime.get();
+
+ // Log a warning on idle or open transactions
+ if (idleWarn > 0L && idleTime > idleWarn)
+ {
+ CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime));
+ _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
+ }
+ else if (openWarn > 0L && openTime > openWarn)
+ {
+ CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
+ _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
+ }
+
+ // Close connection for idle or open transactions that have timed out
+ if (idleClose > 0L && idleTime > idleClose)
+ {
+ getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+ }
+ else if (openClose > 0L && openTime > openClose)
+ {
+ getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+ }
+ }
+ }
+
@Override
public String toLogString()
{
@@ -617,7 +672,5 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
getVirtualHost().getName(),
getChannel())
+ "] ";
-
}
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index d12ab6d474..be659c87ae 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -370,7 +370,6 @@ public class ServerSessionDelegate extends SessionDelegate
}
ssn.processed(xfr);
-
}
@Override
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index db781ead96..36e9d78440 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -50,6 +50,11 @@ public class AutoCommitTransaction implements ServerTransaction
_transactionLog = transactionLog;
}
+ public long getTransactionStartTime()
+ {
+ return 0L;
+ }
+
/**
* Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
* by the caller are executed immediately.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index a04c743be1..f9dac782a6 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -20,18 +20,23 @@ package org.apache.qpid.server.txn;
*
*/
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A concrete implementation of ServerTransaction where enqueue/dequeue
@@ -41,17 +46,28 @@ import org.apache.qpid.server.store.TransactionLog;
*/
public class LocalTransaction implements ServerTransaction
{
- protected static final Logger _logger = Logger.getLogger(LocalTransaction.class);
+ protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class);
private final List<Action> _postTransactionActions = new ArrayList<Action>();
private volatile TransactionLog.Transaction _transaction;
private TransactionLog _transactionLog;
+ private long _txnStartTime = 0L;
public LocalTransaction(TransactionLog transactionLog)
{
_transactionLog = transactionLog;
}
+
+ public boolean inTransaction()
+ {
+ return _transaction != null;
+ }
+
+ public long getTransactionStartTime()
+ {
+ return _txnStartTime;
+ }
public void addPostTransactionAction(Action postTransactionAction)
{
@@ -89,7 +105,6 @@ public class LocalTransaction implements ServerTransaction
try
{
-
for(QueueEntry entry : queueEntries)
{
ServerMessage message = entry.getMessage();
@@ -113,7 +128,6 @@ public class LocalTransaction implements ServerTransaction
_logger.error("Error during message dequeues", e);
tidyUpOnError(e);
}
-
}
private void tidyUpOnError(Exception e)
@@ -140,8 +154,7 @@ public class LocalTransaction implements ServerTransaction
}
finally
{
- _transaction = null;
- _postTransactionActions.clear();
+ resetDetails();
}
}
@@ -193,8 +206,25 @@ public class LocalTransaction implements ServerTransaction
{
_postTransactionActions.add(postTransactionAction);
+ if (_txnStartTime == 0L)
+ {
+ _txnStartTime = System.currentTimeMillis();
+ }
+
if(message.isPersistent())
{
+ if(_transaction == null)
+ {
+ for(BaseQueue queue : queues)
+ {
+ if(queue.isDurable())
+ {
+ beginTranIfNecessary();
+ break;
+ }
+ }
+ }
+
try
{
for(BaseQueue queue : queues)
@@ -248,17 +278,14 @@ public class LocalTransaction implements ServerTransaction
}
finally
{
- _transaction = null;
- _postTransactionActions.clear();
+ resetDetails();
}
-
}
public void rollback()
{
try
{
-
if(_transaction != null)
{
_transaction.abortTran();
@@ -280,9 +307,15 @@ public class LocalTransaction implements ServerTransaction
}
finally
{
- _transaction = null;
- _postTransactionActions.clear();
+ resetDetails();
}
}
}
+
+ private void resetDetails()
+ {
+ _transaction = null;
+ _postTransactionActions.clear();
+ _txnStartTime = 0L;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
index b61b8a5c64..b3c6e1ac3a 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
@@ -52,6 +52,13 @@ public interface ServerTransaction
public void onRollback();
}
+ /**
+ * Return the time the current transaction started.
+ *
+ * @return the time this transaction started or 0 if not in a transaction
+ */
+ long getTransactionStartTime();
+
/**
* Register an Action for execution after transaction commit or rollback. Actions
* will be executed in the order in which they are registered.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index a550283a38..a1566917dd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -37,7 +36,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -64,6 +62,8 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -282,19 +282,30 @@ public class VirtualHostImpl implements VirtualHost
// house keeping task from running.
}
}
+ for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+ {
+ _logger.debug("Checking for long running open transactions on connection " + connection);
+ for (AMQSessionModel session : connection.getSessionModels())
+ {
+ _logger.debug("Checking for long running open transactions on session " + session);
+ try
+ {
+ session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
+ _configuration.getTransactionTimeoutOpenClose(),
+ _configuration.getTransactionTimeoutIdleWarn(),
+ _configuration.getTransactionTimeoutIdleClose());
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
+ }
+ }
+ }
}
}
scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
- class ForceChannelClosuresTask extends TimerTask
- {
- public void run()
- {
- _connectionRegistry.expireClosedChannels();
- }
- }
-
Map<String, VirtualHostPluginFactory> plugins =
ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java
index c4edd9034f..88ee9ed2c5 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java
@@ -50,7 +50,7 @@ public class ConnectionSetup
final static String QUEUE_NAME = "example.MyQueue";
public static final String TOPIC_JNDI_NAME = "topic";
- final static String TOPIC_NAME = "example.hierarical.topic";
+ final static String TOPIC_NAME = "usa.news";
private Context _ctx;
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
index dd936e429f..ac3829d49e 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
@@ -71,7 +71,7 @@ public class Publisher extends Client
public static void main(String[] args)
{
- String destination = args.length > 2 ? args[1] : null;
+ String destination = args.length > 2 ? args[1] : "usa.news";
int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index e5e10c0e07..dc32569ee8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -28,6 +28,7 @@ import static org.apache.qpid.transport.Connection.State.OPENING;
import static org.apache.qpid.transport.Connection.State.RESUMING;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -695,4 +696,8 @@ public class Connection extends ConnectionInvoker
return connectionLost.get();
}
+ protected Collection<Session> getChannels()
+ {
+ return channels.values();
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java
new file mode 100644
index 0000000000..36bac3b715
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+/**
+ * This verifies that changing the {@code transactionTimeout} configuration will alter
+ * the behaviour of the transaction open and idle logging, and that when the connection
+ * will be closed.
+ */
+public class TransactionTimeoutConfigurationTest extends TransactionTimeoutTestCase
+{
+ @Override
+ protected void configure() throws Exception
+ {
+ // Setup housekeeping every second
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100");
+
+ // Set transaction timout properties.
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "200");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "1000");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "100");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "500");
+ }
+
+ public void testProducerIdleCommit() throws Exception
+ {
+ try
+ {
+ send(5, 0);
+
+ sleep(2.0f);
+
+ _psession.commit();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(5, 0);
+
+ check(IDLE);
+ }
+
+ public void testProducerOpenCommit() throws Exception
+ {
+ try
+ {
+ send(5, 0.3f);
+
+ _psession.commit();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(6, 3);
+
+ check(OPEN);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
new file mode 100644
index 0000000000..71b89bf911
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+/**
+ * This verifies that the default behaviour is not to time out transactions.
+ */
+public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase
+{
+ @Override
+ protected void configure() throws Exception
+ {
+ // Setup housekeeping every second
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100");
+ }
+
+ public void testProducerIdleCommit() throws Exception
+ {
+ try
+ {
+ send(5, 0);
+
+ sleep(2.0f);
+
+ _psession.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+
+ monitor(0, 0);
+ }
+
+ public void testProducerOpenCommit() throws Exception
+ {
+ try
+ {
+ send(5, 0.3f);
+
+ _psession.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+
+ monitor(0, 0);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
new file mode 100644
index 0000000000..c912d6a323
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
@@ -0,0 +1,335 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+/**
+ * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration
+ * is set for a virtual host.
+ *
+ * A producer that is idle for too long or open for too long will have its connection closed and
+ * any further operations will fail with a 408 resource timeout exception. Consumers will not
+ * be affected by the transaction timeout configuration.
+ */
+public class TransactionTimeoutTest extends TransactionTimeoutTestCase
+{
+ public void testProducerIdle() throws Exception
+ {
+ try
+ {
+ sleep(2.0f);
+
+ _psession.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+
+ monitor(0, 0);
+ }
+
+ public void testProducerIdleCommit() throws Exception
+ {
+ try
+ {
+ send(5, 0);
+
+ sleep(2.0f);
+
+ _psession.commit();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(5, 0);
+
+ check(IDLE);
+ }
+
+ public void testProducerOpenCommit() throws Exception
+ {
+ try
+ {
+ send(6, 0.5f);
+
+ _psession.commit();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(0, 10);
+
+ check(OPEN);
+ }
+
+ public void testProducerIdleCommitTwice() throws Exception
+ {
+ try
+ {
+ send(5, 0);
+
+ sleep(1.0f);
+
+ _psession.commit();
+
+ send(5, 0);
+
+ sleep(2.0f);
+
+ _psession.commit();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(10, 0);
+
+ check(IDLE);
+ }
+
+ public void testProducerOpenCommitTwice() throws Exception
+ {
+ try
+ {
+ send(5, 0);
+
+ sleep(1.0f);
+
+ _psession.commit();
+
+ send(6, 0.5f);
+
+ _psession.commit();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ // the presistent store generates more idle messages?
+ monitor(isBrokerStorePersistent() ? 10 : 5, 10);
+
+ check(OPEN);
+ }
+
+ public void testProducerIdleRollback() throws Exception
+ {
+ try
+ {
+ send(5, 0);
+
+ sleep(2.0f);
+
+ _psession.rollback();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(5, 0);
+
+ check(IDLE);
+ }
+
+ public void testProducerIdleRollbackTwice() throws Exception
+ {
+ try
+ {
+ send(5, 0);
+
+ sleep(1.0f);
+
+ _psession.rollback();
+
+ send(5, 0);
+
+ sleep(2.0f);
+
+ _psession.rollback();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(10, 0);
+
+ check(IDLE);
+ }
+
+ public void testConsumerCommitClose() throws Exception
+ {
+ try
+ {
+ send(1, 0);
+
+ _psession.commit();
+
+ expect(1, 0);
+
+ _csession.commit();
+
+ sleep(3.0f);
+
+ _csession.close();
+ }
+ catch (Exception e)
+ {
+ fail("should have succeeded: " + e.getMessage());
+ }
+
+ assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+
+ monitor(0, 0);
+ }
+
+ public void testConsumerIdleReceiveCommit() throws Exception
+ {
+ try
+ {
+ send(1, 0);
+
+ _psession.commit();
+
+ sleep(2.0f);
+
+ expect(1, 0);
+
+ sleep(2.0f);
+
+ _csession.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+
+ monitor(0, 0);
+ }
+
+ public void testConsumerIdleCommit() throws Exception
+ {
+ try
+ {
+ send(1, 0);
+
+ _psession.commit();
+
+ expect(1, 0);
+
+ sleep(2.0f);
+
+ _csession.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+
+ monitor(0, 0);
+ }
+
+ public void testConsumerIdleRollback() throws Exception
+ {
+ try
+ {
+ send(1, 0);
+
+ _psession.commit();
+
+ expect(1, 0);
+
+ sleep(2.0f);
+
+ _csession.rollback();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+
+ monitor(0, 0);
+ }
+
+ public void testConsumerOpenCommit() throws Exception
+ {
+ try
+ {
+ send(1, 0);
+
+ _psession.commit();
+
+ sleep(3.0f);
+
+ _csession.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+
+ monitor(0, 0);
+ }
+
+ public void testConsumerOpenRollback() throws Exception
+ {
+ try
+ {
+ send(1, 0);
+
+ _psession.commit();
+
+ sleep(3.0f);
+
+ _csession.rollback();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+
+ monitor(0, 0);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
new file mode 100644
index 0000000000..637f43fb2c
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
@@ -0,0 +1,253 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.LogMonitor;
+
+/**
+ * The {@link TestCase} for transaction timeout testing.
+ */
+public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements ExceptionListener
+{
+ public static final String VIRTUALHOST = "test";
+ public static final String TEXT = "0123456789abcdefghiforgettherest";
+ public static final String CHN_OPEN_TXN = "CHN-1007";
+ public static final String CHN_IDLE_TXN = "CHN-1008";
+ public static final String IDLE = "Idle";
+ public static final String OPEN = "Open";
+
+ protected LogMonitor _monitor;
+ protected AMQConnection _con;
+ protected Session _psession, _csession;
+ protected Queue _queue;
+ protected MessageConsumer _consumer;
+ protected MessageProducer _producer;
+ protected CountDownLatch _caught = new CountDownLatch(1);
+ protected String _message;
+ protected Exception _exception;
+ protected AMQConstant _code;
+
+ protected void configure() throws Exception
+ {
+ // Setup housekeeping every second
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100");
+
+ /*
+ * Set transaction timout properties. The XML in the virtualhosts configuration is as follows:
+ *
+ * <transactionTimeout>
+ * <openWarn>1000</openWarn>
+ * <openClose>2000</openClose>
+ * <idleWarn>500</idleWarn>
+ * <idleClose>1500</idleClose>
+ * </transactionTimeout>
+ */
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "1000");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "2000");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "500");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "1000");
+ }
+
+ protected void setUp() throws Exception
+ {
+ // Configure timeouts
+ configure();
+
+ // Monitor log file
+ _monitor = new LogMonitor(_outputFile);
+
+ // Start broker
+ super.setUp();
+
+ // Connect to broker
+ String broker = _broker.equals(VM) ? ("vm://:" + DEFAULT_VM_PORT) : ("tcp://localhost:" + DEFAULT_PORT);
+ ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + broker + "'&maxprefetch='1'");
+ _con = (AMQConnection) getConnection(url);
+ _con.setExceptionListener(this);
+ _con.start();
+
+ // Create queue
+ Session qsession = _con.createSession(true, Session.SESSION_TRANSACTED);
+ AMQShortString queueName = new AMQShortString("test");
+ _queue = new AMQQueue(qsession.getDefaultQueueExchangeName(), queueName, queueName, false, true);
+ qsession.close();
+
+ // Create producer and consumer
+ producer();
+ consumer();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _con.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ /**
+ * Create a transacted persistent message producer session.
+ */
+ protected void producer() throws Exception
+ {
+ _psession = _con.createSession(true, Session.SESSION_TRANSACTED);
+ _producer = _psession.createProducer(_queue);
+ _producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ }
+
+ /**
+ * Create a transacted message consumer session.
+ */
+ protected void consumer() throws Exception
+ {
+ _csession = _con.createSession(true, Session.SESSION_TRANSACTED);
+ _consumer = _csession.createConsumer(_queue);
+ }
+
+ /**
+ * Send a number of messages to the queue, optionally pausing after each.
+ */
+ protected void send(int count, float delay) throws Exception
+ {
+ for (int i = 0; i < count; i++)
+ {
+ sleep(delay);
+ Message msg = _psession.createTextMessage(TEXT);
+ msg.setIntProperty("i", i);
+ _producer.send(msg);
+ }
+ }
+
+ /**
+ * Sleep for a number of seconds.
+ */
+ protected void sleep(float seconds) throws Exception
+ {
+ try
+ {
+ Thread.sleep((long) (seconds * 1000.0f));
+ }
+ catch (InterruptedException ie)
+ {
+ throw new RuntimeException("Interrupted");
+ }
+ }
+
+ /**
+ * Check for idle and open messages.
+ *
+ * Either exactly zero messages, or +-2 error accepted around the specified number.
+ */
+ protected void monitor(int idle, int open) throws Exception
+ {
+ List<String> idleMsgs = _monitor.findMatches(CHN_IDLE_TXN);
+ List<String> openMsgs = _monitor.findMatches(CHN_OPEN_TXN);
+
+ String idleErr = "Expected " + idle + " but found " + idleMsgs.size() + " txn idle messages";
+ String openErr = "Expected " + open + " but found " + openMsgs.size() + " txn open messages";
+
+ if (idle == 0)
+ {
+ assertTrue(idleErr, idleMsgs.isEmpty());
+ }
+ else
+ {
+ assertTrue(idleErr, idleMsgs.size() >= idle - 2 && idleMsgs.size() <= idle + 2);
+ }
+
+ if (open == 0)
+ {
+ assertTrue(openErr, openMsgs.isEmpty());
+ }
+ else
+ {
+ assertTrue(openErr, openMsgs.size() >= open - 2 && openMsgs.size() <= open + 2);
+ }
+ }
+
+ /**
+ * Receive a number of messages, optionally pausing after each.
+ */
+ protected void expect(int count, float delay) throws Exception
+ {
+ for (int i = 0; i < count; i++)
+ {
+ sleep(delay);
+ Message msg = _consumer.receive(1000);
+ assertNotNull("Message should not be null", msg);
+ assertTrue("Message should be a text message", msg instanceof TextMessage);
+ assertEquals("Message content does not match expected", TEXT, ((TextMessage) msg).getText());
+ assertEquals("Message order is incorrect", i, msg.getIntProperty("i"));
+ }
+ }
+
+ /**
+ * Checks that the correct exception was thrown and was received
+ * by the listener with a 506 error code.
+ */
+ protected void check(String reason)throws InterruptedException
+ {
+ assertTrue("Should have caught exception in listener", _caught.await(1, TimeUnit.SECONDS));
+ assertNotNull("Should have thrown exception to client", _exception);
+ assertTrue("Exception message should contain '" + reason + "': " + _message, _message.contains(reason + " transaction timed out"));
+ assertNotNull("Exception should have an error code", _code);
+ assertEquals("Error code should be 506", AMQConstant.RESOURCE_ERROR, _code);
+ }
+
+ /** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */
+ public void onException(JMSException jmse)
+ {
+ _caught.countDown();
+ _message = jmse.getLinkedException().getMessage();
+ if (jmse.getLinkedException() instanceof AMQException)
+ {
+ _code = ((AMQException) jmse.getLinkedException()).getErrorCode();
+ }
+ }
+}
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index e89b09cca2..4127682208 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -165,6 +165,11 @@ org.apache.qpid.server.security.firewall.FirewallConfigTest#*
org.apache.qpid.server.security.firewall.FirewallConfigurationTest#*
org.apache.qpid.server.plugins.PluginTest#*
+// Transacion timeouts not implemented in CPP broker
+org.apache.qpid.test.unit.transacted.TransactionTimeoutDisabledTest#*
+org.apache.qpid.test.unit.transacted.TransactionTimeoutConfigurationTest#*
+org.apache.qpid.test.unit.transacted.TransactionTimeoutTest#*
+
// Java broker only
org.apache.qpid.server.logging.management.LoggingManagementMBeanTest#*
org.apache.qpid.server.management.AMQUserManagementMBeanTest#*