diff options
author | Robert Gemmell <robbie@apache.org> | 2012-07-31 12:48:51 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-07-31 12:48:51 +0000 |
commit | a295ca2b7294353f86405398f7644f73f06ccae4 (patch) | |
tree | 491db638441672373382e72804d2d68ba8b3438b | |
parent | ae8e2a63a48536e90053bfdc7c4d71edd1ec1c7c (diff) | |
download | qpid-python-a295ca2b7294353f86405398f7644f73f06ccae4.tar.gz |
QPID-4124: Improved logging that is produced when the various transaction timeouts are exceeded. Remove duplication of messages and reorder logging/closing.
Work by myself, Keith Wall, Phil Harvey.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1367522 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 193 insertions, 33 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index a6218b8255..321fe810a9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -176,6 +176,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm private final ClientDeliveryMethod _clientDeliveryMethod; + private final TransactionTimeoutHelper _transactionTimeoutHelper; + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { @@ -195,6 +197,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _transaction = new AsyncAutoCommitTransaction(_messageStore, this); _clientDeliveryMethod = session.createDeliveryMethod(_channelId); + + _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject); } public ConfigStore getConfigStore() @@ -1407,7 +1411,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } } } - + public synchronized void block(AMQQueue queue) { if(_blockingEntities.add(queue)) @@ -1542,27 +1546,20 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm long openTime = currentTime - _transaction.getTransactionStartTime(); long idleTime = currentTime - _txnUpdateTime.get(); - // Log a warning on idle or open transactions - if (idleWarn > 0L && idleTime > idleWarn) - { - CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime)); - _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms"); - } - else if (openWarn > 0L && openTime > openWarn) - { - CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime)); - _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); - } - - // Close _connection_ for idle or open transactions that have timed out (this is different - // than the 0-10 code path which closes the session). - if (idleClose > 0L && idleTime > idleClose) + _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), + TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT); + if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose)) { closeConnection("Idle transaction timed out"); + return; } - else if (openClose > 0L && openTime > openClose) + + _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime), + TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT); + if (_transactionTimeoutHelper.isTimedOut(openTime, openClose)) { closeConnection("Open transaction timed out"); + return; } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java new file mode 100644 index 0000000000..0c474cca13 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ChannelMessages; + +public class TransactionTimeoutHelper +{ + private static final Logger LOGGER = Logger.getLogger(TransactionTimeoutHelper.class); + + public static final String IDLE_TRANSACTION_ALERT = "IDLE TRANSACTION ALERT"; + public static final String OPEN_TRANSACTION_ALERT = "OPEN TRANSACTION ALERT"; + + private final LogSubject _logSubject; + + public TransactionTimeoutHelper(final LogSubject logSubject) + { + _logSubject = logSubject; + } + + public void logIfNecessary(final long timeSoFar, final long warnTimeout, + final LogMessage message, final String alternateLogPrefix) + { + if (isTimedOut(timeSoFar, warnTimeout)) + { + LogActor logActor = CurrentActor.get(); + if(logActor.getRootMessageLogger().isMessageEnabled(logActor, _logSubject, message.getLogHierarchy())) + { + logActor.message(_logSubject, message); + } + else + { + LOGGER.warn(alternateLogPrefix + " " + _logSubject.toLogString() + " " + timeSoFar + " ms"); + } + } + } + + public boolean isTimedOut(long timeSoFar, long timeout) + { + return timeout > 0L && timeSoFar > timeout; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 5b5540897b..f82b25b3d6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -43,6 +43,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; @@ -144,6 +145,8 @@ public class ServerSession extends Session private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + private final TransactionTimeoutHelper _transactionTimeoutHelper; + ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) { this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig()); @@ -157,6 +160,8 @@ public class ServerSession extends Session _logSubject = new ChannelLogSubject(this); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); + + _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject); } protected void setState(State state) @@ -775,26 +780,20 @@ public class ServerSession extends Session long openTime = currentTime - _transaction.getTransactionStartTime(); long idleTime = currentTime - _txnUpdateTime.get(); - // Log a warning on idle or open transactions - if (idleWarn > 0L && idleTime > idleWarn) - { - CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(idleTime)); - _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms"); - } - else if (openWarn > 0L && openTime > openWarn) - { - CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime)); - _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms"); - } - - // Close connection for idle or open transactions that have timed out - if (idleClose > 0L && idleTime > idleClose) + _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), + TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT); + if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose)) { getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); + return; } - else if (openClose > 0L && openTime > openClose) + + _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime), + TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT); + if (_transactionTimeoutHelper.isTimedOut(openTime, openClose)) { getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); + return; } } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java new file mode 100644 index 0000000000..9081dc49d6 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.test.utils.QpidTestCase; + +public class TransactionTimeoutHelperTest extends QpidTestCase +{ + private final LogMessage _logMessage = mock(LogMessage.class); + private final LogActor _logActor = mock(LogActor.class); + private final LogSubject _logSubject = mock(LogSubject.class); + private TransactionTimeoutHelper _transactionTimeoutHelper; + private RootMessageLogger _rootMessageLogger; + + public void testLogIfNecessary() + { + _transactionTimeoutHelper.logIfNecessary(99, 100, _logMessage, ""); + verifyZeroInteractions(_logActor, _logMessage); + + _transactionTimeoutHelper.logIfNecessary(101, 100, _logMessage, ""); + verify(_logActor).message(_logSubject, _logMessage); + } + + public void testLogIfNecessaryWhenOperationalLoggingDisabled() + { + //disable the operational logging + when(_rootMessageLogger.isMessageEnabled( + same(_logActor), any(LogSubject.class), any(String.class))) + .thenReturn(false); + + //verify the actor is never asked to log a message + _transactionTimeoutHelper.logIfNecessary(101, 100, _logMessage, ""); + verify(_logActor, never()).message(any(LogMessage.class)); + verify(_logActor, never()).message(any(LogSubject.class), any(LogMessage.class)); + } + + public void testIsTimedOut() + { + assertFalse("Shouldn't have timed out", _transactionTimeoutHelper.isTimedOut(199,200)); + assertTrue("Should have timed out", _transactionTimeoutHelper.isTimedOut(201,200)); + } + + /** + * If TransactionTimeout is disabled, the timeout will be 0. This test verifies + * that the helper methods respond negatively in this scenario. + */ + public void testTransactionTimeoutDisabled() + { + assertFalse("Shouldn't have timed out", _transactionTimeoutHelper.isTimedOut(201,0)); + + _transactionTimeoutHelper.logIfNecessary(99, 0, _logMessage, ""); + verifyZeroInteractions(_logActor, _logMessage); + } + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + CurrentActor.set(_logActor); + + _rootMessageLogger = mock(RootMessageLogger.class); + when(_logActor.getRootMessageLogger()).thenReturn(_rootMessageLogger); + + when(_rootMessageLogger.isMessageEnabled( + same(_logActor), any(LogSubject.class), any(String.class))) + .thenReturn(true); + + _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject); + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java index cc76d89a67..b11df5a2a0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java @@ -30,7 +30,7 @@ import javax.jms.Queue; * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration * is set for a virtual host. * - * A producer that is idle for too long or open for too long will have its connection closed and + * A producer that is idle for too long or open for too long will have its connection/session(0-10) closed and * any further operations will fail with a 408 resource timeout exception. Consumers will not * be affected by the transaction timeout configuration. */ |