diff options
6 files changed, 578 insertions, 2 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 3932b098cd..6bff89aba1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -158,7 +158,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter private FailoverException _lastFailoverException; /** Defines the default timeout to use for synchronous protocol commands. */ - private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; + private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30); /** Default buffer size for pending messages reads */ private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; diff --git a/java/systests/pom.xml b/java/systests/pom.xml index e6c1fa092e..39d488bd53 100644 --- a/java/systests/pom.xml +++ b/java/systests/pom.xml @@ -96,6 +96,10 @@ <name>QPID_HOME</name> <value>${basedir}/${topDirectoryLocation}/broker</value> </property> + <property> + <name>amqj.logging.level</name> + <value>${amqj.logging.level}</value> + </property> </systemProperties> <excludes> diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java new file mode 100644 index 0000000000..1b3eaf971f --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -0,0 +1,272 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageMetaData; + +import java.util.HashMap; +import java.util.Iterator; + +public class SlowMessageStore implements MessageStore +{ + private static final Logger _logger = Logger.getLogger(SlowMessageStore.class); + private static final String DELAYS = "delays"; + private HashMap<String, Long> _preDelays = new HashMap<String, Long>(); + private HashMap<String, Long> _postDelays = new HashMap<String, Long>(); + private long _defaultDelay = 0L; + private MessageStore _realStore = new MemoryMessageStore(); + private static final String PRE = "pre"; + private static final String POST = "post"; + private String DEFAULT_DELAY = "default"; + + public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + { + Configuration delays = config.subset(base + "." + DELAYS); + + configureDelays(delays); + + String messageStoreClass = config.getString(base + ".store.class"); + + if (delays.containsKey(DEFAULT_DELAY)) + { + _defaultDelay = delays.getLong(DEFAULT_DELAY); + } + + if (messageStoreClass != null) + { + Class clazz = Class.forName(messageStoreClass); + + Object o = clazz.newInstance(); + + if (!(o instanceof MessageStore)) + { + throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + + " does not."); + } + _realStore = (MessageStore) o; + _realStore.configure(virtualHost, base + ".store", config); + } + else + { + _realStore.configure(virtualHost, base + ".store", config); + } + } + + private void configureDelays(Configuration config) + { + Iterator delays = config.getKeys(); + + while (delays.hasNext()) + { + String key = (String) delays.next(); + if (key.endsWith(PRE)) + { + _preDelays.put(key.substring(0, key.length() - PRE.length() - 1), config.getLong(key)); + } + else if (key.endsWith(POST)) + { + _postDelays.put(key.substring(0, key.length() - POST.length() - 1), config.getLong(key)); + } + } + } + + private void doPostDelay(String method) + { + long delay = lookupDelay(_postDelays, method); + doDelay(delay); + } + + private void doPreDelay(String method) + { + long delay = lookupDelay(_preDelays, method); + doDelay(delay); + } + + private long lookupDelay(HashMap<String, Long> delays, String method) + { + Long delay = delays.get(method); + return (delay == null) ? _defaultDelay : delay; + } + + private void doDelay(long delay) + { + if (delay > 0) + { + try + { + Thread.sleep(delay); + } + catch (InterruptedException e) + { + _logger.warn("Interrupted : " + e); + } + } + } + + // ***** MessageStore Interface. + + public void close() throws Exception + { + doPreDelay("close"); + _realStore.close(); + doPostDelay("close"); + } + + public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException + { + doPreDelay("removeMessage"); + _realStore.removeMessage(storeContext, messageId); + doPostDelay("removeMessage"); + } + + public void createExchange(Exchange exchange) throws AMQException + { + doPreDelay("createExchange"); + _realStore.createExchange(exchange); + doPostDelay("createExchange"); + } + + public void removeExchange(Exchange exchange) throws AMQException + { + doPreDelay("removeExchange"); + _realStore.removeExchange(exchange); + doPostDelay("removeExchange"); + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + doPreDelay("bindQueue"); + _realStore.bindQueue(exchange, routingKey, queue, args); + doPostDelay("bindQueue"); + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + doPreDelay("unbindQueue"); + _realStore.unbindQueue(exchange, routingKey, queue, args); + doPostDelay("unbindQueue"); + } + + public void createQueue(AMQQueue queue) throws AMQException + { + doPreDelay("createQueue"); + _realStore.createQueue(queue); + doPostDelay("createQueue"); + } + + public void removeQueue(AMQShortString name) throws AMQException + { + doPreDelay("removeQueue"); + _realStore.removeQueue(name); + doPostDelay("removeQueue"); + } + + public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + { + doPreDelay("enqueueMessage"); + _realStore.enqueueMessage(context, name, messageId); + doPostDelay("enqueueMessage"); + } + + public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + { + doPreDelay("dequeueMessage"); + _realStore.dequeueMessage(context, name, messageId); + doPostDelay("dequeueMessage"); + } + + public void beginTran(StoreContext context) throws AMQException + { + doPreDelay("beginTran"); + _realStore.beginTran(context); + doPostDelay("beginTran"); + } + + public void commitTran(StoreContext context) throws AMQException + { + doPreDelay("commitTran"); + _realStore.commitTran(context); + doPostDelay("commitTran"); + } + + public void abortTran(StoreContext context) throws AMQException + { + doPreDelay("abortTran"); + _realStore.abortTran(context); + doPostDelay("abortTran"); + } + + public boolean inTran(StoreContext context) + { + doPreDelay("inTran"); + boolean b = _realStore.inTran(context); + doPostDelay("inTran"); + return b; + } + + public Long getNewMessageId() + { + doPreDelay("getNewMessageId"); + Long l = _realStore.getNewMessageId(); + doPostDelay("getNewMessageId"); + return l; + } + + public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + { + doPreDelay("storeContentBodyChunk"); + _realStore.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody); + doPostDelay("storeContentBodyChunk"); + } + + public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException + { + doPreDelay("storeMessageMetaData"); + _realStore.storeMessageMetaData(context, messageId, messageMetaData); + doPostDelay("storeMessageMetaData"); + } + + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + { + doPreDelay("getMessageMetaData"); + MessageMetaData mmd = _realStore.getMessageMetaData(context, messageId); + doPostDelay("getMessageMetaData"); + return mmd; + } + + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + { + doPreDelay("getContentBodyChunk"); + ContentChunk c = _realStore.getContentBodyChunk(context, messageId, index); + doPostDelay("getContentBodyChunk"); + return c; + } + +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java index 16dc81e82d..72c90b8023 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java @@ -21,6 +21,8 @@ package org.apache.qpid.test; import junit.framework.TestCase; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; @@ -46,6 +48,8 @@ import java.util.HashSet; import java.util.Hashtable; import java.util.Iterator; import java.util.Map; +import java.util.LinkedList; +import java.util.concurrent.ConcurrentHashMap; public class VMTestCase extends TestCase { @@ -61,6 +65,16 @@ public class VMTestCase extends TestCase protected final Map<String, String> _queues = new HashMap<String, String>(); protected final Map<String, String> _topics = new HashMap<String, String>(); + protected static final String ALL = "org.apache.qpid"; + protected static final String BROKER = "org.apache.qpid.server"; + protected static final String CLIENT = "org.apache.qpid.client"; + protected static final String COMMON = "org.apache.qpid.common"; + protected static final String FRAMING = "org.apache.qpid.framing"; + protected static final String TEST = "org.apache.qpid.test"; + + private LinkedList<LogState> _logStates = new LinkedList<LogState>(); + private Map<String, String> _setProperties = new HashMap<String, String>(); + protected void setUp() throws Exception { super.setUp(); @@ -116,6 +130,11 @@ public class VMTestCase extends TestCase // checkQueuesClean(); stopVMBroker(1); + + revertLogging(); + + revertSystemProperties(); + super.tearDown(); } @@ -159,7 +178,7 @@ public class VMTestCase extends TestCase public void startVMBroker(int vmID) throws Exception { - startVMBroker(vmID, null); + startVMBroker(vmID, (File) null); } /** FIXME: for now vmID must be unique client is responsible for this. */ @@ -204,10 +223,85 @@ public class VMTestCase extends TestCase } } + public void startVMBroker(int vmID, ConfigurationFileApplicationRegistry config) throws Exception + { + ApplicationRegistry.initialise(config, vmID); + startVMBroker(vmID); + } + public void stopVMBroker(int inVMid) { TransportConnection.killVMBroker(inVMid); ApplicationRegistry.remove(inVMid); } + protected void setLoggingLevel(String loggerName, Level level) + { + Logger logger = Logger.getLogger(loggerName); + + Level currentLevel = logger.getLevel(); + + _logStates.push(new LogState(logger, currentLevel)); + + logger.setLevel(level); + } + + protected void revertLogging() + { + for (LogState state : _logStates) + { + state.getLogger().setLevel(state.getLevel()); + } + + _logStates.clear(); + } + + protected class LogState + { + private Logger _logger; + private Level _level; + + public LogState(Logger logger, Level level) + { + _logger = logger; + _level = level; + } + + public Logger getLogger() + { + return _logger; + } + + public Level getLevel() + { + return _level; + } + } + + protected void setSystemProperty(String property, String value) + { + if (!_setProperties.containsKey(property)) + { + _setProperties.put(property, System.getProperty(property)); + } + + System.setProperty(property, value); + } + + protected void revertSystemProperties() + { + for (String key : _setProperties.keySet()) + { + String value = _setProperties.get(key); + if (value != null) + { + System.setProperty(key, value); + } + else + { + System.clearProperty(key); + } + } + } + } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java new file mode 100644 index 0000000000..ad6e9fcf51 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java @@ -0,0 +1,135 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.client.timeouts; + +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.test.VMTestCase; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.io.File; + +/** + * This tests that when the commit takes a long time(due to POST_COMMIT_DELAY) that the commit does not timeout + * This test must be run in conjunction with SyncWaiteTimeoutDelay or be run with POST_COMMIT_DELAY > 30s to ensure + * that the default value is being replaced. + */ +public class SyncWaitDelayTest extends VMTestCase +{ + protected static final Logger _logger = Logger.getLogger(SyncWaitDelayTest.class); + + final String QpidHome = System.getProperty("QPID_HOME"); + final File _configFile = new File(QpidHome, "etc/config.xml"); + + private String VIRTUALHOST = "test"; + protected long POST_COMMIT_DELAY = 1000L; + protected long SYNC_WRITE_TIMEOUT = POST_COMMIT_DELAY + 1000; + + protected Connection _connection; + protected Session _session; + protected Queue _queue; + protected MessageConsumer _consumer; + + public void setUp() throws Exception + { + if (!_configFile.exists()) + { + fail("Unable to test without config file:" + _configFile); + } + + ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(_configFile); + + //For now disable management on all configured inVM broker. + config.getConfiguration().setProperty("management.enabled", "false"); + + Configuration testVirtualhost = config.getConfiguration().subset("virtualhosts.virtualhost." + VIRTUALHOST); + testVirtualhost.setProperty("store.class", "org.apache.qpid.server.store.SlowMessageStore"); + testVirtualhost.setProperty("store.delays.commitTran.post", POST_COMMIT_DELAY); + + startVMBroker(2, config); + + //Set the syncWrite timeout to be just larger than the delay on the commitTran. + setSystemProperty("amqj.default_syncwrite_timeout", String.valueOf(SYNC_WRITE_TIMEOUT)); + + _brokerlist = "vm://:2"; + + super.setUp(); + + _connection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + //Create Queue + _queue = (Queue) _context.lookup("queue"); + + //Create Consumer + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + + //Ensure Queue exists + _session.createConsumer(_queue).close(); + } + + public void tearDown() throws Exception + { + //clean up + _connection.close(); + + stopVMBroker(2); + + super.tearDown(); + } + + public void test() throws JMSException + { + MessageProducer producer = _session.createProducer(_queue); + + Message message = _session.createTextMessage("Message"); + + producer.send(message); + + long start = System.nanoTime(); + + _logger.info("Calling Commit"); + + try + { + _session.commit(); + long end = System.nanoTime(); + long time = (end - start); + // As we are using Nano time ensure to multiply up the millis. + assertTrue("Commit was quickier than the delay:" + time, time > 1000000L * POST_COMMIT_DELAY); + assertFalse("Commit was to slower than the build in default", time > 1000000L * 1000 * 30); + } + catch (JMSException e) + { + fail(e.getMessage()); + } + + } + +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java new file mode 100644 index 0000000000..2b736ed392 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.client.timeouts; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQTimeoutException; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; + +/** This tests that when the syncWrite timeout is set that it will timeout on that time rather than the default 30s. */ +public class SyncWaitTimeoutDelayTest extends SyncWaitDelayTest +{ + protected static final Logger _logger = Logger.getLogger(SyncWaitTimeoutDelayTest.class); + + public void setUp() throws Exception + { + POST_COMMIT_DELAY = 1000L; + + //Set the syncWrite timeout to be less than the COMMIT Delay so we can validate that it is being applied + SYNC_WRITE_TIMEOUT = 500L; + + super.setUp(); + } + + public void test() throws JMSException + { + MessageProducer producer = _session.createProducer(_queue); + + Message message = _session.createTextMessage("Message"); + + producer.send(message); + + _logger.info("Calling Commit"); + + long start = System.nanoTime(); + try + { + _session.commit(); + fail("Commit occured even though syncWait timeout is shorter than delay in commit"); + } + catch (JMSException e) + { + assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException); + assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Failed to commit")); + // As we are using Nano time ensure to multiply up the millis. + assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30)); + } + + } +} |