summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-07-31 12:48:51 +0000
committerRobert Gemmell <robbie@apache.org>2012-07-31 12:48:51 +0000
commita295ca2b7294353f86405398f7644f73f06ccae4 (patch)
tree491db638441672373382e72804d2d68ba8b3438b
parentae8e2a63a48536e90053bfdc7c4d71edd1ec1c7c (diff)
downloadqpid-python-a295ca2b7294353f86405398f7644f73f06ccae4.tar.gz
QPID-4124: Improved logging that is produced when the various transaction timeouts are exceeded. Remove duplication of messages and reorder logging/closing.
Work by myself, Keith Wall, Phil Harvey. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1367522 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java31
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java63
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java29
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java101
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java2
5 files changed, 193 insertions, 33 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index a6218b8255..321fe810a9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -176,6 +176,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
private final ClientDeliveryMethod _clientDeliveryMethod;
+ private final TransactionTimeoutHelper _transactionTimeoutHelper;
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
@@ -195,6 +197,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_transaction = new AsyncAutoCommitTransaction(_messageStore, this);
_clientDeliveryMethod = session.createDeliveryMethod(_channelId);
+
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
}
public ConfigStore getConfigStore()
@@ -1407,7 +1411,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
}
-
+
public synchronized void block(AMQQueue queue)
{
if(_blockingEntities.add(queue))
@@ -1542,27 +1546,20 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
long openTime = currentTime - _transaction.getTransactionStartTime();
long idleTime = currentTime - _txnUpdateTime.get();
- // Log a warning on idle or open transactions
- if (idleWarn > 0L && idleTime > idleWarn)
- {
- CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime));
- _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms");
- }
- else if (openWarn > 0L && openTime > openWarn)
- {
- CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime));
- _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms");
- }
-
- // Close _connection_ for idle or open transactions that have timed out (this is different
- // than the 0-10 code path which closes the session).
- if (idleClose > 0L && idleTime > idleClose)
+ _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
+ TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
+ if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
{
closeConnection("Idle transaction timed out");
+ return;
}
- else if (openClose > 0L && openTime > openClose)
+
+ _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
+ TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
+ if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
{
closeConnection("Open transaction timed out");
+ return;
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java
new file mode 100644
index 0000000000..0c474cca13
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
+
+public class TransactionTimeoutHelper
+{
+ private static final Logger LOGGER = Logger.getLogger(TransactionTimeoutHelper.class);
+
+ public static final String IDLE_TRANSACTION_ALERT = "IDLE TRANSACTION ALERT";
+ public static final String OPEN_TRANSACTION_ALERT = "OPEN TRANSACTION ALERT";
+
+ private final LogSubject _logSubject;
+
+ public TransactionTimeoutHelper(final LogSubject logSubject)
+ {
+ _logSubject = logSubject;
+ }
+
+ public void logIfNecessary(final long timeSoFar, final long warnTimeout,
+ final LogMessage message, final String alternateLogPrefix)
+ {
+ if (isTimedOut(timeSoFar, warnTimeout))
+ {
+ LogActor logActor = CurrentActor.get();
+ if(logActor.getRootMessageLogger().isMessageEnabled(logActor, _logSubject, message.getLogHierarchy()))
+ {
+ logActor.message(_logSubject, message);
+ }
+ else
+ {
+ LOGGER.warn(alternateLogPrefix + " " + _logSubject.toLogString() + " " + timeSoFar + " ms");
+ }
+ }
+ }
+
+ public boolean isTimedOut(long timeSoFar, long timeout)
+ {
+ return timeout > 0L && timeSoFar > timeout;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 5b5540897b..f82b25b3d6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -43,6 +43,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -144,6 +145,8 @@ public class ServerSession extends Session
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private final TransactionTimeoutHelper _transactionTimeoutHelper;
+
ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
@@ -157,6 +160,8 @@ public class ServerSession extends Session
_logSubject = new ChannelLogSubject(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
+
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
}
protected void setState(State state)
@@ -775,26 +780,20 @@ public class ServerSession extends Session
long openTime = currentTime - _transaction.getTransactionStartTime();
long idleTime = currentTime - _txnUpdateTime.get();
- // Log a warning on idle or open transactions
- if (idleWarn > 0L && idleTime > idleWarn)
- {
- CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(idleTime));
- _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
- }
- else if (openWarn > 0L && openTime > openWarn)
- {
- CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
- _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
- }
-
- // Close connection for idle or open transactions that have timed out
- if (idleClose > 0L && idleTime > idleClose)
+ _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
+ TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
+ if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
{
getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+ return;
}
- else if (openClose > 0L && openTime > openClose)
+
+ _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
+ TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
+ if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
{
getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+ return;
}
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java
new file mode 100644
index 0000000000..9081dc49d6
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class TransactionTimeoutHelperTest extends QpidTestCase
+{
+ private final LogMessage _logMessage = mock(LogMessage.class);
+ private final LogActor _logActor = mock(LogActor.class);
+ private final LogSubject _logSubject = mock(LogSubject.class);
+ private TransactionTimeoutHelper _transactionTimeoutHelper;
+ private RootMessageLogger _rootMessageLogger;
+
+ public void testLogIfNecessary()
+ {
+ _transactionTimeoutHelper.logIfNecessary(99, 100, _logMessage, "");
+ verifyZeroInteractions(_logActor, _logMessage);
+
+ _transactionTimeoutHelper.logIfNecessary(101, 100, _logMessage, "");
+ verify(_logActor).message(_logSubject, _logMessage);
+ }
+
+ public void testLogIfNecessaryWhenOperationalLoggingDisabled()
+ {
+ //disable the operational logging
+ when(_rootMessageLogger.isMessageEnabled(
+ same(_logActor), any(LogSubject.class), any(String.class)))
+ .thenReturn(false);
+
+ //verify the actor is never asked to log a message
+ _transactionTimeoutHelper.logIfNecessary(101, 100, _logMessage, "");
+ verify(_logActor, never()).message(any(LogMessage.class));
+ verify(_logActor, never()).message(any(LogSubject.class), any(LogMessage.class));
+ }
+
+ public void testIsTimedOut()
+ {
+ assertFalse("Shouldn't have timed out", _transactionTimeoutHelper.isTimedOut(199,200));
+ assertTrue("Should have timed out", _transactionTimeoutHelper.isTimedOut(201,200));
+ }
+
+ /**
+ * If TransactionTimeout is disabled, the timeout will be 0. This test verifies
+ * that the helper methods respond negatively in this scenario.
+ */
+ public void testTransactionTimeoutDisabled()
+ {
+ assertFalse("Shouldn't have timed out", _transactionTimeoutHelper.isTimedOut(201,0));
+
+ _transactionTimeoutHelper.logIfNecessary(99, 0, _logMessage, "");
+ verifyZeroInteractions(_logActor, _logMessage);
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ CurrentActor.set(_logActor);
+
+ _rootMessageLogger = mock(RootMessageLogger.class);
+ when(_logActor.getRootMessageLogger()).thenReturn(_rootMessageLogger);
+
+ when(_rootMessageLogger.isMessageEnabled(
+ same(_logActor), any(LogSubject.class), any(String.class)))
+ .thenReturn(true);
+
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
index cc76d89a67..b11df5a2a0 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
@@ -30,7 +30,7 @@ import javax.jms.Queue;
* This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration
* is set for a virtual host.
*
- * A producer that is idle for too long or open for too long will have its connection closed and
+ * A producer that is idle for too long or open for too long will have its connection/session(0-10) closed and
* any further operations will fail with a 408 resource timeout exception. Consumers will not
* be affected by the transaction timeout configuration.
*/