diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2011-03-08 00:11:30 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2011-03-08 00:11:30 +0000 |
commit | 49dc07c6d7ba26d6a250466e2679feb523e02dd0 (patch) | |
tree | ec1c15ab0b3b8d9733cf957cd825710928116062 /java/broker/src | |
parent | 449475309d147dcc64aba3fe31dc9432e45659f8 (diff) | |
download | qpid-python-49dc07c6d7ba26d6a250466e2679feb523e02dd0.tar.gz |
QPID-2985: Add producer configurable transaction timeouts
Port of QPID-2864 changes from 0.5.x-dev branch to trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1079042 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
18 files changed, 391 insertions, 104 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 4f86c82578..1c91de6d15 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -22,6 +22,7 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQMethodBody; @@ -141,6 +142,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); + private final AtomicLong _txnUpdateTime = new AtomicLong(0); private final AMQProtocolSession _session; private AtomicBoolean _closing = new AtomicBoolean(false); @@ -200,6 +202,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel return !(_transaction instanceof AutoCommitTransaction); } + public boolean inTransaction() + { + return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; + } + private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -295,7 +302,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel }); deliverCurrentMessageIfComplete(); - } } @@ -333,6 +339,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional())); incrementOutstandingTxnsIfNecessary(); + updateTransactionalActivity(); } } } @@ -794,6 +801,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple); _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); + updateTransactionalActivity(); } private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) @@ -968,6 +976,17 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } + /** + * Update last transaction activity timestamp + */ + private void updateTransactionalActivity() + { + if (isTransactional()) + { + _txnUpdateTime.set(System.currentTimeMillis()); + } + } + public String toString() { return "["+_session.toString()+":"+_channelId+"]"; @@ -1407,4 +1426,36 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { _session.mgmtCloseChannel(_channelId); } + + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + { + if (inTransaction()) + { + long currentTime = System.currentTimeMillis(); + long openTime = currentTime - _transaction.getTransactionStartTime(); + long idleTime = currentTime - _txnUpdateTime.get(); + + // Log a warning on idle or open transactions + if (idleWarn > 0L && idleTime > idleWarn) + { + CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime)); + _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms"); + } + else if (openWarn > 0L && openTime > openWarn) + { + CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime)); + _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); + } + + // Close connection for idle or open transactions that have timed out + if (idleClose > 0L && idleTime > idleClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); + } + else if (openClose > 0L && openTime > openClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); + } + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index d9d7083543..48f2d776bb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -313,4 +313,24 @@ public class VirtualHostConfiguration extends ConfigurationPlugin { return getIntValue("housekeeping.poolSize", Runtime.getRuntime().availableProcessors()); } + + public long getTransactionTimeoutOpenWarn() + { + return getLongValue("transactionTimeout.openWarn", 0L); + } + + public long getTransactionTimeoutOpenClose() + { + return getLongValue("transactionTimeout.openClose", 0L); + } + + public long getTransactionTimeoutIdleWarn() + { + return getLongValue("transactionTimeout.idleWarn", 0L); + } + + public long getTransactionTimeoutIdleClose() + { + return getLongValue("transactionTimeout.idleClose", 0L); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index bac751e0c8..c06305ee4e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -20,19 +20,19 @@ */ package org.apache.qpid.server.connection; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.log4j.Logger; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.common.Closeable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQConnectionModel; public class ConnectionRegistry implements IConnectionRegistry, Closeable { - private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>(); + private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>(); private Logger _logger = Logger.getLogger(ConnectionRegistry.class); @@ -40,44 +40,42 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable { // None required } - - public void expireClosedChannels() - { - for (AMQProtocolSession connection : _registry) - { - connection.closeIfLingeringClosedChannels(); - } - } /** Close all of the currently open connections. */ public void close() { while (!_registry.isEmpty()) { - AMQProtocolSession connection = _registry.get(0); - - try - { - connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down", - 0, 0, - connection.getProtocolOutputConverter().getProtocolMajorVersion(), - connection.getProtocolOutputConverter().getProtocolMinorVersion(), - (Throwable) null), true); - } - catch (AMQException e) - { - _logger.warn("Error closing connection:" + e.getMessage()); - } + AMQConnectionModel connection = _registry.get(0); + closeConnection(connection, AMQConstant.INTERNAL_ERROR, "Broker is shutting down"); + } + } + + public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message) + { + try + { + connection.close(cause, message); + } + catch (AMQException e) + { + _logger.warn("Error closing connection:" + e.getMessage()); } } - public void registerConnection(AMQProtocolSession connnection) + public void registerConnection(AMQConnectionModel connnection) { _registry.add(connnection); } - public void deregisterConnection(AMQProtocolSession connnection) + public void deregisterConnection(AMQConnectionModel connnection) { _registry.remove(connnection); } + + @Override + public List<AMQConnectionModel> getConnections() + { + return new ArrayList<AMQConnectionModel>(_registry); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index 002269bbaa..b4f5bffa57 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -20,18 +20,23 @@ */ package org.apache.qpid.server.connection; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import java.util.List; + import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.protocol.AMQConnectionModel; public interface IConnectionRegistry { - public void initialise(); public void close() throws AMQException; + + public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message); + + public List<AMQConnectionModel> getConnections(); - public void registerConnection(AMQProtocolSession connnection); - - public void deregisterConnection(AMQProtocolSession connnection); + public void registerConnection(AMQConnectionModel connnection); + public void deregisterConnection(AMQConnectionModel connnection); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties index 53bcd712f2..ed8c0d0ce9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties @@ -28,3 +28,7 @@ PREFETCH_SIZE = CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number} # 0 - queue causing flow control FLOW_ENFORCED = CHN-1005 : Flow Control Enforced (Queue {0}) FLOW_REMOVED = CHN-1006 : Flow Control Removed +# Channel Transactions +# 0 - time in milliseconds +OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms +IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index bcda385f64..4ef84631b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -20,14 +20,34 @@ */ package org.apache.qpid.server.protocol; -import org.apache.qpid.protocol.AMQConstant; +import java.util.List; +import java.util.UUID; + import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.logging.LogSubject; public interface AMQConnectionModel { + /** + * get a unique id for this connection. + * + * @return a {@link UUID} representing the connection + */ + public UUID getId(); + + /** + * Close the underlying Connection + * + * @param cause + * @param message + * @throws org.apache.qpid.AMQException + */ + public void close(AMQConstant cause, String message) throws AMQException; /** * Close the given requested Session + * * @param session * @param cause * @param message @@ -36,4 +56,16 @@ public interface AMQConnectionModel public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException; public long getConnectionId(); + + /** + * Get a list of all sessions using this connection. + * + * @return a list of {@link AMQSessionModel}s + */ + public List<AMQSessionModel> getSessionModels(); + + /** + * Return a {@link LogSubject} for the connection. + */ + public LogSubject getLogSubject(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index a1ffe272fd..aef905772a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -30,7 +30,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; @@ -1078,19 +1077,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol return (_clientVersion == null) ? null : _clientVersion.toString(); } - public void closeIfLingeringClosedChannels() - { - for (Entry<Integer, Long>id : _closingChannelsList.entrySet()) - { - if (id.getValue() + 30000 > System.currentTimeMillis()) - { - // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection - _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed"); - closeProtocolSession(); - } - } - } - public Boolean isIncoming() { return true; @@ -1263,7 +1249,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { - closeChannel((Integer)session.getID()); MethodRegistry methodRegistry = getMethodRegistry(); @@ -1274,5 +1259,28 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol 0,0); writeFrame(responseBody.generateFrame((Integer)session.getID())); + } + + public void close(AMQConstant cause, String message) throws AMQException + { + closeConnection(0, new AMQConnectionException(cause, message, 0, 0, + getProtocolOutputConverter().getProtocolMajorVersion(), + getProtocolOutputConverter().getProtocolMinorVersion(), + (Throwable) null), true); + } + + public List<AMQSessionModel> getSessionModels() + { + List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); + for (AMQChannel channel : getChannels()) + { + sessions.add((AMQSessionModel) channel); + } + return sessions; + } + + public LogSubject getLogSubject() + { + return _logSubject; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index f48a214933..c64ed4ad5a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -231,7 +231,5 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin List<AMQChannel> getChannels(); - void closeIfLingeringClosedChannels(); - void mgmtCloseChannel(int channelId); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index a9b2354d75..bc63403a86 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -20,15 +20,35 @@ */ package org.apache.qpid.server.protocol; +import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.LogSubject; public interface AMQSessionModel { - Object getID(); + public Object getID(); - AMQConnectionModel getConnectionModel(); + public AMQConnectionModel getConnectionModel(); - String getClientID(); + public String getClientID(); + + public void close() throws AMQException; - LogSubject getLogSubject(); + public LogSubject getLogSubject(); + + /** + * This method is called from the housekeeping thread to check the status of + * transactions on this session and react appropriately. + * + * If a transaction is open for too long or idle for too long then a warning + * is logged or the connection is closed, depending on the configuration. An open + * transaction is one that has recent activity. The transaction age is counted + * from the time the transaction was started. An idle transaction is one that + * has had no activity, such as publishing or acknowledgeing messages. + * + * @param openWarn time in milliseconds before alerting on open transaction + * @param openClose time in milliseconds before closing connection with open transaction + * @param idleWarn time in milliseconds before alerting on idle transaction + * @param idleClose time in milliseconds before closing connection with idle transaction + */ + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index b36ac84cdd..a20436f029 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -97,7 +97,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private FlowCreditManager_0_10 _creditManager; - private StateListener _stateListener = new StateListener() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index d2addfde0c..e635ad0188 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -24,6 +24,9 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*; import java.text.MessageFormat; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -37,10 +40,12 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionCloseCode; import org.apache.qpid.transport.ExecutionErrorCode; import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.Session; public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject { @@ -54,6 +59,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } + public UUID getId() + { + return _config.getId(); + } + @Override protected void invoke(Method method) { @@ -110,6 +120,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void setVirtualHost(VirtualHost virtualHost) { _virtualHost = virtualHost; + _virtualHost.getConnectionRegistry().registerConnection(this); } public void setConnectionConfig(final ConnectionConfig config) @@ -145,6 +156,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel, ((ServerSession)session).close(); } + + public LogSubject getLogSubject() + { + return (LogSubject) this; + } @Override public void received(ProtocolEvent event) @@ -215,4 +231,31 @@ public class ServerConnection extends Connection implements AMQConnectionModel, { return _actor; } + + @Override + public void close(AMQConstant cause, String message) throws AMQException + { + ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; + try + { + replyCode = ConnectionCloseCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore + } + close(replyCode, message); + getVirtualHost().getConnectionRegistry().deregisterConnection(this); + } + + @Override + public List<AMQSessionModel> getSessionModels() + { + List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); + for (Session ssn : getChannels()) + { + sessions.add((AMQSessionModel) ssn); + } + return sessions; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 2b9e92f685..fb27dec949 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -138,6 +138,7 @@ public class ServerConnectionDelegate extends ServerDelegate sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'")); sconn.setState(Connection.State.CLOSING); } + } @Override diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 540ad3fffd..714b2aa61f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -20,12 +20,26 @@ */ package org.apache.qpid.server.transport; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -import static org.apache.qpid.util.Serial.gt; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*; +import static org.apache.qpid.util.Serial.*; -import com.sun.security.auth.UserPrincipal; +import java.lang.ref.WeakReference; +import java.security.Principal; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -38,6 +52,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; @@ -48,8 +64,6 @@ import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageTransfer; @@ -58,24 +72,15 @@ import org.apache.qpid.transport.Range; import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.lang.ref.WeakReference; -import java.security.Principal; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicLong; +import com.sun.security.auth.UserPrincipal; public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject { + private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); + private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); private final UUID _id; @@ -111,6 +116,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); + private final AtomicLong _txnUpdateTime = new AtomicLong(0); private Principal _principal; @@ -141,7 +147,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo _connectionConfig = connConfig; _transaction = new AutoCommitTransaction(this.getMessageStore()); _principal = new UserPrincipal(connection.getAuthorizationID()); - _reference = new WeakReference(this); + _reference = new WeakReference<Session>(this); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); } @@ -160,8 +166,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues) { - - _transaction.enqueue(queues,message, new ServerTransaction.Action() + _transaction.enqueue(queues,message, new ServerTransaction.Action() { BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]); @@ -189,6 +194,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo }); incrementOutstandingTxnsIfNecessary(); + updateTransactionalActivity(); } @@ -377,6 +383,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo entry.release(); } }); + updateTransactionalActivity(); } public Collection<Subscription_0_10> getSubscriptions() @@ -425,6 +432,11 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo // theory return !(_transaction instanceof AutoCommitTransaction); } + + public boolean inTransaction() + { + return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; + } public void selectTx() { @@ -471,6 +483,17 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo } } + /** + * Update last transaction activity timestamp + */ + public void updateTransactionalActivity() + { + if (isTransactional()) + { + _txnUpdateTime.set(System.currentTimeMillis()); + } + } + public Long getTxnStarts() { return _txnStarts.get(); @@ -606,6 +629,38 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo return (LogSubject) this; } + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + { + if (inTransaction()) + { + long currentTime = System.currentTimeMillis(); + long openTime = currentTime - _transaction.getTransactionStartTime(); + long idleTime = currentTime - _txnUpdateTime.get(); + + // Log a warning on idle or open transactions + if (idleWarn > 0L && idleTime > idleWarn) + { + CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime)); + _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms"); + } + else if (openWarn > 0L && openTime > openWarn) + { + CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime)); + _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms"); + } + + // Close connection for idle or open transactions that have timed out + if (idleClose > 0L && idleTime > idleClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); + } + else if (openClose > 0L && openTime > openClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); + } + } + } + @Override public String toLogString() { @@ -617,7 +672,5 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo getVirtualHost().getName(), getChannel()) + "] "; - } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index d12ab6d474..be659c87ae 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -370,7 +370,6 @@ public class ServerSessionDelegate extends SessionDelegate } ssn.processed(xfr); - } @Override diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index db781ead96..36e9d78440 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -50,6 +50,11 @@ public class AutoCommitTransaction implements ServerTransaction _transactionLog = transactionLog; } + public long getTransactionStartTime() + { + return 0L; + } + /** * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered * by the caller are executed immediately. diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index a04c743be1..f9dac782a6 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -20,18 +20,23 @@ package org.apache.qpid.server.txn; * */ - import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.TransactionLog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A concrete implementation of ServerTransaction where enqueue/dequeue @@ -41,17 +46,28 @@ import org.apache.qpid.server.store.TransactionLog; */ public class LocalTransaction implements ServerTransaction { - protected static final Logger _logger = Logger.getLogger(LocalTransaction.class); + protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class); private final List<Action> _postTransactionActions = new ArrayList<Action>(); private volatile TransactionLog.Transaction _transaction; private TransactionLog _transactionLog; + private long _txnStartTime = 0L; public LocalTransaction(TransactionLog transactionLog) { _transactionLog = transactionLog; } + + public boolean inTransaction() + { + return _transaction != null; + } + + public long getTransactionStartTime() + { + return _txnStartTime; + } public void addPostTransactionAction(Action postTransactionAction) { @@ -89,7 +105,6 @@ public class LocalTransaction implements ServerTransaction try { - for(QueueEntry entry : queueEntries) { ServerMessage message = entry.getMessage(); @@ -113,7 +128,6 @@ public class LocalTransaction implements ServerTransaction _logger.error("Error during message dequeues", e); tidyUpOnError(e); } - } private void tidyUpOnError(Exception e) @@ -140,8 +154,7 @@ public class LocalTransaction implements ServerTransaction } finally { - _transaction = null; - _postTransactionActions.clear(); + resetDetails(); } } @@ -193,8 +206,25 @@ public class LocalTransaction implements ServerTransaction { _postTransactionActions.add(postTransactionAction); + if (_txnStartTime == 0L) + { + _txnStartTime = System.currentTimeMillis(); + } + if(message.isPersistent()) { + if(_transaction == null) + { + for(BaseQueue queue : queues) + { + if(queue.isDurable()) + { + beginTranIfNecessary(); + break; + } + } + } + try { for(BaseQueue queue : queues) @@ -248,17 +278,14 @@ public class LocalTransaction implements ServerTransaction } finally { - _transaction = null; - _postTransactionActions.clear(); + resetDetails(); } - } public void rollback() { try { - if(_transaction != null) { _transaction.abortTran(); @@ -280,9 +307,15 @@ public class LocalTransaction implements ServerTransaction } finally { - _transaction = null; - _postTransactionActions.clear(); + resetDetails(); } } } + + private void resetDetails() + { + _transaction = null; + _postTransactionActions.clear(); + _txnStartTime = 0L; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index b61b8a5c64..b3c6e1ac3a 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -52,6 +52,13 @@ public interface ServerTransaction public void onRollback(); } + /** + * Return the time the current transaction started. + * + * @return the time this transaction started or 0 if not in a transaction + */ + long getTransactionStartTime(); + /** * Register an Action for execution after transaction commit or rollback. Actions * will be executed in the order in which they are registered. diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index a550283a38..a1566917dd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -37,7 +36,6 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -64,6 +62,8 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; @@ -282,19 +282,30 @@ public class VirtualHostImpl implements VirtualHost // house keeping task from running. } } + for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) + { + _logger.debug("Checking for long running open transactions on connection " + connection); + for (AMQSessionModel session : connection.getSessionModels()) + { + _logger.debug("Checking for long running open transactions on session " + session); + try + { + session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(), + _configuration.getTransactionTimeoutOpenClose(), + _configuration.getTransactionTimeoutIdleWarn(), + _configuration.getTransactionTimeoutIdleClose()); + } + catch (Exception e) + { + _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); + } + } + } } } scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this)); - class ForceChannelClosuresTask extends TimerTask - { - public void run() - { - _connectionRegistry.expireClosedChannels(); - } - } - Map<String, VirtualHostPluginFactory> plugins = ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); |