From 9c71e3f67f1a327cdb6638ca26fac7a812fa1593 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 17 Apr 2009 14:16:01 +0000 Subject: QPID-1813 : Provide test that uses the DerbyMessageStore to check NoLocal functionality after a broker restart. merged from trunk r765604 QPID-1817 : Update QTC to provide the configuration file to an external broker merged from trunk r765605 QPID-1813/QPID-1817 : Removed the new properties from the test-provider as this will affect all tests. The NoLocalAfterRecoveryTest now updates a ConnectionURL based on the JNDI data and uses that to start a connection. NLART also provides a default location for the derbyDB store as the DMS class does not correctly attempt to put the store in QPID_WORK. This will be re-addressed when ServerConfiguration is again available from a VHC object. ConnectionTest was updated to remove the literal values for the BrokerDetail options. merged from trunk r765608 QPID-1813 : Add missing DerbyDB library dependency. merged from trunk r765628 QPID-1818 : Exclude NoLocalAfterRecoveryTest as 0-10 client code path does not restore a transacted session after failover. merged from trunk r765656 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5-release@766021 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/010ExcludeList | 2 + qpid/java/build.deps | 5 +- qpid/java/java.testprofile | 3 +- qpid/java/lib/derby-10.3.2.1.jar | Bin 0 -> 2343388 bytes .../MultipleJCAProviderRegistrationTest.java | 19 +- .../persistent/NoLocalAfterRecoveryTest.java | 246 +++++++++++++++++++++ .../unit/client/connection/ConnectionTest.java | 28 +-- .../org/apache/qpid/test/utils/QpidTestCase.java | 63 +++++- qpid/java/test-provider.properties | 1 + 9 files changed, 318 insertions(+), 49 deletions(-) create mode 100644 qpid/java/lib/derby-10.3.2.1.jar create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java diff --git a/qpid/java/010ExcludeList b/qpid/java/010ExcludeList index 61d9c684ba..ae86d46b5b 100644 --- a/qpid/java/010ExcludeList +++ b/qpid/java/010ExcludeList @@ -67,3 +67,5 @@ org.apache.qpid.test.unit.close.FlowToDiskBackingQueueDeleteTest#* // This test may use QpidTestCase but it is not using the getConnection and is hardwired to InVM org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#* +//QPID-1818 : 0-10 Client code path does not correctly restore a transacted session after failover. +org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* diff --git a/qpid/java/build.deps b/qpid/java/build.deps index d62e414639..6bb9bfa09e 100644 --- a/qpid/java/build.deps +++ b/qpid/java/build.deps @@ -10,6 +10,8 @@ commons-lang=lib/commons-lang-2.2.jar commons-logging=lib/commons-logging-1.0.4.jar commons-pool=lib/commons-pool-1.4.jar +derby-db=lib/derby-10.3.2.1.jar + geronimo-jms=lib/geronimo-jms_1.1_spec-1.0.jar junit=lib/junit-3.8.1.jar @@ -78,7 +80,8 @@ common.libs=${slf4j-api} ${backport-util-concurrent} ${mina-core} \ client.libs=${common.libs} ${geronimo-jms} tools.libs=${client.libs} broker.libs=${common.libs} ${commons-cli} ${commons-logging} ${log4j} \ - ${slf4j-log4j} ${xalan} ${felix.libs} ${build.lib}/${project.name}-common-${project.version}.jar + ${slf4j-log4j} ${xalan} ${felix.libs} ${derby-db} \ + ${build.lib}/${project.name}-common-${project.version}.jar broker-plugins.libs=${common.libs} ${felix.libs} management-client.libs=${jsp.libs} ${log4j} ${slf4j-log4j} ${slf4j-api} ${commons-pool} ${geronimo-servlet} ${muse.libs} ${javassist} ${xalan} ${mina-core} ${mina-filter-ssl} diff --git a/qpid/java/java.testprofile b/qpid/java/java.testprofile index 95282b8ee1..4567db5f28 100644 --- a/qpid/java/java.testprofile +++ b/qpid/java/java.testprofile @@ -1,4 +1,5 @@ -broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT +broker.language=java +broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE broker.clean=${project.root}/clean-dir ${build.data} broker.ready=Qpid Broker Ready diff --git a/qpid/java/lib/derby-10.3.2.1.jar b/qpid/java/lib/derby-10.3.2.1.jar new file mode 100644 index 0000000000..76ada492fd Binary files /dev/null and b/qpid/java/lib/derby-10.3.2.1.jar differ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java index ba7a4bb19c..61c1326ad5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java @@ -40,26 +40,9 @@ public class MultipleJCAProviderRegistrationTest extends QpidTestCase public void setUp() throws Exception { - super.setUp(); - - stopBroker(); - _broker = VM; - final String QpidHome = System.getProperty("QPID_HOME"); - - assertNotNull("QPID_HOME not set",QpidHome); - - final File defaultaclConfigFile = new File(QpidHome, "etc/config.xml"); - - if (!defaultaclConfigFile.exists()) - { - System.err.println("Configuration file not found:" + defaultaclConfigFile); - fail("Configuration file not found:" + defaultaclConfigFile); - } - - ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(defaultaclConfigFile); - startBroker(); + super.setUp(); } public void test() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java new file mode 100644 index 0000000000..dc34915a91 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java @@ -0,0 +1,246 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.persistent; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.server.store.DerbyMessageStore; +import org.apache.commons.configuration.XMLConfiguration; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import java.util.concurrent.CountDownLatch; +import java.io.File; + +/** + * QPID-1813 : We do not store the client id with a message so on store restart + * that information is lost and we are unable to perform no local checks. + * + * QPID-1813 highlights the lack of testing here as the broker will NPE as it + * assumes that the client id of the publisher will always exist + */ +public class NoLocalAfterRecoveryTest extends QpidTestCase implements ConnectionListener +{ + protected final String MY_TOPIC_SUBSCRIPTION_NAME = this.getName(); + protected static final int SEND_COUNT = 10; + private CountDownLatch _failoverComplete = new CountDownLatch(1); + + protected ConnectionURL _connectionURL; + + @Override + protected void setUp() throws Exception + { + + XMLConfiguration configuration = new XMLConfiguration(_configFile); + configuration.setProperty("virtualhosts.virtualhost.test.store.class", "org.apache.qpid.server.store.DerbyMessageStore"); + configuration.setProperty("virtualhosts.virtualhost.test.store."+ DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY, + System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + File.separator + "derbyDB-NoLocalAfterRecoveryTest"); + + File tmpFile = File.createTempFile("configFile", "test"); + tmpFile.deleteOnExit(); + configuration.save(tmpFile); + + _configFile = tmpFile; + _connectionURL = getConnectionURL(); + + BrokerDetails details = _connectionURL.getBrokerDetails(0); + + // Due to the problem with SingleServer delaying on all connection + // attempts. So using a high retry value. + if (_broker.equals(VM)) + { + // Local testing suggests InVM restart takes under a second + details.setProperty(BrokerDetails.OPTIONS_RETRY, "5"); + details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "200"); + } + else + { + // This will attempt to failover for 3 seconds. + // Local testing suggests failover takes 2 seconds + details.setProperty(BrokerDetails.OPTIONS_RETRY, "10"); + details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "500"); + } + + super.setUp(); + } + + public void test() throws Exception + { + + Connection connection = getConnection(_connectionURL); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + Topic topic = (Topic) getInitialContext().lookup("topic"); + + TopicSubscriber noLocalSubscriber = session. + createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", + null, true); + + TopicSubscriber normalSubscriber = session. + createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-Normal", + null, false); + + List sent = sendMessage(session, topic, SEND_COUNT); + + session.commit(); + + assertEquals("Incorrect number of messages sent", + SEND_COUNT, sent.size()); + + + // Check messages can be received as expected. + connection.start(); + + assertTrue("No Local Subscriber is not a no-local subscriber", + noLocalSubscriber.getNoLocal()); + + assertFalse("Normal Subscriber is a no-local subscriber", + normalSubscriber.getNoLocal()); + + + List received = receiveMessage(noLocalSubscriber, SEND_COUNT); + assertEquals("No Local Subscriber Received messages", 0, received.size()); + + received = receiveMessage(normalSubscriber, SEND_COUNT); + assertEquals("Normal Subscriber Received no messages", + SEND_COUNT, received.size()); + + + ((AMQConnection)connection).setConnectionListener(this); + + restartBroker(); + + + //Await + if (!_failoverComplete.await(4000L, TimeUnit.MILLISECONDS)) + { + fail("Failover Failed to compelete"); + } + + session.rollback(); + + //Failover will restablish our clients + assertTrue("No Local Subscriber is not a no-local subscriber", + noLocalSubscriber.getNoLocal()); + + assertFalse("Normal Subscriber is a no-local subscriber", + normalSubscriber.getNoLocal()); + + + // NOTE : here that the NO-local subscriber actually now gets ALL the + // messages as the connection has failed and they are consuming on a + // different connnection to the one that was published on. + received = receiveMessage(noLocalSubscriber, SEND_COUNT); + assertEquals("No Local Subscriber Received messages", SEND_COUNT, received.size()); + + received = receiveMessage(normalSubscriber, SEND_COUNT); + assertEquals("Normal Subscriber Received no messages", + SEND_COUNT, received.size()); + + //leave the store in a clean state. + session.commit(); + } + + protected List assertReceiveMessage(MessageConsumer messageConsumer, + int count) throws JMSException + { + + List receivedMessages = new ArrayList(count); + for (int i = 0; i < count; i++) + { + Message received = messageConsumer.receive(1000); + + if (received != null) + { + receivedMessages.add(received); + } + else + { + fail("Only " + + receivedMessages.size() + "/" + count + " received."); + } + } + + return receivedMessages; + } + + protected List receiveMessage(MessageConsumer messageConsumer, + int count) throws JMSException + { + + List receivedMessages = new ArrayList(count); + for (int i = 0; i < count; i++) + { + Message received = messageConsumer.receive(1000); + + if (received != null) + { + receivedMessages.add(received); + } + else + { + break; + } + } + + return receivedMessages; + } + + public void bytesSent(long count) + { + + } + + public void bytesReceived(long count) + { + + } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + _failoverComplete.countDown(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 397fc15b66..a892b3baad 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -48,28 +48,6 @@ public class ConnectionTest extends QpidTestCase String _broker_NotRunning = "vm://:2"; String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs"; - public BrokerDetails getBroker() - { - try - { - if (getConnectionFactory().getConnectionURL().getBrokerCount() > 0) - { - return getConnectionFactory().getConnectionURL().getBrokerDetails(0); - } - else - { - fail("No broker details are available."); - } - } - catch (NamingException e) - { - fail(e.getMessage()); - } - - //keep compiler happy - return null; - } - public void testSimpleConnection() throws Exception { AMQConnection conn = null; @@ -93,7 +71,7 @@ public class ConnectionTest extends QpidTestCase try { BrokerDetails broker = getBroker(); - broker.setProperty("retries","1"); + broker.setProperty(BrokerDetails.OPTIONS_RETRY, "1"); ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + broker + "'&defaultQueueExchange='test.direct'" @@ -160,7 +138,7 @@ public class ConnectionTest extends QpidTestCase try { BrokerDetails broker = getBroker(); - broker.setProperty("retries", "0"); + broker.setProperty(BrokerDetails.OPTIONS_RETRY, "0"); conn = new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + broker + "'"); fail("Connection should not be established password is wrong."); } @@ -234,7 +212,7 @@ public class ConnectionTest extends QpidTestCase try { BrokerDetails broker = getBroker(); - broker.setProperty("retries", "0"); + broker.setProperty(BrokerDetails.OPTIONS_RETRY, "0"); conn = new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + broker + "'"); fail("Connection should not be established"); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java index d75b6276ac..fef3f547d3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -25,6 +25,7 @@ import javax.jms.Destination; import javax.jms.Session; import javax.jms.MessageProducer; import javax.jms.Message; +import javax.jms.JMSException; import javax.naming.InitialContext; import javax.naming.NamingException; import java.io.*; @@ -35,12 +36,15 @@ import java.util.Map; import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.net.MalformedURLException; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,6 +131,7 @@ public class QpidTestCase extends TestCase private static List _exclusionList; // system properties + private static final String BROKER_LANGUAGE = "broker.language"; private static final String BROKER = "broker"; private static final String BROKER_CLEAN = "broker.clean"; private static final String BROKER_VERSION = "broker.version"; @@ -134,6 +139,8 @@ public class QpidTestCase extends TestCase private static final String TEST_OUTPUT = "test.output"; // values + protected static final String JAVA = "java"; + protected static final String CPP = "cpp"; protected static final String VM = "vm"; protected static final String EXTERNAL = "external"; private static final String VERSION_08 = "0-8"; @@ -144,6 +151,7 @@ public class QpidTestCase extends TestCase protected int DEFAULT_VM_PORT = 1; protected int DEFAULT_PORT = 5672; + protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA); protected String _broker = System.getProperty(BROKER, VM); private String _brokerClean = System.getProperty(BROKER_CLEAN, null); private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08); @@ -331,11 +339,21 @@ public class QpidTestCase extends TestCase } } - private String getBrokerCommand(int port) + private String getBrokerCommand(int port) throws MalformedURLException { - return _broker - .replace("@PORT", "" + port) - .replace("@MPORT", "" + (port + (8999 - DEFAULT_PORT))); + if (_brokerLanguage.equals(JAVA)) + { + return _broker + .replace("@PORT", "" + port) + .replace("@MPORT", "" + (port + (8999 - DEFAULT_PORT))) + .replace("@CONFIG_FILE", _configFile.toString()); + } + else + { + return _broker + .replace("@PORT", "" + port) + .replace("@MPORT", "" + (port + (8999 - DEFAULT_PORT))); + } } public void startBroker(int port) throws Exception @@ -569,6 +587,15 @@ public class QpidTestCase extends TestCase return getConnection("guest", "guest"); } + public Connection getConnection(ConnectionURL url) throws JMSException + { + Connection connection = new AMQConnectionFactory(url).createConnection("guest", "guest"); + + _connections.add(connection); + + return connection; + } + /** * Get a connection (remote or in-VM) * @@ -634,4 +661,32 @@ public class QpidTestCase extends TestCase return messages; } + public ConnectionURL getConnectionURL() throws NamingException + { + return getConnectionFactory().getConnectionURL(); + } + + + public BrokerDetails getBroker() + { + try + { + if (getConnectionFactory().getConnectionURL().getBrokerCount() > 0) + { + return getConnectionFactory().getConnectionURL().getBrokerDetails(0); + } + else + { + fail("No broker details are available."); + } + } + catch (NamingException e) + { + fail(e.getMessage()); + } + + //keep compiler happy + return null; + } + } diff --git a/qpid/java/test-provider.properties b/qpid/java/test-provider.properties index 5e2ab9c9cf..bcbe4866f3 100644 --- a/qpid/java/test-provider.properties +++ b/qpid/java/test-provider.properties @@ -35,5 +35,6 @@ queue.MyQueue = example.MyQueue queue.queue = example.queue queue.xaQueue = xaQueue +topic.topic = topic topic.xaTopic = xaTopic topic.durableSubscriberTopic = durableSubscriberTopic -- cgit v1.2.1