summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-02-07 11:31:14 +0000
committerKeith Wall <kwall@apache.org>2012-02-07 11:31:14 +0000
commita3bbc153f2d98b7afc5accbd6aff04a79f20b7ad (patch)
treea178cb357dffe374f386219c75f8803ba25d02ba
parenta62ee47c4a7b03c05b1a534a7c189c76aa3ded04 (diff)
downloadqpid-python-a3bbc153f2d98b7afc5accbd6aff04a79f20b7ad.tar.gz
QPID-3818: Replace CombinedTest and its support classes with a simpler test-case.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1241431 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java169
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java143
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java70
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java95
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java112
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java46
6 files changed, 169 insertions, 466 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java
new file mode 100644
index 0000000000..fe8180d6c6
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java
@@ -0,0 +1,169 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.message;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Tests that {@link Message#setJMSReplyTo(Destination)} can be used to pass a {@link Destination} between
+ * messaging clients as is commonly used in request/response messaging pattern implementations.
+ */
+public class JMSReplyToTest extends QpidBrokerTestCase
+{
+ private AtomicReference<Throwable> _caughtException = new AtomicReference<Throwable>();
+ private Queue _requestQueue;
+ private Connection _connection;
+ private Session _session;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _requestQueue = startAsyncRespondingJmsConsumerOnSeparateConnection();
+
+ _connection = getConnection();
+ _connection.start();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testRequestResponseUsingJmsReplyTo() throws Exception
+ {
+ final String responseQueueName = getTestQueueName() + ".response";
+ Queue replyToQueue = _session.createQueue(responseQueueName);
+ sendRequestAndValidateResponse(replyToQueue);
+ }
+
+ public void testRequestResponseUsingTemporaryJmsReplyTo() throws Exception
+ {
+ TemporaryQueue replyToQueue = _session.createTemporaryQueue();
+
+ sendRequestAndValidateResponse(replyToQueue);
+ }
+
+ private void sendRequestAndValidateResponse(Queue replyToQueue) throws JMSException, Exception
+ {
+ MessageConsumer replyConsumer = _session.createConsumer(replyToQueue);
+
+ Message requestMessage = createRequestMessageWithJmsReplyTo(_session, replyToQueue);
+ sendRequest(_requestQueue, _session, requestMessage);
+
+ receiveAndValidateResponse(replyConsumer, requestMessage);
+
+ assertNull("Async responder caught unexpected exception", _caughtException.get());
+ }
+
+ private Message createRequestMessageWithJmsReplyTo(Session session, Queue replyToQueue)
+ throws JMSException
+ {
+ Message requestMessage = session.createTextMessage("My request");
+ requestMessage.setJMSReplyTo(replyToQueue);
+ return requestMessage;
+ }
+
+ private void sendRequest(final Queue requestQueue, Session session, Message requestMessage) throws Exception
+ {
+ MessageProducer producer = session.createProducer(requestQueue);
+ producer.send(requestMessage);
+ }
+
+ private void receiveAndValidateResponse(MessageConsumer replyConsumer, Message requestMessage) throws JMSException
+ {
+ Message responseMessage = replyConsumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Response message not received", responseMessage);
+ assertEquals("Correlation id of the response should match message id of the request",
+ responseMessage.getJMSCorrelationID(), requestMessage.getJMSMessageID());
+ }
+
+ private Queue startAsyncRespondingJmsConsumerOnSeparateConnection() throws Exception
+ {
+ final String requestQueueName = getTestQueueName() + ".request";
+ final Connection responderConnection = getConnection();
+ responderConnection.start();
+ final Session responderSession = responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue requestQueue = responderSession.createQueue(requestQueueName);
+
+ final MessageConsumer requestConsumer = responderSession.createConsumer(requestQueue);
+ requestConsumer.setMessageListener(new AsyncResponder(responderSession));
+
+ return requestQueue;
+ }
+
+ private final class AsyncResponder implements MessageListener
+ {
+ private final Session _responderSession;
+
+ private AsyncResponder(Session responderSession)
+ {
+ _responderSession = responderSession;
+ }
+
+ @Override
+ public void onMessage(Message requestMessage)
+ {
+ try
+ {
+ Destination replyTo = getReplyToQueue(requestMessage);
+
+ Message responseMessage = _responderSession.createMessage();
+ responseMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
+
+ sendResponseToQueue(replyTo, responseMessage);
+ }
+ catch (Throwable t)
+ {
+ _caughtException.set(t);
+ }
+ }
+
+ private Destination getReplyToQueue(Message requestMessage) throws JMSException, IllegalStateException
+ {
+ Destination replyTo = requestMessage.getJMSReplyTo();
+ if (replyTo == null)
+ {
+ throw new IllegalStateException("JMSReplyTo was null on message " + requestMessage);
+ }
+ return replyTo;
+ }
+
+ private void sendResponseToQueue(Destination replyTo, Message responseMessage)
+ throws JMSException
+ {
+ MessageProducer responseProducer = _responderSession.createProducer(replyTo);
+ responseProducer.send(responseMessage);
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java
deleted file mode 100644
index d42303114b..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.client.forwardall;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-
-/**
- * Declare a private temporary response queue,
- * send a message to amq.direct with a well known routing key with the
- * private response queue as the reply-to destination
- * consume responses.
- */
-public class Client implements MessageListener
-{
- private static final Logger _logger = LoggerFactory.getLogger(Client.class);
-
- private final AMQConnection _connection;
- private final AMQSession _session;
- private final int _expected;
- private int _count;
- private static QpidBrokerTestCase _qct;
-
- Client(String broker, int expected) throws Exception
- {
- this(connect(broker), expected);
- }
-
- public static void setQTC(QpidBrokerTestCase qtc)
- {
- _qct = qtc;
- }
- Client(AMQConnection connection, int expected) throws Exception
- {
- _connection = connection;
- _expected = expected;
- _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE);
- AMQQueue response =
- new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("ResponseQueue"), true);
- _session.createConsumer(response).setMessageListener(this);
- _connection.start();
- // AMQQueue service = new SpecialQueue(_connection, "ServiceQueue");
- AMQQueue service = (AMQQueue) _session.createQueue("ServiceQueue") ;
- Message request = _session.createTextMessage("Request!");
- request.setJMSReplyTo(response);
- MessageProducer prod = _session.createProducer(service);
- prod.send(request);
- _session.commit();
- }
-
- void shutdownWhenComplete() throws Exception
- {
- waitUntilComplete();
- _connection.close();
- }
-
- public synchronized void onMessage(Message response)
- {
-
- _logger.info("Received " + (++_count) + " of " + _expected + " responses.");
- if (_count == _expected)
- {
-
- notifyAll();
- }
- try
- {
- _session.commit();
- }
- catch (JMSException e)
- {
-
- }
-
- }
-
- synchronized void waitUntilComplete() throws Exception
- {
-
- if (_count < _expected)
- {
- wait(60000);
- }
-
- if (_count < _expected)
- {
- throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected);
- }
- }
-
- static AMQConnection connect(String broker) throws Exception
- {
- //return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test");
- return (AMQConnection) _qct.getConnection("guest", "guest") ;
- }
-
- public static void main(String[] argv) throws Exception
- {
- final String connectionString;
- final int expected;
- if (argv.length == 0)
- {
- connectionString = "localhost:5672";
- expected = 100;
- }
- else
- {
- connectionString = argv[0];
- expected = Integer.parseInt(argv[1]);
- }
-
- new Client(connect(connectionString), expected).shutdownWhenComplete();
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
deleted file mode 100644
index e7bc2bb5c3..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.client.forwardall;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/**
- * Runs the Service's and Client parts of the test in the same process
- * as the broker
- */
-public class CombinedTest extends QpidBrokerTestCase
-{
- private static final Logger _logger = LoggerFactory.getLogger(CombinedTest.class);
- private int run = 0;
-
- protected void setUp() throws Exception
- {
- super.setUp();
- Service.setQTC(this);
- Client.setQTC(this);
- }
-
- protected void tearDown() throws Exception
- {
- ServiceCreator.closeAll();
- super.tearDown();
- }
-
- public void testForwardAll() throws Exception
- {
- while (run < 10)
- {
- int services =1;
- ServiceCreator.start("vm://:1", services);
-
- _logger.info("Starting " + ++run + " client...");
-
- new Client("vm://:1", services).shutdownWhenComplete();
-
-
- _logger.info("Completed " + run + " successfully!");
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(CombinedTest.class);
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java
deleted file mode 100644
index 64ded09efa..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.client.forwardall;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
-/**
- * Declare a queue and bind it to amq.direct with a 'well known' routing key,
- * register a consumer for this queue and send a response to every message received.
- */
-public class Service implements MessageListener
-{
- private final AMQConnection _connection;
- private final AMQSession _session;
-
- private static QpidBrokerTestCase _qct;
-
-
- public static void setQTC(QpidBrokerTestCase qtc)
- {
- _qct = qtc;
- }
- Service(String broker) throws Exception
- {
- this(connect(broker));
- }
-
- Service(AMQConnection connection) throws Exception
- {
- _connection = connection;
- //AMQQueue queue = new SpecialQueue(connection, "ServiceQueue");
- _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE);
- AMQQueue queue = (AMQQueue) _session.createQueue("ServiceQueue") ;
- _session.createConsumer(queue).setMessageListener(this);
- _connection.start();
- }
-
- public void onMessage(Message request)
- {
- try
- {
- Message response = _session.createTextMessage("Response!");
- Destination replyTo = request.getJMSReplyTo();
- _session.createProducer(replyTo).send(response);
- _session.commit();
- }
- catch (Exception e)
- {
- e.printStackTrace(System.out);
- }
- }
-
- public void close() throws JMSException
- {
- _connection.close();
- }
-
- static AMQConnection connect(String broker) throws Exception
- {
- //return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test");
- return (AMQConnection) _qct.getConnection("guest", "guest") ;
- }
-
-// public static void main(String[] argv) throws Exception
-// {
-// String broker = argv.length == 0? "localhost:5672" : argv[0];
-// new Service(broker);
-// }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java
deleted file mode 100644
index be16f6b7ae..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.client.forwardall;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.JMSException;
-
-public class ServiceCreator implements Runnable
-{
- private static final Logger _logger = LoggerFactory.getLogger(ServiceCreator.class);
-
- private static Thread[] threads;
- private static ServiceCreator[] _services;
-
- private final String broker;
- private Service service;
-
- ServiceCreator(String broker)
- {
- this.broker = broker;
- }
-
- public void run()
- {
- try
- {
- service = new Service(broker);
- }
- catch (Exception e)
- {
- e.printStackTrace(System.out);
- }
- }
-
- public void closeSC() throws JMSException
- {
- service.close();
- }
-
- static void closeAll()
- {
- for (int i = 0; i < _services.length; i++)
- {
- try
- {
- _services[i].closeSC();
- }
- catch (JMSException e)
- {
- // ignore
- }
- }
- }
-
- static void start(String broker, int services) throws InterruptedException
- {
- threads = new Thread[services];
- _services = new ServiceCreator[services];
- ServiceCreator runner = new ServiceCreator(broker);
- // start services
- _logger.info("Starting " + services + " services...");
- for (int i = 0; i < services; i++)
- {
- threads[i] = new Thread(runner);
- _services[i] = runner;
- threads[i].start();
- }
-
- for (int i = 0; i < threads.length; i++)
- {
- threads[i].join();
- }
- }
-
- public static void main(String[] argv) throws Exception
- {
- final String connectionString;
- final int services;
- if (argv.length == 0)
- {
- connectionString = "localhost:5672";
- services = 100;
- }
- else
- {
- connectionString = argv[0];
- services = Integer.parseInt(argv[1]);
- }
-
- start(connectionString, services);
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
deleted file mode 100644
index 284efb1913..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.client.forwardall;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.framing.AMQShortString;
-
-/**
- * Queue that allows several private queues to be registered and bound
- * to an exchange with the same routing key.
- *
- */
-class SpecialQueue extends AMQQueue
-{
- private final AMQShortString name;
-
- SpecialQueue(AMQConnection con, String name)
- {
- super(con, name, true);
- this.name = new AMQShortString(name);
- }
-
- public AMQShortString getRoutingKey()
- {
- return name;
- }
-}