summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client')
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java104
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java258
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java660
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java113
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java98
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java73
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java215
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java78
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java157
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java378
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java244
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java334
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java198
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java166
14 files changed, 3076 insertions, 0 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
new file mode 100644
index 0000000000..0d81b66be0
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.JMSException;
+import javax.jms.QueueReceiver;
+import javax.jms.TopicSubscriber;
+
+/**
+ * Tests for QueueReceiver and TopicSubscriber creation methods on AMQSession
+ */
+public class AMQSessionTest extends QpidBrokerTestCase
+{
+
+ private static AMQSession _session;
+ private static AMQTopic _topic;
+ private static AMQQueue _queue;
+ private static AMQConnection _connection;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _connection = (AMQConnection) getConnection("guest", "guest");
+ _topic = new AMQTopic(_connection,"mytopic");
+ _queue = new AMQQueue(_connection,"myqueue");
+ _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ //just close
+ }
+ super.tearDown();
+ }
+
+ public void testCreateSubscriber() throws JMSException
+ {
+ TopicSubscriber subscriber = _session.createSubscriber(_topic);
+ assertEquals("Topic names should match from TopicSubscriber", _topic.getTopicName(), subscriber.getTopic().getTopicName());
+
+ subscriber = _session.createSubscriber(_topic, "abc", false);
+ assertEquals("Topic names should match from TopicSubscriber with selector", _topic.getTopicName(), subscriber.getTopic().getTopicName());
+ }
+
+ public void testCreateDurableSubscriber() throws JMSException
+ {
+ TopicSubscriber subscriber = _session.createDurableSubscriber(_topic, "mysubname");
+ assertEquals("Topic names should match from durable TopicSubscriber", _topic.getTopicName(), subscriber.getTopic().getTopicName());
+
+ subscriber = _session.createDurableSubscriber(_topic, "mysubname2", "abc", false);
+ assertEquals("Topic names should match from durable TopicSubscriber with selector", _topic.getTopicName(), subscriber.getTopic().getTopicName());
+ _session.unsubscribe("mysubname");
+ _session.unsubscribe("mysubname2");
+ }
+
+ public void testCreateQueueReceiver() throws JMSException
+ {
+ QueueReceiver receiver = _session.createQueueReceiver(_queue);
+ assertEquals("Queue names should match from QueueReceiver", _queue.getQueueName(), receiver.getQueue().getQueueName());
+
+ receiver = _session.createQueueReceiver(_queue, "abc");
+ assertEquals("Queue names should match from QueueReceiver with selector", _queue.getQueueName(), receiver.getQueue().getQueueName());
+ }
+
+ public void testCreateReceiver() throws JMSException
+ {
+ QueueReceiver receiver = _session.createReceiver(_queue);
+ assertEquals("Queue names should match from QueueReceiver", _queue.getQueueName(), receiver.getQueue().getQueueName());
+
+ receiver = _session.createReceiver(_queue, "abc");
+ assertEquals("Queue names should match from QueueReceiver with selector", _queue.getQueueName(), receiver.getQueue().getQueueName());
+ }
+
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
new file mode 100644
index 0000000000..77df6c58d9
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import java.io.IOException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.management.common.mbeans.ManagedExchange;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.BindingURL;
+
+import javax.jms.Connection;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase
+{
+ private JMXTestUtils _jmxUtils;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ getBrokerConfiguration().addJmxManagementConfiguration();
+
+ _jmxUtils = new JMXTestUtils(this);
+
+ super.setUp();
+ _jmxUtils.open();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ if (_jmxUtils != null)
+ {
+ _jmxUtils.close();
+ }
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ /*
+ * Tests to validate that setting the respective qpid.declare_queues,
+ * qpid.declare_exchanges system properties functions as expected.
+ */
+
+ public void testQueueNotDeclaredDuringConsumerCreation() throws Exception
+ {
+ setSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
+
+ Connection connection = getConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(getTestQueueName());
+
+ try
+ {
+ session.createConsumer(queue);
+ fail("JMSException should be thrown as the queue does not exist");
+ }
+ catch (JMSException e)
+ {
+ checkExceptionErrorCode(e, AMQConstant.NOT_FOUND);
+ }
+ }
+
+ public void testExchangeNotDeclaredDuringConsumerCreation() throws Exception
+ {
+ setSystemProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "false");
+
+ Connection connection = getConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String exchangeName = getTestQueueName();
+ Queue queue = session.createQueue("direct://" + exchangeName + "/queue/queue");
+
+ try
+ {
+ session.createConsumer(queue);
+ fail("JMSException should be thrown as the exchange does not exist");
+ }
+ catch (JMSException e)
+ {
+ checkExceptionErrorCode(e, AMQConstant.NOT_FOUND);
+ }
+
+ //verify the exchange was not declared
+ String exchangeObjectName = _jmxUtils.getExchangeObjectName("test", exchangeName);
+ assertFalse("exchange should not exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName));
+ }
+
+ /**
+ * Checks that setting {@value ClientProperties#QPID_DECLARE_EXCHANGES_PROP_NAME} false results in
+ * disabling implicit ExchangeDeclares during producer creation when using a {@link BindingURL}
+ */
+ public void testExchangeNotDeclaredDuringProducerCreation() throws Exception
+ {
+ Connection connection = getConnection();
+ Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String exchangeName1 = getTestQueueName() + "1";
+
+
+ Queue queue = session1.createQueue("direct://" + exchangeName1 + "/queue/queue");
+ session1.createProducer(queue);
+
+ //close the session to ensure any previous commands were fully processed by
+ //the broker before observing their effect
+ session1.close();
+
+ //verify the exchange was declared
+ String exchangeObjectName = _jmxUtils.getExchangeObjectName("test", exchangeName1);
+ assertTrue("exchange should exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName));
+
+ //Now disable the implicit exchange declares and try again
+ setSystemProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "false");
+
+ Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String exchangeName2 = getTestQueueName() + "2";
+
+ Queue queue2 = session2.createQueue("direct://" + exchangeName2 + "/queue/queue");
+ session2.createProducer(queue2);
+
+ //close the session to ensure any previous commands were fully processed by
+ //the broker before observing their effect
+ session2.close();
+
+ //verify the exchange was not declared
+ String exchangeObjectName2 = _jmxUtils.getExchangeObjectName("test", exchangeName2);
+ assertFalse("exchange should not exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName2));
+ }
+
+ public void testQueueNotBoundDuringConsumerCreation() throws Exception
+ {
+ setSystemProperty(ClientProperties.QPID_BIND_QUEUES_PROP_NAME, "false");
+ setSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "true");
+
+ Connection connection = getConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(getTestQueueName());
+ session.createConsumer(queue);
+
+ try
+ {
+ session.createProducer(queue).send(session.createMessage());
+ fail("JMSException should be thrown as the queue does not exist");
+ }
+ catch (InvalidDestinationException ide)
+ {
+ //PASS
+ }
+ }
+ private void checkExceptionErrorCode(JMSException original, AMQConstant code)
+ {
+ Exception linked = original.getLinkedException();
+ assertNotNull("Linked exception should have been set", linked);
+ assertTrue("Linked exception should be an AMQException", linked instanceof AMQException);
+ assertEquals("Error code should be " + code.getCode(), code, ((AMQException) linked).getErrorCode());
+ }
+
+ /*
+ * Tests to validate that the custom exchanges declared by the client during
+ * consumer and producer creation have the expected properties.
+ */
+
+ public void testPropertiesOfCustomExchangeDeclaredDuringProducerCreation() throws Exception
+ {
+ implTestPropertiesOfCustomExchange(true, false);
+ }
+
+ public void testPropertiesOfCustomExchangeDeclaredDuringConsumerCreation() throws Exception
+ {
+ implTestPropertiesOfCustomExchange(false, true);
+ }
+
+ private void implTestPropertiesOfCustomExchange(boolean createProducer, boolean createConsumer) throws Exception
+ {
+ Connection connection = getConnection();
+
+ Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String exchangeName1 = getTestQueueName() + "1";
+ String queueName1 = getTestQueueName() + "1";
+
+ Queue queue = session1.createQueue("direct://" + exchangeName1 + "/" + queueName1 + "/" + queueName1 + "?" + BindingURL.OPTION_EXCHANGE_AUTODELETE + "='true'");
+ if(createProducer)
+ {
+ session1.createProducer(queue);
+ }
+
+ if(createConsumer)
+ {
+ session1.createConsumer(queue);
+ }
+ session1.close();
+
+ //verify the exchange was declared to expectation
+ verifyDeclaredExchange(exchangeName1, true, false);
+
+ Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String exchangeName2 = getTestQueueName() + "2";
+ String queueName2 = getTestQueueName() + "2";
+
+ Queue queue2 = session2.createQueue("direct://" + exchangeName2 + "/" + queueName2 + "/" + queueName2 + "?" + BindingURL.OPTION_EXCHANGE_DURABLE + "='true'");
+ if(createProducer)
+ {
+ session2.createProducer(queue2);
+ }
+
+ if(createConsumer)
+ {
+ session2.createConsumer(queue2);
+ }
+ session2.close();
+
+ //verify the exchange was declared to expectation
+ verifyDeclaredExchange(exchangeName2, false, true);
+ }
+
+ private void verifyDeclaredExchange(String exchangeName, boolean isAutoDelete, boolean isDurable) throws IOException
+ {
+ String exchangeObjectName = _jmxUtils.getExchangeObjectName("test", exchangeName);
+ assertTrue("exchange should exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName));
+ ManagedExchange exchange = _jmxUtils.getManagedExchange(exchangeName);
+ assertEquals(isAutoDelete, exchange.isAutoDelete());
+ assertEquals(isDurable,exchange.isDurable());
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
new file mode 100644
index 0000000000..5e1e38106a
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -0,0 +1,660 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.RejectBehaviour;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
+/**
+ * Test that the MaxRedelivery feature works as expected, allowing the client to reject
+ * messages during rollback/recover whilst specifying they not be requeued if delivery
+ * to an application has been attempted a specified number of times.
+ *
+ * General approach: specify a set of messages which will cause the test client to then
+ * deliberately rollback/recover the session after consuming, and monitor that they are
+ * re-delivered the specified number of times before the client rejects them without requeue
+ * and then verify that they are not subsequently redelivered.
+ *
+ * Additionally, the queue used in the test is configured for DLQ'ing, and the test verifies
+ * that the messages rejected without requeue are then present on the appropriate DLQ.
+ */
+public class MaxDeliveryCountTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = Logger.getLogger(MaxDeliveryCountTest.class);
+ private boolean _failed;
+ private String _failMsg;
+ private static final int MSG_COUNT = 15;
+ private static final int MAX_DELIVERY_COUNT = 2;
+ private CountDownLatch _awaitCompletion;
+
+ /** index numbers of messages to be redelivered */
+ private final List<Integer> _redeliverMsgs = Arrays.asList(1, 2, 5, 14);
+
+ public void setUp() throws Exception
+ {
+ //enable DLQ/maximumDeliveryCount support for all queues at the vhost level
+
+ TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration();
+ setTestSystemProperty("queue.deadLetterQueueEnabled","true");
+ setTestSystemProperty("queue.maximumDeliveryAttempts", String.valueOf(MAX_DELIVERY_COUNT));
+
+ //Ensure management is on
+ brokerConfiguration.addJmxManagementConfiguration();
+
+ // Set client-side flag to allow the server to determine if messages
+ // dead-lettered or requeued.
+ if (!isBroker010())
+ {
+ setTestClientSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.SERVER.toString());
+ }
+ super.setUp();
+
+ boolean durableSub = isDurSubTest();
+
+ //declare the test queue
+ Connection consumerConnection = getConnection();
+ Session consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Destination destination = getDestination(consumerSession, durableSub);
+ if(durableSub)
+ {
+ consumerSession.createDurableSubscriber((Topic)destination, getName()).close();
+ }
+ else
+ {
+ consumerSession.createConsumer(destination).close();
+ }
+
+ consumerConnection.close();
+
+ //Create Producer put some messages on the queue
+ Connection producerConnection = getConnection();
+ producerConnection.start();
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(getDestination(producerSession, durableSub));
+
+ for (int count = 1; count <= MSG_COUNT; count++)
+ {
+ Message msg = producerSession.createTextMessage(generateContent(count));
+ msg.setIntProperty("count", count);
+ producer.send(msg);
+ }
+
+ producerConnection.close();
+
+ _failed = false;
+ _awaitCompletion = new CountDownLatch(1);
+ }
+
+ private Destination getDestination(Session consumerSession, boolean durableSub) throws JMSException
+ {
+ if(durableSub)
+ {
+ return consumerSession.createTopic(getTestQueueName());
+ }
+ else
+ {
+ return consumerSession.createQueue(getTestQueueName());
+ }
+ }
+
+ private String generateContent(int count)
+ {
+ return "Message " + count + " content.";
+ }
+
+ /**
+ * Test that Max Redelivery is enforced when using onMessage() on a
+ * Client-Ack session.
+ */
+ public void testAsynchronousClientAckSession() throws Exception
+ {
+ doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, false, false);
+ }
+
+ /**
+ * Test that Max Redelivery is enforced when using onMessage() on a
+ * transacted session.
+ */
+ public void testAsynchronousTransactedSession() throws Exception
+ {
+ doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false, false);
+ }
+
+ /**
+ * Test that Max Redelivery is enforced when using onMessage() on an
+ * Auto-Ack session.
+ */
+ public void testAsynchronousAutoAckSession() throws Exception
+ {
+ doTest(Session.AUTO_ACKNOWLEDGE, _redeliverMsgs, false, false);
+ }
+
+ /**
+ * Test that Max Redelivery is enforced when using onMessage() on a
+ * Dups-OK session.
+ */
+ public void testAsynchronousDupsOkSession() throws Exception
+ {
+ doTest(Session.DUPS_OK_ACKNOWLEDGE, _redeliverMsgs, false, false);
+ }
+
+ /**
+ * Test that Max Redelivery is enforced when using recieve() on a
+ * Client-Ack session.
+ */
+ public void testSynchronousClientAckSession() throws Exception
+ {
+ doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, true, false);
+ }
+
+ /**
+ * Test that Max Redelivery is enforced when using recieve() on a
+ * transacted session.
+ */
+ public void testSynchronousTransactedSession() throws Exception
+ {
+ doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true, false);
+ }
+
+ public void testDurableSubscription() throws Exception
+ {
+ doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false, true);
+ }
+
+ public void testWhenBrokerIsRestartedAfterEnqeuingMessages() throws Exception
+ {
+ restartBroker();
+
+ doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true, false);
+ }
+
+ private void doTest(final int deliveryMode, final List<Integer> redeliverMsgs, final boolean synchronous, final boolean durableSub) throws Exception
+ {
+ final Connection clientConnection = getConnection();
+
+ final boolean transacted = deliveryMode == Session.SESSION_TRANSACTED ? true : false;
+ final Session clientSession = clientConnection.createSession(transacted, deliveryMode);
+
+ MessageConsumer consumer;
+ Destination dest = getDestination(clientSession, durableSub);
+ AMQQueue checkQueue;
+ if(durableSub)
+ {
+ consumer = clientSession.createDurableSubscriber((Topic)dest, getName());
+ checkQueue = new AMQQueue("amq.topic", "clientid" + ":" + getName());
+ }
+ else
+ {
+ consumer = clientSession.createConsumer(dest);
+ checkQueue = (AMQQueue) dest;
+ }
+
+ assertEquals("The queue should have " + MSG_COUNT + " msgs at start",
+ MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue));
+
+ clientConnection.start();
+
+ int expectedDeliveries = MSG_COUNT + ((MAX_DELIVERY_COUNT -1) * redeliverMsgs.size());
+
+ if(synchronous)
+ {
+ doSynchronousTest(clientSession, consumer, clientSession.getAcknowledgeMode(),
+ MAX_DELIVERY_COUNT, expectedDeliveries, redeliverMsgs);
+ }
+ else
+ {
+ addMessageListener(clientSession, consumer, clientSession.getAcknowledgeMode(),
+ MAX_DELIVERY_COUNT, expectedDeliveries, redeliverMsgs);
+
+ try
+ {
+ if (!_awaitCompletion.await(20, TimeUnit.SECONDS))
+ {
+ fail("Test did not complete in 20 seconds.");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ fail("Unable to wait for test completion");
+ throw e;
+ }
+
+ if(_failed)
+ {
+ fail(_failMsg);
+ }
+ }
+ consumer.close();
+
+ //check the source queue is now empty
+ assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue, true));
+
+ //check the DLQ has the required number of rejected-without-requeue messages
+ verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub);
+
+ if(isBrokerStorePersistent())
+ {
+ //restart the broker to verify persistence of the DLQ and the messages on it
+ clientConnection.close();
+
+ restartBroker();
+
+ final Connection clientConnection2 = getConnection();
+ clientConnection2.start();
+
+ //verify the messages on the DLQ
+ verifyDLQcontent(clientConnection2, redeliverMsgs, getTestQueueName(), durableSub);
+ clientConnection2.close();
+ }
+ else
+ {
+
+ //verify the messages on the DLQ
+ verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub);
+ clientConnection.close();
+ }
+
+ }
+
+ private void verifyDLQdepth(int expected, Session clientSession, boolean durableSub) throws AMQException
+ {
+ AMQDestination checkQueueDLQ;
+ if(durableSub)
+ {
+ checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+ else
+ {
+ checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+
+ assertEquals("The DLQ should have " + expected + " msgs on it", expected,
+ ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueueDLQ, true));
+ }
+
+ private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, String destName, boolean durableSub) throws JMSException
+ {
+ Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer;
+ if(durableSub)
+ {
+ consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX));
+ }
+ else
+ {
+ consumer = clientSession.createConsumer(
+ clientSession.createQueue(destName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX));
+ }
+
+ //keep track of the message we expect to still be on the DLQ
+ List<Integer> outstandingMessages = new ArrayList<Integer>(redeliverMsgs);
+ int numMsg = outstandingMessages.size();
+
+ for(int i = 0; i < numMsg; i++)
+ {
+ Message message = consumer.receive(250);
+
+ assertNotNull("failed to consume expected message " + i + " from DLQ", message);
+ assertTrue("message " + i + " was the wrong type", message instanceof TextMessage);
+
+ //using Integer here to allow removing the value from the list, using int
+ //would instead result in removal of the element at that index
+ Integer msgId = message.getIntProperty("count");
+
+ TextMessage txt = (TextMessage) message;
+ _logger.info("Received message " + msgId + " at " + i + " from the DLQ: " + txt.getText());
+
+ assertTrue("message " + i + " was not one of those which should have been on the DLQ",
+ redeliverMsgs.contains(msgId));
+ assertTrue("message " + i + " was not one of those expected to still be on the DLQ",
+ outstandingMessages.contains(msgId));
+ assertEquals("Message " + i + " content was not as expected", generateContent(msgId), txt.getText());
+
+ //remove from the list of outstanding msgs
+ outstandingMessages.remove(msgId);
+ }
+
+ if(outstandingMessages.size() > 0)
+ {
+ String failures = "";
+ for(Integer msg : outstandingMessages)
+ {
+ failures = failures.concat(msg + " ");
+ }
+ fail("some DLQ'd messages were not found on the DLQ: " + failures);
+ }
+ }
+
+ private void addMessageListener(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount,
+ final int expectedTotalNumberOfDeliveries, final List<Integer> redeliverMsgs) throws JMSException
+ {
+ if(deliveryMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE
+ || deliveryMode == org.apache.qpid.jms.Session.PRE_ACKNOWLEDGE)
+ {
+ failAsyncTest("Max Delivery feature is not supported with this acknowledgement mode" +
+ "when using asynchronous message delivery.");
+ }
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ private int _deliveryAttempts = 0; //number of times given message(s) have been seen
+ private int _numMsgsToBeRedelivered = 0; //number of messages to rollback/recover
+ private int _totalNumDeliveries = 0;
+ private int _expectedMessage = 1;
+
+ public void onMessage(Message message)
+ {
+ if(_failed || _awaitCompletion.getCount() == 0L)
+ {
+ //don't process anything else
+ return;
+ }
+
+ _totalNumDeliveries++;
+
+ if (message == null)
+ {
+ failAsyncTest("Should not get null messages");
+ return;
+ }
+
+ try
+ {
+ int msgId = message.getIntProperty("count");
+
+ _logger.info("Received message: " + msgId);
+
+ //check the message is the one we expected
+ if(_expectedMessage != msgId)
+ {
+ failAsyncTest("Expected message " + _expectedMessage + " , got message " + msgId);
+ return;
+ }
+
+ _expectedMessage++;
+
+ //keep track of the overall deliveries to ensure we don't see more than expected
+ if(_totalNumDeliveries > expectedTotalNumberOfDeliveries)
+ {
+ failAsyncTest("Expected total of " + expectedTotalNumberOfDeliveries +
+ " message deliveries, reached " + _totalNumDeliveries);
+ }
+
+ //check if this message is one chosen to be rolled back / recovered
+ if(redeliverMsgs.contains(msgId))
+ {
+ _numMsgsToBeRedelivered++;
+
+ //check if next message is going to be rolled back / recovered too
+ if(redeliverMsgs.contains(msgId +1))
+ {
+ switch(deliveryMode)
+ {
+ case Session.SESSION_TRANSACTED:
+ //skip on to next message immediately
+ return;
+ case Session.CLIENT_ACKNOWLEDGE:
+ //skip on to next message immediately
+ return;
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ //fall through
+ case Session.AUTO_ACKNOWLEDGE:
+ //must recover session now or onMessage will ack, so
+ //just fall through the if
+ break;
+ }
+ }
+
+ _deliveryAttempts++; //increment count of times the current rolled back/recovered message(s) have been seen
+
+ _logger.debug("ROLLBACK/RECOVER");
+ switch(deliveryMode)
+ {
+ case Session.SESSION_TRANSACTED:
+ session.rollback();
+ break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ //fall through
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ //fall through
+ case Session.AUTO_ACKNOWLEDGE:
+ session.recover();
+ break;
+ }
+
+ if( _deliveryAttempts >= maxDeliveryCount)
+ {
+ //the client should have rejected the latest messages upon then
+ //above recover/rollback, adjust counts to compensate
+ _deliveryAttempts = 0;
+ }
+ else
+ {
+ //the message(s) should be redelivered, adjust expected message
+ _expectedMessage -= _numMsgsToBeRedelivered;
+ }
+ _logger.debug("XXX _expectedMessage: " + _expectedMessage + " _deliveryAttempts : " + _deliveryAttempts + " _numMsgsToBeRedelivered=" + _numMsgsToBeRedelivered);
+ //reset count of messages expected to be redelivered
+ _numMsgsToBeRedelivered = 0;
+ }
+ else
+ {
+ //consume the message
+ switch(deliveryMode)
+ {
+ case Session.SESSION_TRANSACTED:
+ session.commit();
+ break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ message.acknowledge();
+ break;
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ //fall-through
+ case Session.AUTO_ACKNOWLEDGE:
+ //do nothing, onMessage will ack on exit.
+ break;
+ }
+ }
+
+ if (msgId == MSG_COUNT)
+ {
+ //if this is the last message let the test complete.
+ if (expectedTotalNumberOfDeliveries == _totalNumDeliveries)
+ {
+ _awaitCompletion.countDown();
+ }
+ else
+ {
+ failAsyncTest("Last message received, but we have not had the " +
+ "expected number of total delivieres. Received " + _totalNumDeliveries + " Expecting : " + expectedTotalNumberOfDeliveries);
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ failAsyncTest(e.getMessage());
+ }
+ }
+ });
+ }
+
+ private void failAsyncTest(String msg)
+ {
+ _logger.error("Failing test because: " + msg);
+ _failMsg = msg;
+ _failed = true;
+ _awaitCompletion.countDown();
+ }
+
+ private void doSynchronousTest(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount,
+ final int expectedTotalNumberOfDeliveries, final List<Integer> redeliverMsgs) throws JMSException, AMQException, InterruptedException
+ {
+ if(deliveryMode == Session.AUTO_ACKNOWLEDGE
+ || deliveryMode == Session.DUPS_OK_ACKNOWLEDGE
+ || deliveryMode == org.apache.qpid.jms.Session.PRE_ACKNOWLEDGE
+ || deliveryMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
+ {
+ fail("Max Delivery feature is not supported with this acknowledgement mode" +
+ "when using synchronous message delivery.");
+ }
+
+ int _deliveryAttempts = 0; //number of times given message(s) have been seen
+ int _numMsgsToBeRedelivered = 0; //number of messages to rollback/recover
+ int _totalNumDeliveries = 0;
+ int _expectedMessage = 1;
+
+ while(!_failed)
+ {
+ Message message = consumer.receive(1000);
+
+ _totalNumDeliveries++;
+
+ if (message == null)
+ {
+ fail("Should not get null messages");
+ return;
+ }
+
+ try
+ {
+ int msgId = message.getIntProperty("count");
+
+ _logger.info("Received message: " + msgId);
+
+ //check the message is the one we expected
+ assertEquals("Unexpected message.", _expectedMessage, msgId);
+
+ _expectedMessage++;
+
+ //keep track of the overall deliveries to ensure we don't see more than expected
+ assertTrue("Exceeded expected total number of deliveries.",
+ _totalNumDeliveries <= expectedTotalNumberOfDeliveries );
+
+ //check if this message is one chosen to be rolled back / recovered
+ if(redeliverMsgs.contains(msgId))
+ {
+ //keep track of the number of messages we will have redelivered
+ //upon rollback/recover
+ _numMsgsToBeRedelivered++;
+
+ if(redeliverMsgs.contains(msgId +1))
+ {
+ //next message is going to be rolled back / recovered too.
+ //skip ahead to it
+ continue;
+ }
+
+ _deliveryAttempts++; //increment count of times the current rolled back/recovered message(s) have been seen
+
+ switch(deliveryMode)
+ {
+ case Session.SESSION_TRANSACTED:
+ session.rollback();
+ break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ session.recover();
+
+ //sleep then do a synchronous op to give the broker
+ //time to resend all the messages
+ Thread.sleep(500);
+ ((AMQSession<?,?>) session).sync();
+ break;
+ }
+
+ if( _deliveryAttempts >= maxDeliveryCount)
+ {
+ //the client should have rejected the latest messages upon then
+ //above recover/rollback, adjust counts to compensate
+ _deliveryAttempts = 0;
+ }
+ else
+ {
+ //the message(s) should be redelivered, adjust expected message
+ _expectedMessage -= _numMsgsToBeRedelivered;
+ }
+
+ //As we just rolled back / recovered, we must reset the
+ //count of messages expected to be redelivered
+ _numMsgsToBeRedelivered = 0;
+ }
+ else
+ {
+ //consume the message
+ switch(deliveryMode)
+ {
+ case Session.SESSION_TRANSACTED:
+ session.commit();
+ break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ message.acknowledge();
+ break;
+ }
+ }
+
+ if (msgId == MSG_COUNT)
+ {
+ //if this is the last message let the test complete.
+ assertTrue("Last message received, but we have not had the " +
+ "expected number of total delivieres",
+ expectedTotalNumberOfDeliveries == _totalNumDeliveries);
+
+ break;
+ }
+ }
+ catch (JMSException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ private boolean isDurSubTest()
+ {
+ return getTestQueueName().contains("DurableSubscription");
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java
new file mode 100644
index 0000000000..370e44b3d5
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+/**
+ * Ensures that queue specific session factory method {@link QueueConnection#createQueueSession()} create sessions
+ * of type {@link QueueSession} and that those sessions correctly restrict the available JMS operations
+ * operations to exclude those applicable to only topics.
+ *
+ * @see TopicSessionFactoryTest
+ */
+public class QueueSessionFactoryTest extends QpidBrokerTestCase
+{
+ public void testQueueSessionIsNotATopicSession() throws Exception
+ {
+ QueueSession queueSession = getQueueSession();
+ assertFalse(queueSession instanceof TopicSession);
+ }
+
+ public void testQueueSessionCannotCreateTemporaryTopics() throws Exception
+ {
+ QueueSession queueSession = getQueueSession();
+ try
+ {
+ queueSession.createTemporaryTopic();
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call createTemporaryTopic from QueueSession", s.getMessage());
+ }
+ }
+
+ public void testQueueSessionCannotCreateTopics() throws Exception
+ {
+ QueueSession queueSession = getQueueSession();
+ try
+ {
+ queueSession.createTopic("abc");
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call createTopic from QueueSession", s.getMessage());
+ }
+ }
+
+ public void testQueueSessionCannotCreateDurableSubscriber() throws Exception
+ {
+ QueueSession queueSession = getQueueSession();
+ Topic topic = getTestTopic();
+
+ try
+ {
+ queueSession.createDurableSubscriber(topic, "abc");
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call createDurableSubscriber from QueueSession", s.getMessage());
+ }
+ }
+
+ public void testQueueSessionCannoutUnsubscribe() throws Exception
+ {
+ QueueSession queueSession = getQueueSession();
+ try
+ {
+ queueSession.unsubscribe("abc");
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call unsubscribe from QueueSession", s.getMessage());
+ }
+ }
+
+ private QueueSession getQueueSession() throws Exception
+ {
+ QueueConnection queueConnection = (QueueConnection)getConnection();
+ return queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java
new file mode 100644
index 0000000000..ce15d452ab
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+
+/**
+ * Ensures that topic specific session factory method {@link TopicConnection#createTopicSession()} create sessions
+ * of type {@link TopicSession} and that those sessions correctly restrict the available JMS operations
+ * operations to exclude those applicable to only queues.
+ *
+ * @see QueueSessionFactoryTest
+ */
+public class TopicSessionFactoryTest extends QpidBrokerTestCase
+{
+ public void testTopicSessionIsNotAQueueSession() throws Exception
+ {
+ TopicSession topicSession = getTopicSession();
+ assertFalse(topicSession instanceof QueueSession);
+ }
+
+ public void testTopicSessionCannotCreateCreateBrowser() throws Exception
+ {
+ TopicSession topicSession = getTopicSession();
+ Queue queue = getTestQueue();
+ try
+ {
+ topicSession.createBrowser(queue);
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call createBrowser from TopicSession", s.getMessage());
+ }
+ }
+
+ public void testTopicSessionCannotCreateQueues() throws Exception
+ {
+ TopicSession topicSession = getTopicSession();
+ try
+ {
+ topicSession.createQueue("abc");
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call createQueue from TopicSession", s.getMessage());
+ }
+ }
+
+ public void testTopicSessionCannotCreateTemporaryQueues() throws Exception
+ {
+ TopicSession topicSession = getTopicSession();
+ try
+ {
+ topicSession.createTemporaryQueue();
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // PASS
+ assertEquals("Cannot call createTemporaryQueue from TopicSession", s.getMessage());
+ }
+ }
+
+ private TopicSession getTopicSession() throws Exception
+ {
+ TopicConnection topicConnection = (TopicConnection)getConnection();
+ return topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
new file mode 100644
index 0000000000..58f1bfe372
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class CloseWithBlockingReceiveTest extends QpidBrokerTestCase
+{
+
+
+ public void testReceiveReturnsNull() throws Exception
+ {
+ final Connection connection = getConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ connection.start();
+
+ Runnable r = new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(1000);
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ };
+ long startTime = System.currentTimeMillis();
+ Thread thread = new Thread(r);
+ thread.start();
+ try
+ {
+ consumer.receive(10000);
+ assertTrue(System.currentTimeMillis() - startTime < 10000);
+ }
+ finally
+ {
+ thread.join();
+ }
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
new file mode 100644
index 0000000000..4a92728d82
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.naming.NamingException;
+import org.apache.qpid.AMQConnectionClosedException;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.ConnectionException;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+/**
+ * Tests the behaviour of the client when the Broker terminates client connection
+ * by the Broker being shutdown gracefully or otherwise.
+ *
+ * @see ManagedConnectionMBeanTest
+ */
+public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+ private boolean _isExternalBroker;
+ private final RecordingExceptionListener _recordingExceptionListener = new RecordingExceptionListener();
+ private Session _session;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _connection = getConnection();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _connection.setExceptionListener(_recordingExceptionListener);
+
+ _isExternalBroker = isExternalBroker();
+ }
+
+ public void testClientCloseOnNormalBrokerShutdown() throws Exception
+ {
+ final Class<? extends Exception> expectedLinkedException = isBroker010() ? ConnectionException.class : AMQConnectionClosedException.class;
+
+ assertConnectionOpen();
+
+ stopBroker();
+
+ JMSException exception = _recordingExceptionListener.awaitException(10000);
+ assertConnectionCloseWasReported(exception, expectedLinkedException);
+ assertConnectionClosed();
+
+ ensureCanCloseWithoutException();
+ }
+
+ public void testClientCloseOnBrokerKill() throws Exception
+ {
+ final Class<? extends Exception> expectedLinkedException = isBroker010() ? ConnectionException.class : AMQDisconnectedException.class;
+
+ if (!_isExternalBroker)
+ {
+ return;
+ }
+
+ assertConnectionOpen();
+
+ killBroker();
+
+ JMSException exception = _recordingExceptionListener.awaitException(10000);
+ assertConnectionCloseWasReported(exception, expectedLinkedException);
+ assertConnectionClosed();
+
+ ensureCanCloseWithoutException();
+ }
+
+ private void ensureCanCloseWithoutException()
+ {
+ try
+ {
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ fail("Connection should close without exception" + e.getMessage());
+ }
+ }
+
+ private void assertConnectionCloseWasReported(JMSException exception, Class<? extends Exception> linkedExceptionClass)
+ {
+ assertNotNull("Broker shutdown should be reported to the client via the ExceptionListener", exception);
+ assertNotNull("JMXException should have linked exception", exception.getLinkedException());
+
+ assertEquals("Unexpected linked exception", linkedExceptionClass, exception.getLinkedException().getClass());
+ }
+
+ private void assertConnectionClosed()
+ {
+ assertTrue("Connection should be marked as closed", ((AMQConnection)_connection).isClosed());
+ }
+
+ private void assertConnectionOpen()
+ {
+ assertFalse("Connection should not be marked as closed", ((AMQConnection)_connection).isClosed());
+ }
+
+ private final class RecordingExceptionListener implements ExceptionListener
+ {
+ private final CountDownLatch _exceptionReceivedLatch = new CountDownLatch(1);
+ private volatile JMSException _exception;
+
+ @Override
+ public void onException(JMSException exception)
+ {
+ _exception = exception;
+ }
+
+ public JMSException awaitException(long timeoutInMillis) throws InterruptedException
+ {
+ _exceptionReceivedLatch.await(timeoutInMillis, TimeUnit.MILLISECONDS);
+ return _exception;
+ }
+ }
+
+
+ private class Listener implements MessageListener
+ {
+ int _messageCount;
+
+ @Override
+ public synchronized void onMessage(Message message)
+ {
+ _messageCount++;
+ }
+
+ public synchronized int getCount()
+ {
+ return _messageCount;
+ }
+ }
+
+ public void testNoDeliveryAfterBrokerClose() throws JMSException, NamingException, InterruptedException
+ {
+
+ Listener listener = new Listener();
+
+ Session session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer1 = session.createConsumer(getTestQueue());
+ consumer1.setMessageListener(listener);
+
+ MessageProducer producer = _session.createProducer(getTestQueue());
+ producer.send(_session.createTextMessage("test message"));
+
+ _connection.start();
+
+
+ synchronized (listener)
+ {
+ long currentTime = System.currentTimeMillis();
+ long until = currentTime + 2000l;
+ while(listener.getCount() == 0 && currentTime < until)
+ {
+ listener.wait(until - currentTime);
+ currentTime = System.currentTimeMillis();
+ }
+ }
+ assertEquals(1, listener.getCount());
+
+ Connection connection2 = getConnection();
+ Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createConsumer(getTestQueue());
+ consumer2.setMessageListener(listener);
+ connection2.start();
+
+
+ Connection connection3 = getConnection();
+ Session session3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer3 = session3.createConsumer(getTestQueue());
+ consumer3.setMessageListener(listener);
+ connection3.start();
+
+ assertEquals(1, listener.getCount());
+
+ stopBroker();
+
+ assertEquals(1, listener.getCount());
+
+
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java
new file mode 100644
index 0000000000..bf1fbbf1a3
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ConnectionFactoryTest extends QpidBrokerTestCase
+{
+
+ /**
+ * The username & password specified should not override the default
+ * specified in the URL.
+ */
+ public void testCreateConnectionWithUsernamePassword() throws Exception
+ {
+
+ String brokerUrl = getBroker().toString();
+ String URL = "amqp://guest:guest@clientID/test?brokerlist='" + brokerUrl + "'";
+ AMQConnectionFactory factory = new AMQConnectionFactory(URL);
+
+ AMQConnection con = (AMQConnection)factory.createConnection();
+ assertEquals("Usernames used is different from the one in URL","guest",con.getConnectionURL().getUsername());
+ assertEquals("Password used is different from the one in URL","guest",con.getConnectionURL().getPassword());
+
+ try
+ {
+ AMQConnection con2 = (AMQConnection)factory.createConnection("user","pass");
+ assertEquals("Usernames used is different from the one in URL","user",con2.getConnectionURL().getUsername());
+ assertEquals("Password used is different from the one in URL","pass",con2.getConnectionURL().getPassword());
+ }
+ catch(Exception e)
+ {
+ // ignore
+ }
+
+ AMQConnection con3 = (AMQConnection)factory.createConnection();
+ assertEquals("Usernames used is different from the one in URL","guest",con3.getConnectionURL().getUsername());
+ assertEquals("Password used is different from the one in URL","guest",con3.getConnectionURL().getPassword());
+ }
+
+ /**
+ * Verifies that a connection can be made using an instance of AMQConnectionFactory created with the
+ * default constructor and provided with the connection url via setter.
+ */
+ public void testCreatingConnectionWithInstanceMadeUsingDefaultConstructor() throws Exception
+ {
+ String broker = getBroker().toString();
+ String url = "amqp://guest:guest@clientID/test?brokerlist='" + broker + "'";
+
+ AMQConnectionFactory factory = new AMQConnectionFactory();
+ factory.setConnectionURLString(url);
+
+ Connection con = factory.createConnection();
+ con.close();
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
new file mode 100644
index 0000000000..6ea1582bb8
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class ConnectionStartTest extends QpidBrokerTestCase
+{
+
+ private String _broker = "vm://:1";
+
+ private AMQConnection _connection;
+ private Session _consumerSess;
+ private MessageConsumer _consumer;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ try
+ {
+
+
+ AMQConnection pubCon = (AMQConnection) getConnection("guest", "guest");
+
+ AMQQueue queue = new AMQQueue(pubCon,"ConnectionStartTest");
+
+ Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+
+ MessageProducer pub = pubSess.createProducer(queue);
+
+ _connection = (AMQConnection) getConnection("guest", "guest");
+
+ _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+
+ _consumer = _consumerSess.createConsumer(queue);
+
+ //publish after queue is created to ensure it can be routed as expected
+ pub.send(pubSess.createTextMessage("Initial Message"));
+
+ pubCon.close();
+
+ }
+ catch (Exception e)
+ {
+ _logger.error("Connection to " + _broker + " should succeed.", e);
+ fail("Connection to " + _broker + " should succeed. Reason: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ super.tearDown();
+ }
+
+ public void testSimpleReceiveConnection()
+ {
+ try
+ {
+ assertTrue("Connection should not be started", !_connection.started());
+ //Note that this next line will start the dispatcher in the session
+ // should really not be called before _connection start
+ //assertTrue("There should not be messages waiting for the consumer", _consumer.receiveNoWait() == null);
+ _connection.start();
+ assertTrue("There should be messages waiting for the consumer", _consumer.receive(10*1000) != null);
+ assertTrue("Connection should be started", _connection.started());
+
+ }
+ catch (JMSException e)
+ {
+ fail("An error occured during test because:" + e);
+ }
+
+ }
+
+ public void testMessageListenerConnection()
+ {
+ final CountDownLatch _gotMessage = new CountDownLatch(1);
+
+ try
+ {
+ assertTrue("Connection should not be started", !_connection.started());
+ _consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ assertTrue("Connection should be started", _connection.started());
+ assertEquals("Mesage Received", "Initial Message", ((TextMessage) message).getText());
+ _gotMessage.countDown();
+ }
+ catch (JMSException e)
+ {
+ fail("Couldn't get message text because:" + e.getCause());
+ }
+ }
+ });
+
+ assertTrue("Connection should not be started", !_connection.started());
+ _connection.start();
+ assertTrue("Connection should be started", _connection.started());
+
+ try
+ {
+ assertTrue("Listener was never called", _gotMessage.await(10 * 1000, TimeUnit.MILLISECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ fail("Timed out awaiting message via onMessage");
+ }
+
+ }
+ catch (JMSException e)
+ {
+ fail("Failed because:" + e.getCause());
+ }
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ConnectionStartTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
new file mode 100644
index 0000000000..ed03e83292
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -0,0 +1,378 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import javax.jms.Connection;
+import javax.jms.QueueSession;
+import javax.jms.TopicSession;
+
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ConnectionTest extends QpidBrokerTestCase
+{
+
+ private String _broker_NotRunning = "tcp://localhost:" + findFreePort();
+
+ private String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs";
+
+ public void testSimpleConnection() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ conn = new AMQConnection(getBroker().toString(), "guest", "guest", "fred", "test");
+ }
+ catch (Exception e)
+ {
+ fail("Connection to " + getBroker() + " should succeed. Reason: " + e);
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testDefaultExchanges() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ BrokerDetails broker = getBroker();
+ broker.setProperty(BrokerDetails.OPTIONS_RETRY, "1");
+ ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='"
+ + broker
+ + "'&defaultQueueExchange='test.direct'"
+ + "&defaultTopicExchange='test.topic'"
+ + "&temporaryQueueExchange='tmp.direct'"
+ + "&temporaryTopicExchange='tmp.topic'");
+
+ System.err.println(url.toString());
+ conn = new AMQConnection(url);
+
+
+ AMQSession sess = (AMQSession) conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ sess.declareExchange(new AMQShortString("test.direct"),
+ AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_CLASS), false);
+
+ sess.declareExchange(new AMQShortString("tmp.direct"),
+ AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_CLASS), false);
+
+ sess.declareExchange(new AMQShortString("tmp.topic"),
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS), false);
+
+ sess.declareExchange(new AMQShortString("test.topic"),
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS), false);
+
+ QueueSession queueSession = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ AMQQueue queue = (AMQQueue) queueSession.createQueue("MyQueue");
+
+ assertEquals(queue.getExchangeName().toString(), "test.direct");
+
+ AMQQueue tempQueue = (AMQQueue) queueSession.createTemporaryQueue();
+
+ assertEquals(tempQueue.getExchangeName().toString(), "tmp.direct");
+
+ queueSession.close();
+
+ TopicSession topicSession = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ AMQTopic topic = (AMQTopic) topicSession.createTopic("silly.topic");
+
+ assertEquals(topic.getExchangeName().toString(), "test.topic");
+
+ AMQTopic tempTopic = (AMQTopic) topicSession.createTemporaryTopic();
+
+ assertEquals(tempTopic.getExchangeName().toString(), "tmp.topic");
+
+ topicSession.close();
+
+ }
+ catch (Exception e)
+ {
+ fail("Connection to " + getBroker() + " should succeed. Reason: " + e);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ public void testPasswordFailureConnection() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ BrokerDetails broker = getBroker();
+ broker.setProperty(BrokerDetails.OPTIONS_RETRY, "0");
+ conn = new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + broker + "'");
+ fail("Connection should not be established password is wrong.");
+ }
+ catch (AMQConnectionFailureException amqe)
+ {
+ assertNotNull("No cause set:" + amqe.getMessage(), amqe.getCause());
+ assertTrue("Exception was wrong type", amqe.getCause() instanceof AMQException);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testConnectionFailure() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''");
+ fail("Connection should not be established");
+ }
+ catch (AMQException amqe)
+ {
+ if (!(amqe instanceof AMQConnectionFailureException))
+ {
+ fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+ public void testUnresolvedHostFailure() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''");
+ fail("Connection should not be established");
+ }
+ catch (AMQException amqe)
+ {
+ if (!(amqe instanceof AMQUnresolvedAddressException))
+ {
+ fail("Correct exception not thrown. Excpected 'AMQUnresolvedAddressException' got: " + amqe);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+ public void testUnresolvedVirtualHostFailure() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ BrokerDetails broker = getBroker();
+ broker.setProperty(BrokerDetails.OPTIONS_RETRY, "0");
+ conn = new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + broker + "'");
+ fail("Connection should not be established");
+ }
+ catch (AMQException amqe)
+ {
+ if (!(amqe instanceof AMQConnectionFailureException))
+ {
+ fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testClientIdCannotBeChanged() throws Exception
+ {
+ Connection connection = new AMQConnection(getBroker().toString(), "guest", "guest",
+ "fred", "test");
+ try
+ {
+ connection.setClientID("someClientId");
+ fail("No IllegalStateException thrown when resetting clientid");
+ }
+ catch (javax.jms.IllegalStateException e)
+ {
+ // PASS
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ public void testClientIdIsPopulatedAutomatically() throws Exception
+ {
+ Connection connection = new AMQConnection(getBroker().toString(), "guest", "guest",
+ null, "test");
+ try
+ {
+ assertNotNull(connection.getClientID());
+ }
+ finally
+ {
+ connection.close();
+ }
+ connection.close();
+ }
+
+ public void testUnsupportedSASLMechanism() throws Exception
+ {
+ BrokerDetails broker = getBroker();
+ broker.setProperty(BrokerDetails.OPTIONS_SASL_MECHS, "MY_MECH");
+
+ try
+ {
+ Connection connection = new AMQConnection(broker.toString(), "guest", "guest",
+ null, "test");
+ connection.close();
+ fail("The client should throw a ConnectionException stating the" +
+ " broker does not support the SASL mech specified by the client");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Unexpected exception message : " + e.getMessage(),
+ e.getMessage().contains("Client and broker have no SASL mechanisms in common."));
+ assertTrue("Unexpected exception message : " + e.getMessage(),
+ e.getMessage().contains("Client restricted itself to : MY_MECH"));
+
+ }
+ }
+
+ /**
+ * Tests that when the same user connects twice with same clientid, the second connection
+ * fails if the clientid verification feature is enabled (which uses a dummy 0-10 Session
+ * with the clientid as its name to detect the previous usage of the clientid by the user)
+ */
+ public void testClientIDVerificationForSameUser() throws Exception
+ {
+ setTestSystemProperty(ClientProperties.QPID_VERIFY_CLIENT_ID, "true");
+
+ BrokerDetails broker = getBroker();
+ try
+ {
+ Connection con = new AMQConnection(broker.toString(), "guest", "guest",
+ "client_id", "test");
+
+ Connection con2 = new AMQConnection(broker.toString(), "guest", "guest",
+ "client_id", "test");
+
+ fail("The client should throw a ConnectionException stating the" +
+ " client ID is not unique");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Incorrect exception thrown: " + e.getMessage(),
+ e.getMessage().contains("ClientID must be unique"));
+ }
+ }
+
+ /**
+ * Tests that when different users connects with same clientid, the second connection
+ * succeeds even though the clientid verification feature is enabled (which uses a dummy
+ * 0-10 Session with the clientid as its name; these are only verified unique on a
+ * per-principal basis)
+ */
+ public void testClientIDVerificationForDifferentUsers() throws Exception
+ {
+ setTestSystemProperty(ClientProperties.QPID_VERIFY_CLIENT_ID, "true");
+
+ BrokerDetails broker = getBroker();
+ try
+ {
+ Connection con = new AMQConnection(broker.toString(), "guest", "guest",
+ "client_id", "test");
+
+ Connection con2 = new AMQConnection(broker.toString(), "admin", "admin",
+ "client_id", "test");
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception thrown, client id was not unique but usernames were different! " + e.getMessage());
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ConnectionTest.class);
+ }
+
+ public void testExceptionWhenUserPassIsRequired() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ BrokerDetails broker = getBroker();
+ String url = "amqp:///test?brokerlist='" + broker + "?sasl_mechs='PLAIN%2520CRAM-MD5''";
+ conn = new AMQConnection(url);
+ conn.close();
+ fail("Exception should be thrown as user name and password is required");
+ }
+ catch (Exception e)
+ {
+ if (!e.getMessage().contains("Username and Password is required for the selected mechanism"))
+ {
+ if (conn != null && !conn.isClosed())
+ {
+ conn.close();
+ }
+ fail("Incorrect Exception thrown! The exception thrown is : " + e.getMessage());
+ }
+ }
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
new file mode 100644
index 0000000000..141de1e5a8
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.AMQConnectionClosedException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.ConnectionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExceptionListenerTest extends QpidBrokerTestCase
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExceptionListenerTest.class);
+
+ private volatile Throwable _lastExceptionListenerException = null;
+
+ public void testExceptionListenerHearsBrokerShutdown() throws Exception
+ {
+ final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1);
+ final AtomicInteger exceptionCounter = new AtomicInteger(0);
+ final ExceptionListener listener = new ExceptionListener()
+ {
+ public void onException(JMSException exception)
+ {
+ exceptionCounter.incrementAndGet();
+ _lastExceptionListenerException = exception;
+ exceptionReceivedLatch.countDown();
+ }
+ };
+
+ Connection connection = getConnection();
+ connection.setExceptionListener(listener);
+
+ stopBroker();
+
+ exceptionReceivedLatch.await(10, TimeUnit.SECONDS);
+
+ assertEquals("Unexpected number of exceptions received", 1, exceptionCounter.intValue());
+ LOGGER.debug("exception was", _lastExceptionListenerException);
+ assertNotNull("Exception should have cause", _lastExceptionListenerException.getCause());
+ Class<? extends Exception> expectedExceptionClass = isBroker010() ? ConnectionException.class : AMQConnectionClosedException.class;
+ assertEquals(expectedExceptionClass, _lastExceptionListenerException.getCause().getClass());
+ }
+
+ /**
+ * It is reasonable for an application to perform Connection#close within the exception
+ * listener. This test verifies that close is allowed, and proceeds without generating
+ * further exceptions.
+ */
+ public void testExceptionListenerClosesConnection_IsAllowed() throws Exception
+ {
+ final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1);
+ final Connection connection = getConnection();
+ final ExceptionListener listener = new ExceptionListener()
+ {
+ public void onException(JMSException exception)
+ {
+ try
+ {
+ connection.close();
+ // PASS
+ }
+ catch (Throwable t)
+ {
+ _lastExceptionListenerException = t;
+ }
+ finally
+ {
+ exceptionReceivedLatch.countDown();
+ }
+ }
+ };
+ connection.setExceptionListener(listener);
+
+
+ stopBroker();
+
+ boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS);
+ assertTrue("Exception listener did not hear exception within timeout", exceptionReceived);
+ assertNull("Connection#close() should not have thrown exception", _lastExceptionListenerException);
+ }
+
+ /**
+ * Spring's SingleConnectionFactory installs an ExceptionListener that calls stop()
+ * and ignores any IllegalStateException that result. This test serves to test this
+ * scenario.
+ */
+ public void testExceptionListenerStopsConnection_ThrowsIllegalStateException() throws Exception
+ {
+ final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1);
+ final Connection connection = getConnection();
+ final ExceptionListener listener = new ExceptionListener()
+ {
+ public void onException(JMSException exception)
+ {
+ try
+ {
+ connection.stop();
+ fail("Exception not thrown");
+ }
+ catch (IllegalStateException ise)
+ {
+ // PASS
+ }
+ catch (Throwable t)
+ {
+ _lastExceptionListenerException = t;
+ }
+ finally
+ {
+ exceptionReceivedLatch.countDown();
+ }
+ }
+ };
+ connection.setExceptionListener(listener);
+
+ stopBroker();
+
+ boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS);
+ assertTrue("Exception listener did not hear exception within timeout", exceptionReceived);
+ assertNull("Connection#stop() should not have thrown unexpected exception", _lastExceptionListenerException);
+ }
+
+ /**
+ * This test reproduces a deadlock that was the subject of a support call. A Spring based
+ * application was using SingleConnectionFactory. It installed an ExceptionListener that
+ * stops and closes the connection in response to any exception. On receipt of a message
+ * the application would create a new session then send a response message (within onMessage).
+ * It appears that a misconfiguration in the application meant that some of these messages
+ * were bounced (no-route). Bounces are treated like connection exceptions and are passed
+ * back to the application via the ExceptionListener. The deadlock occurred between the
+ * ExceptionListener's call to stop() and the MessageListener's attempt to create a new
+ * session.
+ */
+ public void testExceptionListenerConnectionStopDeadlock() throws Exception
+ {
+ Queue messageQueue = getTestQueue();
+
+ Map<String, String> options = new HashMap<String, String>();
+ options.put(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(false));
+
+ final Connection connection = getConnectionWithOptions(options);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ session.createConsumer(messageQueue).close(); // Create queue by side-effect
+
+ // Put 10 messages onto messageQueue
+ sendMessage(session, messageQueue, 10);
+
+ // Install an exception listener that stops/closes the connection on receipt of 2nd AMQNoRouteException.
+ // (Triggering on the 2nd (rather than 1st) seems to increase the probability that the test ends in deadlock,
+ // at least on my machine).
+ final CountDownLatch exceptionReceivedLatch = new CountDownLatch(2);
+ final ExceptionListener listener = new ExceptionListener()
+ {
+ public void onException(JMSException exception)
+ {
+ try
+ {
+ assertNotNull("JMS Exception must have cause", exception.getCause() );
+ assertEquals("JMS Exception is of wrong type", AMQNoRouteException.class, exception.getCause().getClass());
+ exceptionReceivedLatch.countDown();
+ if (exceptionReceivedLatch.getCount() == 0)
+ {
+ connection.stop(); // ** Deadlock
+ connection.close();
+ }
+ }
+ catch (Throwable t)
+ {
+ _lastExceptionListenerException = t;
+ }
+ }
+ };
+ connection.setExceptionListener(listener);
+
+ // Create a message listener that receives from testQueue and tries to forward them to unknown queue (thus
+ // provoking AMQNoRouteException exceptions to be delivered to the ExceptionListener).
+ final Queue unknownQueue = session.createQueue(getTestQueueName() + "_unknown");;
+ MessageListener redirectingMessageListener = new MessageListener()
+ {
+ @Override
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ Session mlSession = connection.createSession(true, Session.SESSION_TRANSACTED); // ** Deadlock
+ mlSession.createProducer(unknownQueue).send(msg);
+ mlSession.commit();
+ }
+ catch (JMSException je)
+ {
+ // Connection is closed by the listener, so exceptions here are expected.
+ LOGGER.debug("Expected exception - message listener got exception", je);
+ }
+ }
+ };
+
+ MessageConsumer consumer = session.createConsumer(messageQueue);
+ consumer.setMessageListener(redirectingMessageListener);
+ connection.start();
+
+ // Await the 2nd exception
+ boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS);
+ assertTrue("Exception listener did not hear exception within timeout", exceptionReceived);
+ assertNull("Exception listener should not have had experienced exception", _lastExceptionListenerException);
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
new file mode 100644
index 0000000000..99dc5ff216
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
@@ -0,0 +1,334 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+public class ObjectMessageTest extends QpidBrokerTestCase implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(ObjectMessageTest.class);
+
+ private AMQConnection connection;
+ private AMQDestination destination;
+ private AMQSession session;
+ private MessageProducer producer;
+ private Serializable[] data;
+ private volatile boolean waiting;
+ private int received;
+ private final ArrayList items = new ArrayList();
+
+ private String _broker = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ connection = (AMQConnection) getConnection("guest", "guest");
+ destination = new AMQQueue(connection, randomize("LatencyTest"), true);
+ session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ // set up a consumer
+ session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+
+ // create a publisher
+ producer = session.createProducer(destination, false, false);
+ A a1 = new A(1, "A");
+ A a2 = new A(2, "a");
+ B b = new B(1, "B");
+ C c = new C();
+ c.put("A1", a1);
+ c.put("a2", a2);
+ c.put("B", b);
+ c.put("String", "String");
+
+ data = new Serializable[] { a1, a2, b, c, "Hello World!", new Integer(1001) };
+ }
+
+ protected void tearDown() throws Exception
+ {
+ close();
+ super.tearDown();
+ }
+
+ public ObjectMessageTest()
+ { }
+
+ ObjectMessageTest(String broker) throws Exception
+ {
+ _broker = broker;
+ }
+
+ public void testSendAndReceive() throws Exception
+ {
+ try
+ {
+ send();
+ waitUntilReceived(data.length);
+ check();
+ _logger.info("All " + data.length + " items matched.");
+ }
+ catch (Exception e)
+ {
+ _logger.error("This Test should succeed but failed", e);
+ fail("This Test should succeed but failed due to: " + e);
+ }
+ }
+
+ public void testSetObjectPropertyForString() throws Exception
+ {
+ String testStringProperty = "TestStringProperty";
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestStringProperty", testStringProperty);
+ assertEquals(testStringProperty, msg.getObjectProperty("TestStringProperty"));
+ }
+
+ public void testSetObjectPropertyForBoolean() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestBooleanProperty", Boolean.TRUE);
+ assertEquals(Boolean.TRUE, msg.getObjectProperty("TestBooleanProperty"));
+ }
+
+ public void testSetObjectPropertyForByte() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestByteProperty", Byte.MAX_VALUE);
+ assertEquals(Byte.MAX_VALUE, msg.getObjectProperty("TestByteProperty"));
+ }
+
+ public void testSetObjectPropertyForShort() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestShortProperty", Short.MAX_VALUE);
+ assertEquals(Short.MAX_VALUE, msg.getObjectProperty("TestShortProperty"));
+ }
+
+ public void testSetObjectPropertyForInteger() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestIntegerProperty", Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE, msg.getObjectProperty("TestIntegerProperty"));
+ }
+
+ public void testSetObjectPropertyForDouble() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestDoubleProperty", Double.MAX_VALUE);
+ assertEquals(Double.MAX_VALUE, msg.getObjectProperty("TestDoubleProperty"));
+ }
+
+ public void testSetObjectPropertyForFloat() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestFloatProperty", Float.MAX_VALUE);
+ assertEquals(Float.MAX_VALUE, msg.getObjectProperty("TestFloatProperty"));
+ }
+
+ public void testSetObjectPropertyForByteArray() throws Exception
+ {
+ byte[] array = { 1, 2, 3, 4, 5 };
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestByteArrayProperty", array);
+ assertTrue(Arrays.equals(array, (byte[]) msg.getObjectProperty("TestByteArrayProperty")));
+ }
+
+ public void testSetObjectForNull() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage();
+ msg.setObject(null);
+ assertNull(msg.getObject());
+ }
+
+ private void send() throws Exception
+ {
+ for (int i = 0; i < data.length; i++)
+ {
+ ObjectMessage msg;
+ if ((i % 2) == 0)
+ {
+ msg = session.createObjectMessage(data[i]);
+ }
+ else
+ {
+ msg = session.createObjectMessage();
+ msg.setObject(data[i]);
+ }
+
+ producer.send(msg);
+ }
+ }
+
+ public void check() throws Exception
+ {
+ Object[] actual = (Object[]) items.toArray();
+ if (actual.length != data.length)
+ {
+ throw new Exception("Expected " + data.length + " objects, got " + actual.length);
+ }
+
+ for (int i = 0; i < data.length; i++)
+ {
+ if (actual[i] instanceof Exception)
+ {
+ throw new Exception("Error on receive of " + data[i], ((Exception) actual[i]));
+ }
+
+ if (actual[i] == null)
+ {
+ throw new Exception("Expected " + data[i] + " got null");
+ }
+
+ if (!data[i].equals(actual[i]))
+ {
+ throw new Exception("Expected " + data[i] + " got " + actual[i]);
+ }
+ }
+ }
+
+ private void close() throws Exception
+ {
+ session.close();
+ connection.close();
+ }
+
+ private synchronized void waitUntilReceived(int count) throws InterruptedException
+ {
+ waiting = true;
+ while (received < count)
+ {
+ wait();
+ }
+
+ waiting = false;
+ }
+
+ public void onMessage(Message message)
+ {
+
+ try
+ {
+ if (message instanceof ObjectMessage)
+ {
+ items.add(((ObjectMessage) message).getObject());
+ }
+ else
+ {
+ _logger.error("ERROR: Got " + message.getClass().getName() + " not ObjectMessage");
+ items.add(message);
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error getting object from message", e);
+ items.add(e);
+ }
+
+ synchronized (this)
+ {
+ received++;
+ notify();
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ String broker = (argv.length > 0) ? argv[0] : "vm://:1";
+ if ("-help".equals(broker))
+ {
+ System.out.println("Usage: <broker>");
+ }
+
+ new ObjectMessageTest(broker).testSendAndReceive();
+ }
+
+ private static class A implements Serializable
+ {
+ private String sValue;
+ private int iValue;
+
+ A(int i, String s)
+ {
+ sValue = s;
+ iValue = i;
+ }
+
+ public int hashCode()
+ {
+ return iValue;
+ }
+
+ public boolean equals(Object o)
+ {
+ return (o instanceof A) && equals((A) o);
+ }
+
+ protected boolean equals(A a)
+ {
+ return areEqual(a.sValue, sValue) && (a.iValue == iValue);
+ }
+ }
+
+ private static class B extends A
+ {
+ private long time;
+
+ B(int i, String s)
+ {
+ super(i, s);
+ time = System.currentTimeMillis();
+ }
+
+ protected boolean equals(A a)
+ {
+ return super.equals(a) && (a instanceof B) && (time == ((B) a).time);
+ }
+ }
+
+ private static class C extends HashMap implements Serializable
+ { }
+
+ private static boolean areEqual(Object a, Object b)
+ {
+ return (a == null) ? (b == null) : a.equals(b);
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
new file mode 100644
index 0000000000..3ffa73b9b7
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -0,0 +1,198 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.protocol;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+public class AMQProtocolSessionTest extends QpidBrokerTestCase
+{
+ private static class TestProtocolSession extends AMQProtocolSession
+ {
+
+ public TestProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ {
+ super(protocolHandler,connection);
+ }
+
+ public TestNetworkConnection getNetworkConnection()
+ {
+ return (TestNetworkConnection) getProtocolHandler().getNetworkConnection();
+ }
+
+ public AMQShortString genQueueName()
+ {
+ return generateQueueName();
+ }
+ }
+
+ private TestProtocolSession _testSession;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con);
+ protocolHandler.setNetworkConnection(new TestNetworkConnection());
+
+ //don't care about the values set here apart from the dummy IoSession
+ _testSession = new TestProtocolSession(protocolHandler , con);
+ }
+
+ public void testTemporaryQueueWildcard() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(1234), "tmp_0_0_0_0_0_0_0_0_1234_1");
+ }
+
+ public void testTemporaryQueueLocalhostAddr() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234), "tmp_127_0_0_1_1234_1");
+ }
+
+ public void testTemporaryQueueLocalhostName() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(InetAddress.getByName("localhost"), 1234), "tmp_localhost_127_0_0_1_1234_1");
+ }
+
+ public void testTemporaryQueueInet4() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(InetAddress.getByName("192.168.1.2"), 1234), "tmp_192_168_1_2_1234_1");
+ }
+
+ public void testTemporaryQueueInet6() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(InetAddress.getByName("1080:0:0:0:8:800:200C:417A"), 1234), "tmp_1080_0_0_0_8_800_200c_417a_1234_1");
+ }
+
+ private void checkTempQueueName(SocketAddress address, String queueName)
+ {
+ _testSession.getNetworkConnection().setLocalAddress(address);
+ assertEquals("Wrong queue name", queueName, _testSession.genQueueName().asString());
+ }
+
+ private static class TestNetworkConnection implements NetworkConnection
+ {
+ private String _remoteHost = "127.0.0.1";
+ private String _localHost = "127.0.0.1";
+ private int _port = 1;
+ private SocketAddress _localAddress = null;
+ private final Sender<ByteBuffer> _sender;
+
+ public TestNetworkConnection()
+ {
+ _sender = new Sender<ByteBuffer>()
+ {
+
+ public void setIdleTimeout(int i)
+ {
+
+ }
+
+ public void send(ByteBuffer msg)
+ {
+
+ }
+
+ public void flush()
+ {
+
+ }
+
+ public void close()
+ {
+
+ }
+ };
+ }
+
+ @Override
+ public SocketAddress getLocalAddress()
+ {
+ return (_localAddress != null) ? _localAddress : new InetSocketAddress(_localHost, _port);
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress()
+ {
+ return new InetSocketAddress(_remoteHost, _port);
+ }
+
+ @Override
+ public void setMaxReadIdle(int idleTime)
+ {
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ return null;
+ }
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return 0;
+ }
+
+ @Override
+ public void setMaxWriteIdle(int idleTime)
+ {
+ }
+
+ @Override
+ public void close()
+ {
+ }
+
+ public void setLocalAddress(SocketAddress address)
+ {
+ _localAddress = address;
+ }
+
+ public Sender<ByteBuffer> getSender()
+ {
+ return _sender;
+ }
+
+ public void start()
+ {
+ }
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
new file mode 100644
index 0000000000..41ab35f233
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.test.unit.client.temporaryqueue;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+
+/**
+ * Tests the behaviour of {@link TemporaryQueue}.
+ */
+public class TemporaryQueueTest extends QpidBrokerTestCase
+{
+ /**
+ * Tests the basic produce/consume behaviour of a temporary queue.
+ */
+ public void testMessageDeliveryUsingTemporaryQueue() throws Exception
+ {
+ final Connection conn = getConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+ final MessageProducer producer = session.createProducer(queue);
+ final MessageConsumer consumer = session.createConsumer(queue);
+ conn.start();
+ producer.send(session.createTextMessage("hello"));
+ TextMessage tm = (TextMessage) consumer.receive(2000);
+ assertNotNull("Message not received", tm);
+ assertEquals("hello", tm.getText());
+ }
+
+ /**
+ * Tests that a temporary queue cannot be used by another {@link Session}.
+ */
+ public void testUseFromAnotherSessionProhibited() throws Exception
+ {
+ final Connection conn = getConnection();
+ final Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session1.createTemporaryQueue();
+ assertNotNull(queue);
+
+ try
+ {
+ session2.createConsumer(queue);
+ fail("Expected a JMSException when subscribing to a temporary queue created on a different session");
+ }
+ catch (JMSException je)
+ {
+ //pass
+ assertEquals("Cannot consume from a temporary destination created on another session", je.getMessage());
+ }
+ }
+
+ /**
+ * Tests that the client is able to explicitly delete a temporary queue using
+ * {@link TemporaryQueue#delete()} and is prevented from deleting one that
+ * still has consumers.
+ *
+ * Note: Under < 0-10 {@link TemporaryQueue#delete()} only marks the queue as deleted
+ * on the client. 0-10 causes the queue to be deleted from the Broker.
+ */
+ public void testExplictTemporaryQueueDeletion() throws Exception
+ {
+ final Connection conn = getConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; // Required to observe the queue binding on the Broker
+ final TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+ final MessageConsumer consumer = session.createConsumer(queue);
+ conn.start();
+
+ assertTrue("Queue should be bound", amqSession.isQueueBound((AMQDestination)queue));
+
+ try
+ {
+ queue.delete();
+ fail("Expected JMSException : should not be able to delete while there are active consumers");
+ }
+ catch (JMSException je)
+ {
+ //pass
+ assertEquals("Temporary Queue has consumers so cannot be deleted", je.getMessage());
+ }
+ consumer.close();
+
+ // Now deletion should succeed.
+ queue.delete();
+
+ try
+ {
+ session.createConsumer(queue);
+ fail("Exception not thrown");
+ }
+ catch (JMSException je)
+ {
+ //pass
+ assertEquals("Cannot consume from a deleted destination", je.getMessage());
+ }
+
+ if (isBroker010())
+ {
+ assertFalse("Queue should no longer be bound", amqSession.isQueueBound((AMQDestination)queue));
+ }
+ }
+
+ /**
+ * Tests that a temporary queue remains available for reuse even after its initial
+ * consumer has disconnected.
+ *
+ * This test would fail under < 0-10 as their temporary queues are deleted automatically
+ * (broker side) after the last consumer disconnects (so message2 would be lost). For this
+ * reason this test is excluded from those profiles.
+ */
+ public void testTemporaryQueueReused() throws Exception
+ {
+ final Connection conn = getConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+
+ final MessageProducer producer1 = session.createProducer(queue);
+ final MessageConsumer consumer1 = session.createConsumer(queue);
+ conn.start();
+ producer1.send(session.createTextMessage("message1"));
+ producer1.send(session.createTextMessage("message2"));
+ TextMessage tm = (TextMessage) consumer1.receive(2000);
+ assertNotNull("Message not received by first consumer", tm);
+ assertEquals("message1", tm.getText());
+ consumer1.close();
+
+ final MessageConsumer consumer2 = session.createConsumer(queue);
+ conn.start();
+ tm = (TextMessage) consumer2.receive(2000);
+ assertNotNull("Message not received by second consumer", tm);
+ assertEquals("message2", tm.getText());
+ consumer2.close();
+ }
+}