diff options
Diffstat (limited to 'qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client')
14 files changed, 3076 insertions, 0 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java new file mode 100644 index 0000000000..0d81b66be0 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.JMSException; +import javax.jms.QueueReceiver; +import javax.jms.TopicSubscriber; + +/** + * Tests for QueueReceiver and TopicSubscriber creation methods on AMQSession + */ +public class AMQSessionTest extends QpidBrokerTestCase +{ + + private static AMQSession _session; + private static AMQTopic _topic; + private static AMQQueue _queue; + private static AMQConnection _connection; + + protected void setUp() throws Exception + { + super.setUp(); + _connection = (AMQConnection) getConnection("guest", "guest"); + _topic = new AMQTopic(_connection,"mytopic"); + _queue = new AMQQueue(_connection,"myqueue"); + _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + } + + protected void tearDown() throws Exception + { + try + { + _connection.close(); + } + catch (JMSException e) + { + //just close + } + super.tearDown(); + } + + public void testCreateSubscriber() throws JMSException + { + TopicSubscriber subscriber = _session.createSubscriber(_topic); + assertEquals("Topic names should match from TopicSubscriber", _topic.getTopicName(), subscriber.getTopic().getTopicName()); + + subscriber = _session.createSubscriber(_topic, "abc", false); + assertEquals("Topic names should match from TopicSubscriber with selector", _topic.getTopicName(), subscriber.getTopic().getTopicName()); + } + + public void testCreateDurableSubscriber() throws JMSException + { + TopicSubscriber subscriber = _session.createDurableSubscriber(_topic, "mysubname"); + assertEquals("Topic names should match from durable TopicSubscriber", _topic.getTopicName(), subscriber.getTopic().getTopicName()); + + subscriber = _session.createDurableSubscriber(_topic, "mysubname2", "abc", false); + assertEquals("Topic names should match from durable TopicSubscriber with selector", _topic.getTopicName(), subscriber.getTopic().getTopicName()); + _session.unsubscribe("mysubname"); + _session.unsubscribe("mysubname2"); + } + + public void testCreateQueueReceiver() throws JMSException + { + QueueReceiver receiver = _session.createQueueReceiver(_queue); + assertEquals("Queue names should match from QueueReceiver", _queue.getQueueName(), receiver.getQueue().getQueueName()); + + receiver = _session.createQueueReceiver(_queue, "abc"); + assertEquals("Queue names should match from QueueReceiver with selector", _queue.getQueueName(), receiver.getQueue().getQueueName()); + } + + public void testCreateReceiver() throws JMSException + { + QueueReceiver receiver = _session.createReceiver(_queue); + assertEquals("Queue names should match from QueueReceiver", _queue.getQueueName(), receiver.getQueue().getQueueName()); + + receiver = _session.createReceiver(_queue, "abc"); + assertEquals("Queue names should match from QueueReceiver with selector", _queue.getQueueName(), receiver.getQueue().getQueueName()); + } + +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java new file mode 100644 index 0000000000..77df6c58d9 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import java.io.IOException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.BindingURL; + +import javax.jms.Connection; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Session; + +public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase +{ + private JMXTestUtils _jmxUtils; + + @Override + public void setUp() throws Exception + { + getBrokerConfiguration().addJmxManagementConfiguration(); + + _jmxUtils = new JMXTestUtils(this); + + super.setUp(); + _jmxUtils.open(); + } + + @Override + public void tearDown() throws Exception + { + try + { + if (_jmxUtils != null) + { + _jmxUtils.close(); + } + } + finally + { + super.tearDown(); + } + } + + /* + * Tests to validate that setting the respective qpid.declare_queues, + * qpid.declare_exchanges system properties functions as expected. + */ + + public void testQueueNotDeclaredDuringConsumerCreation() throws Exception + { + setSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false"); + + Connection connection = getConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(getTestQueueName()); + + try + { + session.createConsumer(queue); + fail("JMSException should be thrown as the queue does not exist"); + } + catch (JMSException e) + { + checkExceptionErrorCode(e, AMQConstant.NOT_FOUND); + } + } + + public void testExchangeNotDeclaredDuringConsumerCreation() throws Exception + { + setSystemProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "false"); + + Connection connection = getConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String exchangeName = getTestQueueName(); + Queue queue = session.createQueue("direct://" + exchangeName + "/queue/queue"); + + try + { + session.createConsumer(queue); + fail("JMSException should be thrown as the exchange does not exist"); + } + catch (JMSException e) + { + checkExceptionErrorCode(e, AMQConstant.NOT_FOUND); + } + + //verify the exchange was not declared + String exchangeObjectName = _jmxUtils.getExchangeObjectName("test", exchangeName); + assertFalse("exchange should not exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName)); + } + + /** + * Checks that setting {@value ClientProperties#QPID_DECLARE_EXCHANGES_PROP_NAME} false results in + * disabling implicit ExchangeDeclares during producer creation when using a {@link BindingURL} + */ + public void testExchangeNotDeclaredDuringProducerCreation() throws Exception + { + Connection connection = getConnection(); + Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String exchangeName1 = getTestQueueName() + "1"; + + + Queue queue = session1.createQueue("direct://" + exchangeName1 + "/queue/queue"); + session1.createProducer(queue); + + //close the session to ensure any previous commands were fully processed by + //the broker before observing their effect + session1.close(); + + //verify the exchange was declared + String exchangeObjectName = _jmxUtils.getExchangeObjectName("test", exchangeName1); + assertTrue("exchange should exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName)); + + //Now disable the implicit exchange declares and try again + setSystemProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "false"); + + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String exchangeName2 = getTestQueueName() + "2"; + + Queue queue2 = session2.createQueue("direct://" + exchangeName2 + "/queue/queue"); + session2.createProducer(queue2); + + //close the session to ensure any previous commands were fully processed by + //the broker before observing their effect + session2.close(); + + //verify the exchange was not declared + String exchangeObjectName2 = _jmxUtils.getExchangeObjectName("test", exchangeName2); + assertFalse("exchange should not exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName2)); + } + + public void testQueueNotBoundDuringConsumerCreation() throws Exception + { + setSystemProperty(ClientProperties.QPID_BIND_QUEUES_PROP_NAME, "false"); + setSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "true"); + + Connection connection = getConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(getTestQueueName()); + session.createConsumer(queue); + + try + { + session.createProducer(queue).send(session.createMessage()); + fail("JMSException should be thrown as the queue does not exist"); + } + catch (InvalidDestinationException ide) + { + //PASS + } + } + private void checkExceptionErrorCode(JMSException original, AMQConstant code) + { + Exception linked = original.getLinkedException(); + assertNotNull("Linked exception should have been set", linked); + assertTrue("Linked exception should be an AMQException", linked instanceof AMQException); + assertEquals("Error code should be " + code.getCode(), code, ((AMQException) linked).getErrorCode()); + } + + /* + * Tests to validate that the custom exchanges declared by the client during + * consumer and producer creation have the expected properties. + */ + + public void testPropertiesOfCustomExchangeDeclaredDuringProducerCreation() throws Exception + { + implTestPropertiesOfCustomExchange(true, false); + } + + public void testPropertiesOfCustomExchangeDeclaredDuringConsumerCreation() throws Exception + { + implTestPropertiesOfCustomExchange(false, true); + } + + private void implTestPropertiesOfCustomExchange(boolean createProducer, boolean createConsumer) throws Exception + { + Connection connection = getConnection(); + + Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String exchangeName1 = getTestQueueName() + "1"; + String queueName1 = getTestQueueName() + "1"; + + Queue queue = session1.createQueue("direct://" + exchangeName1 + "/" + queueName1 + "/" + queueName1 + "?" + BindingURL.OPTION_EXCHANGE_AUTODELETE + "='true'"); + if(createProducer) + { + session1.createProducer(queue); + } + + if(createConsumer) + { + session1.createConsumer(queue); + } + session1.close(); + + //verify the exchange was declared to expectation + verifyDeclaredExchange(exchangeName1, true, false); + + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String exchangeName2 = getTestQueueName() + "2"; + String queueName2 = getTestQueueName() + "2"; + + Queue queue2 = session2.createQueue("direct://" + exchangeName2 + "/" + queueName2 + "/" + queueName2 + "?" + BindingURL.OPTION_EXCHANGE_DURABLE + "='true'"); + if(createProducer) + { + session2.createProducer(queue2); + } + + if(createConsumer) + { + session2.createConsumer(queue2); + } + session2.close(); + + //verify the exchange was declared to expectation + verifyDeclaredExchange(exchangeName2, false, true); + } + + private void verifyDeclaredExchange(String exchangeName, boolean isAutoDelete, boolean isDurable) throws IOException + { + String exchangeObjectName = _jmxUtils.getExchangeObjectName("test", exchangeName); + assertTrue("exchange should exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName)); + ManagedExchange exchange = _jmxUtils.getManagedExchange(exchangeName); + assertEquals(isAutoDelete, exchange.isAutoDelete()); + assertEquals(isDurable,exchange.isDurable()); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java new file mode 100644 index 0000000000..5e1e38106a --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java @@ -0,0 +1,660 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.RejectBehaviour; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; + +/** + * Test that the MaxRedelivery feature works as expected, allowing the client to reject + * messages during rollback/recover whilst specifying they not be requeued if delivery + * to an application has been attempted a specified number of times. + * + * General approach: specify a set of messages which will cause the test client to then + * deliberately rollback/recover the session after consuming, and monitor that they are + * re-delivered the specified number of times before the client rejects them without requeue + * and then verify that they are not subsequently redelivered. + * + * Additionally, the queue used in the test is configured for DLQ'ing, and the test verifies + * that the messages rejected without requeue are then present on the appropriate DLQ. + */ +public class MaxDeliveryCountTest extends QpidBrokerTestCase +{ + private static final Logger _logger = Logger.getLogger(MaxDeliveryCountTest.class); + private boolean _failed; + private String _failMsg; + private static final int MSG_COUNT = 15; + private static final int MAX_DELIVERY_COUNT = 2; + private CountDownLatch _awaitCompletion; + + /** index numbers of messages to be redelivered */ + private final List<Integer> _redeliverMsgs = Arrays.asList(1, 2, 5, 14); + + public void setUp() throws Exception + { + //enable DLQ/maximumDeliveryCount support for all queues at the vhost level + + TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration(); + setTestSystemProperty("queue.deadLetterQueueEnabled","true"); + setTestSystemProperty("queue.maximumDeliveryAttempts", String.valueOf(MAX_DELIVERY_COUNT)); + + //Ensure management is on + brokerConfiguration.addJmxManagementConfiguration(); + + // Set client-side flag to allow the server to determine if messages + // dead-lettered or requeued. + if (!isBroker010()) + { + setTestClientSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.SERVER.toString()); + } + super.setUp(); + + boolean durableSub = isDurSubTest(); + + //declare the test queue + Connection consumerConnection = getConnection(); + Session consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Destination destination = getDestination(consumerSession, durableSub); + if(durableSub) + { + consumerSession.createDurableSubscriber((Topic)destination, getName()).close(); + } + else + { + consumerSession.createConsumer(destination).close(); + } + + consumerConnection.close(); + + //Create Producer put some messages on the queue + Connection producerConnection = getConnection(); + producerConnection.start(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(getDestination(producerSession, durableSub)); + + for (int count = 1; count <= MSG_COUNT; count++) + { + Message msg = producerSession.createTextMessage(generateContent(count)); + msg.setIntProperty("count", count); + producer.send(msg); + } + + producerConnection.close(); + + _failed = false; + _awaitCompletion = new CountDownLatch(1); + } + + private Destination getDestination(Session consumerSession, boolean durableSub) throws JMSException + { + if(durableSub) + { + return consumerSession.createTopic(getTestQueueName()); + } + else + { + return consumerSession.createQueue(getTestQueueName()); + } + } + + private String generateContent(int count) + { + return "Message " + count + " content."; + } + + /** + * Test that Max Redelivery is enforced when using onMessage() on a + * Client-Ack session. + */ + public void testAsynchronousClientAckSession() throws Exception + { + doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, false, false); + } + + /** + * Test that Max Redelivery is enforced when using onMessage() on a + * transacted session. + */ + public void testAsynchronousTransactedSession() throws Exception + { + doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false, false); + } + + /** + * Test that Max Redelivery is enforced when using onMessage() on an + * Auto-Ack session. + */ + public void testAsynchronousAutoAckSession() throws Exception + { + doTest(Session.AUTO_ACKNOWLEDGE, _redeliverMsgs, false, false); + } + + /** + * Test that Max Redelivery is enforced when using onMessage() on a + * Dups-OK session. + */ + public void testAsynchronousDupsOkSession() throws Exception + { + doTest(Session.DUPS_OK_ACKNOWLEDGE, _redeliverMsgs, false, false); + } + + /** + * Test that Max Redelivery is enforced when using recieve() on a + * Client-Ack session. + */ + public void testSynchronousClientAckSession() throws Exception + { + doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, true, false); + } + + /** + * Test that Max Redelivery is enforced when using recieve() on a + * transacted session. + */ + public void testSynchronousTransactedSession() throws Exception + { + doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true, false); + } + + public void testDurableSubscription() throws Exception + { + doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false, true); + } + + public void testWhenBrokerIsRestartedAfterEnqeuingMessages() throws Exception + { + restartBroker(); + + doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true, false); + } + + private void doTest(final int deliveryMode, final List<Integer> redeliverMsgs, final boolean synchronous, final boolean durableSub) throws Exception + { + final Connection clientConnection = getConnection(); + + final boolean transacted = deliveryMode == Session.SESSION_TRANSACTED ? true : false; + final Session clientSession = clientConnection.createSession(transacted, deliveryMode); + + MessageConsumer consumer; + Destination dest = getDestination(clientSession, durableSub); + AMQQueue checkQueue; + if(durableSub) + { + consumer = clientSession.createDurableSubscriber((Topic)dest, getName()); + checkQueue = new AMQQueue("amq.topic", "clientid" + ":" + getName()); + } + else + { + consumer = clientSession.createConsumer(dest); + checkQueue = (AMQQueue) dest; + } + + assertEquals("The queue should have " + MSG_COUNT + " msgs at start", + MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue)); + + clientConnection.start(); + + int expectedDeliveries = MSG_COUNT + ((MAX_DELIVERY_COUNT -1) * redeliverMsgs.size()); + + if(synchronous) + { + doSynchronousTest(clientSession, consumer, clientSession.getAcknowledgeMode(), + MAX_DELIVERY_COUNT, expectedDeliveries, redeliverMsgs); + } + else + { + addMessageListener(clientSession, consumer, clientSession.getAcknowledgeMode(), + MAX_DELIVERY_COUNT, expectedDeliveries, redeliverMsgs); + + try + { + if (!_awaitCompletion.await(20, TimeUnit.SECONDS)) + { + fail("Test did not complete in 20 seconds."); + } + } + catch (InterruptedException e) + { + fail("Unable to wait for test completion"); + throw e; + } + + if(_failed) + { + fail(_failMsg); + } + } + consumer.close(); + + //check the source queue is now empty + assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue, true)); + + //check the DLQ has the required number of rejected-without-requeue messages + verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub); + + if(isBrokerStorePersistent()) + { + //restart the broker to verify persistence of the DLQ and the messages on it + clientConnection.close(); + + restartBroker(); + + final Connection clientConnection2 = getConnection(); + clientConnection2.start(); + + //verify the messages on the DLQ + verifyDLQcontent(clientConnection2, redeliverMsgs, getTestQueueName(), durableSub); + clientConnection2.close(); + } + else + { + + //verify the messages on the DLQ + verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub); + clientConnection.close(); + } + + } + + private void verifyDLQdepth(int expected, Session clientSession, boolean durableSub) throws AMQException + { + AMQDestination checkQueueDLQ; + if(durableSub) + { + checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX); + } + else + { + checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX); + } + + assertEquals("The DLQ should have " + expected + " msgs on it", expected, + ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueueDLQ, true)); + } + + private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, String destName, boolean durableSub) throws JMSException + { + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer; + if(durableSub) + { + consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX)); + } + else + { + consumer = clientSession.createConsumer( + clientSession.createQueue(destName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX)); + } + + //keep track of the message we expect to still be on the DLQ + List<Integer> outstandingMessages = new ArrayList<Integer>(redeliverMsgs); + int numMsg = outstandingMessages.size(); + + for(int i = 0; i < numMsg; i++) + { + Message message = consumer.receive(250); + + assertNotNull("failed to consume expected message " + i + " from DLQ", message); + assertTrue("message " + i + " was the wrong type", message instanceof TextMessage); + + //using Integer here to allow removing the value from the list, using int + //would instead result in removal of the element at that index + Integer msgId = message.getIntProperty("count"); + + TextMessage txt = (TextMessage) message; + _logger.info("Received message " + msgId + " at " + i + " from the DLQ: " + txt.getText()); + + assertTrue("message " + i + " was not one of those which should have been on the DLQ", + redeliverMsgs.contains(msgId)); + assertTrue("message " + i + " was not one of those expected to still be on the DLQ", + outstandingMessages.contains(msgId)); + assertEquals("Message " + i + " content was not as expected", generateContent(msgId), txt.getText()); + + //remove from the list of outstanding msgs + outstandingMessages.remove(msgId); + } + + if(outstandingMessages.size() > 0) + { + String failures = ""; + for(Integer msg : outstandingMessages) + { + failures = failures.concat(msg + " "); + } + fail("some DLQ'd messages were not found on the DLQ: " + failures); + } + } + + private void addMessageListener(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount, + final int expectedTotalNumberOfDeliveries, final List<Integer> redeliverMsgs) throws JMSException + { + if(deliveryMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE + || deliveryMode == org.apache.qpid.jms.Session.PRE_ACKNOWLEDGE) + { + failAsyncTest("Max Delivery feature is not supported with this acknowledgement mode" + + "when using asynchronous message delivery."); + } + + consumer.setMessageListener(new MessageListener() + { + private int _deliveryAttempts = 0; //number of times given message(s) have been seen + private int _numMsgsToBeRedelivered = 0; //number of messages to rollback/recover + private int _totalNumDeliveries = 0; + private int _expectedMessage = 1; + + public void onMessage(Message message) + { + if(_failed || _awaitCompletion.getCount() == 0L) + { + //don't process anything else + return; + } + + _totalNumDeliveries++; + + if (message == null) + { + failAsyncTest("Should not get null messages"); + return; + } + + try + { + int msgId = message.getIntProperty("count"); + + _logger.info("Received message: " + msgId); + + //check the message is the one we expected + if(_expectedMessage != msgId) + { + failAsyncTest("Expected message " + _expectedMessage + " , got message " + msgId); + return; + } + + _expectedMessage++; + + //keep track of the overall deliveries to ensure we don't see more than expected + if(_totalNumDeliveries > expectedTotalNumberOfDeliveries) + { + failAsyncTest("Expected total of " + expectedTotalNumberOfDeliveries + + " message deliveries, reached " + _totalNumDeliveries); + } + + //check if this message is one chosen to be rolled back / recovered + if(redeliverMsgs.contains(msgId)) + { + _numMsgsToBeRedelivered++; + + //check if next message is going to be rolled back / recovered too + if(redeliverMsgs.contains(msgId +1)) + { + switch(deliveryMode) + { + case Session.SESSION_TRANSACTED: + //skip on to next message immediately + return; + case Session.CLIENT_ACKNOWLEDGE: + //skip on to next message immediately + return; + case Session.DUPS_OK_ACKNOWLEDGE: + //fall through + case Session.AUTO_ACKNOWLEDGE: + //must recover session now or onMessage will ack, so + //just fall through the if + break; + } + } + + _deliveryAttempts++; //increment count of times the current rolled back/recovered message(s) have been seen + + _logger.debug("ROLLBACK/RECOVER"); + switch(deliveryMode) + { + case Session.SESSION_TRANSACTED: + session.rollback(); + break; + case Session.CLIENT_ACKNOWLEDGE: + //fall through + case Session.DUPS_OK_ACKNOWLEDGE: + //fall through + case Session.AUTO_ACKNOWLEDGE: + session.recover(); + break; + } + + if( _deliveryAttempts >= maxDeliveryCount) + { + //the client should have rejected the latest messages upon then + //above recover/rollback, adjust counts to compensate + _deliveryAttempts = 0; + } + else + { + //the message(s) should be redelivered, adjust expected message + _expectedMessage -= _numMsgsToBeRedelivered; + } + _logger.debug("XXX _expectedMessage: " + _expectedMessage + " _deliveryAttempts : " + _deliveryAttempts + " _numMsgsToBeRedelivered=" + _numMsgsToBeRedelivered); + //reset count of messages expected to be redelivered + _numMsgsToBeRedelivered = 0; + } + else + { + //consume the message + switch(deliveryMode) + { + case Session.SESSION_TRANSACTED: + session.commit(); + break; + case Session.CLIENT_ACKNOWLEDGE: + message.acknowledge(); + break; + case Session.DUPS_OK_ACKNOWLEDGE: + //fall-through + case Session.AUTO_ACKNOWLEDGE: + //do nothing, onMessage will ack on exit. + break; + } + } + + if (msgId == MSG_COUNT) + { + //if this is the last message let the test complete. + if (expectedTotalNumberOfDeliveries == _totalNumDeliveries) + { + _awaitCompletion.countDown(); + } + else + { + failAsyncTest("Last message received, but we have not had the " + + "expected number of total delivieres. Received " + _totalNumDeliveries + " Expecting : " + expectedTotalNumberOfDeliveries); + } + } + } + catch (JMSException e) + { + failAsyncTest(e.getMessage()); + } + } + }); + } + + private void failAsyncTest(String msg) + { + _logger.error("Failing test because: " + msg); + _failMsg = msg; + _failed = true; + _awaitCompletion.countDown(); + } + + private void doSynchronousTest(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount, + final int expectedTotalNumberOfDeliveries, final List<Integer> redeliverMsgs) throws JMSException, AMQException, InterruptedException + { + if(deliveryMode == Session.AUTO_ACKNOWLEDGE + || deliveryMode == Session.DUPS_OK_ACKNOWLEDGE + || deliveryMode == org.apache.qpid.jms.Session.PRE_ACKNOWLEDGE + || deliveryMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE) + { + fail("Max Delivery feature is not supported with this acknowledgement mode" + + "when using synchronous message delivery."); + } + + int _deliveryAttempts = 0; //number of times given message(s) have been seen + int _numMsgsToBeRedelivered = 0; //number of messages to rollback/recover + int _totalNumDeliveries = 0; + int _expectedMessage = 1; + + while(!_failed) + { + Message message = consumer.receive(1000); + + _totalNumDeliveries++; + + if (message == null) + { + fail("Should not get null messages"); + return; + } + + try + { + int msgId = message.getIntProperty("count"); + + _logger.info("Received message: " + msgId); + + //check the message is the one we expected + assertEquals("Unexpected message.", _expectedMessage, msgId); + + _expectedMessage++; + + //keep track of the overall deliveries to ensure we don't see more than expected + assertTrue("Exceeded expected total number of deliveries.", + _totalNumDeliveries <= expectedTotalNumberOfDeliveries ); + + //check if this message is one chosen to be rolled back / recovered + if(redeliverMsgs.contains(msgId)) + { + //keep track of the number of messages we will have redelivered + //upon rollback/recover + _numMsgsToBeRedelivered++; + + if(redeliverMsgs.contains(msgId +1)) + { + //next message is going to be rolled back / recovered too. + //skip ahead to it + continue; + } + + _deliveryAttempts++; //increment count of times the current rolled back/recovered message(s) have been seen + + switch(deliveryMode) + { + case Session.SESSION_TRANSACTED: + session.rollback(); + break; + case Session.CLIENT_ACKNOWLEDGE: + session.recover(); + + //sleep then do a synchronous op to give the broker + //time to resend all the messages + Thread.sleep(500); + ((AMQSession<?,?>) session).sync(); + break; + } + + if( _deliveryAttempts >= maxDeliveryCount) + { + //the client should have rejected the latest messages upon then + //above recover/rollback, adjust counts to compensate + _deliveryAttempts = 0; + } + else + { + //the message(s) should be redelivered, adjust expected message + _expectedMessage -= _numMsgsToBeRedelivered; + } + + //As we just rolled back / recovered, we must reset the + //count of messages expected to be redelivered + _numMsgsToBeRedelivered = 0; + } + else + { + //consume the message + switch(deliveryMode) + { + case Session.SESSION_TRANSACTED: + session.commit(); + break; + case Session.CLIENT_ACKNOWLEDGE: + message.acknowledge(); + break; + } + } + + if (msgId == MSG_COUNT) + { + //if this is the last message let the test complete. + assertTrue("Last message received, but we have not had the " + + "expected number of total delivieres", + expectedTotalNumberOfDeliveries == _totalNumDeliveries); + + break; + } + } + catch (JMSException e) + { + fail(e.getMessage()); + } + } + } + + private boolean isDurSubTest() + { + return getTestQueueName().contains("DurableSubscription"); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java new file mode 100644 index 0000000000..370e44b3d5 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSession; + +/** + * Ensures that queue specific session factory method {@link QueueConnection#createQueueSession()} create sessions + * of type {@link QueueSession} and that those sessions correctly restrict the available JMS operations + * operations to exclude those applicable to only topics. + * + * @see TopicSessionFactoryTest + */ +public class QueueSessionFactoryTest extends QpidBrokerTestCase +{ + public void testQueueSessionIsNotATopicSession() throws Exception + { + QueueSession queueSession = getQueueSession(); + assertFalse(queueSession instanceof TopicSession); + } + + public void testQueueSessionCannotCreateTemporaryTopics() throws Exception + { + QueueSession queueSession = getQueueSession(); + try + { + queueSession.createTemporaryTopic(); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createTemporaryTopic from QueueSession", s.getMessage()); + } + } + + public void testQueueSessionCannotCreateTopics() throws Exception + { + QueueSession queueSession = getQueueSession(); + try + { + queueSession.createTopic("abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createTopic from QueueSession", s.getMessage()); + } + } + + public void testQueueSessionCannotCreateDurableSubscriber() throws Exception + { + QueueSession queueSession = getQueueSession(); + Topic topic = getTestTopic(); + + try + { + queueSession.createDurableSubscriber(topic, "abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createDurableSubscriber from QueueSession", s.getMessage()); + } + } + + public void testQueueSessionCannoutUnsubscribe() throws Exception + { + QueueSession queueSession = getQueueSession(); + try + { + queueSession.unsubscribe("abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call unsubscribe from QueueSession", s.getMessage()); + } + } + + private QueueSession getQueueSession() throws Exception + { + QueueConnection queueConnection = (QueueConnection)getConnection(); + return queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java new file mode 100644 index 0000000000..ce15d452ab --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Queue; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; + +/** + * Ensures that topic specific session factory method {@link TopicConnection#createTopicSession()} create sessions + * of type {@link TopicSession} and that those sessions correctly restrict the available JMS operations + * operations to exclude those applicable to only queues. + * + * @see QueueSessionFactoryTest + */ +public class TopicSessionFactoryTest extends QpidBrokerTestCase +{ + public void testTopicSessionIsNotAQueueSession() throws Exception + { + TopicSession topicSession = getTopicSession(); + assertFalse(topicSession instanceof QueueSession); + } + + public void testTopicSessionCannotCreateCreateBrowser() throws Exception + { + TopicSession topicSession = getTopicSession(); + Queue queue = getTestQueue(); + try + { + topicSession.createBrowser(queue); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createBrowser from TopicSession", s.getMessage()); + } + } + + public void testTopicSessionCannotCreateQueues() throws Exception + { + TopicSession topicSession = getTopicSession(); + try + { + topicSession.createQueue("abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createQueue from TopicSession", s.getMessage()); + } + } + + public void testTopicSessionCannotCreateTemporaryQueues() throws Exception + { + TopicSession topicSession = getTopicSession(); + try + { + topicSession.createTemporaryQueue(); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createTemporaryQueue from TopicSession", s.getMessage()); + } + } + + private TopicSession getTopicSession() throws Exception + { + TopicConnection topicConnection = (TopicConnection)getConnection(); + return topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + } + +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java new file mode 100644 index 0000000000..58f1bfe372 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.channelclose; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +/** + * @author Apache Software Foundation + */ +public class CloseWithBlockingReceiveTest extends QpidBrokerTestCase +{ + + + public void testReceiveReturnsNull() throws Exception + { + final Connection connection = getConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + connection.start(); + + Runnable r = new Runnable() + { + + public void run() + { + try + { + Thread.sleep(1000); + connection.close(); + } + catch (Exception e) + { + } + } + }; + long startTime = System.currentTimeMillis(); + Thread thread = new Thread(r); + thread.start(); + try + { + consumer.receive(10000); + assertTrue(System.currentTimeMillis() - startTime < 10000); + } + finally + { + thread.join(); + } + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java new file mode 100644 index 0000000000..4a92728d82 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java @@ -0,0 +1,215 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.naming.NamingException; +import org.apache.qpid.AMQConnectionClosedException; +import org.apache.qpid.AMQDisconnectedException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.transport.ConnectionException; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Session; + +/** + * Tests the behaviour of the client when the Broker terminates client connection + * by the Broker being shutdown gracefully or otherwise. + * + * @see ManagedConnectionMBeanTest + */ +public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase +{ + private Connection _connection; + private boolean _isExternalBroker; + private final RecordingExceptionListener _recordingExceptionListener = new RecordingExceptionListener(); + private Session _session; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _connection = getConnection(); + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _connection.setExceptionListener(_recordingExceptionListener); + + _isExternalBroker = isExternalBroker(); + } + + public void testClientCloseOnNormalBrokerShutdown() throws Exception + { + final Class<? extends Exception> expectedLinkedException = isBroker010() ? ConnectionException.class : AMQConnectionClosedException.class; + + assertConnectionOpen(); + + stopBroker(); + + JMSException exception = _recordingExceptionListener.awaitException(10000); + assertConnectionCloseWasReported(exception, expectedLinkedException); + assertConnectionClosed(); + + ensureCanCloseWithoutException(); + } + + public void testClientCloseOnBrokerKill() throws Exception + { + final Class<? extends Exception> expectedLinkedException = isBroker010() ? ConnectionException.class : AMQDisconnectedException.class; + + if (!_isExternalBroker) + { + return; + } + + assertConnectionOpen(); + + killBroker(); + + JMSException exception = _recordingExceptionListener.awaitException(10000); + assertConnectionCloseWasReported(exception, expectedLinkedException); + assertConnectionClosed(); + + ensureCanCloseWithoutException(); + } + + private void ensureCanCloseWithoutException() + { + try + { + _connection.close(); + } + catch (JMSException e) + { + fail("Connection should close without exception" + e.getMessage()); + } + } + + private void assertConnectionCloseWasReported(JMSException exception, Class<? extends Exception> linkedExceptionClass) + { + assertNotNull("Broker shutdown should be reported to the client via the ExceptionListener", exception); + assertNotNull("JMXException should have linked exception", exception.getLinkedException()); + + assertEquals("Unexpected linked exception", linkedExceptionClass, exception.getLinkedException().getClass()); + } + + private void assertConnectionClosed() + { + assertTrue("Connection should be marked as closed", ((AMQConnection)_connection).isClosed()); + } + + private void assertConnectionOpen() + { + assertFalse("Connection should not be marked as closed", ((AMQConnection)_connection).isClosed()); + } + + private final class RecordingExceptionListener implements ExceptionListener + { + private final CountDownLatch _exceptionReceivedLatch = new CountDownLatch(1); + private volatile JMSException _exception; + + @Override + public void onException(JMSException exception) + { + _exception = exception; + } + + public JMSException awaitException(long timeoutInMillis) throws InterruptedException + { + _exceptionReceivedLatch.await(timeoutInMillis, TimeUnit.MILLISECONDS); + return _exception; + } + } + + + private class Listener implements MessageListener + { + int _messageCount; + + @Override + public synchronized void onMessage(Message message) + { + _messageCount++; + } + + public synchronized int getCount() + { + return _messageCount; + } + } + + public void testNoDeliveryAfterBrokerClose() throws JMSException, NamingException, InterruptedException + { + + Listener listener = new Listener(); + + Session session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer1 = session.createConsumer(getTestQueue()); + consumer1.setMessageListener(listener); + + MessageProducer producer = _session.createProducer(getTestQueue()); + producer.send(_session.createTextMessage("test message")); + + _connection.start(); + + + synchronized (listener) + { + long currentTime = System.currentTimeMillis(); + long until = currentTime + 2000l; + while(listener.getCount() == 0 && currentTime < until) + { + listener.wait(until - currentTime); + currentTime = System.currentTimeMillis(); + } + } + assertEquals(1, listener.getCount()); + + Connection connection2 = getConnection(); + Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer2 = session2.createConsumer(getTestQueue()); + consumer2.setMessageListener(listener); + connection2.start(); + + + Connection connection3 = getConnection(); + Session session3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer3 = session3.createConsumer(getTestQueue()); + consumer3.setMessageListener(listener); + connection3.start(); + + assertEquals(1, listener.getCount()); + + stopBroker(); + + assertEquals(1, listener.getCount()); + + + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java new file mode 100644 index 0000000000..bf1fbbf1a3 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import javax.jms.Connection; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class ConnectionFactoryTest extends QpidBrokerTestCase +{ + + /** + * The username & password specified should not override the default + * specified in the URL. + */ + public void testCreateConnectionWithUsernamePassword() throws Exception + { + + String brokerUrl = getBroker().toString(); + String URL = "amqp://guest:guest@clientID/test?brokerlist='" + brokerUrl + "'"; + AMQConnectionFactory factory = new AMQConnectionFactory(URL); + + AMQConnection con = (AMQConnection)factory.createConnection(); + assertEquals("Usernames used is different from the one in URL","guest",con.getConnectionURL().getUsername()); + assertEquals("Password used is different from the one in URL","guest",con.getConnectionURL().getPassword()); + + try + { + AMQConnection con2 = (AMQConnection)factory.createConnection("user","pass"); + assertEquals("Usernames used is different from the one in URL","user",con2.getConnectionURL().getUsername()); + assertEquals("Password used is different from the one in URL","pass",con2.getConnectionURL().getPassword()); + } + catch(Exception e) + { + // ignore + } + + AMQConnection con3 = (AMQConnection)factory.createConnection(); + assertEquals("Usernames used is different from the one in URL","guest",con3.getConnectionURL().getUsername()); + assertEquals("Password used is different from the one in URL","guest",con3.getConnectionURL().getPassword()); + } + + /** + * Verifies that a connection can be made using an instance of AMQConnectionFactory created with the + * default constructor and provided with the connection url via setter. + */ + public void testCreatingConnectionWithInstanceMadeUsingDefaultConstructor() throws Exception + { + String broker = getBroker().toString(); + String url = "amqp://guest:guest@clientID/test?brokerlist='" + broker + "'"; + + AMQConnectionFactory factory = new AMQConnectionFactory(); + factory.setConnectionURLString(url); + + Connection con = factory.createConnection(); + con.close(); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java new file mode 100644 index 0000000000..6ea1582bb8 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class ConnectionStartTest extends QpidBrokerTestCase +{ + + private String _broker = "vm://:1"; + + private AMQConnection _connection; + private Session _consumerSess; + private MessageConsumer _consumer; + + protected void setUp() throws Exception + { + super.setUp(); + try + { + + + AMQConnection pubCon = (AMQConnection) getConnection("guest", "guest"); + + AMQQueue queue = new AMQQueue(pubCon,"ConnectionStartTest"); + + Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + + MessageProducer pub = pubSess.createProducer(queue); + + _connection = (AMQConnection) getConnection("guest", "guest"); + + _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + + _consumer = _consumerSess.createConsumer(queue); + + //publish after queue is created to ensure it can be routed as expected + pub.send(pubSess.createTextMessage("Initial Message")); + + pubCon.close(); + + } + catch (Exception e) + { + _logger.error("Connection to " + _broker + " should succeed.", e); + fail("Connection to " + _broker + " should succeed. Reason: " + e); + } + } + + protected void tearDown() throws Exception + { + _connection.close(); + super.tearDown(); + } + + public void testSimpleReceiveConnection() + { + try + { + assertTrue("Connection should not be started", !_connection.started()); + //Note that this next line will start the dispatcher in the session + // should really not be called before _connection start + //assertTrue("There should not be messages waiting for the consumer", _consumer.receiveNoWait() == null); + _connection.start(); + assertTrue("There should be messages waiting for the consumer", _consumer.receive(10*1000) != null); + assertTrue("Connection should be started", _connection.started()); + + } + catch (JMSException e) + { + fail("An error occured during test because:" + e); + } + + } + + public void testMessageListenerConnection() + { + final CountDownLatch _gotMessage = new CountDownLatch(1); + + try + { + assertTrue("Connection should not be started", !_connection.started()); + _consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + try + { + assertTrue("Connection should be started", _connection.started()); + assertEquals("Mesage Received", "Initial Message", ((TextMessage) message).getText()); + _gotMessage.countDown(); + } + catch (JMSException e) + { + fail("Couldn't get message text because:" + e.getCause()); + } + } + }); + + assertTrue("Connection should not be started", !_connection.started()); + _connection.start(); + assertTrue("Connection should be started", _connection.started()); + + try + { + assertTrue("Listener was never called", _gotMessage.await(10 * 1000, TimeUnit.MILLISECONDS)); + } + catch (InterruptedException e) + { + fail("Timed out awaiting message via onMessage"); + } + + } + catch (JMSException e) + { + fail("Failed because:" + e.getCause()); + } + + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(ConnectionStartTest.class); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java new file mode 100644 index 0000000000..ed03e83292 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -0,0 +1,378 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import javax.jms.Connection; +import javax.jms.QueueSession; +import javax.jms.TopicSession; + +import org.apache.qpid.AMQConnectionFailureException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.Session; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class ConnectionTest extends QpidBrokerTestCase +{ + + private String _broker_NotRunning = "tcp://localhost:" + findFreePort(); + + private String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs"; + + public void testSimpleConnection() throws Exception + { + AMQConnection conn = null; + try + { + conn = new AMQConnection(getBroker().toString(), "guest", "guest", "fred", "test"); + } + catch (Exception e) + { + fail("Connection to " + getBroker() + " should succeed. Reason: " + e); + } + finally + { + if(conn != null) + { + conn.close(); + } + } + } + + public void testDefaultExchanges() throws Exception + { + AMQConnection conn = null; + try + { + BrokerDetails broker = getBroker(); + broker.setProperty(BrokerDetails.OPTIONS_RETRY, "1"); + ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + + broker + + "'&defaultQueueExchange='test.direct'" + + "&defaultTopicExchange='test.topic'" + + "&temporaryQueueExchange='tmp.direct'" + + "&temporaryTopicExchange='tmp.topic'"); + + System.err.println(url.toString()); + conn = new AMQConnection(url); + + + AMQSession sess = (AMQSession) conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + sess.declareExchange(new AMQShortString("test.direct"), + AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_CLASS), false); + + sess.declareExchange(new AMQShortString("tmp.direct"), + AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_CLASS), false); + + sess.declareExchange(new AMQShortString("tmp.topic"), + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS), false); + + sess.declareExchange(new AMQShortString("test.topic"), + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS), false); + + QueueSession queueSession = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + AMQQueue queue = (AMQQueue) queueSession.createQueue("MyQueue"); + + assertEquals(queue.getExchangeName().toString(), "test.direct"); + + AMQQueue tempQueue = (AMQQueue) queueSession.createTemporaryQueue(); + + assertEquals(tempQueue.getExchangeName().toString(), "tmp.direct"); + + queueSession.close(); + + TopicSession topicSession = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + AMQTopic topic = (AMQTopic) topicSession.createTopic("silly.topic"); + + assertEquals(topic.getExchangeName().toString(), "test.topic"); + + AMQTopic tempTopic = (AMQTopic) topicSession.createTemporaryTopic(); + + assertEquals(tempTopic.getExchangeName().toString(), "tmp.topic"); + + topicSession.close(); + + } + catch (Exception e) + { + fail("Connection to " + getBroker() + " should succeed. Reason: " + e); + } + finally + { + conn.close(); + } + } + + public void testPasswordFailureConnection() throws Exception + { + AMQConnection conn = null; + try + { + BrokerDetails broker = getBroker(); + broker.setProperty(BrokerDetails.OPTIONS_RETRY, "0"); + conn = new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + broker + "'"); + fail("Connection should not be established password is wrong."); + } + catch (AMQConnectionFailureException amqe) + { + assertNotNull("No cause set:" + amqe.getMessage(), amqe.getCause()); + assertTrue("Exception was wrong type", amqe.getCause() instanceof AMQException); + } + finally + { + if (conn != null) + { + conn.close(); + } + } + } + + public void testConnectionFailure() throws Exception + { + AMQConnection conn = null; + try + { + conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''"); + fail("Connection should not be established"); + } + catch (AMQException amqe) + { + if (!(amqe instanceof AMQConnectionFailureException)) + { + fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe); + } + } + finally + { + if (conn != null) + { + conn.close(); + } + } + + } + + public void testUnresolvedHostFailure() throws Exception + { + AMQConnection conn = null; + try + { + conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''"); + fail("Connection should not be established"); + } + catch (AMQException amqe) + { + if (!(amqe instanceof AMQUnresolvedAddressException)) + { + fail("Correct exception not thrown. Excpected 'AMQUnresolvedAddressException' got: " + amqe); + } + } + finally + { + if (conn != null) + { + conn.close(); + } + } + + } + + public void testUnresolvedVirtualHostFailure() throws Exception + { + AMQConnection conn = null; + try + { + BrokerDetails broker = getBroker(); + broker.setProperty(BrokerDetails.OPTIONS_RETRY, "0"); + conn = new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + broker + "'"); + fail("Connection should not be established"); + } + catch (AMQException amqe) + { + if (!(amqe instanceof AMQConnectionFailureException)) + { + fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe); + } + } + finally + { + if (conn != null) + { + conn.close(); + } + } + } + + public void testClientIdCannotBeChanged() throws Exception + { + Connection connection = new AMQConnection(getBroker().toString(), "guest", "guest", + "fred", "test"); + try + { + connection.setClientID("someClientId"); + fail("No IllegalStateException thrown when resetting clientid"); + } + catch (javax.jms.IllegalStateException e) + { + // PASS + } + finally + { + if (connection != null) + { + connection.close(); + } + } + } + + public void testClientIdIsPopulatedAutomatically() throws Exception + { + Connection connection = new AMQConnection(getBroker().toString(), "guest", "guest", + null, "test"); + try + { + assertNotNull(connection.getClientID()); + } + finally + { + connection.close(); + } + connection.close(); + } + + public void testUnsupportedSASLMechanism() throws Exception + { + BrokerDetails broker = getBroker(); + broker.setProperty(BrokerDetails.OPTIONS_SASL_MECHS, "MY_MECH"); + + try + { + Connection connection = new AMQConnection(broker.toString(), "guest", "guest", + null, "test"); + connection.close(); + fail("The client should throw a ConnectionException stating the" + + " broker does not support the SASL mech specified by the client"); + } + catch (Exception e) + { + assertTrue("Unexpected exception message : " + e.getMessage(), + e.getMessage().contains("Client and broker have no SASL mechanisms in common.")); + assertTrue("Unexpected exception message : " + e.getMessage(), + e.getMessage().contains("Client restricted itself to : MY_MECH")); + + } + } + + /** + * Tests that when the same user connects twice with same clientid, the second connection + * fails if the clientid verification feature is enabled (which uses a dummy 0-10 Session + * with the clientid as its name to detect the previous usage of the clientid by the user) + */ + public void testClientIDVerificationForSameUser() throws Exception + { + setTestSystemProperty(ClientProperties.QPID_VERIFY_CLIENT_ID, "true"); + + BrokerDetails broker = getBroker(); + try + { + Connection con = new AMQConnection(broker.toString(), "guest", "guest", + "client_id", "test"); + + Connection con2 = new AMQConnection(broker.toString(), "guest", "guest", + "client_id", "test"); + + fail("The client should throw a ConnectionException stating the" + + " client ID is not unique"); + } + catch (Exception e) + { + assertTrue("Incorrect exception thrown: " + e.getMessage(), + e.getMessage().contains("ClientID must be unique")); + } + } + + /** + * Tests that when different users connects with same clientid, the second connection + * succeeds even though the clientid verification feature is enabled (which uses a dummy + * 0-10 Session with the clientid as its name; these are only verified unique on a + * per-principal basis) + */ + public void testClientIDVerificationForDifferentUsers() throws Exception + { + setTestSystemProperty(ClientProperties.QPID_VERIFY_CLIENT_ID, "true"); + + BrokerDetails broker = getBroker(); + try + { + Connection con = new AMQConnection(broker.toString(), "guest", "guest", + "client_id", "test"); + + Connection con2 = new AMQConnection(broker.toString(), "admin", "admin", + "client_id", "test"); + } + catch (Exception e) + { + fail("Unexpected exception thrown, client id was not unique but usernames were different! " + e.getMessage()); + } + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(ConnectionTest.class); + } + + public void testExceptionWhenUserPassIsRequired() throws Exception + { + AMQConnection conn = null; + try + { + BrokerDetails broker = getBroker(); + String url = "amqp:///test?brokerlist='" + broker + "?sasl_mechs='PLAIN%2520CRAM-MD5''"; + conn = new AMQConnection(url); + conn.close(); + fail("Exception should be thrown as user name and password is required"); + } + catch (Exception e) + { + if (!e.getMessage().contains("Username and Password is required for the selected mechanism")) + { + if (conn != null && !conn.isClosed()) + { + conn.close(); + } + fail("Incorrect Exception thrown! The exception thrown is : " + e.getMessage()); + } + } + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java new file mode 100644 index 0000000000..141de1e5a8 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.client.connection; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.AMQConnectionClosedException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.transport.ConnectionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExceptionListenerTest extends QpidBrokerTestCase +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ExceptionListenerTest.class); + + private volatile Throwable _lastExceptionListenerException = null; + + public void testExceptionListenerHearsBrokerShutdown() throws Exception + { + final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1); + final AtomicInteger exceptionCounter = new AtomicInteger(0); + final ExceptionListener listener = new ExceptionListener() + { + public void onException(JMSException exception) + { + exceptionCounter.incrementAndGet(); + _lastExceptionListenerException = exception; + exceptionReceivedLatch.countDown(); + } + }; + + Connection connection = getConnection(); + connection.setExceptionListener(listener); + + stopBroker(); + + exceptionReceivedLatch.await(10, TimeUnit.SECONDS); + + assertEquals("Unexpected number of exceptions received", 1, exceptionCounter.intValue()); + LOGGER.debug("exception was", _lastExceptionListenerException); + assertNotNull("Exception should have cause", _lastExceptionListenerException.getCause()); + Class<? extends Exception> expectedExceptionClass = isBroker010() ? ConnectionException.class : AMQConnectionClosedException.class; + assertEquals(expectedExceptionClass, _lastExceptionListenerException.getCause().getClass()); + } + + /** + * It is reasonable for an application to perform Connection#close within the exception + * listener. This test verifies that close is allowed, and proceeds without generating + * further exceptions. + */ + public void testExceptionListenerClosesConnection_IsAllowed() throws Exception + { + final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1); + final Connection connection = getConnection(); + final ExceptionListener listener = new ExceptionListener() + { + public void onException(JMSException exception) + { + try + { + connection.close(); + // PASS + } + catch (Throwable t) + { + _lastExceptionListenerException = t; + } + finally + { + exceptionReceivedLatch.countDown(); + } + } + }; + connection.setExceptionListener(listener); + + + stopBroker(); + + boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS); + assertTrue("Exception listener did not hear exception within timeout", exceptionReceived); + assertNull("Connection#close() should not have thrown exception", _lastExceptionListenerException); + } + + /** + * Spring's SingleConnectionFactory installs an ExceptionListener that calls stop() + * and ignores any IllegalStateException that result. This test serves to test this + * scenario. + */ + public void testExceptionListenerStopsConnection_ThrowsIllegalStateException() throws Exception + { + final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1); + final Connection connection = getConnection(); + final ExceptionListener listener = new ExceptionListener() + { + public void onException(JMSException exception) + { + try + { + connection.stop(); + fail("Exception not thrown"); + } + catch (IllegalStateException ise) + { + // PASS + } + catch (Throwable t) + { + _lastExceptionListenerException = t; + } + finally + { + exceptionReceivedLatch.countDown(); + } + } + }; + connection.setExceptionListener(listener); + + stopBroker(); + + boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS); + assertTrue("Exception listener did not hear exception within timeout", exceptionReceived); + assertNull("Connection#stop() should not have thrown unexpected exception", _lastExceptionListenerException); + } + + /** + * This test reproduces a deadlock that was the subject of a support call. A Spring based + * application was using SingleConnectionFactory. It installed an ExceptionListener that + * stops and closes the connection in response to any exception. On receipt of a message + * the application would create a new session then send a response message (within onMessage). + * It appears that a misconfiguration in the application meant that some of these messages + * were bounced (no-route). Bounces are treated like connection exceptions and are passed + * back to the application via the ExceptionListener. The deadlock occurred between the + * ExceptionListener's call to stop() and the MessageListener's attempt to create a new + * session. + */ + public void testExceptionListenerConnectionStopDeadlock() throws Exception + { + Queue messageQueue = getTestQueue(); + + Map<String, String> options = new HashMap<String, String>(); + options.put(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(false)); + + final Connection connection = getConnectionWithOptions(options); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + session.createConsumer(messageQueue).close(); // Create queue by side-effect + + // Put 10 messages onto messageQueue + sendMessage(session, messageQueue, 10); + + // Install an exception listener that stops/closes the connection on receipt of 2nd AMQNoRouteException. + // (Triggering on the 2nd (rather than 1st) seems to increase the probability that the test ends in deadlock, + // at least on my machine). + final CountDownLatch exceptionReceivedLatch = new CountDownLatch(2); + final ExceptionListener listener = new ExceptionListener() + { + public void onException(JMSException exception) + { + try + { + assertNotNull("JMS Exception must have cause", exception.getCause() ); + assertEquals("JMS Exception is of wrong type", AMQNoRouteException.class, exception.getCause().getClass()); + exceptionReceivedLatch.countDown(); + if (exceptionReceivedLatch.getCount() == 0) + { + connection.stop(); // ** Deadlock + connection.close(); + } + } + catch (Throwable t) + { + _lastExceptionListenerException = t; + } + } + }; + connection.setExceptionListener(listener); + + // Create a message listener that receives from testQueue and tries to forward them to unknown queue (thus + // provoking AMQNoRouteException exceptions to be delivered to the ExceptionListener). + final Queue unknownQueue = session.createQueue(getTestQueueName() + "_unknown");; + MessageListener redirectingMessageListener = new MessageListener() + { + @Override + public void onMessage(Message msg) + { + try + { + Session mlSession = connection.createSession(true, Session.SESSION_TRANSACTED); // ** Deadlock + mlSession.createProducer(unknownQueue).send(msg); + mlSession.commit(); + } + catch (JMSException je) + { + // Connection is closed by the listener, so exceptions here are expected. + LOGGER.debug("Expected exception - message listener got exception", je); + } + } + }; + + MessageConsumer consumer = session.createConsumer(messageQueue); + consumer.setMessageListener(redirectingMessageListener); + connection.start(); + + // Await the 2nd exception + boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS); + assertTrue("Exception listener did not hear exception within timeout", exceptionReceived); + assertNull("Exception listener should not have had experienced exception", _lastExceptionListenerException); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java new file mode 100644 index 0000000000..99dc5ff216 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java @@ -0,0 +1,334 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.message; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; + +public class ObjectMessageTest extends QpidBrokerTestCase implements MessageListener +{ + private static final Logger _logger = LoggerFactory.getLogger(ObjectMessageTest.class); + + private AMQConnection connection; + private AMQDestination destination; + private AMQSession session; + private MessageProducer producer; + private Serializable[] data; + private volatile boolean waiting; + private int received; + private final ArrayList items = new ArrayList(); + + private String _broker = "vm://:1"; + + protected void setUp() throws Exception + { + super.setUp(); + connection = (AMQConnection) getConnection("guest", "guest"); + destination = new AMQQueue(connection, randomize("LatencyTest"), true); + session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + + // set up a consumer + session.createConsumer(destination).setMessageListener(this); + connection.start(); + + // create a publisher + producer = session.createProducer(destination, false, false); + A a1 = new A(1, "A"); + A a2 = new A(2, "a"); + B b = new B(1, "B"); + C c = new C(); + c.put("A1", a1); + c.put("a2", a2); + c.put("B", b); + c.put("String", "String"); + + data = new Serializable[] { a1, a2, b, c, "Hello World!", new Integer(1001) }; + } + + protected void tearDown() throws Exception + { + close(); + super.tearDown(); + } + + public ObjectMessageTest() + { } + + ObjectMessageTest(String broker) throws Exception + { + _broker = broker; + } + + public void testSendAndReceive() throws Exception + { + try + { + send(); + waitUntilReceived(data.length); + check(); + _logger.info("All " + data.length + " items matched."); + } + catch (Exception e) + { + _logger.error("This Test should succeed but failed", e); + fail("This Test should succeed but failed due to: " + e); + } + } + + public void testSetObjectPropertyForString() throws Exception + { + String testStringProperty = "TestStringProperty"; + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestStringProperty", testStringProperty); + assertEquals(testStringProperty, msg.getObjectProperty("TestStringProperty")); + } + + public void testSetObjectPropertyForBoolean() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestBooleanProperty", Boolean.TRUE); + assertEquals(Boolean.TRUE, msg.getObjectProperty("TestBooleanProperty")); + } + + public void testSetObjectPropertyForByte() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestByteProperty", Byte.MAX_VALUE); + assertEquals(Byte.MAX_VALUE, msg.getObjectProperty("TestByteProperty")); + } + + public void testSetObjectPropertyForShort() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestShortProperty", Short.MAX_VALUE); + assertEquals(Short.MAX_VALUE, msg.getObjectProperty("TestShortProperty")); + } + + public void testSetObjectPropertyForInteger() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestIntegerProperty", Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE, msg.getObjectProperty("TestIntegerProperty")); + } + + public void testSetObjectPropertyForDouble() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestDoubleProperty", Double.MAX_VALUE); + assertEquals(Double.MAX_VALUE, msg.getObjectProperty("TestDoubleProperty")); + } + + public void testSetObjectPropertyForFloat() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestFloatProperty", Float.MAX_VALUE); + assertEquals(Float.MAX_VALUE, msg.getObjectProperty("TestFloatProperty")); + } + + public void testSetObjectPropertyForByteArray() throws Exception + { + byte[] array = { 1, 2, 3, 4, 5 }; + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestByteArrayProperty", array); + assertTrue(Arrays.equals(array, (byte[]) msg.getObjectProperty("TestByteArrayProperty"))); + } + + public void testSetObjectForNull() throws Exception + { + ObjectMessage msg = session.createObjectMessage(); + msg.setObject(null); + assertNull(msg.getObject()); + } + + private void send() throws Exception + { + for (int i = 0; i < data.length; i++) + { + ObjectMessage msg; + if ((i % 2) == 0) + { + msg = session.createObjectMessage(data[i]); + } + else + { + msg = session.createObjectMessage(); + msg.setObject(data[i]); + } + + producer.send(msg); + } + } + + public void check() throws Exception + { + Object[] actual = (Object[]) items.toArray(); + if (actual.length != data.length) + { + throw new Exception("Expected " + data.length + " objects, got " + actual.length); + } + + for (int i = 0; i < data.length; i++) + { + if (actual[i] instanceof Exception) + { + throw new Exception("Error on receive of " + data[i], ((Exception) actual[i])); + } + + if (actual[i] == null) + { + throw new Exception("Expected " + data[i] + " got null"); + } + + if (!data[i].equals(actual[i])) + { + throw new Exception("Expected " + data[i] + " got " + actual[i]); + } + } + } + + private void close() throws Exception + { + session.close(); + connection.close(); + } + + private synchronized void waitUntilReceived(int count) throws InterruptedException + { + waiting = true; + while (received < count) + { + wait(); + } + + waiting = false; + } + + public void onMessage(Message message) + { + + try + { + if (message instanceof ObjectMessage) + { + items.add(((ObjectMessage) message).getObject()); + } + else + { + _logger.error("ERROR: Got " + message.getClass().getName() + " not ObjectMessage"); + items.add(message); + } + } + catch (JMSException e) + { + _logger.error("Error getting object from message", e); + items.add(e); + } + + synchronized (this) + { + received++; + notify(); + } + } + + public static void main(String[] argv) throws Exception + { + String broker = (argv.length > 0) ? argv[0] : "vm://:1"; + if ("-help".equals(broker)) + { + System.out.println("Usage: <broker>"); + } + + new ObjectMessageTest(broker).testSendAndReceive(); + } + + private static class A implements Serializable + { + private String sValue; + private int iValue; + + A(int i, String s) + { + sValue = s; + iValue = i; + } + + public int hashCode() + { + return iValue; + } + + public boolean equals(Object o) + { + return (o instanceof A) && equals((A) o); + } + + protected boolean equals(A a) + { + return areEqual(a.sValue, sValue) && (a.iValue == iValue); + } + } + + private static class B extends A + { + private long time; + + B(int i, String s) + { + super(i, s); + time = System.currentTimeMillis(); + } + + protected boolean equals(A a) + { + return super.equals(a) && (a instanceof B) && (time == ((B) a).time); + } + } + + private static class C extends HashMap implements Serializable + { } + + private static boolean areEqual(Object a, Object b) + { + return (a == null) ? (b == null) : a.equals(b); + } + + private static String randomize(String in) + { + return in + System.currentTimeMillis(); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java new file mode 100644 index 0000000000..3ffa73b9b7 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -0,0 +1,198 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.protocol; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.security.Principal; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; + +public class AMQProtocolSessionTest extends QpidBrokerTestCase +{ + private static class TestProtocolSession extends AMQProtocolSession + { + + public TestProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) + { + super(protocolHandler,connection); + } + + public TestNetworkConnection getNetworkConnection() + { + return (TestNetworkConnection) getProtocolHandler().getNetworkConnection(); + } + + public AMQShortString genQueueName() + { + return generateQueueName(); + } + } + + private TestProtocolSession _testSession; + + protected void setUp() throws Exception + { + super.setUp(); + + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con); + protocolHandler.setNetworkConnection(new TestNetworkConnection()); + + //don't care about the values set here apart from the dummy IoSession + _testSession = new TestProtocolSession(protocolHandler , con); + } + + public void testTemporaryQueueWildcard() throws UnknownHostException + { + checkTempQueueName(new InetSocketAddress(1234), "tmp_0_0_0_0_0_0_0_0_1234_1"); + } + + public void testTemporaryQueueLocalhostAddr() throws UnknownHostException + { + checkTempQueueName(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234), "tmp_127_0_0_1_1234_1"); + } + + public void testTemporaryQueueLocalhostName() throws UnknownHostException + { + checkTempQueueName(new InetSocketAddress(InetAddress.getByName("localhost"), 1234), "tmp_localhost_127_0_0_1_1234_1"); + } + + public void testTemporaryQueueInet4() throws UnknownHostException + { + checkTempQueueName(new InetSocketAddress(InetAddress.getByName("192.168.1.2"), 1234), "tmp_192_168_1_2_1234_1"); + } + + public void testTemporaryQueueInet6() throws UnknownHostException + { + checkTempQueueName(new InetSocketAddress(InetAddress.getByName("1080:0:0:0:8:800:200C:417A"), 1234), "tmp_1080_0_0_0_8_800_200c_417a_1234_1"); + } + + private void checkTempQueueName(SocketAddress address, String queueName) + { + _testSession.getNetworkConnection().setLocalAddress(address); + assertEquals("Wrong queue name", queueName, _testSession.genQueueName().asString()); + } + + private static class TestNetworkConnection implements NetworkConnection + { + private String _remoteHost = "127.0.0.1"; + private String _localHost = "127.0.0.1"; + private int _port = 1; + private SocketAddress _localAddress = null; + private final Sender<ByteBuffer> _sender; + + public TestNetworkConnection() + { + _sender = new Sender<ByteBuffer>() + { + + public void setIdleTimeout(int i) + { + + } + + public void send(ByteBuffer msg) + { + + } + + public void flush() + { + + } + + public void close() + { + + } + }; + } + + @Override + public SocketAddress getLocalAddress() + { + return (_localAddress != null) ? _localAddress : new InetSocketAddress(_localHost, _port); + } + + @Override + public SocketAddress getRemoteAddress() + { + return new InetSocketAddress(_remoteHost, _port); + } + + @Override + public void setMaxReadIdle(int idleTime) + { + } + + @Override + public Principal getPeerPrincipal() + { + return null; + } + + @Override + public int getMaxReadIdle() + { + return 0; + } + + @Override + public int getMaxWriteIdle() + { + return 0; + } + + @Override + public void setMaxWriteIdle(int idleTime) + { + } + + @Override + public void close() + { + } + + public void setLocalAddress(SocketAddress address) + { + _localAddress = address; + } + + public Sender<ByteBuffer> getSender() + { + return _sender; + } + + public void start() + { + } + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java new file mode 100644 index 0000000000..41ab35f233 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -0,0 +1,166 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.test.unit.client.temporaryqueue; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; + +/** + * Tests the behaviour of {@link TemporaryQueue}. + */ +public class TemporaryQueueTest extends QpidBrokerTestCase +{ + /** + * Tests the basic produce/consume behaviour of a temporary queue. + */ + public void testMessageDeliveryUsingTemporaryQueue() throws Exception + { + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session.createTemporaryQueue(); + assertNotNull(queue); + final MessageProducer producer = session.createProducer(queue); + final MessageConsumer consumer = session.createConsumer(queue); + conn.start(); + producer.send(session.createTextMessage("hello")); + TextMessage tm = (TextMessage) consumer.receive(2000); + assertNotNull("Message not received", tm); + assertEquals("hello", tm.getText()); + } + + /** + * Tests that a temporary queue cannot be used by another {@link Session}. + */ + public void testUseFromAnotherSessionProhibited() throws Exception + { + final Connection conn = getConnection(); + final Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session1.createTemporaryQueue(); + assertNotNull(queue); + + try + { + session2.createConsumer(queue); + fail("Expected a JMSException when subscribing to a temporary queue created on a different session"); + } + catch (JMSException je) + { + //pass + assertEquals("Cannot consume from a temporary destination created on another session", je.getMessage()); + } + } + + /** + * Tests that the client is able to explicitly delete a temporary queue using + * {@link TemporaryQueue#delete()} and is prevented from deleting one that + * still has consumers. + * + * Note: Under < 0-10 {@link TemporaryQueue#delete()} only marks the queue as deleted + * on the client. 0-10 causes the queue to be deleted from the Broker. + */ + public void testExplictTemporaryQueueDeletion() throws Exception + { + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; // Required to observe the queue binding on the Broker + final TemporaryQueue queue = session.createTemporaryQueue(); + assertNotNull(queue); + final MessageConsumer consumer = session.createConsumer(queue); + conn.start(); + + assertTrue("Queue should be bound", amqSession.isQueueBound((AMQDestination)queue)); + + try + { + queue.delete(); + fail("Expected JMSException : should not be able to delete while there are active consumers"); + } + catch (JMSException je) + { + //pass + assertEquals("Temporary Queue has consumers so cannot be deleted", je.getMessage()); + } + consumer.close(); + + // Now deletion should succeed. + queue.delete(); + + try + { + session.createConsumer(queue); + fail("Exception not thrown"); + } + catch (JMSException je) + { + //pass + assertEquals("Cannot consume from a deleted destination", je.getMessage()); + } + + if (isBroker010()) + { + assertFalse("Queue should no longer be bound", amqSession.isQueueBound((AMQDestination)queue)); + } + } + + /** + * Tests that a temporary queue remains available for reuse even after its initial + * consumer has disconnected. + * + * This test would fail under < 0-10 as their temporary queues are deleted automatically + * (broker side) after the last consumer disconnects (so message2 would be lost). For this + * reason this test is excluded from those profiles. + */ + public void testTemporaryQueueReused() throws Exception + { + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session.createTemporaryQueue(); + assertNotNull(queue); + + final MessageProducer producer1 = session.createProducer(queue); + final MessageConsumer consumer1 = session.createConsumer(queue); + conn.start(); + producer1.send(session.createTextMessage("message1")); + producer1.send(session.createTextMessage("message2")); + TextMessage tm = (TextMessage) consumer1.receive(2000); + assertNotNull("Message not received by first consumer", tm); + assertEquals("message1", tm.getText()); + consumer1.close(); + + final MessageConsumer consumer2 = session.createConsumer(queue); + conn.start(); + tm = (TextMessage) consumer2.receive(2000); + assertNotNull("Message not received by second consumer", tm); + assertEquals("message2", tm.getText()); + consumer2.close(); + } +} |