summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-06-22 15:39:27 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-06-22 15:39:27 +0000
commit0f0bb4d46355a335ddd4c97bf2b6d7af09fd8ca2 (patch)
tree8b49ee895af097a73c59b0003cc2164f0c635172
parentc9dcaaae7a4a3320505fcadaa6b502524844457a (diff)
downloadqpid-python-0f0bb4d46355a335ddd4c97bf2b6d7af09fd8ca2.tar.gz
Added Immediate and Mandatory message tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@549849 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java686
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java135
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java282
3 files changed, 1103 insertions, 0 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
new file mode 100644
index 0000000000..05fbceca20
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
@@ -0,0 +1,686 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.NDC;
+
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ImmediateMessageTest tests for the desired behaviour of immediate messages. Immediate messages are a non-JMS
+ * feature. A message may be marked with an immediate delivery flag, which means that a consumer must be connected
+ * to receive the message, through a valid route, when it is sent, or when its transaction is committed in the case
+ * of transactional messaging. If this is not the case, the broker should return the message with a NO_CONSUMERS code.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Check that an immediate message is sent succesfully not using transactions when a consumer is connected.
+ * <tr><td> Check that an immediate message is committed succesfully in a transaction when a consumer is connected.
+ * <tr><td> Check that an immediate message results in no consumers code, not using transactions, when no consumer is
+ * connected.
+ * <tr><td> Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is
+ * connected.
+ * </table>
+ *
+ * @todo Write a test decorator, the sole function of which is to populate test context properties, from sys properties,
+ * from trailing prop=value pairs on the command line, from test properties files or other sources. This should
+ * run through stanard JUnit without the JUnit toolkit extensions, and through Maven surefire, and also through
+ * the JUnit toolkit extended test runners.
+ *
+ * @todo Veto test topologies using bounce back. Or else the bounce back client will act as an immediate consumer.
+ */
+public class ImmediateMessageTest extends TestCase
+{
+ /** Used for debugging. */
+ private static final Logger log = LoggerFactory.getLogger(ImmediateMessageTest.class);
+
+ /** Used to read the tests configurable properties through. */
+ ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+ /** All these tests should have the immediate flag on. */
+ private boolean immediateFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true);
+
+ /** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */
+ public void test_QPID_517_ImmediateOkNoTx() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+
+ // Send one message with no errors.
+ PublisherReceiverImpl.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */
+ public void test_QPID_517_ImmediateOkTx() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+
+ // Send one message with no errors.
+ PublisherReceiverImpl.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoConsumerNoTx() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ // Send one message and get a linked no consumers exception.
+ PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoConsumerTx() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ // Send one message and get a linked no consumers exception.
+ PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ protected void setUp() throws Exception
+ {
+ NDC.push(getName());
+
+ // Ensure that the in-vm broker is created.
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ // Ensure that the in-vm broker is cleaned up so that the next test starts afresh.
+ TransportConnection.killVMBroker(1);
+ ApplicationRegistry.remove(1);
+ }
+ finally
+ {
+ NDC.pop();
+ }
+ }
+
+ /*
+ * Stuff below:
+ *
+ * This will get tidied into some sort on JMS convenience framework, through which practically any usefull test
+ * topology can be created. This will become a replacement for PingPongProducer.
+ *
+ * Base everything on standard connection properties defined in PingPongProducer. Split JMS and AMQP-only properties.
+ *
+ * Integrate with ConversationFactory, so that it will work with prod/con pairs.
+ *
+ * Support pub/rec pairs.
+ * Support m*n pub/rec setups. All pubs/recs on one machine.
+ *
+ * Support bounce back clients, with configurable bounce back behavior. All, one in X, round robin one in m, etc.
+ *
+ * Support pairing of m*n pub/rec setups with bounce back clients. JVM running a test, can simulate m publishers,
+ * will receive (a known subset of) all messages sent, bounced back to n receivers. Co-location of pub/rec will be
+ * the normal model to allow accurate timings to be taken.
+ *
+ * Support creation of pub or rec only.
+ * Support clock synching of pub/rec on different JVMs, by calculating clock offsets. Must also provide an accuracy
+ * estimate to +- the results.
+ *
+ * Augment the interop Coordinator, to become a full distributed test coordinator. Capable of querying available
+ * tests machines, looking at test parameters and farming out tests onto the test machines, passing all test
+ * parameters, standard naming of pub/rec config parameters used to set up m*n test topologies, run test cases,
+ * report results, tear down m*n topologies. Need to split the re-usable general purpose distributed test coordinator
+ * from the Qpid specific test framework for creating test-topoloigies and passing Qpid specific parameters.
+ *
+ * Write all tests against pub/rec pairs, without coding to the fact that the topology may be anything from 1:1 in
+ * JVM to m*n with bounce back clients accross many machines. That is, make the test topology orthogonal to the test
+ * case.
+ */
+
+ private static class ExceptionMonitor implements ExceptionListener
+ {
+ List<JMSException> exceptions = new ArrayList<JMSException>();
+
+ public void onException(JMSException e)
+ {
+ log.debug("ExceptionMonitor got JMSException: ", e);
+
+ exceptions.add(e);
+ }
+
+ public boolean assertNoExceptions()
+ {
+ return exceptions.isEmpty();
+ }
+
+ public boolean assertOneJMSException()
+ {
+ return exceptions.size() == 1;
+ }
+
+ public boolean assertOneJMSExceptionWithLinkedCause(Class aClass)
+ {
+ if (exceptions.size() == 1)
+ {
+ JMSException e = exceptions.get(0);
+
+ Exception linkedCause = e.getLinkedException();
+
+ if ((linkedCause != null) && aClass.isInstance(linkedCause))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public void reset()
+ {
+ exceptions = new ArrayList();
+ }
+ }
+
+ /**
+ * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple
+ * convenience method for code that does anticipate handling connection failures. All exceptions that indicate
+ * that the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure
+ * handler.
+ *
+ * @param messagingProps Any additional connection properties.
+ *
+ * @return A JMS conneciton.
+ *
+ * @todo Move this to a Utils library class or base test class. Also move the copy in interop.TestClient too.
+ *
+ * @todo Make in VM broker creation step optional on whether one is to be used or not.
+ */
+ public static Connection createConnection(ParsedProperties messagingProps)
+ {
+ log.debug("public static Connection createConnection(Properties messagingProps = " + messagingProps + "): called");
+
+ try
+ {
+ // Extract the configured connection properties from the test configuration.
+ String conUsername = messagingProps.getProperty(USERNAME_PROPNAME);
+ String conPassword = messagingProps.getProperty(PASSWORD_PROPNAME);
+ String virtualHost = messagingProps.getProperty(VIRTUAL_HOST_PROPNAME);
+ String brokerUrl = messagingProps.getProperty(BROKER_PROPNAME);
+
+ // Set up the broker connection url.
+ String connectionString =
+ "amqp://" + conUsername + ":" + conPassword + "/" + ((virtualHost != null) ? virtualHost : "")
+ + "?brokerlist='" + brokerUrl + "'";
+
+ // messagingProps.setProperty(CONNECTION_PROPNAME, connectionString);
+
+ Context ctx = new InitialContext(messagingProps);
+
+ ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME);
+ Connection connection = cf.createConnection();
+
+ return connection;
+ }
+ catch (NamingException e)
+ {
+ log.debug("Got NamingException: ", e);
+ throw new RuntimeException("Got JNDI NamingException whilst looking up the connection factory.", e);
+ }
+ catch (JMSException e)
+ {
+ log.debug("Got JMSException: ", e);
+ throw new RuntimeException("Could not establish connection due to JMSException.", e);
+ }
+ }
+
+ /**
+ * Creates a publisher and a receiver on the same connection, configured according the to specified standard
+ * properties.
+ *
+ * @param messagingProps The connection properties.
+ *
+ * @return A publisher/receiver client pair.
+ */
+ public static PublisherReceiver createPublisherReceiverPairSharedConnection(ParsedProperties messagingProps)
+ {
+ try
+ {
+ int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME);
+ String destinationSendRoot = messagingProps.getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME);
+ String destinationReceiveRoot = messagingProps.getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME);
+ boolean createPublisherProducer = messagingProps.getPropertyAsBoolean(PUBLISHER_PRODUCER_BIND_PROPNAME);
+ boolean createPublisherConsumer = messagingProps.getPropertyAsBoolean(PUBLISHER_CONSUMER_BIND_PROPNAME);
+ boolean createReceiverProducer = messagingProps.getPropertyAsBoolean(RECEIVER_PRODUCER_BIND_PROPNAME);
+ boolean createReceiverConsumer = messagingProps.getPropertyAsBoolean(RECEIVER_CONSUMER_BIND_PROPNAME);
+ boolean transactional = messagingProps.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+
+ // Check if any Qpid/AMQP specific flags or options need to be set.
+ boolean immediate = messagingProps.getPropertyAsBoolean(IMMEDIATE_PROPNAME);
+ boolean mandatory = messagingProps.getPropertyAsBoolean(MANDATORY_PROPNAME);
+ boolean needsQpidOptions = immediate | mandatory;
+
+ log.debug("ackMode = " + ackMode);
+ log.debug("useTopics = " + useTopics);
+ log.debug("destinationSendRoot = " + destinationSendRoot);
+ log.debug("destinationReceiveRoot = " + destinationReceiveRoot);
+ log.debug("createPublisherProducer = " + createPublisherProducer);
+ log.debug("createPublisherConsumer = " + createPublisherConsumer);
+ log.debug("createReceiverProducer = " + createReceiverProducer);
+ log.debug("createReceiverConsumer = " + createReceiverConsumer);
+ log.debug("transactional = " + transactional);
+ log.debug("immediate = " + immediate);
+ log.debug("mandatory = " + mandatory);
+ log.debug("needsQpidOptions = " + needsQpidOptions);
+
+ // Create connection, sessions and producer/consumer pairs on each session.
+ Connection connection = createConnection(messagingProps);
+
+ // Add the connection exception listener to assert on exception conditions with.
+ ExceptionMonitor exceptionMonitor = new ExceptionMonitor();
+ connection.setExceptionListener(exceptionMonitor);
+
+ Session publisherSession = connection.createSession(transactional, ackMode);
+ Session receiverSession = connection.createSession(transactional, ackMode);
+
+ Destination publisherProducerDestination =
+ useTopics ? publisherSession.createTopic(destinationSendRoot)
+ : publisherSession.createQueue(destinationSendRoot);
+
+ MessageProducer publisherProducer =
+ createPublisherProducer
+ ? (needsQpidOptions
+ ? ((AMQSession) publisherSession).createProducer(publisherProducerDestination, mandatory, immediate)
+ : publisherSession.createProducer(publisherProducerDestination)) : null;
+
+ MessageConsumer publisherConsumer =
+ createPublisherConsumer
+ ? publisherSession.createConsumer(publisherSession.createQueue(destinationReceiveRoot)) : null;
+
+ MessageProducer receiverProducer =
+ createReceiverProducer ? receiverSession.createProducer(receiverSession.createQueue(destinationReceiveRoot))
+ : null;
+
+ MessageConsumer receiverConsumer =
+ createReceiverConsumer ? receiverSession.createConsumer(receiverSession.createQueue(destinationSendRoot))
+ : null;
+
+ // Start listening for incoming messages.
+ connection.start();
+
+ // Package everything up.
+ ProducerConsumerPair publisher =
+ new ProducerConsumerPairImpl(publisherProducer, publisherConsumer, publisherSession);
+ ProducerConsumerPair receiver =
+ new ProducerConsumerPairImpl(receiverProducer, receiverConsumer, receiverSession);
+
+ PublisherReceiver result = new PublisherReceiverImpl(publisher, receiver, connection, exceptionMonitor);
+
+ return result;
+ }
+ catch (JMSException e)
+ {
+ log.debug("Got JMSException: ", e);
+ throw new RuntimeException("Could not create publisher/receiver pair due to a JMSException.", e);
+ }
+ }
+
+ public static Message createTestMessage(ProducerConsumerPair client, ParsedProperties testProps) throws JMSException
+ {
+ return client.getSession().createMessage();
+ }
+
+ /**
+ * A ProducerConsumerPair is a pair consisting of one message producer and one message consumer. It is a standard
+ * unit of connectivity allowing a full-duplex conversation to be held, provided both the consumer and producer
+ * are instantiated and configured.
+ *
+ * In some situations a test, or piece of application code will be written with differing numbers of publishers
+ * and receivers in different roles, where one role produces only and one consumes only. This messaging topology
+ * can still make use of producer/consumer pairs as standard building blocks, combined into publisher/receiver
+ * units to fulfill the different messaging roles, with the publishers consumer uninstantiated and the receivers
+ * producer uninstantiated. Use a {@link PublisherReceiver} for this.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Provide a message producer for sending messages.
+ * <tr><td> Provide a message consumer for receiving messages.
+ * </table>
+ *
+ * @todo Update the {@link org.apache.qpid.util.ConversationFactory} so that it accepts these as the basic
+ * conversation connection units.
+ */
+ public static interface ProducerConsumerPair
+ {
+ public MessageProducer getProducer();
+
+ public MessageConsumer getConsumer();
+
+ public void send(Message message) throws JMSException;
+
+ public Session getSession();
+
+ public void close() throws JMSException;
+ }
+
+ /**
+ * A single producer and consumer.
+ */
+ public static class ProducerConsumerPairImpl implements ProducerConsumerPair
+ {
+ MessageProducer producer;
+
+ MessageConsumer consumer;
+
+ Session session;
+
+ public ProducerConsumerPairImpl(MessageProducer producer, MessageConsumer consumer, Session session)
+ {
+ this.producer = producer;
+ this.consumer = consumer;
+ this.session = session;
+ }
+
+ public MessageProducer getProducer()
+ {
+ return null;
+ }
+
+ public MessageConsumer getConsumer()
+ {
+ return null;
+ }
+
+ public void send(Message message) throws JMSException
+ {
+ producer.send(message);
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ public void close() throws JMSException
+ {
+ if (producer != null)
+ {
+ producer.close();
+ }
+
+ if (consumer != null)
+ {
+ consumer.close();
+ }
+ }
+ }
+
+ /**
+ * Multiple producers and consumers made to look like a single producer and consumer. All methods repeated accross
+ * all producers and consumers.
+ */
+ public static class MultiProducerConsumerPairImpl implements ProducerConsumerPair
+ {
+ public MessageProducer getProducer()
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ public MessageConsumer getConsumer()
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ public void send(Message message) throws JMSException
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ public Session getSession()
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ public void close()
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+ }
+
+ /**
+ * A PublisherReceiver consists of two sets of producer/consumer pairs, one for an 'instigating' publisher
+ * role, and one for a more 'passive' receiver role.
+ *
+ * <p/>A set of publishers and receivers forms a typical test configuration where both roles are to be controlled
+ * from within a single JVM. This is not a particularly usefull arrangement for applications which want to place
+ * these roles on physically seperate machines and pass messages between them. It is a faily normal arrangement for
+ * test code though, either to publish and receive messages through an in-VM message broker in order to test its
+ * expected behaviour, or to publish and receive (possibly bounced back) messages through a seperate broker instance
+ * in order to take performance timings. In the case of performance timings, the co-location of the publisher and
+ * receiver means that the timings are taken on the same machine for accurate timing without the need for clock
+ * synchronization.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Manage an m*n array of publisher and recievers.
+ * </table>
+ */
+ public static interface PublisherReceiver
+ {
+ public ProducerConsumerPair getPublisher();
+
+ public ProducerConsumerPair getReceiver();
+
+ public void start();
+
+ public void send(ParsedProperties testProps, int numMessages);
+
+ public ExceptionMonitor getConnectionExceptionMonitor();
+
+ public ExceptionMonitor getExceptionMonitor();
+
+ public void close();
+ }
+
+ public static class PublisherReceiverImpl implements PublisherReceiver
+ {
+ private ProducerConsumerPair publisher;
+ private ProducerConsumerPair receiver;
+ private Connection connection;
+ private ExceptionMonitor connectionExceptionMonitor;
+ private ExceptionMonitor exceptionMonitor;
+
+ public PublisherReceiverImpl(ProducerConsumerPair publisher, ProducerConsumerPair receiver, Connection connection,
+ ExceptionMonitor connectionExceptionMonitor)
+ {
+ this.publisher = publisher;
+ this.receiver = receiver;
+ this.connection = connection;
+ this.connectionExceptionMonitor = connectionExceptionMonitor;
+ this.exceptionMonitor = new ExceptionMonitor();
+ }
+
+ public ProducerConsumerPair getPublisher()
+ {
+ return publisher;
+ }
+
+ public ProducerConsumerPair getReceiver()
+ {
+ return receiver;
+ }
+
+ public void start()
+ { }
+
+ public void close()
+ {
+ try
+ {
+ publisher.close();
+ receiver.close();
+ connection.close();
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException("Got JMSException during close.", e);
+ }
+ }
+
+ public ExceptionMonitor getConnectionExceptionMonitor()
+ {
+ return connectionExceptionMonitor;
+ }
+
+ public ExceptionMonitor getExceptionMonitor()
+ {
+ return exceptionMonitor;
+ }
+
+ public void send(ParsedProperties testProps, int numMessages)
+ {
+ boolean transactional = testProps.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+
+ // Send an immediate message through the publisher and ensure that it results in a JMSException.
+ try
+ {
+ getPublisher().send(createTestMessage(getPublisher(), testProps));
+
+ if (transactional)
+ {
+ getPublisher().getSession().commit();
+ }
+ }
+ catch (JMSException e)
+ {
+ log.debug("Got JMSException: ", e);
+ exceptionMonitor.onException(e);
+ }
+ }
+
+ public static void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */)
+ {
+ PublisherReceiver testClients;
+
+ // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
+ testClients = createPublisherReceiverPairSharedConnection(testProps);
+ testClients.start();
+
+ testClients.send(testProps, 1);
+
+ pause(1000L);
+
+ String errors = "";
+
+ if (!testClients.getConnectionExceptionMonitor().assertOneJMSExceptionWithLinkedCause(aClass))
+ {
+ errors += "Was expecting linked exception type " + aClass.getName() + ".\n";
+ }
+
+ // Clean up the publisher/receiver client pair.
+ testClients.close();
+
+ assertEquals(errors, "", errors);
+ }
+
+ /**
+ */
+ public static void testNoExceptions(ParsedProperties testProps)
+ {
+ PublisherReceiver testClients;
+
+ // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
+ testClients = createPublisherReceiverPairSharedConnection(testProps);
+ testClients.start();
+
+ testClients.send(testProps, 1);
+
+ pause(1000L);
+
+ String errors = "";
+
+ if (!testClients.getConnectionExceptionMonitor().assertNoExceptions())
+ {
+ errors += "There were connection exceptions.\n";
+ }
+
+ if (!testClients.getExceptionMonitor().assertNoExceptions())
+ {
+ errors += "There were exceptions on producer.\n";
+ }
+
+ // Clean up the publisher/receiver client pair.
+ testClients.close();
+
+ assertEquals(errors, "", errors);
+ }
+ }
+
+ /**
+ * Pauses for the specified length of time. In the event of failing to pause for at least that length of time
+ * due to interuption of the thread, a RutimeException is raised to indicate the failure. The interupted status
+ * of the thread is restores in that case. This method should only be used when it is expected that the pause
+ * will be succesfull, for example in test code that relies on inejecting a pause.
+ *
+ * @param t The minimum time to pause for in milliseconds.
+ */
+ public static void pause(long t)
+ {
+ try
+ {
+ Thread.sleep(t);
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+
+ throw new RuntimeException("Failed to generate the requested pause length.", e);
+ }
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
new file mode 100644
index 0000000000..f41acca11b
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.NDC;
+
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.client.transport.TransportConnection;
+import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+/**
+ * MandatoryMessageTest tests for the desired behaviour of mandatory messages. Mandatory messages are a non-JMS
+ * feature. A message may be marked with a mandatory delivery flag, which means that a valid route for the message
+ * must exist, when it is sent, or when its transaction is committed in the case of transactional messaging. If this
+ * is not the case, the broker should return the message with a NO_CONSUMERS code.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Check that a mandatory message is sent succesfully not using transactions when a consumer is connected.
+ * <tr><td> Check that a mandatory message is committed succesfully in a transaction when a consumer is connected.
+ * <tr><td> Check that a mandatory message results in no route code, not using transactions, when no consumer is
+ * connected.
+ * <tr><td> Check that a mandatory message results in no route code, upon transaction commit, when a consumer is
+ * connected.
+ * </table>
+ */
+public class MandatoryMessageTest extends TestCase
+{
+ /** Used for debugging. */
+ private static final Logger log = LoggerFactory.getLogger(MandatoryMessageTest.class);
+
+ /** Used to read the tests configurable properties through. */
+ ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+ /** All these tests should have the mandatory flag on. */
+ // private boolean mandatoryFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true);
+ private boolean mandatoryFlag = testProps.setProperty(MANDATORY_PROPNAME, true);
+
+ /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkNoTx() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+
+ // Send one message with no errors.
+ ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps);
+ }
+
+ /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkTx() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+
+ // Send one message with no errors.
+ ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps);
+ }
+
+ /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteNoTx() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ // Send one message and get a linked no consumers exception.
+ ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteTx() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ // Send one message and get a linked no consumers exception.
+ ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ protected void setUp() throws Exception
+ {
+ NDC.push(getName());
+
+ // Ensure that the in-vm broker is created.
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ // Ensure that the in-vm broker is cleaned up so that the next test starts afresh.
+ TransportConnection.killVMBroker(1);
+ ApplicationRegistry.remove(1);
+ }
+ finally
+ {
+ NDC.pop();
+ }
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
new file mode 100644
index 0000000000..9c8cefc492
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
@@ -0,0 +1,282 @@
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.jms.Session;
+
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * MessagingTestConfigProperties defines a set of property names and default values for specifying a messaging topology,
+ * and test parameters for running a messaging test over that topology. A Properties object holding some of these
+ * properties, superimposed onto the defaults, is used to establish test topologies and control test behaviour.
+ *
+ * <p/>A complete list of the parameters, default values and comments on their usage is provided here:
+ *
+ * <p/><table><caption>Parameters</caption>
+ * <tr><th> Parameter <th> Default <th> Comments
+ * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers.
+ * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping.
+ * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used.
+ * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions.
+ * <tr><td> broker <td> tcp://localhost:5672 <td> Determines the broker to connect to.
+ * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over.
+ * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
+ * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message.
+ * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default.
+ * <tr><td> username <td> guest <td> The username to access the broker with.
+ * <tr><td> password <td> guest <td> The password to access the broker with.
+ * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with.
+ * <tr><td> destinationCount <td> 1 <td> The number of receivers listening to the pings.
+ * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
+ * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
+ * <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> durableDests <td> false <td> Whether or not durable destinations are used.
+ * <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
+ * 0 - SESSION_TRANSACTED
+ * 1 - AUTO_ACKNOWLEDGE
+ * 2 - CLIENT_ACKNOWLEDGE
+ * 3 - DUPS_OK_ACKNOWLEDGE
+ * 257 - NO_ACKNOWLEDGE
+ * 258 - PRE_ACKNOWLEDGE
+ * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received.
+ * Limits the volume of messages currently buffered on the client
+ * or broker. Can help scale test clients by limiting amount of buffered
+ * data to avoid out of memory errors.
+ * </table>
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide the names and defaults of all test parameters.
+ * </table>
+ */
+public class MessagingTestConfigProperties
+{
+ // ====================== Connection Properties ==================================
+
+ /** Holds the name of the default connection configuration. */
+ public static final String CONNECTION_NAME = "broker";
+
+ /** Holds the name of the property to get the initial context factory name from. */
+ public static final String INITIAL_CONTEXT_FACTORY_PROPNAME = "java.naming.factory.initial";
+
+ /** Defines the class to use as the initial context factory by default. */
+ public static final String INITIAL_CONTEXT_FACTORY_DEFAULT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+
+ /** Holds the name of the default connection factory configuration property. */
+ public static final String CONNECTION_PROPNAME = "connectionfactory.broker";
+
+ /** Defeins the default connection configuration. */
+ public static final String CONNECTION_DEFAULT = "amqp://guest:guest@clientid/?brokerlist='vm://:1'";
+
+ /** Holds the name of the property to get the test broker url from. */
+ public static final String BROKER_PROPNAME = "qpid.test.broker";
+
+ /** Holds the default broker url for the test. */
+ public static final String BROKER_DEFAULT = "vm://:1";
+
+ /** Holds the name of the property to get the test broker virtual path. */
+ public static final String VIRTUAL_HOST_PROPNAME = "virtualHost";
+
+ /** Holds the default virtual path for the test. */
+ public static final String VIRTUAL_HOST_DEFAULT = "";
+
+ /** Holds the name of the property to get the broker access username from. */
+ public static final String USERNAME_PROPNAME = "username";
+
+ /** Holds the default broker log on username. */
+ public static final String USERNAME_DEFAULT = "guest";
+
+ /** Holds the name of the property to get the broker access password from. */
+ public static final String PASSWORD_PROPNAME = "password";
+
+ /** Holds the default broker log on password. */
+ public static final String PASSWORD_DEFAULT = "guest";
+
+ // ====================== Messaging Topology Properties ==========================
+
+ /** Holds the name of the property to get the bind publisher procuder flag from. */
+ public static final String PUBLISHER_PRODUCER_BIND_PROPNAME = "publisherProducerBind";
+
+ /** Holds the default value of the publisher producer flag. */
+ public static final boolean PUBLISHER_PRODUCER_BIND_DEFAULT = true;
+
+ /** Holds the name of the property to get the bind publisher procuder flag from. */
+ public static final String PUBLISHER_CONSUMER_BIND_PROPNAME = "publisherConsumerBind";
+
+ /** Holds the default value of the publisher consumer flag. */
+ public static final boolean PUBLISHER_CONSUMER_BIND_DEFAULT = false;
+
+ /** Holds the name of the property to get the bind receiver procuder flag from. */
+ public static final String RECEIVER_PRODUCER_BIND_PROPNAME = "receiverProducerBind";
+
+ /** Holds the default value of the receiver producer flag. */
+ public static final boolean RECEIVER_PRODUCER_BIND_DEFAULT = false;
+
+ /** Holds the name of the property to get the bind receiver procuder flag from. */
+ public static final String RECEIVER_CONSUMER_BIND_PROPNAME = "receiverConsumerBind";
+
+ /** Holds the default value of the receiver consumer flag. */
+ public static final boolean RECEIVER_CONSUMER_BIND_DEFAULT = true;
+
+ /** Holds the name of the property to get the destination name root from. */
+ public static final String SEND_DESTINATION_NAME_ROOT_PROPNAME = "sendDestinationRoot";
+
+ /** Holds the root of the name of the default destination to send to. */
+ public static final String SEND_DESTINATION_NAME_ROOT_DEFAULT = "sendTo";
+
+ /** Holds the name of the property to get the destination name root from. */
+ public static final String RECEIVE_DESTINATION_NAME_ROOT_PROPNAME = "receiveDestinationRoot";
+
+ /** Holds the root of the name of the default destination to send to. */
+ public static final String RECEIVE_DESTINATION_NAME_ROOT_DEFAULT = "receiveFrom";
+
+ /** Holds the name of the proeprty to get the destination count from. */
+ public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
+
+ /** Defines the default number of destinations to ping. */
+ public static final int DESTINATION_COUNT_DEFAULT = 1;
+
+ /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */
+ public static final String PUBSUB_PROPNAME = "pubsub";
+
+ /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
+ public static final boolean PUBSUB_DEFAULT = false;
+
+ // ====================== JMS Options and Flags =================================
+
+ /** Holds the name of the property to get the test delivery mode from. */
+ public static final String PERSISTENT_MODE_PROPNAME = "persistent";
+
+ /** Holds the message delivery mode to use for the test. */
+ public static final boolean PERSISTENT_MODE_DEFAULT = false;
+
+ /** Holds the name of the property to get the test transactional mode from. */
+ public static final String TRANSACTED_PROPNAME = "transacted";
+
+ /** Holds the transactional mode to use for the test. */
+ public static final boolean TRANSACTED_DEFAULT = false;
+
+ /** Holds the name of the property to set the no local flag from. */
+ public static final String NO_LOCAL_PROPNAME = "noLocal";
+
+ /** Defines the default value of the no local flag to use when consuming messages. */
+ public static final boolean NO_LOCAL_DEFAULT = false;
+
+ /** Holds the name of the property to get the message acknowledgement mode from. */
+ public static final String ACK_MODE_PROPNAME = "ackMode";
+
+ /** Defines the default message acknowledgement mode. */
+ public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+
+ // ====================== Qpid Options and Flags ================================
+
+ /** Holds the name of the property to set the exclusive flag from. */
+ public static final String EXCLUSIVE_PROPNAME = "exclusive";
+
+ /** Defines the default value of the exclusive flag to use when consuming messages. */
+ public static final boolean EXCLUSIVE_DEFAULT = false;
+
+ /** Holds the name of the property to set the immediate flag from. */
+ public static final String IMMEDIATE_PROPNAME = "immediate";
+
+ /** Defines the default value of the immediate flag to use when sending messages. */
+ public static final boolean IMMEDIATE_DEFAULT = false;
+
+ /** Holds the name of the property to set the mandatory flag from. */
+ public static final String MANDATORY_PROPNAME = "mandatory";
+
+ /** Defines the default value of the mandatory flag to use when sending messages. */
+ public static final boolean MANDATORY_DEFAULT = false;
+
+ /** Holds the name of the property to get the durable destinations flag from. */
+ public static final String DURABLE_DESTS_PROPNAME = "durableDests";
+
+ /** Default value for the durable destinations flag. */
+ public static final boolean DURABLE_DESTS_DEFAULT = false;
+
+ /** Holds the name of the proeprty to set the prefetch size from. */
+ public static final String PREFECTH_PROPNAME = "prefetch";
+
+ /** Defines the default prefetch size to use when consuming messages. */
+ public static final int PREFETCH_DEFAULT = 100;
+
+ // ====================== Common Test Parameters ================================
+
+ /** Holds the name of the property to get the test message size from. */
+ public static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+
+ /** Used to set up a default message size. */
+ public static final int MESSAGE_SIZE_DEAFULT = 0;
+
+ /** Holds the name of the property to get the message rate from. */
+ public static final String RATE_PROPNAME = "rate";
+
+ /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */
+ public static final int RATE_DEFAULT = 0;
+
+ /** Holds the name of the proeprty to get the. */
+ public static final String SELECTOR_PROPNAME = "selector";
+
+ /** Holds the default message selector. */
+ public static final String SELECTOR_DEFAULT = "";
+
+ /** Holds the name of the property to get the waiting timeout for response messages. */
+ public static final String TIMEOUT_PROPNAME = "timeout";
+
+ /** Default time to wait before assuming that a ping has timed out. */
+ public static final long TIMEOUT_DEFAULT = 30000;
+
+ /** Holds the name of the property to get the commit batch size from. */
+ public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize";
+
+ /** Defines the default number of pings to send in each transaction when running transactionally. */
+ public static final int TX_BATCH_SIZE_DEFAULT = 1;
+
+ /** Holds the name of the property to set the maximum amount of pending message data for a producer to hold. */
+ public static final String MAX_PENDING_PROPNAME = "maxPending";
+
+ /** Defines the default maximum quantity of pending message data to allow producers to hold. */
+ public static final int MAX_PENDING_DEFAULT = 0;
+
+ /** Holds the name of the property to get the verbose mode proeprty from. */
+ public static final String VERBOSE_PROPNAME = "verbose";
+
+ /** Holds the default verbose mode. */
+ public static final boolean VERBOSE_DEFAULT = false;
+
+ /** Holds the default configuration properties. */
+ public static ParsedProperties defaults = new ParsedProperties();
+
+ static
+ {
+ defaults.setPropertyIfNull(INITIAL_CONTEXT_FACTORY_PROPNAME, INITIAL_CONTEXT_FACTORY_DEFAULT);
+ defaults.setPropertyIfNull(CONNECTION_PROPNAME, CONNECTION_DEFAULT);
+ defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
+ defaults.setPropertyIfNull(PUBLISHER_PRODUCER_BIND_PROPNAME, PUBLISHER_PRODUCER_BIND_DEFAULT);
+ defaults.setPropertyIfNull(PUBLISHER_CONSUMER_BIND_PROPNAME, PUBLISHER_CONSUMER_BIND_DEFAULT);
+ defaults.setPropertyIfNull(RECEIVER_PRODUCER_BIND_PROPNAME, RECEIVER_PRODUCER_BIND_DEFAULT);
+ defaults.setPropertyIfNull(RECEIVER_CONSUMER_BIND_PROPNAME, RECEIVER_CONSUMER_BIND_DEFAULT);
+ defaults.setPropertyIfNull(SEND_DESTINATION_NAME_ROOT_PROPNAME, SEND_DESTINATION_NAME_ROOT_DEFAULT);
+ defaults.setPropertyIfNull(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME, RECEIVE_DESTINATION_NAME_ROOT_DEFAULT);
+ defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
+ defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
+ defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
+ defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
+ defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
+ defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
+ defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
+ defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT);
+ defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
+ defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
+ defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
+ defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
+ defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
+ defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
+ defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
+ defaults.setPropertyIfNull(PREFECTH_PROPNAME, PREFETCH_DEFAULT);
+ defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT);
+ defaults.setPropertyIfNull(EXCLUSIVE_PROPNAME, EXCLUSIVE_DEFAULT);
+ defaults.setPropertyIfNull(IMMEDIATE_PROPNAME, IMMEDIATE_DEFAULT);
+ defaults.setPropertyIfNull(MANDATORY_PROPNAME, MANDATORY_DEFAULT);
+ }
+}