summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-06-04 14:57:37 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-06-04 14:57:37 +0000
commit985e8c974e786b8aa03c48db159804c1a9e97ae2 (patch)
tree0c3a243519ada14e84c55eb5834cb339e67a2b0e
parentbe5148ecc15cceabf2a3df82904f6dbd0e9c070c (diff)
downloadqpid-python-985e8c974e786b8aa03c48db159804c1a9e97ae2.tar.gz
Merged revisions 663125 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x ........ r663125 | ritchiem | 2008-06-04 15:32:49 +0100 (Wed, 04 Jun 2008) | 6 lines QPID-1119 : M2x commit : Addition of a System property to AMQProtocolHandler.java to allow the syncWait default to be changed. To perform this a new SlowMessageStore has been added to the systest package. This allows all MessageStore methods to have a pre and/or post delay applied. This delay can be configured dynamically if you have a handle to the Store or via the XML configuration. The SlowMessageStore can also be used to wrap any existing MessageStore (Testing only carried out with the default MemoryMessageStore) To make testing easier on M2x VMTestCase has been modified to allow the test to simply configure logging levels and systemProperties. These are then reverted after the test has completed. These changes will naturally need more work to before they are merged to trunk which uses totally different methods for ClientProperties and for running tests. systests/pom.xml didn't have amqj.logging.level as a systemProperty hence setting it did nothing for the tests. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.x@663142 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java2
-rw-r--r--java/systests/pom.xml4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java272
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java96
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java135
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java71
6 files changed, 578 insertions, 2 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 3932b098cd..6bff89aba1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -158,7 +158,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private FailoverException _lastFailoverException;
/** Defines the default timeout to use for synchronous protocol commands. */
- private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+ private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
/** Default buffer size for pending messages reads */
private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
diff --git a/java/systests/pom.xml b/java/systests/pom.xml
index f937a97c53..6385c119eb 100644
--- a/java/systests/pom.xml
+++ b/java/systests/pom.xml
@@ -96,6 +96,10 @@
<name>QPID_HOME</name>
<value>${basedir}/${topDirectoryLocation}/broker</value>
</property>
+ <property>
+ <name>amqj.logging.level</name>
+ <value>${amqj.logging.level}</value>
+ </property>
</systemProperties>
<excludes>
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
new file mode 100644
index 0000000000..1b3eaf971f
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -0,0 +1,272 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+
+import java.util.HashMap;
+import java.util.Iterator;
+
+public class SlowMessageStore implements MessageStore
+{
+ private static final Logger _logger = Logger.getLogger(SlowMessageStore.class);
+ private static final String DELAYS = "delays";
+ private HashMap<String, Long> _preDelays = new HashMap<String, Long>();
+ private HashMap<String, Long> _postDelays = new HashMap<String, Long>();
+ private long _defaultDelay = 0L;
+ private MessageStore _realStore = new MemoryMessageStore();
+ private static final String PRE = "pre";
+ private static final String POST = "post";
+ private String DEFAULT_DELAY = "default";
+
+ public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
+ {
+ Configuration delays = config.subset(base + "." + DELAYS);
+
+ configureDelays(delays);
+
+ String messageStoreClass = config.getString(base + ".store.class");
+
+ if (delays.containsKey(DEFAULT_DELAY))
+ {
+ _defaultDelay = delays.getLong(DEFAULT_DELAY);
+ }
+
+ if (messageStoreClass != null)
+ {
+ Class clazz = Class.forName(messageStoreClass);
+
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ _realStore = (MessageStore) o;
+ _realStore.configure(virtualHost, base + ".store", config);
+ }
+ else
+ {
+ _realStore.configure(virtualHost, base + ".store", config);
+ }
+ }
+
+ private void configureDelays(Configuration config)
+ {
+ Iterator delays = config.getKeys();
+
+ while (delays.hasNext())
+ {
+ String key = (String) delays.next();
+ if (key.endsWith(PRE))
+ {
+ _preDelays.put(key.substring(0, key.length() - PRE.length() - 1), config.getLong(key));
+ }
+ else if (key.endsWith(POST))
+ {
+ _postDelays.put(key.substring(0, key.length() - POST.length() - 1), config.getLong(key));
+ }
+ }
+ }
+
+ private void doPostDelay(String method)
+ {
+ long delay = lookupDelay(_postDelays, method);
+ doDelay(delay);
+ }
+
+ private void doPreDelay(String method)
+ {
+ long delay = lookupDelay(_preDelays, method);
+ doDelay(delay);
+ }
+
+ private long lookupDelay(HashMap<String, Long> delays, String method)
+ {
+ Long delay = delays.get(method);
+ return (delay == null) ? _defaultDelay : delay;
+ }
+
+ private void doDelay(long delay)
+ {
+ if (delay > 0)
+ {
+ try
+ {
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted : " + e);
+ }
+ }
+ }
+
+ // ***** MessageStore Interface.
+
+ public void close() throws Exception
+ {
+ doPreDelay("close");
+ _realStore.close();
+ doPostDelay("close");
+ }
+
+ public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+ {
+ doPreDelay("removeMessage");
+ _realStore.removeMessage(storeContext, messageId);
+ doPostDelay("removeMessage");
+ }
+
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ doPreDelay("createExchange");
+ _realStore.createExchange(exchange);
+ doPostDelay("createExchange");
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ doPreDelay("removeExchange");
+ _realStore.removeExchange(exchange);
+ doPostDelay("removeExchange");
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ doPreDelay("bindQueue");
+ _realStore.bindQueue(exchange, routingKey, queue, args);
+ doPostDelay("bindQueue");
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ doPreDelay("unbindQueue");
+ _realStore.unbindQueue(exchange, routingKey, queue, args);
+ doPostDelay("unbindQueue");
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ doPreDelay("createQueue");
+ _realStore.createQueue(queue);
+ doPostDelay("createQueue");
+ }
+
+ public void removeQueue(AMQShortString name) throws AMQException
+ {
+ doPreDelay("removeQueue");
+ _realStore.removeQueue(name);
+ doPostDelay("removeQueue");
+ }
+
+ public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ {
+ doPreDelay("enqueueMessage");
+ _realStore.enqueueMessage(context, name, messageId);
+ doPostDelay("enqueueMessage");
+ }
+
+ public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ {
+ doPreDelay("dequeueMessage");
+ _realStore.dequeueMessage(context, name, messageId);
+ doPostDelay("dequeueMessage");
+ }
+
+ public void beginTran(StoreContext context) throws AMQException
+ {
+ doPreDelay("beginTran");
+ _realStore.beginTran(context);
+ doPostDelay("beginTran");
+ }
+
+ public void commitTran(StoreContext context) throws AMQException
+ {
+ doPreDelay("commitTran");
+ _realStore.commitTran(context);
+ doPostDelay("commitTran");
+ }
+
+ public void abortTran(StoreContext context) throws AMQException
+ {
+ doPreDelay("abortTran");
+ _realStore.abortTran(context);
+ doPostDelay("abortTran");
+ }
+
+ public boolean inTran(StoreContext context)
+ {
+ doPreDelay("inTran");
+ boolean b = _realStore.inTran(context);
+ doPostDelay("inTran");
+ return b;
+ }
+
+ public Long getNewMessageId()
+ {
+ doPreDelay("getNewMessageId");
+ Long l = _realStore.getNewMessageId();
+ doPostDelay("getNewMessageId");
+ return l;
+ }
+
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ {
+ doPreDelay("storeContentBodyChunk");
+ _realStore.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
+ doPostDelay("storeContentBodyChunk");
+ }
+
+ public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ {
+ doPreDelay("storeMessageMetaData");
+ _realStore.storeMessageMetaData(context, messageId, messageMetaData);
+ doPostDelay("storeMessageMetaData");
+ }
+
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+ {
+ doPreDelay("getMessageMetaData");
+ MessageMetaData mmd = _realStore.getMessageMetaData(context, messageId);
+ doPostDelay("getMessageMetaData");
+ return mmd;
+ }
+
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+ {
+ doPreDelay("getContentBodyChunk");
+ ContentChunk c = _realStore.getContentBodyChunk(context, messageId, index);
+ doPostDelay("getContentBodyChunk");
+ return c;
+ }
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
index 16dc81e82d..72c90b8023 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
@@ -21,6 +21,8 @@
package org.apache.qpid.test;
import junit.framework.TestCase;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
@@ -46,6 +48,8 @@ import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentHashMap;
public class VMTestCase extends TestCase
{
@@ -61,6 +65,16 @@ public class VMTestCase extends TestCase
protected final Map<String, String> _queues = new HashMap<String, String>();
protected final Map<String, String> _topics = new HashMap<String, String>();
+ protected static final String ALL = "org.apache.qpid";
+ protected static final String BROKER = "org.apache.qpid.server";
+ protected static final String CLIENT = "org.apache.qpid.client";
+ protected static final String COMMON = "org.apache.qpid.common";
+ protected static final String FRAMING = "org.apache.qpid.framing";
+ protected static final String TEST = "org.apache.qpid.test";
+
+ private LinkedList<LogState> _logStates = new LinkedList<LogState>();
+ private Map<String, String> _setProperties = new HashMap<String, String>();
+
protected void setUp() throws Exception
{
super.setUp();
@@ -116,6 +130,11 @@ public class VMTestCase extends TestCase
// checkQueuesClean();
stopVMBroker(1);
+
+ revertLogging();
+
+ revertSystemProperties();
+
super.tearDown();
}
@@ -159,7 +178,7 @@ public class VMTestCase extends TestCase
public void startVMBroker(int vmID) throws Exception
{
- startVMBroker(vmID, null);
+ startVMBroker(vmID, (File) null);
}
/** FIXME: for now vmID must be unique client is responsible for this. */
@@ -204,10 +223,85 @@ public class VMTestCase extends TestCase
}
}
+ public void startVMBroker(int vmID, ConfigurationFileApplicationRegistry config) throws Exception
+ {
+ ApplicationRegistry.initialise(config, vmID);
+ startVMBroker(vmID);
+ }
+
public void stopVMBroker(int inVMid)
{
TransportConnection.killVMBroker(inVMid);
ApplicationRegistry.remove(inVMid);
}
+ protected void setLoggingLevel(String loggerName, Level level)
+ {
+ Logger logger = Logger.getLogger(loggerName);
+
+ Level currentLevel = logger.getLevel();
+
+ _logStates.push(new LogState(logger, currentLevel));
+
+ logger.setLevel(level);
+ }
+
+ protected void revertLogging()
+ {
+ for (LogState state : _logStates)
+ {
+ state.getLogger().setLevel(state.getLevel());
+ }
+
+ _logStates.clear();
+ }
+
+ protected class LogState
+ {
+ private Logger _logger;
+ private Level _level;
+
+ public LogState(Logger logger, Level level)
+ {
+ _logger = logger;
+ _level = level;
+ }
+
+ public Logger getLogger()
+ {
+ return _logger;
+ }
+
+ public Level getLevel()
+ {
+ return _level;
+ }
+ }
+
+ protected void setSystemProperty(String property, String value)
+ {
+ if (!_setProperties.containsKey(property))
+ {
+ _setProperties.put(property, System.getProperty(property));
+ }
+
+ System.setProperty(property, value);
+ }
+
+ protected void revertSystemProperties()
+ {
+ for (String key : _setProperties.keySet())
+ {
+ String value = _setProperties.get(key);
+ if (value != null)
+ {
+ System.setProperty(key, value);
+ }
+ else
+ {
+ System.clearProperty(key);
+ }
+ }
+ }
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
new file mode 100644
index 0000000000..ad6e9fcf51
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.timeouts;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.test.VMTestCase;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.io.File;
+
+/**
+ * This tests that when the commit takes a long time(due to POST_COMMIT_DELAY) that the commit does not timeout
+ * This test must be run in conjunction with SyncWaiteTimeoutDelay or be run with POST_COMMIT_DELAY > 30s to ensure
+ * that the default value is being replaced.
+ */
+public class SyncWaitDelayTest extends VMTestCase
+{
+ protected static final Logger _logger = Logger.getLogger(SyncWaitDelayTest.class);
+
+ final String QpidHome = System.getProperty("QPID_HOME");
+ final File _configFile = new File(QpidHome, "etc/config.xml");
+
+ private String VIRTUALHOST = "test";
+ protected long POST_COMMIT_DELAY = 1000L;
+ protected long SYNC_WRITE_TIMEOUT = POST_COMMIT_DELAY + 1000;
+
+ protected Connection _connection;
+ protected Session _session;
+ protected Queue _queue;
+ protected MessageConsumer _consumer;
+
+ public void setUp() throws Exception
+ {
+ if (!_configFile.exists())
+ {
+ fail("Unable to test without config file:" + _configFile);
+ }
+
+ ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(_configFile);
+
+ //For now disable management on all configured inVM broker.
+ config.getConfiguration().setProperty("management.enabled", "false");
+
+ Configuration testVirtualhost = config.getConfiguration().subset("virtualhosts.virtualhost." + VIRTUALHOST);
+ testVirtualhost.setProperty("store.class", "org.apache.qpid.server.store.SlowMessageStore");
+ testVirtualhost.setProperty("store.delays.commitTran.post", POST_COMMIT_DELAY);
+
+ startVMBroker(2, config);
+
+ //Set the syncWrite timeout to be just larger than the delay on the commitTran.
+ setSystemProperty("amqj.default_syncwrite_timeout", String.valueOf(SYNC_WRITE_TIMEOUT));
+
+ _brokerlist = "vm://:2";
+
+ super.setUp();
+
+ _connection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ //Create Queue
+ _queue = (Queue) _context.lookup("queue");
+
+ //Create Consumer
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ //Ensure Queue exists
+ _session.createConsumer(_queue).close();
+ }
+
+ public void tearDown() throws Exception
+ {
+ //clean up
+ _connection.close();
+
+ stopVMBroker(2);
+
+ super.tearDown();
+ }
+
+ public void test() throws JMSException
+ {
+ MessageProducer producer = _session.createProducer(_queue);
+
+ Message message = _session.createTextMessage("Message");
+
+ producer.send(message);
+
+ long start = System.nanoTime();
+
+ _logger.info("Calling Commit");
+
+ try
+ {
+ _session.commit();
+ long end = System.nanoTime();
+ long time = (end - start);
+ // As we are using Nano time ensure to multiply up the millis.
+ assertTrue("Commit was quickier than the delay:" + time, time > 1000000L * POST_COMMIT_DELAY);
+ assertFalse("Commit was to slower than the build in default", time > 1000000L * 1000 * 30);
+ }
+ catch (JMSException e)
+ {
+ fail(e.getMessage());
+ }
+
+ }
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
new file mode 100644
index 0000000000..2b736ed392
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.timeouts;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQTimeoutException;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+/** This tests that when the syncWrite timeout is set that it will timeout on that time rather than the default 30s. */
+public class SyncWaitTimeoutDelayTest extends SyncWaitDelayTest
+{
+ protected static final Logger _logger = Logger.getLogger(SyncWaitTimeoutDelayTest.class);
+
+ public void setUp() throws Exception
+ {
+ POST_COMMIT_DELAY = 1000L;
+
+ //Set the syncWrite timeout to be less than the COMMIT Delay so we can validate that it is being applied
+ SYNC_WRITE_TIMEOUT = 500L;
+
+ super.setUp();
+ }
+
+ public void test() throws JMSException
+ {
+ MessageProducer producer = _session.createProducer(_queue);
+
+ Message message = _session.createTextMessage("Message");
+
+ producer.send(message);
+
+ _logger.info("Calling Commit");
+
+ long start = System.nanoTime();
+ try
+ {
+ _session.commit();
+ fail("Commit occured even though syncWait timeout is shorter than delay in commit");
+ }
+ catch (JMSException e)
+ {
+ assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException);
+ assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Failed to commit"));
+ // As we are using Nano time ensure to multiply up the millis.
+ assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30));
+ }
+
+ }
+}