diff options
author | Martin Ritchie <ritchiem@apache.org> | 2009-04-16 12:27:28 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2009-04-16 12:27:28 +0000 |
commit | 99549d769fc94c35eb6ccbc837e0ce6dffa398b0 (patch) | |
tree | 8d810c5d39e27c8e7467ac6673543809e4012141 | |
parent | d3f1e3e2c020a2f61e7518777c74c9d7eb75ff43 (diff) | |
download | qpid-python-99549d769fc94c35eb6ccbc837e0ce6dffa398b0.tar.gz |
QPID-1813 : Provide test that uses the DerbyMessageStore to check NoLocal functionality after a broker restart.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@765604 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 230 insertions, 4 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 5c0672d783..20ef80e00e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -154,8 +154,9 @@ public class DerbyMessageStore implements MessageStore _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName()); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + //Update to pick up QPID_WORK and use that as the default location not just derbyDB final String databasePath = config.getStoreConfiguration().getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB"); - + File environmentPath = new File(databasePath); if (!environmentPath.exists()) { diff --git a/qpid/java/build.deps b/qpid/java/build.deps index d62e414639..6bb9bfa09e 100644 --- a/qpid/java/build.deps +++ b/qpid/java/build.deps @@ -10,6 +10,8 @@ commons-lang=lib/commons-lang-2.2.jar commons-logging=lib/commons-logging-1.0.4.jar commons-pool=lib/commons-pool-1.4.jar +derby-db=lib/derby-10.3.2.1.jar + geronimo-jms=lib/geronimo-jms_1.1_spec-1.0.jar junit=lib/junit-3.8.1.jar @@ -78,7 +80,8 @@ common.libs=${slf4j-api} ${backport-util-concurrent} ${mina-core} \ client.libs=${common.libs} ${geronimo-jms} tools.libs=${client.libs} broker.libs=${common.libs} ${commons-cli} ${commons-logging} ${log4j} \ - ${slf4j-log4j} ${xalan} ${felix.libs} ${build.lib}/${project.name}-common-${project.version}.jar + ${slf4j-log4j} ${xalan} ${felix.libs} ${derby-db} \ + ${build.lib}/${project.name}-common-${project.version}.jar broker-plugins.libs=${common.libs} ${felix.libs} management-client.libs=${jsp.libs} ${log4j} ${slf4j-log4j} ${slf4j-api} ${commons-pool} ${geronimo-servlet} ${muse.libs} ${javassist} ${xalan} ${mina-core} ${mina-filter-ssl} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java new file mode 100644 index 0000000000..735913146c --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java @@ -0,0 +1,219 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.persistent; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.commons.configuration.XMLConfiguration; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import java.util.concurrent.CountDownLatch; +import java.io.File; + +/** + * QPID-1813 : We do not store the client id with a message so on store restart + * that information is lost and we are unable to perform no local checks. + * + * QPID-1813 highlights the lack of testing here as the broker will NPE as it + * assumes that the client id of the publisher will always exist + */ +public class NoLocalAfterRecoveryTest extends QpidTestCase implements ConnectionListener +{ + protected final String MY_TOPIC_SUBSCRIPTION_NAME = this.getName(); + protected static final int SEND_COUNT = 10; + private CountDownLatch _failoverComplete = new CountDownLatch(1); + + @Override + protected void setUp() throws Exception + { + XMLConfiguration configuration = new XMLConfiguration(_configFile); + configuration.setProperty("virtualhosts.virtualhost.test.store.class", "org.apache.qpid.server.store.DerbyMessageStore"); + + File tmpFile = File.createTempFile("configFile", "test"); + tmpFile.deleteOnExit(); + configuration.save(tmpFile); + + _configFile = tmpFile; + super.setUp(); + } + + public void test() throws Exception + { + + + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + Topic topic = (Topic) getInitialContext().lookup("topic"); + + TopicSubscriber noLocalSubscriber = session. + createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", + null, true); + + TopicSubscriber normalSubscriber = session. + createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-Normal", + null, false); + + List<Message> sent = sendMessage(session, topic, SEND_COUNT); + + session.commit(); + + assertEquals("Incorrect number of messages sent", + SEND_COUNT, sent.size()); + + + // Check messages can be received as expected. + connection.start(); + + assertTrue("No Local Subscriber is not a no-local subscriber", + noLocalSubscriber.getNoLocal()); + + assertFalse("Normal Subscriber is a no-local subscriber", + normalSubscriber.getNoLocal()); + + + List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT); + assertEquals("No Local Subscriber Received messages", 0, received.size()); + + received = receiveMessage(normalSubscriber, SEND_COUNT); + assertEquals("Normal Subscriber Received no messages", + SEND_COUNT, received.size()); + + + ((AMQConnection)connection).setConnectionListener(this); + + restartBroker(); + + + //Await + if (!_failoverComplete.await(4000L, TimeUnit.MILLISECONDS)) + { + fail("Failover Failed to compelete"); + } + + session.rollback(); + + //Failover will restablish our clients + assertTrue("No Local Subscriber is not a no-local subscriber", + noLocalSubscriber.getNoLocal()); + + assertFalse("Normal Subscriber is a no-local subscriber", + normalSubscriber.getNoLocal()); + + + // NOTE : here that the NO-local subscriber actually now gets ALL the + // messages as the connection has failed and they are consuming on a + // different connnection to the one that was published on. + received = receiveMessage(noLocalSubscriber, SEND_COUNT); + assertEquals("No Local Subscriber Received messages", SEND_COUNT, received.size()); + + received = receiveMessage(normalSubscriber, SEND_COUNT); + assertEquals("Normal Subscriber Received no messages", + SEND_COUNT, received.size()); + + //leave the store in a clean state. + session.commit(); + } + + protected List<Message> assertReceiveMessage(MessageConsumer messageConsumer, + int count) throws JMSException + { + + List<Message> receivedMessages = new ArrayList<Message>(count); + for (int i = 0; i < count; i++) + { + Message received = messageConsumer.receive(1000); + + if (received != null) + { + receivedMessages.add(received); + } + else + { + fail("Only " + + receivedMessages.size() + "/" + count + " received."); + } + } + + return receivedMessages; + } + + protected List<Message> receiveMessage(MessageConsumer messageConsumer, + int count) throws JMSException + { + + List<Message> receivedMessages = new ArrayList<Message>(count); + for (int i = 0; i < count; i++) + { + Message received = messageConsumer.receive(1000); + + if (received != null) + { + receivedMessages.add(received); + } + else + { + break; + } + } + + return receivedMessages; + } + + public void bytesSent(long count) + { + + } + + public void bytesReceived(long count) + { + + } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + _failoverComplete.countDown(); + } +} diff --git a/qpid/java/test-provider.properties b/qpid/java/test-provider.properties index 5e2ab9c9cf..352a26cbd1 100644 --- a/qpid/java/test-provider.properties +++ b/qpid/java/test-provider.properties @@ -19,8 +19,10 @@ # # -connectionfactory.default = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' -connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1' +# Allow the client to reconnect to the broker if the connection is lost, for up to 1second. +# This will allow for persistent tests to bounce the broker. +connectionfactory.default = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672?retries='5'&connectdelay='200'' +connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1?retries='5'&connectdelay='200'' connectionfactory.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5671?ssl='true'' connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5673;tcp://localhost:5672'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20'' @@ -35,5 +37,6 @@ queue.MyQueue = example.MyQueue queue.queue = example.queue queue.xaQueue = xaQueue +topic.topic = topic topic.xaTopic = xaTopic topic.durableSubscriberTopic = durableSubscriberTopic |