diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2011-03-08 00:11:30 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2011-03-08 00:11:30 +0000 |
commit | 49dc07c6d7ba26d6a250466e2679feb523e02dd0 (patch) | |
tree | ec1c15ab0b3b8d9733cf957cd825710928116062 /java/systests | |
parent | 449475309d147dcc64aba3fe31dc9432e45659f8 (diff) | |
download | qpid-python-49dc07c6d7ba26d6a250466e2679feb523e02dd0.tar.gz |
QPID-2985: Add producer configurable transaction timeouts
Port of QPID-2864 changes from 0.5.x-dev branch to trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1079042 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests')
4 files changed, 742 insertions, 0 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java new file mode 100644 index 0000000000..36bac3b715 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +/** + * This verifies that changing the {@code transactionTimeout} configuration will alter + * the behaviour of the transaction open and idle logging, and that when the connection + * will be closed. + */ +public class TransactionTimeoutConfigurationTest extends TransactionTimeoutTestCase +{ + @Override + protected void configure() throws Exception + { + // Setup housekeeping every second + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100"); + + // Set transaction timout properties. + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "200"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "1000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "100"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "500"); + } + + public void testProducerIdleCommit() throws Exception + { + try + { + send(5, 0); + + sleep(2.0f); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(5, 0); + + check(IDLE); + } + + public void testProducerOpenCommit() throws Exception + { + try + { + send(5, 0.3f); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(6, 3); + + check(OPEN); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java new file mode 100644 index 0000000000..71b89bf911 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +/** + * This verifies that the default behaviour is not to time out transactions. + */ +public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase +{ + @Override + protected void configure() throws Exception + { + // Setup housekeeping every second + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100"); + } + + public void testProducerIdleCommit() throws Exception + { + try + { + send(5, 0); + + sleep(2.0f); + + _psession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testProducerOpenCommit() throws Exception + { + try + { + send(5, 0.3f); + + _psession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java new file mode 100644 index 0000000000..c912d6a323 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java @@ -0,0 +1,335 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +/** + * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration + * is set for a virtual host. + * + * A producer that is idle for too long or open for too long will have its connection closed and + * any further operations will fail with a 408 resource timeout exception. Consumers will not + * be affected by the transaction timeout configuration. + */ +public class TransactionTimeoutTest extends TransactionTimeoutTestCase +{ + public void testProducerIdle() throws Exception + { + try + { + sleep(2.0f); + + _psession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testProducerIdleCommit() throws Exception + { + try + { + send(5, 0); + + sleep(2.0f); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(5, 0); + + check(IDLE); + } + + public void testProducerOpenCommit() throws Exception + { + try + { + send(6, 0.5f); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(0, 10); + + check(OPEN); + } + + public void testProducerIdleCommitTwice() throws Exception + { + try + { + send(5, 0); + + sleep(1.0f); + + _psession.commit(); + + send(5, 0); + + sleep(2.0f); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(10, 0); + + check(IDLE); + } + + public void testProducerOpenCommitTwice() throws Exception + { + try + { + send(5, 0); + + sleep(1.0f); + + _psession.commit(); + + send(6, 0.5f); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + // the presistent store generates more idle messages? + monitor(isBrokerStorePersistent() ? 10 : 5, 10); + + check(OPEN); + } + + public void testProducerIdleRollback() throws Exception + { + try + { + send(5, 0); + + sleep(2.0f); + + _psession.rollback(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(5, 0); + + check(IDLE); + } + + public void testProducerIdleRollbackTwice() throws Exception + { + try + { + send(5, 0); + + sleep(1.0f); + + _psession.rollback(); + + send(5, 0); + + sleep(2.0f); + + _psession.rollback(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(10, 0); + + check(IDLE); + } + + public void testConsumerCommitClose() throws Exception + { + try + { + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + _csession.commit(); + + sleep(3.0f); + + _csession.close(); + } + catch (Exception e) + { + fail("should have succeeded: " + e.getMessage()); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testConsumerIdleReceiveCommit() throws Exception + { + try + { + send(1, 0); + + _psession.commit(); + + sleep(2.0f); + + expect(1, 0); + + sleep(2.0f); + + _csession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testConsumerIdleCommit() throws Exception + { + try + { + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + sleep(2.0f); + + _csession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testConsumerIdleRollback() throws Exception + { + try + { + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + sleep(2.0f); + + _csession.rollback(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testConsumerOpenCommit() throws Exception + { + try + { + send(1, 0); + + _psession.commit(); + + sleep(3.0f); + + _csession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testConsumerOpenRollback() throws Exception + { + try + { + send(1, 0); + + _psession.commit(); + + sleep(3.0f); + + _csession.rollback(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java new file mode 100644 index 0000000000..637f43fb2c --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java @@ -0,0 +1,253 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.Session; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.LogMonitor; + +/** + * The {@link TestCase} for transaction timeout testing. + */ +public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements ExceptionListener +{ + public static final String VIRTUALHOST = "test"; + public static final String TEXT = "0123456789abcdefghiforgettherest"; + public static final String CHN_OPEN_TXN = "CHN-1007"; + public static final String CHN_IDLE_TXN = "CHN-1008"; + public static final String IDLE = "Idle"; + public static final String OPEN = "Open"; + + protected LogMonitor _monitor; + protected AMQConnection _con; + protected Session _psession, _csession; + protected Queue _queue; + protected MessageConsumer _consumer; + protected MessageProducer _producer; + protected CountDownLatch _caught = new CountDownLatch(1); + protected String _message; + protected Exception _exception; + protected AMQConstant _code; + + protected void configure() throws Exception + { + // Setup housekeeping every second + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100"); + + /* + * Set transaction timout properties. The XML in the virtualhosts configuration is as follows: + * + * <transactionTimeout> + * <openWarn>1000</openWarn> + * <openClose>2000</openClose> + * <idleWarn>500</idleWarn> + * <idleClose>1500</idleClose> + * </transactionTimeout> + */ + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "1000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "2000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "500"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "1000"); + } + + protected void setUp() throws Exception + { + // Configure timeouts + configure(); + + // Monitor log file + _monitor = new LogMonitor(_outputFile); + + // Start broker + super.setUp(); + + // Connect to broker + String broker = _broker.equals(VM) ? ("vm://:" + DEFAULT_VM_PORT) : ("tcp://localhost:" + DEFAULT_PORT); + ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + broker + "'&maxprefetch='1'"); + _con = (AMQConnection) getConnection(url); + _con.setExceptionListener(this); + _con.start(); + + // Create queue + Session qsession = _con.createSession(true, Session.SESSION_TRANSACTED); + AMQShortString queueName = new AMQShortString("test"); + _queue = new AMQQueue(qsession.getDefaultQueueExchangeName(), queueName, queueName, false, true); + qsession.close(); + + // Create producer and consumer + producer(); + consumer(); + } + + protected void tearDown() throws Exception + { + try + { + _con.close(); + } + finally + { + super.tearDown(); + } + } + + /** + * Create a transacted persistent message producer session. + */ + protected void producer() throws Exception + { + _psession = _con.createSession(true, Session.SESSION_TRANSACTED); + _producer = _psession.createProducer(_queue); + _producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } + + /** + * Create a transacted message consumer session. + */ + protected void consumer() throws Exception + { + _csession = _con.createSession(true, Session.SESSION_TRANSACTED); + _consumer = _csession.createConsumer(_queue); + } + + /** + * Send a number of messages to the queue, optionally pausing after each. + */ + protected void send(int count, float delay) throws Exception + { + for (int i = 0; i < count; i++) + { + sleep(delay); + Message msg = _psession.createTextMessage(TEXT); + msg.setIntProperty("i", i); + _producer.send(msg); + } + } + + /** + * Sleep for a number of seconds. + */ + protected void sleep(float seconds) throws Exception + { + try + { + Thread.sleep((long) (seconds * 1000.0f)); + } + catch (InterruptedException ie) + { + throw new RuntimeException("Interrupted"); + } + } + + /** + * Check for idle and open messages. + * + * Either exactly zero messages, or +-2 error accepted around the specified number. + */ + protected void monitor(int idle, int open) throws Exception + { + List<String> idleMsgs = _monitor.findMatches(CHN_IDLE_TXN); + List<String> openMsgs = _monitor.findMatches(CHN_OPEN_TXN); + + String idleErr = "Expected " + idle + " but found " + idleMsgs.size() + " txn idle messages"; + String openErr = "Expected " + open + " but found " + openMsgs.size() + " txn open messages"; + + if (idle == 0) + { + assertTrue(idleErr, idleMsgs.isEmpty()); + } + else + { + assertTrue(idleErr, idleMsgs.size() >= idle - 2 && idleMsgs.size() <= idle + 2); + } + + if (open == 0) + { + assertTrue(openErr, openMsgs.isEmpty()); + } + else + { + assertTrue(openErr, openMsgs.size() >= open - 2 && openMsgs.size() <= open + 2); + } + } + + /** + * Receive a number of messages, optionally pausing after each. + */ + protected void expect(int count, float delay) throws Exception + { + for (int i = 0; i < count; i++) + { + sleep(delay); + Message msg = _consumer.receive(1000); + assertNotNull("Message should not be null", msg); + assertTrue("Message should be a text message", msg instanceof TextMessage); + assertEquals("Message content does not match expected", TEXT, ((TextMessage) msg).getText()); + assertEquals("Message order is incorrect", i, msg.getIntProperty("i")); + } + } + + /** + * Checks that the correct exception was thrown and was received + * by the listener with a 506 error code. + */ + protected void check(String reason)throws InterruptedException + { + assertTrue("Should have caught exception in listener", _caught.await(1, TimeUnit.SECONDS)); + assertNotNull("Should have thrown exception to client", _exception); + assertTrue("Exception message should contain '" + reason + "': " + _message, _message.contains(reason + " transaction timed out")); + assertNotNull("Exception should have an error code", _code); + assertEquals("Error code should be 506", AMQConstant.RESOURCE_ERROR, _code); + } + + /** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */ + public void onException(JMSException jmse) + { + _caught.countDown(); + _message = jmse.getLinkedException().getMessage(); + if (jmse.getLinkedException() instanceof AMQException) + { + _code = ((AMQException) jmse.getLinkedException()).getErrorCode(); + } + } +} |