diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-03-01 10:04:31 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-03-01 10:04:31 +0000 |
commit | ed972141474d38a2c07818e0a455d897d3c8976e (patch) | |
tree | 1e29ab0560063c439ef251bf40e6aa94073f3fcc /qpid/java/systests | |
parent | f848a82aaf14b35495ba94359009aa3818a49067 (diff) | |
download | qpid-python-ed972141474d38a2c07818e0a455d897d3c8976e.tar.gz |
NO-JIRA: [AMQP 1-0 Sandbox] merging from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1295492 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests')
55 files changed, 2017 insertions, 1278 deletions
diff --git a/qpid/java/systests/build.xml b/qpid/java/systests/build.xml index 33ad2227bb..fb2bae1d47 100644 --- a/qpid/java/systests/build.xml +++ b/qpid/java/systests/build.xml @@ -19,7 +19,15 @@ nn - or more contributor license agreements. See the NOTICE file - --> <project name="System Tests" default="build"> - <property name="module.depends" value="client management/tools/qpid-cli management/common broker broker/test common common/test junit-toolkit"/> + + <condition property="systests.optional.depends" value="bdbstore" else=""> + <and> + <contains string="${modules.opt}" substring="bdbstore"/> + <contains string="${profile}" substring="bdb"/> + </and> + </condition> + + <property name="module.depends" value="client management/common broker broker/test common common/test junit-toolkit ${systests.optional.depends}"/> <property name="module.test.src" location="src/main/java"/> <property name="module.test.excludes" value="**/DropInTest.java,**/TestClientControlledTest.java"/> diff --git a/qpid/java/systests/etc/config-systests-aclv2.xml b/qpid/java/systests/etc/config-systests-aclv2.xml index 33563e7891..e8b971a2a0 100644 --- a/qpid/java/systests/etc/config-systests-aclv2.xml +++ b/qpid/java/systests/etc/config-systests-aclv2.xml @@ -22,7 +22,7 @@ <configuration> <system/> <override> - <xml fileName="${test.config}" optional="true"/> + <xml fileName="${QPID_HOME}/${test.config}" optional="true"/> <xml fileName="${QPID_HOME}/etc/config-systests-aclv2-settings.xml"/> <xml fileName="${QPID_HOME}/etc/config-systests-settings.xml"/> <xml fileName="${QPID_HOME}/etc/config.xml"/> diff --git a/qpid/java/systests/etc/config-systests-bdb-settings.xml b/qpid/java/systests/etc/config-systests-bdb-settings.xml new file mode 100644 index 0000000000..4fa69d0abc --- /dev/null +++ b/qpid/java/systests/etc/config-systests-bdb-settings.xml @@ -0,0 +1,26 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<broker> + <virtualhosts>${QPID_HOME}/etc/virtualhosts-systests-bdb.xml</virtualhosts> +</broker> + + diff --git a/qpid/java/systests/etc/config-systests-bdb.xml b/qpid/java/systests/etc/config-systests-bdb.xml new file mode 100644 index 0000000000..9364006fcc --- /dev/null +++ b/qpid/java/systests/etc/config-systests-bdb.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<configuration> + <system/> + <override> + <xml fileName="${QPID_HOME}/${test.config}" optional="true"/> + <xml fileName="${QPID_HOME}/etc/config-systests-bdb-settings.xml"/> + <xml fileName="${QPID_HOME}/etc/config-systests-settings.xml"/> + <xml fileName="${QPID_HOME}/etc/config.xml"/> + </override> +</configuration> diff --git a/qpid/java/systests/etc/config-systests-derby.xml b/qpid/java/systests/etc/config-systests-derby.xml index ba27a0c020..303154d8f0 100644 --- a/qpid/java/systests/etc/config-systests-derby.xml +++ b/qpid/java/systests/etc/config-systests-derby.xml @@ -22,7 +22,7 @@ <configuration> <system/> <override> - <xml fileName="${test.config}" optional="true"/> + <xml fileName="${QPID_HOME}/${test.config}" optional="true"/> <xml fileName="${QPID_HOME}/etc/config-systests-derby-settings.xml"/> <xml fileName="${QPID_HOME}/etc/config-systests-settings.xml"/> <xml fileName="${QPID_HOME}/etc/config.xml"/> diff --git a/qpid/java/systests/etc/config-systests-firewall-2.xml b/qpid/java/systests/etc/config-systests-firewall-2.xml index f16cce6b85..2549a7e6c4 100644 --- a/qpid/java/systests/etc/config-systests-firewall-2.xml +++ b/qpid/java/systests/etc/config-systests-firewall-2.xml @@ -42,13 +42,6 @@ </connector> <management> <enabled>false</enabled> - <jmxport>8999</jmxport> - <ssl> - <enabled>false</enabled> - <!-- Update below path to your keystore location, eg ${conf}/qpid.keystore --> - <keyStorePath>${prefix}/../test-profiles/test_resources/ssl/keystore.jks</keyStorePath> - <keyStorePassword>password</keyStorePassword> - </ssl> </management> <advanced> <filterchain enableExecutorPool="true"/> diff --git a/qpid/java/systests/etc/config-systests-firewall-3.xml b/qpid/java/systests/etc/config-systests-firewall-3.xml index 71644e4185..0cafb6d70a 100644 --- a/qpid/java/systests/etc/config-systests-firewall-3.xml +++ b/qpid/java/systests/etc/config-systests-firewall-3.xml @@ -42,13 +42,6 @@ </connector> <management> <enabled>false</enabled> - <jmxport>8999</jmxport> - <ssl> - <enabled>false</enabled> - <!-- Update below path to your keystore location, eg ${conf}/qpid.keystore --> - <keyStorePath>${prefix}/../test-profiles/test_resources/ssl/keystore.jks</keyStorePath> - <keyStorePassword>password</keyStorePassword> - </ssl> </management> <advanced> <filterchain enableExecutorPool="true"/> diff --git a/qpid/java/systests/etc/config-systests-firewall.xml b/qpid/java/systests/etc/config-systests-firewall.xml index c0ce71210f..c73ac6a687 100644 --- a/qpid/java/systests/etc/config-systests-firewall.xml +++ b/qpid/java/systests/etc/config-systests-firewall.xml @@ -22,7 +22,7 @@ <configuration> <system/> <override> - <xml fileName="${test.config}" optional="true"/> + <xml fileName="${QPID_HOME}/${test.config}" optional="true"/> <xml fileName="${QPID_FIREWALL_CONFIG_SETTINGS}" optional="true"/> <xml fileName="${QPID_HOME}/etc/config-systests-firewall-settings.xml"/> <xml fileName="${QPID_HOME}/etc/config-systests-settings.xml"/> diff --git a/qpid/java/systests/etc/config-systests.xml b/qpid/java/systests/etc/config-systests.xml index 5d7d878e76..0e8f2803e3 100644 --- a/qpid/java/systests/etc/config-systests.xml +++ b/qpid/java/systests/etc/config-systests.xml @@ -22,7 +22,7 @@ <configuration> <system/> <override> - <xml fileName="${test.config}" optional="true"/> + <xml fileName="${QPID_HOME}/${test.config}" optional="true"/> <xml fileName="${QPID_HOME}/etc/config-systests-settings.xml"/> <xml fileName="${QPID_HOME}/etc/config.xml"/> </override> diff --git a/qpid/java/systests/etc/virtualhosts-systests-aclv2.xml b/qpid/java/systests/etc/virtualhosts-systests-aclv2.xml index eb96577487..db396d7ab1 100644 --- a/qpid/java/systests/etc/virtualhosts-systests-aclv2.xml +++ b/qpid/java/systests/etc/virtualhosts-systests-aclv2.xml @@ -22,7 +22,7 @@ <configuration> <system/> <override> - <xml fileName="${test.virtualhosts}" optional="true"/> + <xml fileName="${QPID_HOME}/${test.virtualhosts}" optional="true"/> <xml fileName="${QPID_HOME}/etc/virtualhosts-systests-aclv2-settings.xml"/> <xml fileName="${QPID_HOME}/etc/virtualhosts.xml"/> </override> diff --git a/qpid/java/systests/etc/virtualhosts-systests-bdb-settings.xml b/qpid/java/systests/etc/virtualhosts-systests-bdb-settings.xml new file mode 100644 index 0000000000..ce16523f13 --- /dev/null +++ b/qpid/java/systests/etc/virtualhosts-systests-bdb-settings.xml @@ -0,0 +1,56 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<virtualhosts> + <work>${QPID_WORK}</work> + + <virtualhost> + <name>localhost</name> + <localhost> + <store> + <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> + <environment-path>${work}/bdbstore/localhost-store</environment-path> + </store> + </localhost> + </virtualhost> + + <virtualhost> + <name>development</name> + <development> + <store> + <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> + <environment-path>${work}/bdbstore/development-store</environment-path> + </store> + </development> + </virtualhost> + + <virtualhost> + <name>test</name> + <test> + <store> + <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> + <environment-path>${work}/bdbstore/test-store</environment-path> + </store> + </test> + </virtualhost> +</virtualhosts> + + diff --git a/qpid/java/systests/etc/virtualhosts-systests-bdb.xml b/qpid/java/systests/etc/virtualhosts-systests-bdb.xml new file mode 100644 index 0000000000..367fee65ac --- /dev/null +++ b/qpid/java/systests/etc/virtualhosts-systests-bdb.xml @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<configuration> + <system/> + <override> + <xml fileName="${QPID_HOME}/${test.virtualhosts}" optional="true"/> + <xml fileName="${QPID_HOME}/etc/virtualhosts-systests-bdb-settings.xml"/> + <xml fileName="${QPID_HOME}/etc/virtualhosts.xml"/> + </override> +</configuration> diff --git a/qpid/java/systests/etc/virtualhosts-systests-derby.xml b/qpid/java/systests/etc/virtualhosts-systests-derby.xml index 171be37416..3745100e1f 100644 --- a/qpid/java/systests/etc/virtualhosts-systests-derby.xml +++ b/qpid/java/systests/etc/virtualhosts-systests-derby.xml @@ -22,7 +22,7 @@ <configuration> <system/> <override> - <xml fileName="${test.virtualhosts}" optional="true"/> + <xml fileName="${QPID_HOME}/${test.virtualhosts}" optional="true"/> <xml fileName="${QPID_HOME}/etc/virtualhosts-systests-derby-settings.xml"/> <xml fileName="${QPID_HOME}/etc/virtualhosts.xml"/> </override> diff --git a/qpid/java/systests/etc/virtualhosts-systests-firewall.xml b/qpid/java/systests/etc/virtualhosts-systests-firewall.xml index 51ab6739b3..c5c6a86d7c 100644 --- a/qpid/java/systests/etc/virtualhosts-systests-firewall.xml +++ b/qpid/java/systests/etc/virtualhosts-systests-firewall.xml @@ -22,7 +22,7 @@ <configuration> <system/> <override> - <xml fileName="${test.virtualhosts}" optional="true"/> + <xml fileName="${QPID_HOME}/${test.virtualhosts}" optional="true"/> <xml fileName="${QPID_FIREWALL_VIRTUALHOSTS_SETTINGS}" optional="true"/> <xml fileName="${QPID_HOME}/etc/virtualhosts.xml"/> </override> diff --git a/qpid/java/systests/etc/virtualhosts-systests.xml b/qpid/java/systests/etc/virtualhosts-systests.xml index 71f1cc9889..d6aeefac72 100644 --- a/qpid/java/systests/etc/virtualhosts-systests.xml +++ b/qpid/java/systests/etc/virtualhosts-systests.xml @@ -22,7 +22,7 @@ <configuration> <system/> <override> - <xml fileName="${test.virtualhosts}" optional="true"/> + <xml fileName="${QPID_HOME}/${test.virtualhosts}" optional="true"/> <xml fileName="${QPID_HOME}/etc/virtualhosts.xml"/> </override> </configuration> diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java new file mode 100644 index 0000000000..66f8fe0546 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java @@ -0,0 +1,949 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client.failover; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.TransactionRolledBackException; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.test.utils.FailoverBaseCase; + +/** + * Test suite to test all possible failover corner cases + */ +public class FailoverBehaviourTest extends FailoverBaseCase implements ConnectionListener, ExceptionListener +{ + private static final String TEST_MESSAGE_FORMAT = "test message {0}"; + + /** Indicates whether tests are run against clustered broker */ + private static boolean CLUSTERED = Boolean.getBoolean("profile.clustered"); + + /** Default number of messages to send before failover */ + private static final int DEFAULT_NUMBER_OF_MESSAGES = 10; + + /** Actual number of messages to send before failover */ + protected int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES); + + /** Test connection */ + protected Connection _connection; + + /** + * Failover completion latch is used to wait till connectivity to broker is + * restored + */ + private CountDownLatch _failoverComplete; + + /** + * Consumer session + */ + private Session _consumerSession; + + /** + * Test destination + */ + private Destination _destination; + + /** + * Consumer + */ + private MessageConsumer _consumer; + + /** + * Producer session + */ + private Session _producerSession; + + /** + * Producer + */ + private MessageProducer _producer; + + /** + * Holds exception sent into {@link ExceptionListener} on failover + */ + private JMSException _exceptionListenerException; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _connection = getConnection(); + _connection.setExceptionListener(this); + ((AMQConnection) _connection).setConnectionListener(this); + _failoverComplete = new CountDownLatch(1); + } + + /** + * Test whether MessageProducer can successfully publish messages after + * failover and rollback transaction + */ + public void testMessageProducingAndRollbackAfterFailover() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + produceMessages(); + causeFailure(); + + assertFailoverException(); + // producer should be able to send messages after failover + _producer.send(_producerSession.createTextMessage("test message " + _messageNumber)); + + // rollback after failover + _producerSession.rollback(); + + // tests whether sending and committing is working after failover + produceMessages(); + _producerSession.commit(); + + // tests whether receiving and committing is working after failover + consumeMessages(); + _consumerSession.commit(); + } + + /** + * Test whether {@link TransactionRolledBackException} is thrown on commit + * of dirty transacted session after failover. + * <p> + * Verifies whether second after failover commit is successful. + */ + public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnProducingMessages() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + produceMessages(); + causeFailure(); + + assertFailoverException(); + + // producer should be able to send messages after failover + _producer.send(_producerSession.createTextMessage("test message " + _messageNumber)); + + try + { + _producerSession.commit(); + fail("TransactionRolledBackException is expected on commit after failover with dirty session!"); + } + catch (JMSException t) + { + assertTrue("Expected TransactionRolledBackException but thrown " + t, + t instanceof TransactionRolledBackException); + } + + // simulate process of user replaying the transaction + produceMessages("replayed test message {0}", _messageNumber, false); + + // no exception should be thrown + _producerSession.commit(); + + // only messages sent after rollback should be received + consumeMessages("replayed test message {0}", _messageNumber); + + // no exception should be thrown + _consumerSession.commit(); + } + + /** + * Tests JMSException is not thrown on commit with a clean session after + * failover + */ + public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanProducerSession() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + + causeFailure(); + + assertFailoverException(); + + // should not throw an exception for a clean session + _producerSession.commit(); + + // tests whether sending and committing is working after failover + produceMessages(); + _producerSession.commit(); + + // tests whether receiving and committing is working after failover + consumeMessages(); + _consumerSession.commit(); + } + + /** + * Tests {@link TransactionRolledBackException} is thrown on commit of dirty + * transacted session after failover. + * <p> + * Verifies whether second after failover commit is successful. + */ + public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnMessageReceiving() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + produceMessages(); + _producerSession.commit(); + + // receive messages but do not commit + consumeMessages(); + + causeFailure(); + + assertFailoverException(); + + try + { + // should throw TransactionRolledBackException + _consumerSession.commit(); + fail("TransactionRolledBackException is expected on commit after failover"); + } + catch (Exception t) + { + assertTrue("Expected TransactionRolledBackException but thrown " + t, + t instanceof TransactionRolledBackException); + } + + resendMessagesIfNecessary(); + + // consume messages successfully + consumeMessages(); + _consumerSession.commit(); + } + + /** + * Tests JMSException is not thrown on commit with a clean session after failover + */ + public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanConsumerSession() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + produceMessages(); + _producerSession.commit(); + + consumeMessages(); + _consumerSession.commit(); + + causeFailure(); + + assertFailoverException(); + + // should not throw an exception with a clean consumer session + _consumerSession.commit(); + } + + /** + * Test that TransactionRolledBackException is thrown on commit of + * dirty session in asynchronous consumer after failover. + */ + public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnReceivingMessagesAsynchronously() + throws Exception + { + init(Session.SESSION_TRANSACTED, false); + FailoverTestMessageListener ml = new FailoverTestMessageListener(); + _consumer.setMessageListener(ml); + + _connection.start(); + + produceMessages(); + _producerSession.commit(); + + // wait for message receiving + ml.awaitForEnd(); + + assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter()); + + // assert messages + int counter = 0; + for (Message message : ml.getReceivedMessages()) + { + assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++); + } + ml.reset(); + + causeFailure(); + assertFailoverException(); + + + try + { + _consumerSession.commit(); + fail("TransactionRolledBackException should be thrown!"); + } + catch (TransactionRolledBackException e) + { + // that is what is expected + } + + resendMessagesIfNecessary(); + + // wait for message receiving + ml.awaitForEnd(); + + assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter()); + + // assert messages + counter = 0; + for (Message message : ml.getReceivedMessages()) + { + assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++); + } + + // commit again. It should be successful + _consumerSession.commit(); + } + + /** + * Test that {@link Session#rollback()} does not throw exception after failover + * and that we are able to consume messages. + */ + public void testRollbackAfterFailover() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + + produceMessages(); + _producerSession.commit(); + + consumeMessages(); + + causeFailure(); + + assertFailoverException(); + + _consumerSession.rollback(); + + resendMessagesIfNecessary(); + + // tests whether receiving and committing is working after failover + consumeMessages(); + _consumerSession.commit(); + } + + /** + * Test that {@link Session#rollback()} does not throw exception after receiving further messages + * after failover, and we can receive published messages after rollback. + */ + public void testRollbackAfterReceivingAfterFailover() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + + produceMessages(); + _producerSession.commit(); + + consumeMessages(); + causeFailure(); + + assertFailoverException(); + + resendMessagesIfNecessary(); + + consumeMessages(); + + _consumerSession.rollback(); + + // tests whether receiving and committing is working after failover + consumeMessages(); + _consumerSession.commit(); + } + + /** + * Test that {@link Session#recover()} does not throw an exception after failover + * and that we can consume messages after recover. + */ + public void testRecoverAfterFailover() throws Exception + { + init(Session.CLIENT_ACKNOWLEDGE, true); + + produceMessages(); + + // consume messages but do not acknowledge them + consumeMessages(); + + causeFailure(); + + assertFailoverException(); + + _consumerSession.recover(); + + resendMessagesIfNecessary(); + + // tests whether receiving and acknowledgment is working after recover + Message lastMessage = consumeMessages(); + lastMessage.acknowledge(); + } + + /** + * Test that receiving more messages after failover and then calling + * {@link Session#recover()} does not throw an exception + * and that we can consume messages after recover. + */ + public void testRecoverWithConsumedMessagesAfterFailover() throws Exception + { + init(Session.CLIENT_ACKNOWLEDGE, true); + + produceMessages(); + + // consume messages but do not acknowledge them + consumeMessages(); + + causeFailure(); + + assertFailoverException(); + + // publishing should work after failover + resendMessagesIfNecessary(); + + // consume messages again on a dirty session + consumeMessages(); + + // recover should successfully restore session + _consumerSession.recover(); + + // tests whether receiving and acknowledgment is working after recover + Message lastMessage = consumeMessages(); + lastMessage.acknowledge(); + } + + /** + * Test that first call to {@link Message#acknowledge()} after failover + * throws a JMSEXception if session is dirty. + */ + public void testAcknowledgeAfterFailover() throws Exception + { + init(Session.CLIENT_ACKNOWLEDGE, true); + + produceMessages(); + + // consume messages but do not acknowledge them + Message lastMessage = consumeMessages(); + causeFailure(); + + assertFailoverException(); + + try + { + // an implicit recover performed when acknowledge throws an exception due to failover + lastMessage.acknowledge(); + fail("JMSException should be thrown"); + } + catch (JMSException t) + { + // TODO: assert error code and/or expected exception type + } + + resendMessagesIfNecessary(); + + // tests whether receiving and acknowledgment is working after recover + lastMessage = consumeMessages(); + lastMessage.acknowledge(); + } + + /** + * Test that calling acknowledge before failover leaves the session + * clean for use after failover. + */ + public void testAcknowledgeBeforeFailover() throws Exception + { + init(Session.CLIENT_ACKNOWLEDGE, true); + + produceMessages(); + + // consume messages and acknowledge them + Message lastMessage = consumeMessages(); + lastMessage.acknowledge(); + + causeFailure(); + + assertFailoverException(); + + produceMessages(); + + // tests whether receiving and acknowledgment is working after recover + lastMessage = consumeMessages(); + lastMessage.acknowledge(); + } + + /** + * Test that receiving of messages after failover prior to calling + * {@link Message#acknowledge()} still results in acknowledge throwing an exception. + */ + public void testAcknowledgeAfterMessageReceivingAfterFailover() throws Exception + { + init(Session.CLIENT_ACKNOWLEDGE, true); + + produceMessages(); + + // consume messages but do not acknowledge them + consumeMessages(); + causeFailure(); + + assertFailoverException(); + + resendMessagesIfNecessary(); + + // consume again on dirty session + Message lastMessage = consumeMessages(); + try + { + // an implicit recover performed when acknowledge throws an exception due to failover + lastMessage.acknowledge(); + fail("JMSException should be thrown"); + } + catch (JMSException t) + { + // TODO: assert error code and/or expected exception type + } + + // tests whether receiving and acknowledgment is working on a clean session + lastMessage = consumeMessages(); + lastMessage.acknowledge(); + } + + /** + * Tests that call to {@link Message#acknowledge()} after failover throws an exception in asynchronous consumer + * and we can consume messages after acknowledge. + */ + public void testAcknowledgeAfterFailoverForAsynchronousConsumer() throws Exception + { + init(Session.CLIENT_ACKNOWLEDGE, false); + FailoverTestMessageListener ml = new FailoverTestMessageListener(); + _consumer.setMessageListener(ml); + _connection.start(); + + produceMessages(); + + // wait for message receiving + ml.awaitForEnd(); + + assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter()); + + // assert messages + int counter = 0; + Message currentMessage = null; + for (Message message : ml.getReceivedMessages()) + { + assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++); + currentMessage = message; + } + ml.reset(); + + causeFailure(); + assertFailoverException(); + + + try + { + currentMessage.acknowledge(); + fail("JMSException should be thrown!"); + } + catch (JMSException e) + { + // TODO: assert error code and/or expected exception type + } + + resendMessagesIfNecessary(); + + // wait for message receiving + ml.awaitForEnd(); + + assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter()); + + // assert messages + counter = 0; + for (Message message : ml.getReceivedMessages()) + { + assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++); + currentMessage = message; + } + + // acknowledge again. It should be successful + currentMessage.acknowledge(); + } + + /** + * Test whether {@link Session#recover()} works as expected after failover + * in AA mode. + */ + public void testRecoverAfterFailoverInAutoAcknowledgeMode() throws Exception + { + init(Session.AUTO_ACKNOWLEDGE, true); + + produceMessages(); + + // receive first message in order to start a dispatcher thread + Message receivedMessage = _consumer.receive(1000l); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + + causeFailure(); + + assertFailoverException(); + + _consumerSession.recover(); + + resendMessagesIfNecessary(); + + // tests whether receiving is working after recover + consumeMessages(); + } + + public void testClientAcknowledgedSessionCloseAfterFailover() throws Exception + { + sessionCloseAfterFailoverImpl(Session.CLIENT_ACKNOWLEDGE); + } + + public void testTransactedSessionCloseAfterFailover() throws Exception + { + sessionCloseAfterFailoverImpl(Session.SESSION_TRANSACTED); + } + + public void testAutoAcknowledgedSessionCloseAfterFailover() throws Exception + { + sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE); + } + + /** + * Tests {@link Session#close()} for session with given acknowledge mode + * to ensure that close works after failover. + * + * @param acknowledgeMode session acknowledge mode + * @throws JMSException + */ + private void sessionCloseAfterFailoverImpl(int acknowledgeMode) throws JMSException + { + init(acknowledgeMode, true); + produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false); + if (acknowledgeMode == Session.SESSION_TRANSACTED) + { + _producerSession.commit(); + } + + // intentionally receive message but do not commit or acknowledge it in + // case of transacted or CLIENT_ACK session + Message receivedMessage = _consumer.receive(1000l); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + + causeFailure(); + + assertFailoverException(); + + // for transacted/client_ack session + // no exception should be thrown but transaction should be automatically + // rolled back + _consumerSession.close(); + } + + /** + * A helper method to instantiate produce and consumer sessions, producer + * and consumer. + * + * @param acknowledgeMode + * acknowledge mode + * @param startConnection + * indicates whether connection should be started + * @throws JMSException + */ + private void init(int acknowledgeMode, boolean startConnection) throws JMSException + { + boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false; + + _consumerSession = _connection.createSession(isTransacted, acknowledgeMode); + _destination = _consumerSession.createQueue(getTestQueueName() + "_" + System.currentTimeMillis()); + _consumer = _consumerSession.createConsumer(_destination); + + if (startConnection) + { + _connection.start(); + } + + _producerSession = _connection.createSession(isTransacted, acknowledgeMode); + _producer = _producerSession.createProducer(_destination); + + } + + /** + * Resends messages if reconnected to a non-clustered broker + * + * @throws JMSException + */ + private void resendMessagesIfNecessary() throws JMSException + { + if (!CLUSTERED) + { + // assert that a new broker does not have messages on a queue + if (_consumer.getMessageListener() == null) + { + Message message = _consumer.receive(100l); + assertNull("Received a message after failover with non-clustered broker!", message); + } + // re-sending messages if reconnected to a non-clustered broker + produceMessages(true); + } + } + + /** + * Produces a default number of messages with default text content into test + * queue + * + * @throws JMSException + */ + private void produceMessages() throws JMSException + { + produceMessages(false); + } + + private void produceMessages(boolean seperateProducer) throws JMSException + { + produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, seperateProducer); + } + + /** + * Consumes a default number of messages and asserts their content. + * + * @return last consumed message + * @throws JMSException + */ + private Message consumeMessages() throws JMSException + { + return consumeMessages(TEST_MESSAGE_FORMAT, _messageNumber); + } + + /** + * Produces given number of text messages with content matching given + * content pattern + * + * @param messagePattern message content pattern + * @param messageNumber number of messages to send + * @param standaloneProducer whether to use the existing producer or a new one. + * @throws JMSException + */ + private void produceMessages(String messagePattern, int messageNumber, boolean standaloneProducer) throws JMSException + { + Session producerSession; + MessageProducer producer; + + if(standaloneProducer) + { + producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); + producer = producerSession.createProducer(_destination); + } + else + { + producerSession = _producerSession; + producer = _producer; + } + + for (int i = 0; i < messageNumber; i++) + { + String text = MessageFormat.format(messagePattern, i); + Message message = producerSession.createTextMessage(text); + producer.send(message); + } + + if(standaloneProducer) + { + producerSession.commit(); + } + } + + /** + * Consumes given number of text messages and asserts that their content + * matches given pattern + * + * @param messagePattern + * messages content pattern + * @param messageNumber + * message number to received + * @return last consumed message + * @throws JMSException + */ + private Message consumeMessages(String messagePattern, int messageNumber) throws JMSException + { + Message receivedMesssage = null; + for (int i = 0; i < messageNumber; i++) + { + receivedMesssage = _consumer.receive(1000l); + assertReceivedMessage(receivedMesssage, messagePattern, i); + } + return receivedMesssage; + } + + /** + * Asserts received message + * + * @param receivedMessage + * received message + * @param messagePattern + * messages content pattern + * @param messageIndex + * message index + */ + private void assertReceivedMessage(Message receivedMessage, String messagePattern, int messageIndex) + { + assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage); + assertTrue("Failure to receive message [" + messageIndex + "], expected TextMessage but received " + + receivedMessage, receivedMessage instanceof TextMessage); + String expectedText = MessageFormat.format(messagePattern, messageIndex); + String receivedText = null; + try + { + receivedText = ((TextMessage) receivedMessage).getText(); + } + catch (JMSException e) + { + fail("JMSException occured while getting message text:" + e.getMessage()); + } + assertEquals("Failover is broken! Expected [" + expectedText + "] but got [" + receivedText + "]", + expectedText, receivedText); + } + + /** + * Causes failover and waits till connection is re-established. + */ + private void causeFailure() + { + causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME * 2); + } + + /** + * Causes failover by stopping broker on given port and waits till + * connection is re-established during given time interval. + * + * @param port + * broker port + * @param delay + * time interval to wait for connection re-establishement + */ + private void causeFailure(int port, long delay) + { + failBroker(port); + + awaitForFailoverCompletion(delay); + } + + private void awaitForFailoverCompletion(long delay) + { + _logger.info("Awaiting Failover completion.."); + try + { + if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS)) + { + fail("Failover did not complete"); + } + } + catch (InterruptedException e) + { + fail("Test was interrupted:" + e.getMessage()); + } + } + + private void assertFailoverException() + { + // TODO: assert exception is received (once implemented) + // along with error code and/or expected exception type + } + + @Override + public void bytesSent(long count) + { + } + + @Override + public void bytesReceived(long count) + { + } + + @Override + public boolean preFailover(boolean redirect) + { + return true; + } + + @Override + public boolean preResubscribe() + { + return true; + } + + @Override + public void failoverComplete() + { + _failoverComplete.countDown(); + } + + @Override + public void onException(JMSException e) + { + _exceptionListenerException = e; + } + + private class FailoverTestMessageListener implements MessageListener + { + // message counter + private AtomicInteger _counter = new AtomicInteger(); + + private List<Message> _receivedMessage = new ArrayList<Message>(); + + private volatile CountDownLatch _endLatch; + + public FailoverTestMessageListener() throws JMSException + { + _endLatch = new CountDownLatch(1); + } + + @Override + public void onMessage(Message message) + { + _receivedMessage.add(message); + if (_counter.incrementAndGet() % _messageNumber == 0) + { + _endLatch.countDown(); + } + } + + public void reset() + { + _receivedMessage.clear(); + _endLatch = new CountDownLatch(1); + _counter.set(0); + } + + public List<Message> getReceivedMessages() + { + return _receivedMessage; + } + + public Object awaitForEnd() throws InterruptedException + { + return _endLatch.await((long) _messageNumber, TimeUnit.SECONDS); + } + + public int getMessageCounter() + { + return _counter.get(); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java index 471ebb16fc..d754979ab9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java @@ -32,9 +32,9 @@ import org.apache.qpid.transport.Connection; public class SSLTest extends QpidBrokerTestCase { - private static final String KEYSTORE = TEST_RESOURCES_DIR + "/ssl/java_client_keystore.jks"; + private static final String KEYSTORE = "test-profiles/test_resources/ssl/java_client_keystore.jks"; private static final String KEYSTORE_PASSWORD = "password"; - private static final String TRUSTSTORE = TEST_RESOURCES_DIR + "/ssl/java_client_truststore.jks"; + private static final String TRUSTSTORE = "test-profiles/test_resources/ssl/java_client_truststore.jks"; private static final String TRUSTSTORE_PASSWORD = "password"; private static final String CERT_ALIAS_APP1 = "app1"; private static final String CERT_ALIAS_APP2 = "app2"; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java index 2864d8e994..12a1682212 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java @@ -106,7 +106,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging }); //Remove the connection close from any 0-10 connections - _monitor.reset(); + _monitor.markDiscardPoint(); // Get a managedConnection ManagedConnection mangedConnection = _jmxUtils.getManagedObject(ManagedConnection.class, "org.apache.qpid:type=VirtualHost.Connection,*"); @@ -147,7 +147,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging */ public void testCreateExchangeDirectTransientViaManagementConsole() throws IOException, JMException { - _monitor.reset(); + _monitor.markDiscardPoint(); _jmxUtils.createExchange("test", getName(), "direct", false); @@ -171,7 +171,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging public void testCreateExchangeTopicTransientViaManagementConsole() throws IOException, JMException { //Remove any previous exchange declares - _monitor.reset(); + _monitor.markDiscardPoint(); _jmxUtils.createExchange("test", getName(), "topic", false); @@ -196,7 +196,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging public void testCreateExchangeFanoutTransientViaManagementConsole() throws IOException, JMException { //Remove any previous exchange declares - _monitor.reset(); + _monitor.markDiscardPoint(); _jmxUtils.createExchange("test", getName(), "fanout", false); @@ -221,7 +221,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging public void testCreateExchangeHeadersTransientViaManagementConsole() throws IOException, JMException { //Remove any previous exchange declares - _monitor.reset(); + _monitor.markDiscardPoint(); _jmxUtils.createExchange("test", getName(), "headers", false); @@ -265,7 +265,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging public void testCreateQueueTransientViaManagementConsole() throws IOException, JMException { //Remove any previous queue declares - _monitor.reset(); + _monitor.markDiscardPoint(); _jmxUtils.createQueue("test", getName(), null, false); @@ -308,7 +308,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging public void testQueueDeleteViaManagementConsole() throws IOException, JMException { //Remove any previous queue declares - _monitor.reset(); + _monitor.markDiscardPoint(); _jmxUtils.createQueue("test", getName(), null, false); @@ -354,7 +354,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging public void testBindingCreateOnDirectViaManagementConsole() throws IOException, JMException { //Remove any previous queue declares - _monitor.reset(); + _monitor.markDiscardPoint(); _jmxUtils.createQueue("test", getName(), null, false); @@ -381,7 +381,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging public void testBindingCreateOnTopicViaManagementConsole() throws IOException, JMException { //Remove any previous queue declares - _monitor.reset(); + _monitor.markDiscardPoint(); _jmxUtils.createQueue("test", getName(), null, false); @@ -408,7 +408,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging public void testBindingCreateOnFanoutViaManagementConsole() throws IOException, JMException { //Remove any previous queue declares - _monitor.reset(); + _monitor.markDiscardPoint(); _jmxUtils.createQueue("test", getName(), null, false); @@ -455,7 +455,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging public void testUnRegisterExchangeViaManagementConsole() throws IOException, JMException { //Remove any previous queue declares - _monitor.reset(); + _monitor.markDiscardPoint(); _jmxUtils.createExchange("test", getName(), "direct", false); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java index e657856d0e..c374d23473 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java @@ -103,7 +103,7 @@ public class MessageStatisticsDeliveryTest extends MessageStatisticsTestCase MessageConsumer consumer = session.createConsumer(_queue); for (int i = 0; i < number; i++) { - Message msg = consumer.receive(100); + Message msg = consumer.receive(1000); assertNotNull("Message " + i + " was not received", msg); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java index 98cdf94ac9..470fcefae3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java @@ -129,8 +129,8 @@ public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionL assertTrue("Failover took less than 6 seconds", duration > 6000); // Ensure we don't have delays before initial connection and reconnection. - // We allow 1 second for initial connection and failover logic on top of 6s of sleep. - assertTrue("Failover took more than 7 seconds:(" + duration + ")", duration < 7000); + // We allow 3 second for initial connection and failover logic on top of 6s of sleep. + assertTrue("Failover took more than 9 seconds:(" + duration + ")", duration < 9000); } catch (AMQException e) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java index f56f428f0b..484c2afeb5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java @@ -349,7 +349,7 @@ public class AbstractTestLogging extends QpidBrokerTestCase public boolean waitForMessage(String message, long wait) throws FileNotFoundException, IOException { - return _monitor.waitForMessage(message, wait, true); + return _monitor.waitForMessage(message, wait); } /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java index a2487b49bf..aef98b8a2a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java @@ -136,7 +136,7 @@ public class AlertingTest extends AbstractTestLogging stopBroker(); // Rest the monitoring clearing the current output file. - _monitor.reset(); + _monitor.markDiscardPoint(); startBroker(); wasAlertFired(); } @@ -169,7 +169,7 @@ public class AlertingTest extends AbstractTestLogging stopBroker(); - _monitor.reset(); + _monitor.markDiscardPoint(); // Change max message count to 5, start broker and make sure that that's triggered at the right time setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".queues.maximumMessageCount", "5"); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java index 97914f84a5..be2da128bc 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java @@ -55,7 +55,7 @@ public class BindingLoggingTest extends AbstractTestLogging { super.setUp(); //Ignore broker startup messages - _monitor.reset(); + _monitor.markDiscardPoint(); _connection = getConnection(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java index e901903eb4..7969ffc059 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java @@ -246,7 +246,7 @@ public class BrokerLoggingTest extends AbstractTestLogging if (isJavaBroker() && isExternalBroker()) { // Get custom -l value used during testing for the broker startup - String customLog4j = _brokerCommand.substring(_brokerCommand.indexOf("-l") + 2); + String customLog4j = _brokerCommand.substring(_brokerCommand.indexOf("-l") + 2).trim(); String TESTID = "BRK-1007"; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java index 16c529316a..d45bde2d98 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java @@ -496,7 +496,7 @@ public class DerbyMessageStoreLoggingTest extends MemoryMessageStoreLoggingTest stopBroker(); // Clear our monitor - _monitor.reset(); + _monitor.markDiscardPoint(); startBroker(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java index 32adc49521..602bdb66b5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java @@ -58,7 +58,7 @@ public class DurableQueueLoggingTest extends AbstractTestLogging { super.setUp(); //Ensure we only have logs from our test - _monitor.reset(); + _monitor.markDiscardPoint(); _connection = getConnection(); _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java index 1e48f34f99..ec96f778f6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java @@ -20,6 +20,16 @@ */ package org.apache.qpid.server.logging; +import java.io.IOException; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.framing.AMQFrame; @@ -28,13 +38,6 @@ import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.ExchangeDeleteOkBody; import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.Session; -import java.io.IOException; -import java.util.List; - /** * Exchange * @@ -122,7 +125,7 @@ public class ExchangeLoggingTest extends AbstractTestLogging public void testExchangeCreate() throws JMSException, IOException { //Ignore broker startup messages - _monitor.reset(); + _monitor.markDiscardPoint(); _session.createConsumer(_queue); // Ensure we have received the EXH log msg. @@ -176,7 +179,7 @@ public class ExchangeLoggingTest extends AbstractTestLogging public void testExchangeDelete() throws Exception, IOException { //Ignore broker startup messages - _monitor.reset(); + _monitor.markDiscardPoint(); //create the exchange by creating a consumer _session.createConsumer(_queue); @@ -214,4 +217,38 @@ public class ExchangeLoggingTest extends AbstractTestLogging } + public void testDiscardedMessage() throws Exception + { + //Ignore broker startup messages + _monitor.markDiscardPoint(); + + if (!isBroker010()) + { + // Default 0-8..-0-9-1 behaviour is for messages to be rejected (returned to client). + setTestClientSystemProperty("qpid.default_mandatory", "false"); + } + + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Do not create consumer so queue is not created and message will be discarded. + final MessageProducer producer = _session.createProducer(_queue); + + // Sending message + final TextMessage msg = _session.createTextMessage("msg"); + producer.send(msg); + + final String expectedMessageBody = "Discarded Message : Name: " + _name + " Routing Key: " + _queue.getQueueName(); + + // Ensure we have received the EXH log msg. + waitForMessage("EXH-1003"); + + List<String> results = findMatches(EXH_PREFIX); + assertEquals("Result set larger than expected.", 2, results.size()); + + final String log = getLogMessage(results, 1); + validateMessageID("EXH-1003", log); + + final String message = getMessageString(fromMessage(log)); + assertEquals("Log Message not as expected", expectedMessageBody, message); + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java index 9feca7279e..24e6aa4207 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.logging; import junit.framework.AssertionFailedError; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.util.LogMonitor; import java.util.List; @@ -206,7 +207,7 @@ public class ManagementLoggingTest extends AbstractTestLogging validateMessageID("MNG-1002", log); //Check the RMI Registry port is as expected - int mPort = getPort() + (DEFAULT_MANAGEMENT_PORT - DEFAULT_PORT); + int mPort = getManagementPort(getPort()); assertTrue("RMI Registry port not as expected(" + mPort + ").:" + getMessageString(log), getMessageString(log).endsWith(String.valueOf(mPort))); @@ -217,7 +218,7 @@ public class ManagementLoggingTest extends AbstractTestLogging // We expect the RMI Registry port (the defined 'management port') to be // 100 lower than the JMX RMIConnector Server Port (the actual JMX server) - int jmxPort = mPort + 100; + int jmxPort = mPort + ServerConfiguration.JMXPORT_CONNECTORSERVER_OFFSET; assertTrue("JMX RMIConnectorServer port not as expected(" + jmxPort + ").:" + getMessageString(log), getMessageString(log).endsWith(String.valueOf(jmxPort))); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/QueueLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/QueueLoggingTest.java index b8a42c0ab3..76ebda0ebd 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/QueueLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/QueueLoggingTest.java @@ -53,7 +53,7 @@ public class QueueLoggingTest extends AbstractTestLogging { super.setUp(); //Remove broker startup logging messages - _monitor.reset(); + _monitor.markDiscardPoint(); _connection = getConnection(); _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java index 6e156f091e..b6efe53580 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java @@ -58,7 +58,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging { super.setUp(); //Remove broker startup logging messages - _monitor.reset(); + _monitor.markDiscardPoint(); _connection = getConnection(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index f78b327209..a724e6c66e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -64,7 +64,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging _jmxUtilConnected=false; super.setUp(); - _monitor.reset(); + _monitor.markDiscardPoint(); producerConnection = getConnection(); producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/AbstractACLTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/AbstractACLTestCase.java index 8aa5b6d9de..32b0185f88 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/AbstractACLTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/AbstractACLTestCase.java @@ -83,14 +83,8 @@ public abstract class AbstractACLTestCase extends QpidBrokerTestCase implements @Override public void setUp() throws Exception { - if (QpidHome == null) - { - fail("QPID_HOME not set"); - } - // Initialise ACLs. - _configFile = new File(QpidHome, "etc" + File.separator + getConfig()); - + _configFile = new File("build" + File.separator + "etc" + File.separator + getConfig()); // Initialise ACL files for (String virtualHost : getHostList()) { @@ -156,7 +150,7 @@ public abstract class AbstractACLTestCase extends QpidBrokerTestCase implements */ public void setUpACLFile(String virtualHost) throws IOException, ConfigurationException { - String path = QpidHome + File.separator + "etc"; + String path = "build" + File.separator + "etc"; String className = StringUtils.substringBeforeLast(getClass().getSimpleName().toLowerCase(), "test"); String testName = StringUtils.substringAfter(getName(), "test").toLowerCase(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java index 782ca22965..254e1fe6ac 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java @@ -91,9 +91,9 @@ public class ExternalACLTest extends AbstractACLTestCase //send a message to each queue (also causing an exchange declare) MessageProducer sender = ((AMQSession<?, ?>) sess).createProducer(null); ((org.apache.qpid.jms.MessageProducer) sender).send(namedQueue, sess.createTextMessage("test"), - DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); + DeliveryMode.NON_PERSISTENT, 0, 0L, false, false); ((org.apache.qpid.jms.MessageProducer) sender).send(tempQueue, sess.createTextMessage("test"), - DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); + DeliveryMode.NON_PERSISTENT, 0, 0L, false, false); //consume the messages from the queues consumer.receive(2000); @@ -309,7 +309,7 @@ public class ExternalACLTest extends AbstractACLTestCase // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not // queue existence. ((org.apache.qpid.jms.MessageProducer) sender).send(queue, sess.createTextMessage("test"), - DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); + DeliveryMode.NON_PERSISTENT, 0, 0L, false, false); conn.close(); } @@ -337,7 +337,7 @@ public class ExternalACLTest extends AbstractACLTestCase // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not // queue existence. ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"), - DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); + DeliveryMode.NON_PERSISTENT, 0, 0L, false, false); // Test the connection with a valid consumer // This may fail as the session may be closed before the queue or the consumer created. @@ -608,7 +608,7 @@ public class ExternalACLTest extends AbstractACLTestCase // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not // queue existence. ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"), - DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); + DeliveryMode.NON_PERSISTENT, 0, 0L, false, false); // Test the connection with a valid consumer // This may not work as the session may be closed before the queue or consumer creation can occur. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java index 2d99a44532..044a0af335 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java @@ -35,16 +35,8 @@ public class FirewallConfigTest extends QpidBrokerTestCase @Override protected void setUp() throws Exception { - // do setup - final String QPID_HOME = System.getProperty("QPID_HOME"); - - if (QPID_HOME == null) - { - fail("QPID_HOME not set"); - } - // Setup initial config file. - _configFile = new File(QPID_HOME, "etc/config-systests-firewall.xml"); + _configFile = new File("build/etc/config-systests-firewall.xml"); // Setup temporary config file _tmpConfig = File.createTempFile("config-systests-firewall", ".xml"); @@ -86,7 +78,7 @@ public class FirewallConfigTest extends QpidBrokerTestCase public void testVhostAllowBrokerDeny() throws Exception { - _configFile = new File(System.getProperty("QPID_HOME"), "etc/config-systests-firewall-2.xml"); + _configFile = new File("build/etc/config-systests-firewall-2.xml"); super.setUp(); @@ -119,7 +111,7 @@ public class FirewallConfigTest extends QpidBrokerTestCase public void testVhostDenyBrokerAllow() throws Exception { - _configFile = new File(System.getProperty("QPID_HOME"), "etc/config-systests-firewall-3.xml"); + _configFile = new File("build/etc/config-systests-firewall-3.xml"); super.setUp(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index b5bb74327e..8c3c247e2b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -1116,10 +1116,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testDestinationOnSend() throws Exception { Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - MessageConsumer cons = ssn.createConsumer(ssn.createTopic("amq.topic/test")); + MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test")); MessageProducer prod = ssn.createProducer(null); - Queue queue = ssn.createQueue("amq.topic/test"); + Queue queue = ssn.createQueue("ADDR:amq.topic/test"); prod.send(queue,ssn.createTextMessage("A")); Message msg = cons.receive(1000); @@ -1147,7 +1147,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase Destination replyToDest = AMQDestination.createDestination(replyTo); MessageConsumer replyToCons = session.createConsumer(replyToDest); - Destination dest = session.createQueue("amq.direct/test"); + Destination dest = session.createQueue("ADDR:amq.direct/test"); MessageConsumer cons = session.createConsumer(dest); MessageProducer prod = session.createProducer(dest); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java index 1a23eee8ab..6189c37306 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java @@ -63,7 +63,7 @@ public class SyncWaitTimeoutDelayTest extends SyncWaitDelayTest catch (JMSException e) { assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException); - assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Failed to commit")); + assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Exception during commit")); // As we are using Nano time ensure to multiply up the millis. assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30)); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java new file mode 100644 index 0000000000..06be5cf456 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.unit.ack; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class ClientAcknowledgeTest extends QpidBrokerTestCase +{ + private static final long ONE_DAY_MS = 1000l * 60 * 60 * 24; + private Connection _connection; + private Queue _queue; + private Session _consumerSession; + private MessageConsumer _consumer; + private MessageProducer _producer; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _queue = getTestQueue(); + _connection = getConnection(); + } + + /** + * Test that message.acknowledge actually acknowledges, regardless of + * the flusher thread period, by restarting the broker after calling + * acknowledge, and then verifying after restart that the message acked + * is no longer present. This test requires a persistent store. + */ + public void testClientAckWithLargeFlusherPeriod() throws Exception + { + setTestClientSystemProperty("qpid.session.max_ack_delay", Long.toString(ONE_DAY_MS)); + _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + _consumer = _consumerSession.createConsumer(_queue); + _connection.start(); + + _producer = _consumerSession.createProducer(_queue); + _producer.send(createNextMessage(_consumerSession, 1)); + _producer.send(createNextMessage(_consumerSession, 2)); + + Message message = _consumer.receive(1000l); + assertNotNull("Message has not been received", message); + assertEquals("Unexpected message is received", 1, message.getIntProperty(INDEX)); + message.acknowledge(); + + //restart broker to allow verification of the acks + //without explicitly closing connection (which acks) + restartBroker(); + + // try to receive the message again, which should fail (as it was ackd) + _connection = getConnection(); + _connection.start(); + _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + _consumer = _consumerSession.createConsumer(_queue); + message = _consumer.receive(1000l); + assertNotNull("Message has not been received", message); + assertEquals("Unexpected message is received", 2, message.getIntProperty(INDEX)); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 5e7ba5482d..66ca1d8345 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -46,7 +46,7 @@ public class RecoverTest extends FailoverBaseCase { static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class); - private Exception _error; + private volatile Exception _error; private AtomicInteger count; protected AMQConnection _connection; @@ -249,14 +249,13 @@ public class RecoverTest extends FailoverBaseCase { if (!message.getJMSRedelivered()) { - setError( - new Exception("Message not marked as redelivered on what should be second delivery attempt")); + setError(new Exception("Message not marked as redelivered on what should be second delivery attempt")); } } else { - System.err.println(message); - fail("Message delivered too many times!: " + count); + _logger.error(message.toString()); + setError(new Exception("Message delivered too many times!: " + count)); } } catch (JMSException e) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java index c6b8069300..3c7962d873 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java @@ -21,14 +21,13 @@ package org.apache.qpid.test.unit.basic.close; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; public class CloseTest extends QpidBrokerTestCase @@ -41,7 +40,7 @@ public class CloseTest extends QpidBrokerTestCase Session session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - AMQQueue queue = new AMQQueue(new AMQBindingURL("test-queue")); + Queue queue = session.createQueue("test-queue"); MessageConsumer consumer = session.createConsumer(queue); MessageProducer producer_not_used_but_created_for_testing = session.createProducer(queue); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index e79fe69199..53a7533869 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -20,16 +20,6 @@ */ package org.apache.qpid.test.unit.client; -import java.io.BufferedWriter; -import java.io.InputStreamReader; -import java.io.LineNumberReader; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Connection; -import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java index 53a433c543..5f3daa407a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java @@ -26,9 +26,9 @@ import org.apache.qpid.client.AMQConnectionURL; public class AMQSSLConnectionTest extends AMQConnectionTest { - private static final String KEYSTORE = TEST_RESOURCES_DIR + "/ssl/java_client_keystore.jks"; + private static final String KEYSTORE = "test-profiles/test_resources/ssl/java_client_keystore.jks"; private static final String KEYSTORE_PASSWORD = "password"; - private static final String TRUSTSTORE = TEST_RESOURCES_DIR + "/ssl/java_client_truststore.jks"; + private static final String TRUSTSTORE = "test-profiles/test_resources/ssl/java_client_truststore.jks"; private static final String TRUSTSTORE_PASSWORD = "password"; @Override diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index fe2ea6ef10..f18f365f20 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -32,6 +32,7 @@ import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.BrokerDetails; @@ -285,16 +286,23 @@ public class ConnectionTest extends QpidBrokerTestCase } catch (Exception e) { - assertTrue("Incorrect exception thrown", - e.getMessage().contains("The following SASL mechanisms " + - "[MY_MECH]" + - " specified by the client are not supported by the broker")); + assertTrue("Unexpected exception message : " + e.getMessage(), + e.getMessage().contains("Client and broker have no SASL mechanisms in common.")); + assertTrue("Unexpected exception message : " + e.getMessage(), + e.getMessage().contains("Client restricted itself to : MY_MECH")); + } } - public void testClientIDVerification() throws Exception + /** + * Tests that when the same user connects twice with same clientid, the second connection + * fails if the clientid verification feature is enabled (which uses a dummy 0-10 Session + * with the clientid as its name to detect the previous usage of the clientid by the user) + */ + public void testClientIDVerificationForSameUser() throws Exception { - System.setProperty("qpid.verify_client_id", "true"); + setTestSystemProperty(ClientProperties.QPID_VERIFY_CLIENT_ID, "true"); + BrokerDetails broker = getBroker(); try { @@ -302,19 +310,40 @@ public class ConnectionTest extends QpidBrokerTestCase "client_id", "test"); Connection con2 = new AMQConnection(broker.toString(), "guest", "guest", - "client_id", "test"); + "client_id", "test"); fail("The client should throw a ConnectionException stating the" + " client ID is not unique"); } catch (Exception e) { - assertTrue("Incorrect exception thrown", + assertTrue("Incorrect exception thrown: " + e.getMessage(), e.getMessage().contains("ClientID must be unique")); } - finally + } + + /** + * Tests that when different users connects with same clientid, the second connection + * succeeds even though the clientid verification feature is enabled (which uses a dummy + * 0-10 Session with the clientid as its name; these are only verified unique on a + * per-principal basis) + */ + public void testClientIDVerificationForDifferentUsers() throws Exception + { + setTestSystemProperty(ClientProperties.QPID_VERIFY_CLIENT_ID, "true"); + + BrokerDetails broker = getBroker(); + try + { + Connection con = new AMQConnection(broker.toString(), "guest", "guest", + "client_id", "test"); + + Connection con2 = new AMQConnection(broker.toString(), "admin", "admin", + "client_id", "test"); + } + catch (Exception e) { - System.setProperty("qpid.verify_client_id", "false"); + fail("Unexpected exception thrown, client id was not unique but usernames were different! " + e.getMessage()); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java index fd28b86762..8ad8fa77d7 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java @@ -20,14 +20,10 @@ */ package org.apache.qpid.test.unit.client.message; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import javax.jms.JMSException; import javax.jms.Message; @@ -35,10 +31,13 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ObjectMessageTest extends QpidBrokerTestCase implements MessageListener { @@ -67,7 +66,7 @@ public class ObjectMessageTest extends QpidBrokerTestCase implements MessageList connection.start(); // create a publisher - producer = session.createProducer(destination, false, false, true); + producer = session.createProducer(destination, false, false); A a1 = new A(1, "A"); A a2 = new A(2, "a"); B b = new B(1, "B"); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java index 8c806fa2da..c98e403671 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -22,237 +22,145 @@ package org.apache.qpid.test.unit.client.temporaryqueue; import javax.jms.Connection; -import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; -import junit.framework.Assert; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.jms.ConnectionListener; -import java.util.ArrayList; -import java.util.List; -import java.util.LinkedList; - -public class TemporaryQueueTest extends QpidBrokerTestCase implements ExceptionListener +/** + * Tests the behaviour of {@link TemporaryQueue}. + */ +public class TemporaryQueueTest extends QpidBrokerTestCase { - private List<Exception> _exceptions = new ArrayList<Exception>(); - - protected void setUp() throws Exception - { - super.setUp(); - } - - protected void tearDown() throws Exception + /** + * Tests the basic produce/consume behaviour of a temporary queue. + */ + public void testMessageDeliveryUsingTemporaryQueue() throws Exception { - super.tearDown(); - } - - protected Connection createConnection() throws Exception - { - return getConnection("guest", "guest"); - } - - public void testTemporaryQueue() throws Exception - { - Connection conn = createConnection(); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryQueue queue = session.createTemporaryQueue(); + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session.createTemporaryQueue(); assertNotNull(queue); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createConsumer(queue); + final MessageProducer producer = session.createProducer(queue); + final MessageConsumer consumer = session.createConsumer(queue); conn.start(); producer.send(session.createTextMessage("hello")); TextMessage tm = (TextMessage) consumer.receive(2000); - assertNotNull(tm); + assertNotNull("Message not received", tm); assertEquals("hello", tm.getText()); + } - try - { - queue.delete(); - fail("Expected JMSException : should not be able to delete while there are active consumers"); - } - catch (JMSException je) - { - ; //pass - } - - consumer.close(); + /** + * Tests that a temporary queue cannot be used by another {@link Session}. + */ + public void testUseFromAnotherSessionProhibited() throws Exception + { + final Connection conn = getConnection(); + final Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session1.createTemporaryQueue(); + assertNotNull(queue); try { - queue.delete(); + session2.createConsumer(queue); + fail("Expected a JMSException when subscribing to a temporary queue created on a different session"); } catch (JMSException je) { - fail("Unexpected Exception: " + je.getMessage()); + //pass + assertEquals("Cannot consume from a temporary destination created on another session", je.getMessage()); } - - conn.close(); - } - - public void tUniqueness() throws Exception - { - int numProcs = Runtime.getRuntime().availableProcessors(); - final int threadsProc = 5; - - runUniqueness(1, 10); - runUniqueness(numProcs * threadsProc, 10); - runUniqueness(numProcs * threadsProc, 100); - runUniqueness(numProcs * threadsProc, 500); } - void runUniqueness(int makers, int queues) throws Exception + /** + * Tests that the client is able to explicitly delete a temporary queue using + * {@link TemporaryQueue#delete()} and is prevented from deleting one that + * still has consumers. + * + * Note: Under < 0-10 {@link TemporaryQueue#delete()} only marks the queue as deleted + * on the client. 0-10 causes the queue to be deleted from the Broker. + */ + public void testExplictTemporaryQueueDeletion() throws Exception { - Connection connection = createConnection(); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>(); - - //Create Makers - for (int m = 0; m < makers; m++) - { - tqList.add(new TempQueueMaker(session, queues)); - } - - - List<Thread> threadList = new LinkedList<Thread>(); - - //Create Makers - for (TempQueueMaker maker : tqList) - { - threadList.add(new Thread(maker)); - } - - //Start threads - for (Thread thread : threadList) - { - thread.start(); - } - - // Join Threads - for (Thread thread : threadList) - { - try - { - thread.join(); - } - catch (InterruptedException e) - { - fail("Couldn't correctly join threads"); - } - } - + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; // Required to observe the queue binding on the Broker + final TemporaryQueue queue = session.createTemporaryQueue(); + assertNotNull(queue); + final MessageConsumer consumer = session.createConsumer(queue); + conn.start(); - List<AMQQueue> list = new LinkedList<AMQQueue>(); + assertTrue("Queue should be bound", amqSession.isQueueBound((AMQDestination)queue)); - // Test values - for (TempQueueMaker maker : tqList) + try { - check(maker, list); + queue.delete(); + fail("Expected JMSException : should not be able to delete while there are active consumers"); } - - Assert.assertEquals("Not enough queues made.", makers * queues, list.size()); - - connection.close(); - } - - private void check(TempQueueMaker tq, List<AMQQueue> list) - { - for (AMQQueue q : tq.getList()) + catch (JMSException je) { - if (list.contains(q)) - { - fail(q + " already exists."); - } - else - { - list.add(q); - } + //pass + assertEquals("Temporary Queue has consumers so cannot be deleted", je.getMessage()); } - } - - - class TempQueueMaker implements Runnable - { - List<AMQQueue> _queues; - Session _session; - private int _count; + consumer.close(); + // Now deletion should succeed. + queue.delete(); - TempQueueMaker(Session session, int queues) throws JMSException + try { - _queues = new LinkedList<AMQQueue>(); - - _count = queues; - - _session = session; + session.createConsumer(queue); + fail("Exception not thrown"); } - - public void run() + catch (JMSException je) { - int i = 0; - try - { - for (; i < _count; i++) - { - _queues.add((AMQQueue) _session.createTemporaryQueue()); - } - } - catch (JMSException jmse) - { - //stop - } + //pass + assertEquals("Cannot consume from a deleted destination", je.getMessage()); } - List<AMQQueue> getList() + if (isBroker010()) { - return _queues; + assertFalse("Queue should no longer be bound", amqSession.isQueueBound((AMQDestination)queue)); } } - public void testQPID1217() throws Exception - { - Connection conA = getConnection(); - conA.setExceptionListener(this); - Session sessA = conA.createSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryQueue temp = sessA.createTemporaryQueue(); - - MessageProducer prod = sessA.createProducer(temp); - prod.send(sessA.createTextMessage("hi")); - - Thread.sleep(500); - assertTrue("Exception received", _exceptions.isEmpty()); - - Connection conB = getConnection(); - Session sessB = conB.createSession(false, Session.AUTO_ACKNOWLEDGE); - - JMSException ex = null; - try - { - MessageConsumer consB = sessB.createConsumer(temp); - } - catch (JMSException e) - { - ex = e; - } - assertNotNull(ex); - } - - public static junit.framework.Test suite() + /** + * Tests that a temporary queue remains available for reuse even after its initial + * consumer has disconnected. + * + * This test would fail under < 0-10 as their temporary queues are deleted automatically + * (broker side) after the last consumer disconnects (so message2 would be lost). For this + * reason this test is excluded from those profiles. + */ + public void testTemporaryQueueReused() throws Exception { - return new junit.framework.TestSuite(TemporaryQueueTest.class); - } + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session.createTemporaryQueue(); + assertNotNull(queue); - public void onException(JMSException arg0) - { - _exceptions.add(arg0); + final MessageProducer producer1 = session.createProducer(queue); + final MessageConsumer consumer1 = session.createConsumer(queue); + conn.start(); + producer1.send(session.createTextMessage("message1")); + producer1.send(session.createTextMessage("message2")); + TextMessage tm = (TextMessage) consumer1.receive(2000); + assertNotNull("Message not received by first consumer", tm); + assertEquals("message1", tm.getText()); + consumer1.close(); + + final MessageConsumer consumer2 = session.createConsumer(queue); + conn.start(); + tm = (TextMessage) consumer2.receive(2000); + assertNotNull("Message not received by second consumer", tm); + assertEquals("message2", tm.getText()); + consumer2.close(); } - } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java index fe929b4965..978ebfa93f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java @@ -20,17 +20,20 @@ */ package org.apache.qpid.test.unit.message; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.Session; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.util.Properties; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.naming.InitialContext; -import javax.jms.*; -import java.util.Properties; -import java.io.*; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; /** @@ -41,8 +44,6 @@ import java.io.*; */ public class UTF8Test extends QpidBrokerTestCase { - private static final Logger _logger = LoggerFactory.getLogger(UTF8Test.class); - public void testPlainEn() throws Exception { invoke("UTF8En"); @@ -65,38 +66,24 @@ public class UTF8Test extends QpidBrokerTestCase private void runTest(String exchangeName, String queueName, String routingKey, String data) throws Exception { - _logger.info("Running test for exchange: " + exchangeName - + " queue Name: " + queueName - + " routing key: " + routingKey); - declareQueue(exchangeName, routingKey, queueName); + Connection con = getConnection(); + Session sess = con.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + final Destination dest = getDestination(exchangeName, routingKey, queueName); + + final MessageConsumer msgCons = sess.createConsumer(dest); + con.start(); - javax.jms.Connection con = getConnection(); - javax.jms.Session sess = con.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); - Destination dest = getDestination(exchangeName, routingKey, queueName); // Send data MessageProducer msgProd = sess.createProducer(dest); TextMessage message = sess.createTextMessage(data); msgProd.send(message); + // consume data - MessageConsumer msgCons = sess.createConsumer(dest); - con.start(); TextMessage m = (TextMessage) msgCons.receive(RECEIVE_TIMEOUT); assertNotNull(m); assertEquals(m.getText(), data); } - private void declareQueue(String exch, String routkey, String qname) throws Exception - { - Connection conn = new Connection(); - conn.connect("localhost", QpidBrokerTestCase.DEFAULT_PORT, "test", "guest", "guest",false); - Session sess = conn.createSession(0); - sess.exchangeDeclare(exch, "direct", null, null); - sess.queueDeclare(qname, null, null); - sess.exchangeBind(qname, exch, routkey, null); - sess.sync(); - conn.close(); - } - private Destination getDestination(String exch, String routkey, String qname) throws Exception { Properties props = new Properties(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTransactedPublishTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTransactedPublishTest.java deleted file mode 100644 index 3ec7937812..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTransactedPublishTest.java +++ /dev/null @@ -1,403 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.publish; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.test.utils.FailoverBaseCase; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TransactionRolledBackException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -/** - * QPID-1816 : Whilst testing Acknoledgement after failover this completes testing - * of the client after failover. When we have a dirty session we should receive - * an error if we attempt to publish. This test ensures that both in the synchronous - * and asynchronous message delivery paths we receive the expected exceptions at - * the expected time. - */ -public class DirtyTransactedPublishTest extends FailoverBaseCase implements ConnectionListener -{ - protected CountDownLatch _failoverCompleted = new CountDownLatch(1); - - protected int NUM_MESSAGES; - protected Connection _connection; - protected Queue _queue; - protected Session _consumerSession; - protected MessageConsumer _consumer; - protected MessageProducer _producer; - - private static final String MSG = "MSG"; - private static final String SEND_FROM_ON_MESSAGE_TEXT = "sendFromOnMessage"; - protected CountDownLatch _receviedAll; - protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null); - - @Override - protected void setUp() throws Exception - { - super.setUp(); - NUM_MESSAGES = 10; - - _queue = getTestQueue(); - - //Create Producer put some messages on the queue - _connection = getConnection(); - } - - /** - * Initialise the test variables - * @param transacted is this a transacted test - * @param mode if not trasacted then what ack mode to use - * @throws Exception if there is a setup issue. - */ - protected void init(boolean transacted, int mode) throws Exception - { - _consumerSession = _connection.createSession(transacted, mode); - _consumer = _consumerSession.createConsumer(_queue); - _producer = _consumerSession.createProducer(_queue); - - // These should all end up being prefetched by session - sendMessage(_consumerSession, _queue, 1); - - assertEquals("Wrong number of messages on queue", 1, - ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); - } - - /** - * If a transacted session has failed over whilst it has uncommitted sent - * data then we need to throw a TransactedRolledbackException on commit() - * - * The alternative would be to maintain a replay buffer so that the message - * could be resent. This is not currently implemented - * - * @throws Exception if something goes wrong. - */ - public void testDirtySendingSynchronousTransacted() throws Exception - { - Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); - - // Ensure we get failover notifications - ((AMQConnection) _connection).setConnectionListener(this); - - MessageProducer producer = producerSession.createProducer(_queue); - - // Create and send message 0 - Message msg = producerSession.createMessage(); - msg.setIntProperty(INDEX, 0); - producer.send(msg); - - // DON'T commit message .. fail connection - - failBroker(getFailingPort()); - - // Ensure destination exists for sending - producerSession.createConsumer(_queue).close(); - - // Send the next message - msg.setIntProperty(INDEX, 1); - try - { - producer.send(msg); - fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException."); - } - catch (JMSException jmse) - { - assertEquals("Early warning of dirty session not correct", - "Failover has occurred and session is dirty so unable to send.", jmse.getMessage()); - } - - // Ignore that the session is dirty and attempt to commit to validate the - // exception is thrown. AND that the above failure notification did NOT - // clean up the session. - - try - { - producerSession.commit(); - fail("Session is dirty we should get an TransactionRolledBackException"); - } - catch (TransactionRolledBackException trbe) - { - // Normal path. - } - - // Resending of messages should now work ok as the commit was forcilbly rolledback - msg.setIntProperty(INDEX, 0); - producer.send(msg); - msg.setIntProperty(INDEX, 1); - producer.send(msg); - - producerSession.commit(); - - assertEquals("Wrong number of messages on queue", 2, - ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue)); - } - - /** - * If a transacted session has failed over whilst it has uncommitted sent - * data then we need to throw a TransactedRolledbackException on commit() - * - * The alternative would be to maintain a replay buffer so that the message - * could be resent. This is not currently implemented - * - * @throws Exception if something goes wrong. - */ - public void testDirtySendingOnMessageTransacted() throws Exception - { - NUM_MESSAGES = 1; - _receviedAll = new CountDownLatch(NUM_MESSAGES); - ((AMQConnection) _connection).setConnectionListener(this); - - init(true, Session.SESSION_TRANSACTED); - - _consumer.setMessageListener(new MessageListener() - { - - public void onMessage(Message message) - { - try - { - // Create and send message 0 - Message msg = _consumerSession.createMessage(); - msg.setIntProperty(INDEX, 0); - _producer.send(msg); - - // DON'T commit message .. fail connection - - failBroker(getFailingPort()); - - // rep - repopulateBroker(); - - // Destination will exist as this failBroker will populate - // the queue with 1 message - - // Send the next message - msg.setIntProperty(INDEX, 1); - try - { - _producer.send(msg); - fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException."); - } - catch (JMSException jmse) - { - assertEquals("Early warning of dirty session not correct", - "Failover has occurred and session is dirty so unable to send.", jmse.getMessage()); - } - - // Ignore that the session is dirty and attempt to commit to validate the - // exception is thrown. AND that the above failure notification did NOT - // clean up the session. - - try - { - _consumerSession.commit(); - fail("Session is dirty we should get an TransactionRolledBackException"); - } - catch (TransactionRolledBackException trbe) - { - // Normal path. - } - - // Resend messages - msg.setIntProperty(INDEX, 0); - msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT); - _producer.send(msg); - msg.setIntProperty(INDEX, 1); - msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT); - _producer.send(msg); - - _consumerSession.commit(); - - // Stop this consumer .. can't do _consumer.stop == DEADLOCK - // this doesn't seem to stop dispatcher running - _connection.stop(); - - // Signal that the onMessage send part of test is complete - // main thread can validate that messages are correct - _receviedAll.countDown(); - - } - catch (Exception e) - { - fail(e); - } - - } - - }); - - _connection.start(); - - if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS)) - { - // Check to see if we ended due to an exception in the onMessage handler - Exception cause = _causeOfFailure.get(); - if (cause != null) - { - cause.printStackTrace(); - fail(cause.getMessage()); - } - else - { - fail("All messages not received:" + _receviedAll.getCount() + "/" + NUM_MESSAGES); - } - } - - // Check to see if we ended due to an exception in the onMessage handler - Exception cause = _causeOfFailure.get(); - if (cause != null) - { - cause.printStackTrace(); - fail(cause.getMessage()); - } - - _consumer.close(); - _consumerSession.close(); - - _consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _connection.start(); - - // Validate that we could send the messages as expected. - assertEquals("Wrong number of messages on queue", 3, - ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); - - MessageConsumer consumer = _consumerSession.createConsumer(_queue); - - //Validate the message sent to setup the failed over broker. - Message message = consumer.receive(1000); - assertNotNull("Message " + 0 + " not received.", message); - assertEquals("Incorrect message received", 0, message.getIntProperty(INDEX)); - - // Validate the two messages sent from within the onMessage - for (int index = 0; index <= 1; index++) - { - message = consumer.receive(1000); - assertNotNull("Message " + index + " not received.", message); - assertEquals("Incorrect message received", index, message.getIntProperty(INDEX)); - assertEquals("Incorrect message text for message:" + index, SEND_FROM_ON_MESSAGE_TEXT, message.getStringProperty(MSG)); - } - - assertNull("Extra message received.", consumer.receiveNoWait()); - - _consumerSession.close(); - - assertEquals("Wrong number of messages on queue", 0, - ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue)); - } - - private void repopulateBroker() throws Exception - { - // Repopulate this new broker so we can test what happends after failover - - //Get the connection to the first (main port) broker. - Connection connection = getConnection(); - // Use a transaction to send messages so we can be sure they arrive. - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - // ensure destination is created. - session.createConsumer(_queue).close(); - - sendMessage(session, _queue, NUM_MESSAGES); - - assertEquals("Wrong number of messages on queue", NUM_MESSAGES, - ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); - - connection.close(); - } - - // AMQConnectionListener Interface.. used so we can validate that we - // actually failed over. - - public void bytesSent(long count) - { - } - - public void bytesReceived(long count) - { - } - - public boolean preFailover(boolean redirect) - { - //Allow failover - return true; - } - - public boolean preResubscribe() - { - //Allow failover - return true; - } - - public void failoverComplete() - { - _failoverCompleted.countDown(); - } - - /** - * Override so we can block until failover has completd - * - * @param port int the port of the broker to fail. - */ - @Override - public void failBroker(int port) - { - super.failBroker(port); - - try - { - if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS)) - { - fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME); - } - } - catch (InterruptedException e) - { - fail("Failover was interuppted"); - } - } - - /** - * Pass the given exception back to the waiting thread to fail the test run. - * - * @param e The exception that is causing the test to fail. - */ - protected void fail(Exception e) - { - _causeOfFailure.set(e); - // End the test. - while (_receviedAll.getCount() != 0) - { - _receviedAll.countDown(); - } - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TemporaryTopicTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TemporaryTopicTest.java new file mode 100644 index 0000000000..c89b13a0f9 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TemporaryTopicTest.java @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.topic; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + + +/** + * Tests the behaviour of {@link TemporaryTopic}. + */ +public class TemporaryTopicTest extends QpidBrokerTestCase +{ + /** + * Tests the basic publish/subscribe behaviour of a temporary topic. Single + * message is sent to two subscribers. + */ + public void testMessageDeliveryUsingTemporaryTopic() throws Exception + { + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryTopic topic = session.createTemporaryTopic(); + assertNotNull(topic); + final MessageProducer producer = session.createProducer(topic); + final MessageConsumer consumer1 = session.createConsumer(topic); + final MessageConsumer consumer2 = session.createConsumer(topic); + conn.start(); + producer.send(session.createTextMessage("hello")); + + final TextMessage tm1 = (TextMessage) consumer1.receive(2000); + final TextMessage tm2 = (TextMessage) consumer2.receive(2000); + + assertNotNull("Message not received by subscriber1", tm1); + assertEquals("hello", tm1.getText()); + assertNotNull("Message not received by subscriber2", tm2); + assertEquals("hello", tm2.getText()); + } + + /** + * Tests that the client is able to explicitly delete a temporary topic using + * {@link TemporaryTopic#delete()} and is prevented from deleting one that + * still has consumers. + * + * Note: Under < 0-10 {@link TemporaryTopic#delete()} only marks the queue as deleted + * on the client. 0-10 causes the topic to be deleted from the Broker. + */ + public void testExplictTemporaryTopicDeletion() throws Exception + { + final Connection conn = getConnection(); + + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryTopic topic = session.createTemporaryTopic(); + assertNotNull(topic); + final MessageConsumer consumer = session.createConsumer(topic); + conn.start(); + try + { + topic.delete(); + fail("Expected JMSException : should not be able to delete while there are active consumers"); + } + catch (JMSException je) + { + //pass + assertEquals("Temporary Topic has consumers so cannot be deleted", je.getMessage()); + } + + consumer.close(); + + // Now deletion should succeed. + topic.delete(); + + try + { + session.createConsumer(topic); + fail("Exception not thrown"); + } + catch (JMSException je) + { + //pass + assertEquals("Cannot consume from a deleted destination", je.getMessage()); + } + } + + /** + * Tests that a temporary topic cannot be used by another {@link Session}. + */ + public void testUseFromAnotherSessionProhibited() throws Exception + { + final Connection conn = getConnection(); + final Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryTopic topic = session1.createTemporaryTopic(); + + try + { + session2.createConsumer(topic); + fail("Expected a JMSException when subscribing to a temporary topic created on a different session"); + } + catch (JMSException je) + { + // pass + assertEquals("Cannot consume from a temporary destination created on another session", je.getMessage()); + } + } + + /** + * Tests that the client is prohibited from creating a durable subscriber for a temporary + * queue. + */ + public void testDurableSubscriptionProhibited() throws Exception + { + final Connection conn = getConnection(); + + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryTopic topic = session.createTemporaryTopic(); + assertNotNull(topic); + try + { + session.createDurableSubscriber(topic, null); + fail("Expected JMSException : should not be able to create durable subscription from temp topic"); + } + catch (JMSException je) + { + //pass + assertEquals("Cannot create a durable subscription with a temporary topic: " + topic.toString(), je.getMessage()); + } + } + + /** + * Tests that a temporary topic remains available for reuse even after its initial + * subscribers have disconnected. + */ + public void testTemporaryTopicReused() throws Exception + { + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryTopic topic = session.createTemporaryTopic(); + assertNotNull(topic); + + final MessageProducer producer = session.createProducer(topic); + final MessageConsumer consumer1 = session.createConsumer(topic); + conn.start(); + producer.send(session.createTextMessage("message1")); + TextMessage tm = (TextMessage) consumer1.receive(2000); + assertNotNull("Message not received by first consumer", tm); + assertEquals("message1", tm.getText()); + consumer1.close(); + + final MessageConsumer consumer2 = session.createConsumer(topic); + conn.start(); + producer.send(session.createTextMessage("message2")); + tm = (TextMessage) consumer2.receive(2000); + assertNotNull("Message not received by second consumer", tm); + assertEquals("message2", tm.getText()); + consumer2.close(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index eee232e113..0b1aeef8e9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -21,10 +21,7 @@ package org.apache.qpid.test.unit.topic; import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.TopicPublisher; import javax.jms.TopicSession; @@ -40,18 +37,6 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase; /** @author Apache Software Foundation */ public class TopicSessionTest extends QpidBrokerTestCase { - - protected void setUp() throws Exception - { - super.setUp(); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - - public void testTopicSubscriptionUnsubscription() throws Exception { @@ -228,83 +213,6 @@ public class TopicSessionTest extends QpidBrokerTestCase con.close(); } - public void testSendingSameMessage() throws Exception - { - AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); - TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); - TemporaryTopic topic = session.createTemporaryTopic(); - assertNotNull(topic); - TopicPublisher producer = session.createPublisher(topic); - MessageConsumer consumer = session.createConsumer(topic); - conn.start(); - TextMessage sentMessage = session.createTextMessage("Test Message"); - producer.send(sentMessage); - session.commit(); - TextMessage receivedMessage = (TextMessage) consumer.receive(2000); - assertNotNull(receivedMessage); - assertEquals(sentMessage.getText(), receivedMessage.getText()); - producer.send(sentMessage); - session.commit(); - receivedMessage = (TextMessage) consumer.receive(2000); - assertNotNull(receivedMessage); - assertEquals(sentMessage.getText(), receivedMessage.getText()); - session.commit(); - conn.close(); - - } - - public void testTemporaryTopic() throws Exception - { - AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); - TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); - TemporaryTopic topic = session.createTemporaryTopic(); - assertNotNull(topic); - TopicPublisher producer = session.createPublisher(topic); - MessageConsumer consumer = session.createConsumer(topic); - conn.start(); - producer.send(session.createTextMessage("hello")); - session.commit(); - TextMessage tm = (TextMessage) consumer.receive(2000); - assertNotNull(tm); - assertEquals("hello", tm.getText()); - session.commit(); - try - { - topic.delete(); - fail("Expected JMSException : should not be able to delete while there are active consumers"); - } - catch (JMSException je) - { - ; //pass - } - - consumer.close(); - - try - { - topic.delete(); - } - catch (JMSException je) - { - fail("Unexpected Exception: " + je.getMessage()); - } - - TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - try - { - session2.createConsumer(topic); - fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session"); - } - catch (JMSException je) - { - ; // pass - } - - - conn.close(); - } - - public void testNoLocal() throws Exception { @@ -445,9 +353,4 @@ public class TopicSessionTest extends QpidBrokerTestCase assertEquals("Queue depth was wrong", 0, depth); } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TopicSessionTest.class); - } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java deleted file mode 100644 index 46e5d214f5..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.transacted; - -/** - * This verifies that changing the {@code transactionTimeout} configuration will alter - * the behaviour of the transaction open and idle logging, and that when the connection - * will be closed. - */ -public class TransactionTimeoutConfigurationTest extends TransactionTimeoutTestCase -{ - @Override - protected void configure() throws Exception - { - // Setup housekeeping every second - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100"); - - // Set transaction timout properties. - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "200"); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "1000"); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "100"); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "500"); - } - - public void testProducerIdleCommit() throws Exception - { - try - { - send(5, 0); - - sleep(2.0f); - - _psession.commit(); - fail("should fail"); - } - catch (Exception e) - { - _exception = e; - } - - monitor(5, 0); - - check(IDLE); - } - - public void testProducerOpenCommit() throws Exception - { - try - { - send(5, 0.3f); - - _psession.commit(); - fail("should fail"); - } - catch (Exception e) - { - _exception = e; - } - - monitor(6, 3); - - check(OPEN); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java index db508143f9..fd8beffbe6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java @@ -30,6 +30,8 @@ public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase { // Setup housekeeping every second setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100"); + + // No transaction timeout configuration. } public void testProducerIdleCommit() throws Exception @@ -47,7 +49,7 @@ public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase fail("Should have succeeded"); } - assertTrue("Listener should not have received exception", _caught.getCount() == 1); + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); monitor(0, 0); } @@ -65,7 +67,7 @@ public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase fail("Should have succeeded"); } - assertTrue("Listener should not have received exception", _caught.getCount() == 1); + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); monitor(0, 0); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java index c912d6a323..f554b0089e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java @@ -30,152 +30,125 @@ package org.apache.qpid.test.unit.transacted; */ public class TransactionTimeoutTest extends TransactionTimeoutTestCase { - public void testProducerIdle() throws Exception + + protected void configure() throws Exception { - try + // Setup housekeeping every second + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100"); + + if (getName().contains("ProducerIdle")) { - sleep(2.0f); - - _psession.commit(); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "0"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "0"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "500"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "1500"); } - catch (Exception e) + else if (getName().contains("ProducerOpen")) { - fail("Should have succeeded"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "1000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "2000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "0"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "0"); } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - - monitor(0, 0); - } - - public void testProducerIdleCommit() throws Exception - { - try + else { - send(5, 0); - - sleep(2.0f); - - _psession.commit(); - fail("should fail"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "1000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "2000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "500"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "1000"); } - catch (Exception e) - { - _exception = e; - } - - monitor(5, 0); - - check(IDLE); } - - public void testProducerOpenCommit() throws Exception + + public void testProducerIdle() throws Exception { - try - { - send(6, 0.5f); - - _psession.commit(); - fail("should fail"); - } - catch (Exception e) - { - _exception = e; - } - - monitor(0, 10); - - check(OPEN); + sleep(2.0f); + + _psession.commit(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + + monitor(0, 0); } - - public void testProducerIdleCommitTwice() throws Exception + + public void testProducerIdleCommit() throws Exception { + send(5, 0); + // Idle for more than idleClose to generate idle-warns and cause a close. + sleep(2.0f); + try { - send(5, 0); - - sleep(1.0f); - _psession.commit(); - - send(5, 0); - - sleep(2.0f); - - _psession.commit(); - fail("should fail"); + fail("Exception not thrown"); } catch (Exception e) { _exception = e; } - + monitor(10, 0); - + check(IDLE); } - - public void testProducerOpenCommitTwice() throws Exception + + public void testProducerIdleCommitTwice() throws Exception { + send(5, 0); + // Idle for less than idleClose to generate idle-warns + sleep(1.0f); + + _psession.commit(); + + send(5, 0); + // Now idle for more than idleClose to generate more idle-warns and cause a close. + sleep(2.0f); + try { - send(5, 0); - - sleep(1.0f); - _psession.commit(); - - send(6, 0.5f); - - _psession.commit(); - fail("should fail"); + fail("Exception not thrown"); } catch (Exception e) { _exception = e; } - - // the presistent store generates more idle messages? - monitor(isBrokerStorePersistent() ? 10 : 5, 10); - - check(OPEN); + + monitor(15, 0); + + check(IDLE); } - + public void testProducerIdleRollback() throws Exception { + send(5, 0); + // Now idle for more than idleClose to generate more idle-warns and cause a close. + sleep(2.0f); try { - send(5, 0); - - sleep(2.0f); - _psession.rollback(); - fail("should fail"); + fail("Exception not thrown"); } catch (Exception e) { _exception = e; } - - monitor(5, 0); - + + monitor(10, 0); + check(IDLE); } - + public void testProducerIdleRollbackTwice() throws Exception { + send(5, 0); + // Idle for less than idleClose to generate idle-warns + sleep(1.0f); + _psession.rollback(); + send(5, 0); + // Now idle for more than idleClose to generate more idle-warns and cause a close. + sleep(2.0f); try { - send(5, 0); - - sleep(1.0f); - - _psession.rollback(); - - send(5, 0); - - sleep(2.0f); - _psession.rollback(); fail("should fail"); } @@ -183,153 +156,153 @@ public class TransactionTimeoutTest extends TransactionTimeoutTestCase { _exception = e; } - - monitor(10, 0); + + monitor(15, 0); check(IDLE); } - - public void testConsumerCommitClose() throws Exception + + public void testProducerOpenCommit() throws Exception { try { - send(1, 0); - + // Sleep between sends to cause open warns and then cause a close. + send(6, 0.5f); _psession.commit(); - - expect(1, 0); - - _csession.commit(); - - sleep(3.0f); - - _csession.close(); + fail("Exception not thrown"); } catch (Exception e) { - fail("should have succeeded: " + e.getMessage()); + _exception = e; } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - - monitor(0, 0); + + monitor(0, 10); + + check(OPEN); } - public void testConsumerIdleReceiveCommit() throws Exception + public void testProducerOpenCommitTwice() throws Exception { + send(5, 0); + sleep(1.0f); + _psession.commit(); + try { - send(1, 0); - + // Now sleep between sends to cause open warns and then cause a close. + send(6, 0.5f); _psession.commit(); - - sleep(2.0f); - - expect(1, 0); - - sleep(2.0f); - - _csession.commit(); + fail("Exception not thrown"); } catch (Exception e) { - fail("Should have succeeded"); + _exception = e; } + + monitor(0, 10); - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - + check(OPEN); + } + + public void testConsumerCommitClose() throws Exception + { + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + _csession.commit(); + + sleep(3.0f); + + _csession.close(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + monitor(0, 0); } + public void testConsumerIdleReceiveCommit() throws Exception + { + send(1, 0); + + _psession.commit(); + + sleep(2.0f); + + expect(1, 0); + + sleep(2.0f); + + _csession.commit(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + + monitor(0, 0); + } + public void testConsumerIdleCommit() throws Exception { - try - { - send(1, 0); - - _psession.commit(); - - expect(1, 0); - - sleep(2.0f); - - _csession.commit(); - } - catch (Exception e) - { - fail("Should have succeeded"); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + sleep(2.0f); + + _csession.commit(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + monitor(0, 0); } public void testConsumerIdleRollback() throws Exception { - try - { - send(1, 0); - - _psession.commit(); - - expect(1, 0); - - sleep(2.0f); - - _csession.rollback(); - } - catch (Exception e) - { - fail("Should have succeeded"); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + sleep(2.0f); + + _csession.rollback(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + monitor(0, 0); } - + public void testConsumerOpenCommit() throws Exception { - try - { - send(1, 0); - - _psession.commit(); - - sleep(3.0f); - - _csession.commit(); - } - catch (Exception e) - { - fail("Should have succeeded"); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - + send(1, 0); + + _psession.commit(); + + sleep(3.0f); + + _csession.commit(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + monitor(0, 0); } public void testConsumerOpenRollback() throws Exception { - try - { - send(1, 0); - - _psession.commit(); - - sleep(3.0f); - - _csession.rollback(); - } - catch (Exception e) - { - fail("Should have succeeded"); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); + send(1, 0); + _psession.commit(); + + sleep(3.0f); + + _csession.rollback(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + monitor(0, 0); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java index ef2de5c592..2b90d38049 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java @@ -23,6 +23,7 @@ package org.apache.qpid.test.unit.transacted; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.DeliveryMode; import javax.jms.ExceptionListener; @@ -49,7 +50,7 @@ import org.apache.qpid.util.LogMonitor; /** * The {@link TestCase} for transaction timeout testing. */ -public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements ExceptionListener +public abstract class TransactionTimeoutTestCase extends QpidBrokerTestCase implements ExceptionListener { public static final String VIRTUALHOST = "test"; public static final String TEXT = "0123456789abcdefghiforgettherest"; @@ -64,31 +65,16 @@ public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements Ex protected Queue _queue; protected MessageConsumer _consumer; protected MessageProducer _producer; - protected CountDownLatch _caught = new CountDownLatch(1); + private CountDownLatch _exceptionLatch = new CountDownLatch(1); + protected AtomicInteger _exceptionCount = new AtomicInteger(0); protected String _message; protected Exception _exception; protected AMQConstant _code; - - protected void configure() throws Exception - { - // Setup housekeeping every second - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100"); - - /* - * Set transaction timout properties. The XML in the virtualhosts configuration is as follows: - * - * <transactionTimeout> - * <openWarn>1000</openWarn> - * <openClose>2000</openClose> - * <idleWarn>500</idleWarn> - * <idleClose>1500</idleClose> - * </transactionTimeout> - */ - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "1000"); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "2000"); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "500"); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "1000"); - } + + /** + * Subclasses must implement this to configure transaction timeout parameters. + */ + protected abstract void configure() throws Exception; protected void setUp() throws Exception { @@ -233,7 +219,7 @@ public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements Ex */ protected void check(String reason)throws InterruptedException { - assertTrue("Should have caught exception in listener", _caught.await(1, TimeUnit.SECONDS)); + assertTrue("Should have caught exception in listener", _exceptionLatch.await(1, TimeUnit.SECONDS)); assertNotNull("Should have thrown exception to client", _exception); assertTrue("Exception message should contain '" + reason + "': " + _message, _message.contains(reason + " transaction timed out")); assertNotNull("Exception should have an error code", _code); @@ -243,11 +229,18 @@ public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements Ex /** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */ public void onException(JMSException jmse) { - _caught.countDown(); + _exceptionLatch.countDown(); + _exceptionCount.incrementAndGet(); + _message = jmse.getLinkedException().getMessage(); if (jmse.getLinkedException() instanceof AMQException) { _code = ((AMQException) jmse.getLinkedException()).getErrorCode(); } } + + protected int getNumberOfDeliveredExceptions() + { + return _exceptionCount.get(); + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index 0a98fc3382..bb44aea659 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -52,7 +52,6 @@ import javax.naming.NamingException; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.lang.StringUtils; -import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnectionFactory; @@ -115,7 +114,6 @@ public class QpidBrokerTestCase extends QpidTestCase private static final String BROKER_LANGUAGE = "broker.language"; private static final String BROKER_TYPE = "broker.type"; private static final String BROKER_COMMAND = "broker.command"; - private static final String BROKER_CLEAN = "broker.clean"; private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests"; private static final String BROKER_EXISTING_QPID_WORK = "broker.existing.qpid.work"; private static final String BROKER_VERSION = "broker.version"; @@ -137,16 +135,15 @@ public class QpidBrokerTestCase extends QpidTestCase public static final int DEFAULT_VM_PORT = 1; public static final int DEFAULT_PORT = Integer.getInteger("test.port", ServerConfiguration.DEFAULT_PORT); public static final int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt")); - public static final int DEFAULT_MANAGEMENT_PORT = Integer.getInteger("test.mport", ServerConfiguration.DEFAULT_JMXPORT); + public static final int DEFAULT_MANAGEMENT_PORT = Integer.getInteger("test.mport", ServerConfiguration.DEFAULT_JMXPORT_REGISTRYSERVER); public static final int DEFAULT_SSL_PORT = Integer.getInteger("test.port.ssl", ServerConfiguration.DEFAULT_SSL_PORT); protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA); protected BrokerType _brokerType = BrokerType.valueOf(System.getProperty(BROKER_TYPE, "").toUpperCase()); protected String _brokerCommand = System.getProperty(BROKER_COMMAND); - private String _brokerClean = System.getProperty(BROKER_CLEAN, null); private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS); private final AmqpProtocolVersion _brokerVersion = AmqpProtocolVersion.valueOf(System.getProperty(BROKER_VERSION, "")); - protected String _output = System.getProperty(TEST_OUTPUT); + protected String _output = System.getProperty(TEST_OUTPUT, System.getProperty("java.io.tmpdir")); protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT); private String _brokerProtocolExcludes = System.getProperty(BROKER_PROTOCOL_EXCLUDES); @@ -217,8 +214,13 @@ public class QpidBrokerTestCase extends QpidTestCase if (redirected) { _outputFile = new File(String.format("%s/TEST-%s.out", _output, qname)); - out = new PrintStream(_outputFile); + out = new PrintStream(new FileOutputStream(_outputFile), true); err = new PrintStream(String.format("%s/TEST-%s.err", _output, qname)); + + // This is relying on behaviour specific to log4j 1.2.12. If we were to upgrade to 1.2.13 or + // beyond we must change either code (or config) to ensure that ConsoleAppender#setFollow + // is set to true otherwise log4j logging will not respect the following reassignment. + System.setOut(out); System.setErr(err); @@ -259,14 +261,9 @@ public class QpidBrokerTestCase extends QpidTestCase if(_brokerCleanBetweenTests) { - try - { - cleanBroker(); - } - catch (Exception e) - { - _logger.error("exception cleaning up broker", e); - } + final String qpidWork = System.getProperty("QPID_WORK"); + cleanBrokerWork(qpidWork); + createBrokerWork(qpidWork); } _logger.info("========== stop " + getTestName() + " =========="); @@ -298,11 +295,11 @@ public class QpidBrokerTestCase extends QpidTestCase String existingQpidWorkPath = System.getProperty(BROKER_EXISTING_QPID_WORK); if(existingQpidWorkPath != null && !existingQpidWorkPath.equals("")) { - cleanBroker(); + String qpidWork = getQpidWork(_brokerType, getPort()); File existing = new File(existingQpidWorkPath); - File qpidWork = new File(getQpidWork(_brokerType, getPort())); - FileUtils.copyRecursive(existing, qpidWork); + cleanBrokerWork(qpidWork); + FileUtils.copyRecursive(existing, new File(qpidWork)); } startBroker(); @@ -480,7 +477,7 @@ public class QpidBrokerTestCase extends QpidTestCase addExcludedPorts(port, DEFAULT_SSL_PORT, options); - options.setJmxPort(getManagementPort(port)); + options.setJmxPortRegistryServer(getManagementPort(port)); //Set the log config file, relying on the log4j.configuration system property //set on the JVM by the JUnit runner task in module.xml. @@ -494,25 +491,22 @@ public class QpidBrokerTestCase extends QpidTestCase } else if (!_brokerType.equals(BrokerType.EXTERNAL)) { + // Add the port to QPID_WORK to ensure unique working dirs for multi broker tests + final String qpidWork = getQpidWork(_brokerType, port); String cmd = getBrokerCommand(port); _logger.info("starting external broker: " + cmd); ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+")); pb.redirectErrorStream(true); - Map<String, String> env = pb.environment(); - String qpidHome = System.getProperty(QPID_HOME); env.put(QPID_HOME, qpidHome); - //Augment Path with bin directory in QPID_HOME. env.put("PATH", env.get("PATH").concat(File.pathSeparator + qpidHome + "/bin")); //Add the test name to the broker run. // DON'T change PNAME, qpid.stop needs this value. env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + getTestName() + "\""); - // Add the port to QPID_WORK to ensure unique working dirs for multi broker tests - env.put("QPID_WORK", getQpidWork(_brokerType, port)); - + env.put("QPID_WORK", qpidWork); // Use the environment variable to set amqj.logging.level for the broker // The value used is a 'server' value in the test configuration to @@ -563,6 +557,10 @@ public class QpidBrokerTestCase extends QpidTestCase env.put("QPID_OPTS", QPID_OPTS); } } + + // cpp broker requires that the work directory is created + createBrokerWork(qpidWork); + Process process = pb.start();; Piper p = new Piper(process.getInputStream(), @@ -577,7 +575,7 @@ public class QpidBrokerTestCase extends QpidTestCase _logger.info("broker failed to become ready (" + p.ready + "):" + p.getStopLine()); //Ensure broker has stopped process.destroy(); - cleanBroker(); + cleanBrokerWork(qpidWork); throw new RuntimeException("broker failed to become ready:" + p.getStopLine()); } @@ -587,7 +585,7 @@ public class QpidBrokerTestCase extends QpidTestCase //test that the broker is still running and hasn't exited unexpectedly int exit = process.exitValue(); _logger.info("broker aborted: " + exit); - cleanBroker(); + cleanBrokerWork(qpidWork); throw new RuntimeException("broker aborted: " + exit); } catch (IllegalThreadStateException e) @@ -655,21 +653,28 @@ public class QpidBrokerTestCase extends QpidTestCase public String getTestConfigFile() { - String path = _output == null ? System.getProperty("java.io.tmpdir") : _output; - return path + "/" + getTestQueueName() + "-config.xml"; + return _output + "/" + getTestQueueName() + "-config.xml"; } public String getTestVirtualhostsFile() { - String path = _output == null ? System.getProperty("java.io.tmpdir") : _output; - return path + "/" + getTestQueueName() + "-virtualhosts.xml"; + return _output + "/" + getTestQueueName() + "-virtualhosts.xml"; + } + + private String relativeToQpidHome(String file) + { + return file.replace(System.getProperty(QPID_HOME,"QPID_HOME") + "/",""); } protected void saveTestConfiguration() throws ConfigurationException { - // Specifiy the test config file + // Specify the test config file String testConfig = getTestConfigFile(); - setSystemProperty("test.config", testConfig); + String relative = relativeToQpidHome(testConfig); + + setSystemProperty("test.config", relative); + _logger.info("Set test.config property to: " + relative); + _logger.info("Saving test virtualhosts file at: " + testConfig); // Create the file if configuration does not exist if (_testConfiguration.isEmpty()) @@ -681,9 +686,13 @@ public class QpidBrokerTestCase extends QpidTestCase protected void saveTestVirtualhosts() throws ConfigurationException { - // Specifiy the test virtualhosts file + // Specify the test virtualhosts file String testVirtualhosts = getTestVirtualhostsFile(); - setSystemProperty("test.virtualhosts", testVirtualhosts); + String relative = relativeToQpidHome(testVirtualhosts); + + setSystemProperty("test.virtualhosts", relative); + _logger.info("Set test.virtualhosts property to: " + relative); + _logger.info("Saving test virtualhosts file at: " + testVirtualhosts); // Create the file if configuration does not exist if (_testVirtualhosts.isEmpty()) @@ -693,30 +702,33 @@ public class QpidBrokerTestCase extends QpidTestCase _testVirtualhosts.save(testVirtualhosts); } - public void cleanBroker() + protected void cleanBrokerWork(final String qpidWork) { - if (_brokerClean != null) + if (qpidWork != null) { - _logger.info("clean: " + _brokerClean); + _logger.info("Cleaning broker work dir: " + qpidWork); - try + File file = new File(qpidWork); + if (file.exists()) { - ProcessBuilder pb = new ProcessBuilder(_brokerClean.split("\\s+")); - pb.redirectErrorStream(true); - Process clean = pb.start(); - new Piper(clean.getInputStream(),_brokerOutputStream).start(); - - clean.waitFor(); - - _logger.info("clean exited: " + clean.exitValue()); - } - catch (IOException e) - { - throw new RuntimeException(e); + final boolean success = FileUtils.delete(file, true); + if(!success) + { + throw new RuntimeException("Failed to recursively delete beneath : " + file); + } } - catch (InterruptedException e) + } + } + + protected void createBrokerWork(final String qpidWork) + { + if (qpidWork != null) + { + final File dir = new File(qpidWork); + dir.mkdirs(); + if (!dir.isDirectory()) { - throw new RuntimeException(e); + throw new RuntimeException("Failed to created Qpid work directory : " + qpidWork); } } } @@ -730,7 +742,7 @@ public class QpidBrokerTestCase extends QpidTestCase { port = getPort(port); - _logger.info("stopping broker: " + getBrokerCommand(port)); + _logger.info("stopping broker on port : " + port); BrokerHolder broker = _brokers.remove(port); broker.shutdown(); } @@ -906,7 +918,7 @@ public class QpidBrokerTestCase extends QpidTestCase } /** - * Add an environtmen variable for the external broker environment + * Add an environment variable for the external broker environment * * @param property the property to set * @param value the value to set it to @@ -990,9 +1002,9 @@ public class QpidBrokerTestCase extends QpidTestCase * Get the default connection factory for the currently used broker * Default factory is "local" * - * @return A conection factory + * @return A connection factory * - * @throws Exception if there is an error getting the tactory + * @throws Exception if there is an error getting the factory */ public AMQConnectionFactory getConnectionFactory() throws NamingException { @@ -1016,7 +1028,7 @@ public class QpidBrokerTestCase extends QpidTestCase * * @param factoryName The factory name * - * @return A conection factory + * @return A connection factory * * @throws Exception if there is an error getting the tactory */ @@ -1054,7 +1066,7 @@ public class QpidBrokerTestCase extends QpidTestCase { _logger.info("get connection"); Connection con = getConnectionFactory().createConnection(username, password); - //add the connection in the lis of connections + //add the connection in the list of connections _connections.add(con); return con; } @@ -1063,7 +1075,7 @@ public class QpidBrokerTestCase extends QpidTestCase { _logger.info("get Connection"); Connection con = getConnectionFactory().createConnection(username, password, id); - //add the connection in the lis of connections + //add the connection in the list of connections _connections.add(con); return con; } @@ -1154,7 +1166,7 @@ public class QpidBrokerTestCase extends QpidTestCase /** * Send messages to the given destination. * - * If session is transacted then messages will be commited before returning + * If session is transacted then messages will be committed before returning * * @param session the session to use for sending * @param destination where to send them to @@ -1162,7 +1174,7 @@ public class QpidBrokerTestCase extends QpidTestCase * * @param batchSize the batchSize in which to commit, 0 means no batching, * but a single commit at the end - * @return the sent messgse + * @return the sent message * * @throws Exception */ @@ -1175,7 +1187,7 @@ public class QpidBrokerTestCase extends QpidTestCase /** * Send messages to the given destination. * - * If session is transacted then messages will be commited before returning + * If session is transacted then messages will be committed before returning * * @param session the session to use for sending * @param destination where to send them to @@ -1184,7 +1196,7 @@ public class QpidBrokerTestCase extends QpidTestCase * @param offset offset allows the INDEX value of the message to be adjusted. * @param batchSize the batchSize in which to commit, 0 means no batching, * but a single commit at the end - * @return the sent messgse + * @return the sent message * * @throws Exception */ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java index a5e2b80f64..c09e63308c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java @@ -27,11 +27,9 @@ import org.apache.log4j.SimpleLayout; import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.ArrayList; +import java.io.LineNumberReader; import java.util.List; import java.util.LinkedList; @@ -45,10 +43,12 @@ import java.util.LinkedList; public class LogMonitor { // The file that the log statements will be written to. - private File _logfile; + private final File _logfile; // The appender we added to the get messages - private FileAppender _appender; + private final FileAppender _appender; + + private int _linesToSkip = 0; /** * Create a new LogMonitor that creates a new Log4j Appender and monitors @@ -78,6 +78,7 @@ public class LogMonitor if (file != null && file.exists()) { _logfile = file; + _appender = null; } else { @@ -99,13 +100,13 @@ public class LogMonitor * @param wait the time in ms to wait for the message to occur * @return true if the message was found * - * @throws java.io.FileNotFoundException if the Log file can nolonger be found + * @throws java.io.FileNotFoundException if the Log file can no longer be found * @throws IOException thrown when reading the log file */ public List<String> waitAndFindMatches(String message, long wait) throws FileNotFoundException, IOException { - if (waitForMessage(message, wait, true)) + if (waitForMessage(message, wait)) { return findMatches(message); } @@ -116,7 +117,9 @@ public class LogMonitor } /** - * Checks the log for instances of the search string. + * Checks the log for instances of the search string. If the caller + * has previously called {@link #markDiscardPoint()}, lines up until the discard + * point are not considered. * * The pattern parameter can take any valid argument used in String.contains() * @@ -130,66 +133,99 @@ public class LogMonitor */ public List<String> findMatches(String pattern) throws IOException { - return FileUtils.searchFile(_logfile, pattern); + + List<String> results = new LinkedList<String>(); + + LineNumberReader reader = new LineNumberReader(new FileReader(_logfile)); + try + { + while (reader.ready()) + { + String line = reader.readLine(); + if (reader.getLineNumber() > _linesToSkip && line.contains(pattern)) + { + results.add(line); + } + } + } + finally + { + reader.close(); + } + + return results; } /** - * Checks the log file for a given message to appear. + * Checks the log file for a given message to appear. If the caller + * has previously called {@link #markDiscardPoint()}, lines up until the discard + * point are not considered. * * @param message the message to wait for in the log * @param wait the time in ms to wait for the message to occur - * - * @param printFileOnFailure should we print the contents that have been - * read if we fail ot find the message. * @return true if the message was found * - * @throws java.io.FileNotFoundException if the Log file can nolonger be found + * @throws java.io.FileNotFoundException if the Log file can no longer be found * @throws IOException thrown when reading the log file */ - public boolean waitForMessage(String message, long wait, boolean printFileOnFailure) + public boolean waitForMessage(String message, long wait) throws FileNotFoundException, IOException { // Loop through alerts until we're done or wait ms seconds have passed, // just in case the logfile takes a while to flush. - BufferedReader reader = new BufferedReader(new FileReader(_logfile)); - boolean found = false; - long endtime = System.currentTimeMillis() + wait; - ArrayList<String> contents = new ArrayList<String>(); - while (!found && System.currentTimeMillis() < endtime) + LineNumberReader reader = null; + try { - while (reader.ready()) + reader = new LineNumberReader(new FileReader(_logfile)); + + boolean found = false; + long endtime = System.currentTimeMillis() + wait; + while (!found && System.currentTimeMillis() < endtime) { - String line = reader.readLine(); - contents.add(line); - if (line.contains(message)) + boolean ready = true; + while (ready = reader.ready()) { - found = true; + String line = reader.readLine(); + + if (reader.getLineNumber() > _linesToSkip) + { + if (line.contains(message)) + { + found = true; + break; + } + } + } + if (!ready) + { + try + { + Thread.sleep(50); + } + catch (InterruptedException ie) + { + Thread.currentThread().interrupt(); + } } } + return found; + } - if (!found && printFileOnFailure) + finally { - for (String line : contents) + if (reader != null) { - System.out.println(line); + reader.close(); } } - return found; } - - public boolean waitForMessage(String message, long alertLogWaitPeriod) throws FileNotFoundException, IOException - { - return waitForMessage(message, alertLogWaitPeriod, true); - } - - /** * Read the log file in to memory as a String * * @return the current contents of the log file * - * @throws java.io.FileNotFoundException if the Log file can nolonger be found + * @throws java.io.FileNotFoundException if the Log file can no longer be found * @throws IOException thrown when reading the log file */ public String readFile() throws FileNotFoundException, IOException @@ -208,14 +244,37 @@ public class LogMonitor } /** - * Clears the log file and writes: 'Log Monitor Reset' at the start of the file + * Marks the discard point in the log file. * - * @throws java.io.FileNotFoundException if the Log file can nolonger be found + * @throws java.io.FileNotFoundException if the Log file can no longer be found * @throws IOException thrown if there is a problem with the log file */ - public void reset() throws FileNotFoundException, IOException + public void markDiscardPoint() throws FileNotFoundException, IOException + { + _linesToSkip = countLinesInFile(); + } + + private int countLinesInFile() throws IOException { - new FileOutputStream(_logfile).getChannel().truncate(0); + int lineCount = 0; + BufferedReader br = null; + try + { + br = new BufferedReader(new FileReader(_logfile)); + while(br.readLine() != null) + { + lineCount++; + } + + return lineCount; + } + finally + { + if (br != null) + { + br.close(); + } + } } /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java index a99abe4b94..89f707fbef 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java @@ -155,7 +155,7 @@ public class LogMonitorTest extends TestCase String notLogged = "This text was not logged"; - validateLogDoesNotContainsMessage(_monitor, notLogged); + validateLogDoesNotContainMessage(_monitor, notLogged); } public void testWaitForMessage_Timeout() throws IOException @@ -168,28 +168,27 @@ public class LogMonitorTest extends TestCase // Verify that we can time out waiting for a message assertFalse("Message was logged ", - _monitor.waitForMessage(message, TIME_OUT / 2, false)); + _monitor.waitForMessage(message, TIME_OUT / 2)); // Verify that the message did eventually get logged. assertTrue("Message was never logged.", _monitor.waitForMessage(message, TIME_OUT)); } - public void testReset() throws IOException + public void testDiscardPoint() throws IOException { - String message = getName() + ": Test Message"; - - Logger.getRootLogger().warn(message); - - validateLogContainsMessage(_monitor, message); + String firstMessage = getName() + ": Test Message1"; + Logger.getRootLogger().warn(firstMessage); - String LOG_RESET_TEXT = "Log Monitor Reset"; + validateLogContainsMessage(_monitor, firstMessage); - validateLogDoesNotContainsMessage(_monitor, LOG_RESET_TEXT); + _monitor.markDiscardPoint(); - _monitor.reset(); + validateLogDoesNotContainMessage(_monitor, firstMessage); - assertEquals("", _monitor.readFile()); + String secondMessage = getName() + ": Test Message2"; + Logger.getRootLogger().warn(secondMessage); + validateLogContainsMessage(_monitor, secondMessage); } public void testRead() throws IOException @@ -214,7 +213,7 @@ public class LogMonitorTest extends TestCase * * @throws IOException if a problems occurs */ - protected void validateLogDoesNotContainsMessage(LogMonitor log, String message) + protected void validateLogDoesNotContainMessage(LogMonitor log, String message) throws IOException { List<String> results = log.findMatches(message); |