diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test')
118 files changed, 1677 insertions, 2339 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java deleted file mode 100644 index 107c730a7e..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.test.client; - -import org.apache.log4j.Logger; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.Session; -import java.util.Enumeration; - -public class CancelTest extends QpidBrokerTestCase -{ - private static final Logger _logger = Logger.getLogger(CancelTest.class); - - private Connection _clientConnection; - private Session _clientSession; - private Queue _queue; - - public void setUp() throws Exception - { - - super.setUp(); - - _queue = (Queue) getInitialContext().lookup("queue"); - - //Create Client - _clientConnection = getConnection(); - - _clientConnection.start(); - - _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - //Ensure _queue is created - _clientSession.createConsumer(_queue).close(); - } - - /** - * Simply - * This test originally did not assert anything but was just checking - * that a message could be browsed and consumed without throwing an exception. - * It now checks that at least a message is browsed and that a message is received. - */ - public void test() throws Exception - { - Connection producerConnection = getConnection(); - - producerConnection.start(); - - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(_queue); - producer.send(producerSession.createTextMessage()); - producerConnection.close(); - - - QueueBrowser browser = _clientSession.createBrowser(_queue); - Enumeration e = browser.getEnumeration(); - - assertTrue(e.hasMoreElements()); - - int i = 0; - while (e.hasMoreElements()) - { - e.nextElement(); - if(++i > 1) - { - fail("Two many elemnts to browse!"); - } - } - - browser.close(); - - MessageConsumer consumer = _clientSession.createConsumer(_queue); - assertNotNull( consumer.receive(2000l) ); - consumer.close(); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java index a94d975a32..e06ed6e171 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java @@ -1,8 +1,8 @@ package org.apache.qpid.test.client; -import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.JMSException; @@ -96,7 +96,7 @@ public class DupsOkTest extends QpidBrokerTestCase consumer.setMessageListener(new MessageListener() { - int _msgCount = 0; + private int _msgCount = 0; public void onMessage(Message message) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java index e1f639afb6..f8bc051be7 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java @@ -20,12 +20,19 @@ */ package org.apache.qpid.test.client; +import org.apache.log4j.Logger; + import org.apache.qpid.client.AMQSession_0_8; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.log4j.Logger; -import javax.jms.*; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; public class FlowControlTest extends QpidBrokerTestCase { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java new file mode 100644 index 0000000000..b746a5b09e --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.client; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase implements ExceptionListener +{ + private Connection _connection; + private BlockingQueue<JMSException> _exceptions; + + public void setUp() throws Exception + { + super.setUp(); + _exceptions = new ArrayBlockingQueue<JMSException>(1); + _connection = getConnection(); + _connection.setExceptionListener(this); + } + + public void testPublishP2PWithNoConsumerAndImmediateOnAndAutoAck() throws Exception + { + publishIntoExistingDestinationWithNoConsumerAndImmediateOn(Session.AUTO_ACKNOWLEDGE, false); + } + + public void testPublishP2PWithNoConsumerAndImmediateOnAndTx() throws Exception + { + publishIntoExistingDestinationWithNoConsumerAndImmediateOn(Session.SESSION_TRANSACTED, false); + } + + public void testPublishPubSubWithDisconnectedDurableSubscriberAndImmediateOnAndAutoAck() throws Exception + { + publishIntoExistingDestinationWithNoConsumerAndImmediateOn(Session.AUTO_ACKNOWLEDGE, true); + } + + public void testPublishPubSubWithDisconnectedDurableSubscriberAndImmediateOnAndTx() throws Exception + { + publishIntoExistingDestinationWithNoConsumerAndImmediateOn(Session.SESSION_TRANSACTED, true); + } + + public void testPublishP2PIntoNonExistingDesitinationWithMandatoryOnAutoAck() throws Exception + { + publishWithMandatoryOnImmediateOff(Session.AUTO_ACKNOWLEDGE, false); + } + + public void testPublishP2PIntoNonExistingDesitinationWithMandatoryOnAndTx() throws Exception + { + publishWithMandatoryOnImmediateOff(Session.SESSION_TRANSACTED, false); + } + + public void testPubSubMandatoryAutoAck() throws Exception + { + publishWithMandatoryOnImmediateOff(Session.AUTO_ACKNOWLEDGE, true); + } + + public void testPubSubMandatoryTx() throws Exception + { + publishWithMandatoryOnImmediateOff(Session.SESSION_TRANSACTED, true); + } + + public void testP2PNoMandatoryAutoAck() throws Exception + { + publishWithMandatoryOffImmediateOff(Session.AUTO_ACKNOWLEDGE, false); + } + + public void testP2PNoMandatoryTx() throws Exception + { + publishWithMandatoryOffImmediateOff(Session.SESSION_TRANSACTED, false); + } + + public void testPubSubWithImmediateOnAndAutoAck() throws Exception + { + consumerCreateAndClose(true, false); + + Message message = produceMessage(Session.AUTO_ACKNOWLEDGE, true, false, true); + + JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); + assertNotNull("JMSException is expected", exception); + AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); + assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); + Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); + assertNotNull("Bounced Message is expected", bounceMessage); + assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + } + + private void publishIntoExistingDestinationWithNoConsumerAndImmediateOn(int acknowledgeMode, boolean pubSub) + throws JMSException, InterruptedException + { + consumerCreateAndClose(pubSub, true); + + Message message = produceMessage(acknowledgeMode, pubSub, false, true); + + JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); + assertNotNull("JMSException is expected", exception); + AMQNoConsumersException noConsumerException = (AMQNoConsumersException) exception.getLinkedException(); + assertNotNull("AMQNoConsumersException should be linked to JMSEXception", noConsumerException); + Message bounceMessage = (Message) noConsumerException.getUndeliveredMessage(); + assertNotNull("Bounced Message is expected", bounceMessage); + assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + } + + private void publishWithMandatoryOnImmediateOff(int acknowledgeMode, boolean pubSub) throws JMSException, + InterruptedException + { + Message message = produceMessage(acknowledgeMode, pubSub, true, false); + + JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); + assertNotNull("JMSException is expected", exception); + AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); + assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); + Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); + assertNotNull("Bounced Message is expected", bounceMessage); + assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + } + + private void publishWithMandatoryOffImmediateOff(int acknowledgeMode, boolean pubSub) throws JMSException, + InterruptedException + { + produceMessage(acknowledgeMode, pubSub, false, false); + + JMSException exception = _exceptions.poll(1, TimeUnit.SECONDS); + assertNull("Unexpected JMSException", exception); + } + + private void consumerCreateAndClose(boolean pubSub, boolean durable) throws JMSException + { + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = null; + MessageConsumer consumer = null; + if (pubSub) + { + destination = session.createTopic(getTestQueueName()); + if (durable) + { + consumer = session.createDurableSubscriber((Topic) destination, getTestName()); + } + else + { + consumer = session.createConsumer(destination); + } + } + else + { + destination = session.createQueue(getTestQueueName()); + consumer = session.createConsumer(destination); + } + consumer.close(); + } + + private Message produceMessage(int acknowledgeMode, boolean pubSub, boolean mandatory, boolean immediate) + throws JMSException + { + Session session = _connection.createSession(acknowledgeMode == Session.SESSION_TRANSACTED, acknowledgeMode); + Destination destination = null; + if (pubSub) + { + destination = session.createTopic(getTestQueueName()); + } + else + { + destination = session.createQueue(getTestQueueName()); + } + + MessageProducer producer = ((AMQSession<?, ?>) session).createProducer(destination, mandatory, immediate); + Message message = session.createMessage(); + producer.send(message); + if (session.getTransacted()) + { + session.commit(); + } + return message; + } + + public void testMandatoryAndImmediateDefaults() throws JMSException, InterruptedException + { + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // publish to non-existent queue - should get mandatory failure + MessageProducer producer = session.createProducer(session.createQueue(getTestQueueName())); + Message message = session.createMessage(); + producer.send(message); + + JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); + assertNotNull("JMSException is expected", exception); + AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); + assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); + Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); + assertNotNull("Bounced Message is expected", bounceMessage); + assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + + producer = session.createProducer(null); + message = session.createMessage(); + producer.send(session.createQueue(getTestQueueName()), message); + + exception = _exceptions.poll(10, TimeUnit.SECONDS); + assertNotNull("JMSException is expected", exception); + noRouteException = (AMQNoRouteException) exception.getLinkedException(); + assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); + bounceMessage = (Message) noRouteException.getUndeliveredMessage(); + assertNotNull("Bounced Message is expected", bounceMessage); + assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + + + // publish to non-existent topic - should get no failure + producer = session.createProducer(session.createTopic(getTestQueueName())); + message = session.createMessage(); + producer.send(message); + + exception = _exceptions.poll(1, TimeUnit.SECONDS); + assertNull("Unexpected JMSException", exception); + + producer = session.createProducer(null); + message = session.createMessage(); + producer.send(session.createTopic(getTestQueueName()), message); + + exception = _exceptions.poll(1, TimeUnit.SECONDS); + assertNull("Unexpected JMSException", exception); + + session.close(); + } + + public void testMandatoryAndImmediateSystemProperties() throws JMSException, InterruptedException + { + setTestClientSystemProperty("qpid.default_mandatory","true"); + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // publish to non-existent topic - should get mandatory failure + + MessageProducer producer = session.createProducer(session.createTopic(getTestQueueName())); + Message message = session.createMessage(); + producer.send(message); + + JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); + assertNotNull("JMSException is expected", exception); + AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); + assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); + Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); + assertNotNull("Bounced Message is expected", bounceMessage); + assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + + // now set topic specific system property to false - should no longer get mandatory failure on new producer + setTestClientSystemProperty("qpid.default_mandatory_topic","false"); + producer = session.createProducer(null); + message = session.createMessage(); + producer.send(session.createTopic(getTestQueueName()), message); + + exception = _exceptions.poll(1, TimeUnit.SECONDS); + if(exception != null) + { + exception.printStackTrace(); + } + assertNull("Unexpected JMSException", exception); + + } + + public void onException(JMSException exception) + { + _exceptions.add(exception); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index f2ac590a3c..6b6b4a7b3c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -20,13 +20,7 @@ */ package org.apache.qpid.test.client; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.FailoverBaseCase; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - +import java.util.Enumeration; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -37,8 +31,10 @@ import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.NamingException; -import java.util.Enumeration; -import java.util.Random; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; public class QueueBrowserAutoAckTest extends QpidBrokerTestCase { @@ -424,4 +420,21 @@ public class QueueBrowserAutoAckTest extends QpidBrokerTestCase validate(messages); } + public void testBrowsingWhileStopped() throws JMSException + { + _clientConnection.stop(); + + try + { + QueueBrowser browser = _clientSession.createBrowser(getTestQueue()); + Enumeration messages = browser.getEnumeration(); + fail("Expected exception when attempting to browse on a stopped connection did not occur"); + } + catch(javax.jms.IllegalStateException e) + { + // pass + } + + } + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java index b944f2ddd2..a53c3d3ee0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java @@ -20,14 +20,20 @@ */ package org.apache.qpid.test.client; -import org.apache.qpid.test.utils.*; -import javax.jms.*; +import junit.framework.AssertionFailedError; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import junit.framework.ComparisonFailure; -import junit.framework.AssertionFailedError; - /** * RollbackOrderTest, QPID-1864, QPID-1871 * diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index feae7c9573..e1f93b975b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -20,31 +20,8 @@ */ package org.apache.qpid.test.client.destination; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.naming.Context; -import javax.naming.InitialContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; @@ -58,8 +35,17 @@ import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.messaging.Address; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.transport.ExecutionErrorCode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.Properties; public class AddressBasedDestinationTest extends QpidBrokerTestCase { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java index 5b01c28fcc..b82c3756f2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java @@ -21,9 +21,11 @@ package org.apache.qpid.test.client.failover; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.test.utils.FailoverBaseCase; import javax.jms.Connection; import javax.jms.JMSException; @@ -34,11 +36,9 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.NamingException; - -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.test.utils.FailoverBaseCase; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class FailoverTest extends FailoverBaseCase implements ConnectionListener { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java index 2d326d73b8..5b350d2d89 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.test.client.message; -import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.test.utils.JMXTestUtils; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java new file mode 100644 index 0000000000..fe8180d6c6 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java @@ -0,0 +1,169 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.client.message; + +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TemporaryQueue; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Tests that {@link Message#setJMSReplyTo(Destination)} can be used to pass a {@link Destination} between + * messaging clients as is commonly used in request/response messaging pattern implementations. + */ +public class JMSReplyToTest extends QpidBrokerTestCase +{ + private AtomicReference<Throwable> _caughtException = new AtomicReference<Throwable>(); + private Queue _requestQueue; + private Connection _connection; + private Session _session; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _requestQueue = startAsyncRespondingJmsConsumerOnSeparateConnection(); + + _connection = getConnection(); + _connection.start(); + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public void testRequestResponseUsingJmsReplyTo() throws Exception + { + final String responseQueueName = getTestQueueName() + ".response"; + Queue replyToQueue = _session.createQueue(responseQueueName); + sendRequestAndValidateResponse(replyToQueue); + } + + public void testRequestResponseUsingTemporaryJmsReplyTo() throws Exception + { + TemporaryQueue replyToQueue = _session.createTemporaryQueue(); + + sendRequestAndValidateResponse(replyToQueue); + } + + private void sendRequestAndValidateResponse(Queue replyToQueue) throws JMSException, Exception + { + MessageConsumer replyConsumer = _session.createConsumer(replyToQueue); + + Message requestMessage = createRequestMessageWithJmsReplyTo(_session, replyToQueue); + sendRequest(_requestQueue, _session, requestMessage); + + receiveAndValidateResponse(replyConsumer, requestMessage); + + assertNull("Async responder caught unexpected exception", _caughtException.get()); + } + + private Message createRequestMessageWithJmsReplyTo(Session session, Queue replyToQueue) + throws JMSException + { + Message requestMessage = session.createTextMessage("My request"); + requestMessage.setJMSReplyTo(replyToQueue); + return requestMessage; + } + + private void sendRequest(final Queue requestQueue, Session session, Message requestMessage) throws Exception + { + MessageProducer producer = session.createProducer(requestQueue); + producer.send(requestMessage); + } + + private void receiveAndValidateResponse(MessageConsumer replyConsumer, Message requestMessage) throws JMSException + { + Message responseMessage = replyConsumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Response message not received", responseMessage); + assertEquals("Correlation id of the response should match message id of the request", + responseMessage.getJMSCorrelationID(), requestMessage.getJMSMessageID()); + } + + private Queue startAsyncRespondingJmsConsumerOnSeparateConnection() throws Exception + { + final String requestQueueName = getTestQueueName() + ".request"; + final Connection responderConnection = getConnection(); + responderConnection.start(); + final Session responderSession = responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue requestQueue = responderSession.createQueue(requestQueueName); + + final MessageConsumer requestConsumer = responderSession.createConsumer(requestQueue); + requestConsumer.setMessageListener(new AsyncResponder(responderSession)); + + return requestQueue; + } + + private final class AsyncResponder implements MessageListener + { + private final Session _responderSession; + + private AsyncResponder(Session responderSession) + { + _responderSession = responderSession; + } + + @Override + public void onMessage(Message requestMessage) + { + try + { + Destination replyTo = getReplyToQueue(requestMessage); + + Message responseMessage = _responderSession.createMessage(); + responseMessage.setJMSCorrelationID(requestMessage.getJMSMessageID()); + + sendResponseToQueue(replyTo, responseMessage); + } + catch (Throwable t) + { + _caughtException.set(t); + } + } + + private Destination getReplyToQueue(Message requestMessage) throws JMSException, IllegalStateException + { + Destination replyTo = requestMessage.getJMSReplyTo(); + if (replyTo == null) + { + throw new IllegalStateException("JMSReplyTo was null on message " + requestMessage); + } + return replyTo; + } + + private void sendResponseToQueue(Destination replyTo, Message responseMessage) + throws JMSException + { + MessageProducer responseProducer = _responderSession.createProducer(replyTo); + responseProducer.send(responseMessage); + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java index 1071861d47..dc1f690b1e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java @@ -20,14 +20,12 @@ */ package org.apache.qpid.test.client.message; -import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.BytesMessage; import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; @@ -49,7 +47,7 @@ public class MessageToStringTest extends QpidBrokerTestCase private Connection _connection; private Session _session; private Queue _queue; - MessageConsumer _consumer; + private MessageConsumer _consumer; private static final String BYTE_TEST = "MapByteTest"; public void setUp() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java index fa16152b69..3bd2c4a44e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java @@ -20,10 +20,9 @@ */ package org.apache.qpid.test.client.message; -import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.JMSException; @@ -38,8 +37,8 @@ public class ObjectMessageTest extends QpidBrokerTestCase { private Connection _connection; private Session _session; - MessageConsumer _consumer; - MessageProducer _producer; + private MessageConsumer _consumer; + private MessageProducer _producer; public void setUp() throws Exception { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java index b1c8b5682f..2c7f426306 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java @@ -20,7 +20,16 @@ */ package org.apache.qpid.test.client.message; -import java.util.concurrent.CountDownLatch; +import junit.framework.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.BasicMessageProducer; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -32,18 +41,7 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; - -import junit.framework.Assert; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.BasicMessageProducer; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.CountDownLatch; public class SelectorTest extends QpidBrokerTestCase implements MessageListener { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java index 14fbd1deb6..51566403b3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.test.client.queue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; @@ -28,11 +33,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class LVQTest extends QpidBrokerTestCase { private static final Logger _logger = LoggerFactory.getLogger(LVQTest.class); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java index e7c3fad27d..b785326ef2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.test.client.queue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageConsumer; @@ -27,12 +34,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class QueuePolicyTest extends QpidBrokerTestCase { private static final Logger _logger = LoggerFactory.getLogger(QueuePolicyTest.class); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java index 85565a33b0..ee81e7c372 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.test.client.timeouts; -import java.io.File; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.JMSException; @@ -30,13 +33,6 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * This tests that when the commit takes a long time(due to POST_COMMIT_DELAY) that the commit does not timeout * This test must be run in conjunction with SyncWaiteTimeoutDelay or be run with POST_COMMIT_DELAY > 30s to ensure diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java index 6189c37306..28467231ed 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.test.client.timeouts; -import org.apache.log4j.Level; import org.apache.log4j.Logger; + import org.apache.qpid.AMQTimeoutException; import javax.jms.JMSException; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CauseFailureUserPrompt.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CauseFailureUserPrompt.java index 889df4ad07..118ddff48b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CauseFailureUserPrompt.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CauseFailureUserPrompt.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.test.framework; -import org.apache.qpid.test.framework.CauseFailure; - import java.io.IOException; /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEnd.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEnd.java index 824edd7022..ffde385cfc 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEnd.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEnd.java @@ -20,7 +20,11 @@ */ package org.apache.qpid.test.framework; -import javax.jms.*; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; /** * A CircuitEnd is a pair consisting of one message producer and one message consumer, that represents one end of a diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEndBase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEndBase.java index d5a33514df..5730752eae 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEndBase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEndBase.java @@ -20,7 +20,11 @@ */ package org.apache.qpid.test.framework; -import javax.jms.*; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; /** * A CircuitEndBase is a pair consisting of one message producer and one message consumer, that represents one end of a diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java index 7d06aba1c0..afb7b5bc5b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java @@ -38,7 +38,6 @@ import org.apache.log4j.Logger; import javax.jms.ExceptionListener; import javax.jms.JMSException; - import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; @@ -59,7 +58,7 @@ public class ExceptionMonitor implements ExceptionListener private final Logger log = Logger.getLogger(ExceptionMonitor.class); /** Holds the received exceptions. */ - List<Exception> exceptions = new ArrayList<Exception>(); + private List<Exception> exceptions = new ArrayList<Exception>(); /** * Receives incoming exceptions. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkBaseCase.java index f866cd572f..ecbb710a6b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkBaseCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkBaseCase.java @@ -23,14 +23,12 @@ package org.apache.qpid.test.framework; import org.apache.log4j.Logger; import org.apache.log4j.NDC; -import org.apache.qpid.test.framework.BrokerLifecycleAware; -import org.apache.qpid.test.framework.sequencers.CircuitFactory; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - import org.apache.qpid.junit.extensions.SetupTaskAware; import org.apache.qpid.junit.extensions.SetupTaskHandler; import org.apache.qpid.junit.extensions.util.ParsedProperties; import org.apache.qpid.junit.extensions.util.TestContextProperties; +import org.apache.qpid.test.framework.sequencers.CircuitFactory; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import java.util.ArrayList; import java.util.List; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkClientBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkClientBaseCase.java deleted file mode 100644 index 2322955253..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkClientBaseCase.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.framework; - -/** - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> - * </table> - */ -public class FrameworkClientBaseCase -{ -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/LocalAMQPCircuitFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/LocalAMQPCircuitFactory.java index 4c8f301d1c..899a808bdd 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/LocalAMQPCircuitFactory.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/LocalAMQPCircuitFactory.java @@ -23,12 +23,17 @@ package org.apache.qpid.test.framework; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.junit.extensions.util.ParsedProperties; import org.apache.qpid.test.framework.localcircuit.LocalAMQPPublisherImpl; import org.apache.qpid.test.framework.localcircuit.LocalPublisherImpl; -import org.apache.qpid.junit.extensions.util.ParsedProperties; - -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; /** * LocalAMQPCircuitFactory is a test sequencer that creates test circuits with publishing and receiving ends rooted diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/LocalCircuitFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/LocalCircuitFactory.java index ec70759cf7..b8fd4cc7e7 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/LocalCircuitFactory.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/LocalCircuitFactory.java @@ -22,16 +22,20 @@ package org.apache.qpid.test.framework; import org.apache.log4j.Logger; +import org.apache.qpid.junit.extensions.util.ParsedProperties; import org.apache.qpid.test.framework.localcircuit.LocalCircuitImpl; import org.apache.qpid.test.framework.localcircuit.LocalPublisherImpl; import org.apache.qpid.test.framework.localcircuit.LocalReceiverImpl; import org.apache.qpid.test.framework.sequencers.CircuitFactory; import org.apache.qpid.test.utils.ConversationFactory; -import org.apache.qpid.junit.extensions.util.ParsedProperties; - -import javax.jms.*; - +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java index 3fac969369..5265c0416f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java @@ -24,7 +24,6 @@ import org.apache.log4j.Logger; import javax.jms.Message; import javax.jms.MessageListener; - import java.util.concurrent.atomic.AtomicInteger; /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/MessagingTestConfigProperties.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/MessagingTestConfigProperties.java index 6d72402018..ceece2dae2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/MessagingTestConfigProperties.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/MessagingTestConfigProperties.java @@ -23,7 +23,6 @@ package org.apache.qpid.test.framework; import org.apache.qpid.junit.extensions.util.ParsedProperties; import javax.jms.Session; - import java.util.Properties; /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/TestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/TestUtils.java index f1adeead80..919faa4754 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/TestUtils.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/TestUtils.java @@ -22,15 +22,23 @@ package org.apache.qpid.test.framework; import org.apache.log4j.Logger; -import static org.apache.qpid.test.framework.MessagingTestConfigProperties.*; - import org.apache.qpid.junit.extensions.util.ParsedProperties; -import javax.jms.*; +import static org.apache.qpid.test.framework.MessagingTestConfigProperties.BROKER_PROPNAME; +import static org.apache.qpid.test.framework.MessagingTestConfigProperties.CONNECTION_NAME; +import static org.apache.qpid.test.framework.MessagingTestConfigProperties.PASSWORD_PROPNAME; +import static org.apache.qpid.test.framework.MessagingTestConfigProperties.USERNAME_PROPNAME; +import static org.apache.qpid.test.framework.MessagingTestConfigProperties.VIRTUAL_HOST_PROPNAME; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; - import java.util.Map; /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java index 8bce752f68..8ee8d82636 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java @@ -23,7 +23,11 @@ package org.apache.qpid.test.framework.clocksynch; import org.apache.qpid.junit.extensions.ShutdownHookable; import java.io.IOException; -import java.net.*; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketException; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java index c89112eff8..226c84611d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java @@ -24,7 +24,12 @@ import org.apache.qpid.junit.extensions.util.CommandLineParser; import org.apache.qpid.junit.extensions.util.ParsedProperties; import java.io.IOException; -import java.net.*; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Arrays; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java index f375eda4d1..6c950fc307 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java @@ -22,18 +22,21 @@ package org.apache.qpid.test.framework.distributedcircuit; import org.apache.log4j.Logger; -import org.apache.qpid.test.framework.*; -import org.apache.qpid.test.utils.ConversationFactory; - import org.apache.qpid.junit.extensions.TimingController; import org.apache.qpid.junit.extensions.TimingControllerAware; import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.test.framework.Assertion; +import org.apache.qpid.test.framework.Circuit; +import org.apache.qpid.test.framework.Publisher; +import org.apache.qpid.test.framework.Receiver; +import org.apache.qpid.test.framework.TestClientDetails; +import org.apache.qpid.test.framework.TestUtils; +import org.apache.qpid.test.utils.ConversationFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; - import java.util.LinkedList; import java.util.List; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedPublisherImpl.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedPublisherImpl.java index c51f710494..130e908b0e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedPublisherImpl.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedPublisherImpl.java @@ -20,11 +20,10 @@ */ package org.apache.qpid.test.framework.distributedcircuit; +import org.apache.qpid.junit.extensions.util.ParsedProperties; import org.apache.qpid.test.framework.Assertion; import org.apache.qpid.test.framework.Publisher; -import org.apache.qpid.junit.extensions.util.ParsedProperties; - /** * DistributedPublisherImpl represents the status of the publishing side of a test circuit. Its main purpose is to * provide assertions that can be applied to verify the behaviour of a non-local publisher. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedReceiverImpl.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedReceiverImpl.java index 863921e387..4b801e7b66 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedReceiverImpl.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedReceiverImpl.java @@ -20,11 +20,10 @@ */ package org.apache.qpid.test.framework.distributedcircuit; +import org.apache.qpid.junit.extensions.util.ParsedProperties; import org.apache.qpid.test.framework.Assertion; import org.apache.qpid.test.framework.Receiver; -import org.apache.qpid.junit.extensions.util.ParsedProperties; - /** * DistributedReceiverImpl represents the status of the receiving side of a test circuit. Its main purpose is to * provide assertions that can be applied to verify the behaviour of a non-local receiver. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java index dce2706bc4..09bcf24da5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java @@ -22,13 +22,22 @@ package org.apache.qpid.test.framework.distributedcircuit; import org.apache.log4j.Logger; -import org.apache.qpid.test.framework.*; -import org.apache.qpid.test.framework.distributedtesting.TestClientControlledTest; - import org.apache.qpid.junit.extensions.util.ParsedProperties; import org.apache.qpid.junit.extensions.util.TestContextProperties; +import org.apache.qpid.test.framework.CircuitEnd; +import org.apache.qpid.test.framework.ExceptionMonitor; +import org.apache.qpid.test.framework.LocalCircuitFactory; +import org.apache.qpid.test.framework.MessageMonitor; +import org.apache.qpid.test.framework.MessagingTestConfigProperties; +import org.apache.qpid.test.framework.TestUtils; +import org.apache.qpid.test.framework.distributedtesting.TestClientControlledTest; -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; /** * A TestClientCircuitEnd is a {@link CircuitEnd} that may be controlled from a diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java index d532109dc3..e07b141cb5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java @@ -20,24 +20,11 @@ */ package org.apache.qpid.test.framework.distributedtesting; -import java.net.InetAddress; -import java.util.*; -import java.util.concurrent.LinkedBlockingQueue; - -import javax.jms.*; - import junit.framework.Test; import junit.framework.TestResult; import junit.framework.TestSuite; - import org.apache.log4j.Logger; import org.apache.log4j.NDC; -import org.apache.qpid.test.framework.FrameworkBaseCase; -import org.apache.qpid.test.framework.MessagingTestConfigProperties; -import org.apache.qpid.test.framework.TestClientDetails; -import org.apache.qpid.test.framework.TestUtils; -import org.apache.qpid.test.framework.clocksynch.UDPClockReference; -import org.apache.qpid.test.utils.ConversationFactory; import org.apache.qpid.junit.extensions.TKTestRunner; import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; @@ -45,6 +32,26 @@ import org.apache.qpid.junit.extensions.util.CommandLineParser; import org.apache.qpid.junit.extensions.util.MathUtils; import org.apache.qpid.junit.extensions.util.ParsedProperties; import org.apache.qpid.junit.extensions.util.TestContextProperties; +import org.apache.qpid.test.framework.FrameworkBaseCase; +import org.apache.qpid.test.framework.MessagingTestConfigProperties; +import org.apache.qpid.test.framework.TestClientDetails; +import org.apache.qpid.test.framework.TestUtils; +import org.apache.qpid.test.framework.clocksynch.UDPClockReference; +import org.apache.qpid.test.utils.ConversationFactory; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; /** * <p/>Implements the coordinator client described in the interop testing specification diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/DistributedTestDecorator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/DistributedTestDecorator.java index bdcfc996d6..49a01d3127 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/DistributedTestDecorator.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/DistributedTestDecorator.java @@ -21,22 +21,20 @@ package org.apache.qpid.test.framework.distributedtesting; import junit.framework.TestResult; - import org.apache.log4j.Logger; +import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; import org.apache.qpid.test.framework.FrameworkBaseCase; import org.apache.qpid.test.framework.TestClientDetails; import org.apache.qpid.test.framework.sequencers.CircuitFactory; import org.apache.qpid.test.utils.ConversationFactory; -import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; - import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; - -import java.util.*; +import java.util.Collection; +import java.util.Set; /** * DistributedTestDecorator is a base class for writing test decorators that invite test clients to participate in diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java index eed9b1f290..809bb1dd2f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java @@ -22,9 +22,9 @@ package org.apache.qpid.test.framework.distributedtesting; import junit.framework.Test; import junit.framework.TestResult; - import org.apache.log4j.Logger; +import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; import org.apache.qpid.test.framework.DropInTest; import org.apache.qpid.test.framework.FrameworkBaseCase; import org.apache.qpid.test.framework.TestClientDetails; @@ -32,13 +32,10 @@ import org.apache.qpid.test.framework.sequencers.CircuitFactory; import org.apache.qpid.test.framework.sequencers.FanOutCircuitFactory; import org.apache.qpid.test.utils.ConversationFactory; -import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; - import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; - import java.util.Iterator; import java.util.Set; @@ -60,7 +57,7 @@ public class FanOutTestDecorator extends DistributedTestDecorator implements Mes private static final Logger log = Logger.getLogger(FanOutTestDecorator.class); /** Holds the currently running test case. */ - FrameworkBaseCase currentTest = null; + private FrameworkBaseCase currentTest = null; /** * Creates a wrapped suite test decorator from another one. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropTestDecorator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropTestDecorator.java index 413d5558f2..dd5007090b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropTestDecorator.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropTestDecorator.java @@ -22,20 +22,21 @@ package org.apache.qpid.test.framework.distributedtesting; import junit.framework.Test; import junit.framework.TestResult; - import org.apache.log4j.Logger; +import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; import org.apache.qpid.test.framework.FrameworkBaseCase; import org.apache.qpid.test.framework.TestClientDetails; import org.apache.qpid.test.framework.sequencers.CircuitFactory; import org.apache.qpid.test.framework.sequencers.InteropCircuitFactory; import org.apache.qpid.test.utils.ConversationFactory; -import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; - import javax.jms.Connection; - -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; /** * DistributedTestDecorator is a test decorator, written to implement the interop test specification. Given a list diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/OptOutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/OptOutTestCase.java index 008b89a981..229c6a34da 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/OptOutTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/OptOutTestCase.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.test.framework.distributedtesting; -import org.apache.qpid.test.framework.sequencers.CircuitFactory; import org.apache.qpid.test.framework.FrameworkBaseCase; +import org.apache.qpid.test.framework.sequencers.CircuitFactory; /** * An OptOutTestCase is a test case that automatically fails. It is used when a list of test clients has been generated diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java index 33770363ce..f9b8cbb898 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java @@ -23,6 +23,9 @@ package org.apache.qpid.test.framework.distributedtesting; import org.apache.log4j.Logger; import org.apache.log4j.NDC; +import org.apache.qpid.junit.extensions.SleepThrottle; +import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.junit.extensions.util.TestContextProperties; import org.apache.qpid.test.framework.MessagingTestConfigProperties; import org.apache.qpid.test.framework.TestUtils; import org.apache.qpid.test.framework.clocksynch.ClockSynchThread; @@ -30,13 +33,21 @@ import org.apache.qpid.test.framework.clocksynch.UDPClockSynchronizer; import org.apache.qpid.test.utils.ReflectionUtils; import org.apache.qpid.test.utils.ReflectionUtilsException; -import org.apache.qpid.junit.extensions.SleepThrottle; -import org.apache.qpid.junit.extensions.util.ParsedProperties; -import org.apache.qpid.junit.extensions.util.TestContextProperties; - -import javax.jms.*; - -import java.util.*; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; /** * Implements a test client as described in the interop testing spec diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClientControlledTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClientControlledTest.java index 30fd382333..2e64c9ca2d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClientControlledTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClientControlledTest.java @@ -22,7 +22,6 @@ package org.apache.qpid.test.framework.distributedtesting; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.MessageListener; import javax.jms.Session; /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/listeners/XMLTestListener.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/listeners/XMLTestListener.java index c79029c99a..ad2c196adb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/listeners/XMLTestListener.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/listeners/XMLTestListener.java @@ -23,7 +23,6 @@ package org.apache.qpid.test.framework.listeners; import junit.framework.AssertionFailedError; import junit.framework.Test; import junit.framework.TestCase; - import org.apache.log4j.Logger; import org.apache.qpid.junit.extensions.ShutdownHookable; @@ -32,7 +31,13 @@ import org.apache.qpid.junit.extensions.listeners.TKTestListener; import java.io.IOException; import java.io.PrintWriter; import java.io.Writer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; /** * Listens for test results for a named test and outputs these in the standard JUnit XML format to the specified diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalAMQPPublisherImpl.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalAMQPPublisherImpl.java index 4388c7fbd8..6a0e8cba4b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalAMQPPublisherImpl.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalAMQPPublisherImpl.java @@ -22,9 +22,13 @@ package org.apache.qpid.test.framework.localcircuit; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQNoRouteException; -import org.apache.qpid.test.framework.*; - import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.test.framework.AMQPPublisher; +import org.apache.qpid.test.framework.Assertion; +import org.apache.qpid.test.framework.AssertionBase; +import org.apache.qpid.test.framework.CircuitEndBase; +import org.apache.qpid.test.framework.ExceptionMonitor; +import org.apache.qpid.test.framework.MessageMonitor; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java index 391091266c..dc9ee0ac28 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalCircuitImpl.java @@ -22,12 +22,19 @@ package org.apache.qpid.test.framework.localcircuit; import org.apache.log4j.Logger; -import org.apache.qpid.test.framework.*; - import org.apache.qpid.junit.extensions.util.ParsedProperties; - -import javax.jms.*; - +import org.apache.qpid.test.framework.Assertion; +import org.apache.qpid.test.framework.Circuit; +import org.apache.qpid.test.framework.CircuitEnd; +import org.apache.qpid.test.framework.ExceptionMonitor; +import org.apache.qpid.test.framework.MessagingTestConfigProperties; +import org.apache.qpid.test.framework.Publisher; +import org.apache.qpid.test.framework.Receiver; +import org.apache.qpid.test.framework.TestUtils; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; import java.util.LinkedList; import java.util.List; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalPublisherImpl.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalPublisherImpl.java index 3ec3f62538..9920be003a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalPublisherImpl.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalPublisherImpl.java @@ -20,9 +20,15 @@ */ package org.apache.qpid.test.framework.localcircuit; -import org.apache.qpid.test.framework.*; - import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.test.framework.Assertion; +import org.apache.qpid.test.framework.AssertionBase; +import org.apache.qpid.test.framework.CircuitEnd; +import org.apache.qpid.test.framework.CircuitEndBase; +import org.apache.qpid.test.framework.ExceptionMonitor; +import org.apache.qpid.test.framework.MessageMonitor; +import org.apache.qpid.test.framework.NotApplicableAssertion; +import org.apache.qpid.test.framework.Publisher; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalReceiverImpl.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalReceiverImpl.java index 74f414c974..bb242faf90 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalReceiverImpl.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/localcircuit/LocalReceiverImpl.java @@ -20,9 +20,14 @@ */ package org.apache.qpid.test.framework.localcircuit; -import org.apache.qpid.test.framework.*; - import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.test.framework.Assertion; +import org.apache.qpid.test.framework.CircuitEnd; +import org.apache.qpid.test.framework.CircuitEndBase; +import org.apache.qpid.test.framework.ExceptionMonitor; +import org.apache.qpid.test.framework.MessageMonitor; +import org.apache.qpid.test.framework.NotApplicableAssertion; +import org.apache.qpid.test.framework.Receiver; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/CircuitFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/CircuitFactory.java index e69952918d..9b5d40fd48 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/CircuitFactory.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/CircuitFactory.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.test.framework.sequencers; +import org.apache.qpid.junit.extensions.util.ParsedProperties; import org.apache.qpid.test.framework.Assertion; import org.apache.qpid.test.framework.Circuit; import org.apache.qpid.test.framework.TestClientDetails; import org.apache.qpid.test.utils.ConversationFactory; -import org.apache.qpid.junit.extensions.util.ParsedProperties; - import javax.jms.Connection; import java.util.List; import java.util.Properties; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/FanOutCircuitFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/FanOutCircuitFactory.java index 8a9c48d8e7..833f5fb674 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/FanOutCircuitFactory.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/FanOutCircuitFactory.java @@ -22,6 +22,7 @@ package org.apache.qpid.test.framework.sequencers; import org.apache.log4j.Logger; +import org.apache.qpid.junit.extensions.util.ParsedProperties; import org.apache.qpid.test.framework.Assertion; import org.apache.qpid.test.framework.Circuit; import org.apache.qpid.test.framework.TestClientDetails; @@ -29,10 +30,11 @@ import org.apache.qpid.test.framework.TestUtils; import org.apache.qpid.test.framework.distributedcircuit.DistributedCircuitImpl; import org.apache.qpid.test.utils.ConversationFactory; -import org.apache.qpid.junit.extensions.util.ParsedProperties; - -import javax.jms.*; - +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; import java.util.LinkedList; import java.util.List; import java.util.Properties; @@ -63,7 +65,7 @@ import java.util.Properties; public class FanOutCircuitFactory extends BaseCircuitFactory { /** Used for debugging. */ - Logger log = Logger.getLogger(FanOutCircuitFactory.class); + private Logger log = Logger.getLogger(FanOutCircuitFactory.class); /** * Creates a test circuit for the test, configered by the test parameters specified. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/InteropCircuitFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/InteropCircuitFactory.java index 7df80bbf10..a4c6888d68 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/InteropCircuitFactory.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/sequencers/InteropCircuitFactory.java @@ -22,6 +22,7 @@ package org.apache.qpid.test.framework.sequencers; import org.apache.log4j.Logger; +import org.apache.qpid.junit.extensions.util.ParsedProperties; import org.apache.qpid.test.framework.Assertion; import org.apache.qpid.test.framework.Circuit; import org.apache.qpid.test.framework.TestClientDetails; @@ -29,10 +30,11 @@ import org.apache.qpid.test.framework.TestUtils; import org.apache.qpid.test.framework.distributedcircuit.DistributedCircuitImpl; import org.apache.qpid.test.utils.ConversationFactory; -import org.apache.qpid.junit.extensions.util.ParsedProperties; - -import javax.jms.*; - +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; import java.util.LinkedList; import java.util.List; import java.util.Properties; @@ -56,7 +58,7 @@ import java.util.Properties; public class InteropCircuitFactory extends BaseCircuitFactory { /** Used for debugging. */ - Logger log = Logger.getLogger(InteropCircuitFactory.class); + private Logger log = Logger.getLogger(InteropCircuitFactory.class); /** * Creates a test circuit for the test, configered by the test parameters specified. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/FailoverTest.java deleted file mode 100644 index a5a0d4e41f..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/FailoverTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.testcases; - -import org.apache.qpid.test.framework.*; -import static org.apache.qpid.test.framework.MessagingTestConfigProperties.*; -import org.apache.qpid.test.framework.localcircuit.LocalCircuitImpl; -import org.apache.qpid.test.framework.sequencers.CircuitFactory; - -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; - -/** - * FailoverTest provides testing of fail-over over a local-circuit implementation. The circuit being tested may be - * against an in-vm broker or against an external broker, with the failure mechanism abstracted out of the test case. - * Automatic failures can be simulated against an in-vm broker. Currently the test must interact with the user to - * simulate failures on an external broker. - * - * Things to test: - * In tx, failure duing tx causes tx to error on subsequent sends/receives or commits/rollbacks. - * Outside of tx, reconnection allows msg flow to continue but there may be loss. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> - * </table> - * - * @todo This test is designed to be run over a local circuit only. For in-vm using automatic failures, for external - * brokers by prompting the user (or maybe using a script). Enforce the local-circuit only nature of the tests as - * well as thinking about how other local-circuit tests might be implemented. For example, could add a method - * to the framework base case for local only tests to call, that allows them access to the local-circuit - * implementation and so on. - * - * @todo More. Need to really expand the set of fail-over tests. - */ -public class FailoverTest extends FrameworkBaseCase -{ - /* Used for debugging purposes. */ - // private static final Logger log = Logger.getLogger(FailoverTest.class); - - /** - * Creates a new test case with the specified name. - * - * @param name The test case name. - */ - public FailoverTest(String name) - { - super(name); - } - - /** - * Checks that all messages sent within a transaction are receieved despite a fail-over occuring outside of - * the transaction. - * - * @throws JMSException Allowed to fall through and fail test. - */ - public void testTxP2PFailover() throws Exception - { - // Set up the test properties to match the test cases requirements. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - getTestProps().setProperty(ACK_MODE_PROPNAME, Session.AUTO_ACKNOWLEDGE); - getTestProps().setProperty(PUBSUB_PROPNAME, false); - - // MessagingTestConfigProperties props = this.getTestParameters(); - - // Create the test circuit from the test configuration parameters. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - // Create an assertion that all messages are received. - Assertion allMessagesReceived = testCircuit.getReceiver().allMessagesReceivedAssertion(getTestProps()); - - // This test case assumes it is using a local circuit. - LocalCircuitImpl localCircuit = (LocalCircuitImpl) testCircuit; - - Session producerSession = localCircuit.getLocalPublisherCircuitEnd().getSession(); - MessageProducer producer = localCircuit.getLocalPublisherCircuitEnd().getProducer(); - // MessageConsumer consumer = localCircuit.getLocalReceiverCircuitEnd().getConsumer(); - - // Send some test messages. - for (int i = 0; i < 100; i++) - { - producer.send(TestUtils.createTestMessageOfSize(producerSession, 10)); - producerSession.commit(); - - // Cause a failover. - if (i == 50) - { - getFailureMechanism().causeFailure(); - } - - // Wait for the reconnection to complete. - } - - // Check that trying to send within the original transaction fails. - - // Check that all messages sent were received. - assertTrue("All messages sent were not received back again.", allMessagesReceived.apply()); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/ImmediateMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/ImmediateMessageTest.java deleted file mode 100644 index 3001211eae..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/ImmediateMessageTest.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.testcases; - -import org.apache.qpid.test.framework.AMQPPublisher; -import org.apache.qpid.test.framework.Circuit; -import org.apache.qpid.test.framework.FrameworkBaseCase; -import org.apache.qpid.test.framework.MessagingTestConfigProperties; -import static org.apache.qpid.test.framework.MessagingTestConfigProperties.*; -import org.apache.qpid.test.framework.sequencers.CircuitFactory; - -import org.apache.qpid.junit.extensions.util.TestContextProperties; - -/** - * ImmediateMessageTest tests for the desired behaviour of immediate messages. Immediate messages are a non-JMS - * feature. A message may be marked with an immediate delivery flag, which means that a consumer must be connected - * to receive the message, through a valid route, when it is sent, or when its transaction is committed in the case - * of transactional messaging. If this is not the case, the broker should return the message with a NO_CONSUMERS code. - * - * <p><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Check that an immediate message is sent succesfully not using transactions when a consumer is connected. - * <tr><td> Check that an immediate message is committed succesfully in a transaction when a consumer is connected. - * <tr><td> Check that an immediate message results in no consumers code, not using transactions, when a consumer is - * disconnected. - * <tr><td> Check that an immediate message results in no consumers code, in a transaction, when a consumer is - * disconnected. - * <tr><td> Check that an immediate message results in no route code, not using transactions, when no outgoing route is - * connected. - * <tr><td> Check that an immediate message results in no route code, upon transaction commit, when no outgoing route is - * connected. - * <tr><td> Check that an immediate message is sent succesfully not using transactions when a consumer is connected. - * <tr><td> Check that an immediate message is committed succesfully in a transaction when a consumer is connected. - * <tr><td> Check that an immediate message results in no consumers code, not using transactions, when a consumer is - * disconnected. - * <tr><td> Check that an immediate message results in no consumers code, in a transaction, when a consumer is - * disconnected. - * <tr><td> Check that an immediate message results in no route code, not using transactions, when no outgoing route is - * connected. - * <tr><td> Check that an immediate message results in no route code, upon transaction commit, when no outgoing route is - * connected. - * </table> - * - * @todo All of these test cases will be generated by a test generator that thoroughly tests all combinations of test - * circuits. - */ -public class ImmediateMessageTest extends FrameworkBaseCase -{ - /** - * Creates a new test case with the specified name. - * - * @param name The test case name. - */ - public ImmediateMessageTest(String name) - { - super(name); - } - - /** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */ - public void test_QPID_517_ImmediateOkNoTxP2P() throws Exception - { - // Ensure transactional sessions are off. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - getTestProps().setProperty(PUBSUB_PROPNAME, false); - - // Run the default test sequence over the test circuit checking for no errors. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noExceptionsAssertion(getTestProps())))); - } - - /** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */ - public void test_QPID_517_ImmediateOkTxP2P() throws Exception - { - // Ensure transactional sessions are off. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - getTestProps().setProperty(PUBSUB_PROPNAME, false); - - // Send one message with no errors. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noExceptionsAssertion(getTestProps())))); - } - - /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */ - public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P() throws Exception - { - // Ensure transactional sessions are off. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - getTestProps().setProperty(PUBSUB_PROPNAME, false); - - // Disconnect the consumer. - getTestProps().setProperty(RECEIVER_CONSUMER_ACTIVE_PROPNAME, false); - - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - // Send one message and get a linked no consumers exception. - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noConsumersAssertion(getTestProps())))); - } - - /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */ - public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P() throws Exception - { - // Ensure transactional sessions are on. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - getTestProps().setProperty(PUBSUB_PROPNAME, false); - - // Disconnect the consumer. - getTestProps().setProperty(RECEIVER_CONSUMER_ACTIVE_PROPNAME, false); - - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - // Send one message and get a linked no consumers exception. - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noConsumersAssertion(getTestProps())))); - } - - /** Check that an immediate message results in no route code, not using transactions, when no outgoing route is connected. */ - public void test_QPID_517_ImmediateFailsNoRouteNoTxP2P() throws Exception - { - // Ensure transactional sessions are off. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - getTestProps().setProperty(PUBSUB_PROPNAME, false); - - // Set up the messaging topology so that only the publishers producer is bound (do not set up the receivers to - // collect its messages). - getTestProps().setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); - - // Send one message and get a linked no route exception. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noRouteAssertion(getTestProps())))); - } - - /** Check that an immediate message results in no route code, upon transaction commit, when no outgoing route is connected. */ - public void test_QPID_517_ImmediateFailsNoRouteTxP2P() throws Exception - { - // Ensure transactional sessions are on. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - getTestProps().setProperty(PUBSUB_PROPNAME, false); - - // Set up the messaging topology so that only the publishers producer is bound (do not set up the receivers to - // collect its messages). - getTestProps().setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); - - // Send one message and get a linked no route exception. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noRouteAssertion(getTestProps())))); - } - - /** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */ - public void test_QPID_517_ImmediateOkNoTxPubSub() throws Exception - { - // Ensure transactional sessions are off. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - getTestProps().setProperty(PUBSUB_PROPNAME, true); - - // Send one message with no errors. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noExceptionsAssertion(getTestProps())))); - } - - /** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */ - public void test_QPID_517_ImmediateOkTxPubSub() throws Exception - { - // Ensure transactional sessions are off. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - getTestProps().setProperty(PUBSUB_PROPNAME, true); - - // Send one message with no errors. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noExceptionsAssertion(getTestProps())))); - } - - /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */ - public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxPubSub() throws Exception - { - // Ensure transactional sessions are off. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - getTestProps().setProperty(PUBSUB_PROPNAME, true); - - // Use durable subscriptions, so that the route remains open with no subscribers. - getTestProps().setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true); - - // Disconnect the consumer. - getTestProps().setProperty(RECEIVER_CONSUMER_ACTIVE_PROPNAME, false); - - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - // Send one message and get a linked no consumers exception. - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noConsumersAssertion(getTestProps())))); - } - - /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */ - public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxPubSub() throws Exception - { - // Ensure transactional sessions are on. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - getTestProps().setProperty(PUBSUB_PROPNAME, true); - - // Use durable subscriptions, so that the route remains open with no subscribers. - getTestProps().setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true); - - // Disconnect the consumer. - getTestProps().setProperty(RECEIVER_CONSUMER_ACTIVE_PROPNAME, false); - - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - // Send one message and get a linked no consumers exception. - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noConsumersAssertion(getTestProps())))); - } - - /** Check that an immediate message results in no route code, not using transactions, when no outgoing route is connected. */ - public void test_QPID_517_ImmediateFailsNoRouteNoTxPubSub() throws Exception - { - // Ensure transactional sessions are off. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - getTestProps().setProperty(PUBSUB_PROPNAME, true); - - // Set up the messaging topology so that only the publishers producer is bound (do not set up the receivers to - // collect its messages). - getTestProps().setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); - - // Send one message and get a linked no route exception. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noRouteAssertion(getTestProps())))); - } - - /** Check that an immediate message results in no route code, upon transaction commit, when no outgoing route is connected. */ - public void test_QPID_517_ImmediateFailsNoRouteTxPubSub() throws Exception - { - // Ensure transactional sessions are on. - getTestProps().setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - getTestProps().setProperty(PUBSUB_PROPNAME, true); - - // Set up the messaging topology so that only the publishers producer is bound (do not set up the receivers to - // collect its messages). - getTestProps().setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); - - // Send one message and get a linked no route exception. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noRouteAssertion(getTestProps())))); - } - - protected void setUp() throws Exception - { - super.setUp(); - - setTestProps(TestContextProperties.getInstance(MessagingTestConfigProperties.defaults)); - - /** All these tests should have the immediate flag on. */ - getTestProps().setProperty(IMMEDIATE_PROPNAME, true); - getTestProps().setProperty(MANDATORY_PROPNAME, false); - - /** Bind the receivers consumer by default. */ - getTestProps().setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, true); - getTestProps().setProperty(RECEIVER_CONSUMER_ACTIVE_PROPNAME, true); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/MandatoryMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/MandatoryMessageTest.java deleted file mode 100644 index b4c4eb91b4..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/MandatoryMessageTest.java +++ /dev/null @@ -1,321 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.testcases; - -import org.apache.qpid.test.framework.AMQPPublisher; -import org.apache.qpid.test.framework.Circuit; -import org.apache.qpid.test.framework.FrameworkBaseCase; -import org.apache.qpid.test.framework.MessagingTestConfigProperties; -import static org.apache.qpid.test.framework.MessagingTestConfigProperties.*; -import org.apache.qpid.test.framework.sequencers.CircuitFactory; - -import org.apache.qpid.junit.extensions.util.ParsedProperties; -import org.apache.qpid.junit.extensions.util.TestContextProperties; - -/** - * MandatoryMessageTest tests for the desired behaviour of mandatory messages. Mandatory messages are a non-JMS - * feature. A message may be marked with a mandatory delivery flag, which means that a valid route for the message - * must exist, when it is sent, or when its transaction is committed in the case of transactional messaging. If this - * is not the case, the broker should return the message with a NO_CONSUMERS code. - * - * <p><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. - * <tr><td> Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. - * <tr><td> Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected - * but the route exists. - * <tr><td> Check that a mandatory message is sent succesfully, in a transaction, when a consumer is disconnected but - * the route exists. - * <tr><td> Check that an mandatory message results in no route code, not using transactions, when no consumer is - * connected. - * <tr><td> Check that an mandatory message results in no route code, upon transaction commit, when a consumer is - * connected. - * <tr><td> Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. - * <tr><td> Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. - * <tr><td> Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected - * but the route exists. - * <tr><td> Check that a mandatory message is sent succesfully, in a transaction, when a consumer is disconnected but - * the route exists. - * <tr><td> Check that an mandatory message results in no route code, not using transactions, when no consumer is - * connected. - * <tr><td> Check that an mandatory message results in no route code, upon transaction commit, when a consumer is - * connected. - * </table> - * - * @todo All of these test cases will be generated by a test generator that thoroughly tests all combinations of test - * circuits. - */ -public class MandatoryMessageTest extends FrameworkBaseCase -{ - /** Used to read the tests configurable properties through. */ - ParsedProperties testProps; - - /** - * Creates a new test case with the specified name. - * - * @param name The test case name. - */ - public MandatoryMessageTest(String name) - { - super(name); - } - - /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */ - public void test_QPID_508_MandatoryOkNoTxP2P() throws Exception - { - // Ensure transactional sessions are off. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - testProps.setProperty(PUBSUB_PROPNAME, false); - - // Run the default test sequence over the test circuit checking for no errors. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noExceptionsAssertion(testProps)))); - } - - /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */ - public void test_QPID_508_MandatoryOkTxP2P() throws Exception - { - // Ensure transactional sessions are off. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - testProps.setProperty(PUBSUB_PROPNAME, false); - - // Run the default test sequence over the test circuit checking for no errors. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noExceptionsAssertion(testProps)))); - } - - /** - * Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected but - * the route exists. - */ - public void test_QPID_517_MandatoryOkConsumerDisconnectedNoTxP2P() throws Exception - { - // Ensure transactional sessions are off. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - testProps.setProperty(PUBSUB_PROPNAME, false); - - // Disconnect the consumer. - testProps.setProperty(RECEIVER_CONSUMER_ACTIVE_PROPNAME, false); - - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - // Send one message with no errors. - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noExceptionsAssertion(testProps)))); - } - - /** - * Check that a mandatory message is sent succesfully, in a transaction, when a consumer is disconnected but - * the route exists. - */ - public void test_QPID_517_MandatoryOkConsumerDisconnectedTxP2P() throws Exception - { - // Ensure transactional sessions are on. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - testProps.setProperty(PUBSUB_PROPNAME, false); - - // Disconnect the consumer. - testProps.setProperty(RECEIVER_CONSUMER_ACTIVE_PROPNAME, false); - - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - // Send one message with no errors. - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noExceptionsAssertion(testProps)))); - } - - /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */ - public void test_QPID_508_MandatoryFailsNoRouteNoTxP2P() throws Exception - { - // Ensure transactional sessions are off. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - testProps.setProperty(PUBSUB_PROPNAME, false); - - // Set up the messaging topology so that only the publishers producer is bound (do not set up the receivers to - // collect its messages). - testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); - - // Send one message and get a linked no route exception. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noRouteAssertion(testProps)))); - } - - /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */ - public void test_QPID_508_MandatoryFailsNoRouteTxP2P() throws Exception - { - // Ensure transactional sessions are on. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - testProps.setProperty(PUBSUB_PROPNAME, false); - - // Set up the messaging topology so that only the publishers producer is bound (do not set up the receivers to - // collect its messages). - testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); - - // Send one message and get a linked no route exception. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noRouteAssertion(testProps)))); - } - - /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */ - public void test_QPID_508_MandatoryOkNoTxPubSub() throws Exception - { - // Ensure transactional sessions are off. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - testProps.setProperty(PUBSUB_PROPNAME, true); - - // Run the default test sequence over the test circuit checking for no errors. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noExceptionsAssertion(testProps)))); - } - - /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */ - public void test_QPID_508_MandatoryOkTxPubSub() throws Exception - { - // Ensure transactional sessions are on. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - testProps.setProperty(PUBSUB_PROPNAME, true); - - // Run the default test sequence over the test circuit checking for no errors. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noExceptionsAssertion(testProps)))); - } - - /** - * Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected but - * the route exists. - */ - public void test_QPID_517_MandatoryOkConsumerDisconnectedNoTxPubSub() throws Exception - { - // Ensure transactional sessions are off. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - testProps.setProperty(PUBSUB_PROPNAME, true); - - // Use durable subscriptions, so that the route remains open with no subscribers. - testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true); - - // Disconnect the consumer. - testProps.setProperty(RECEIVER_CONSUMER_ACTIVE_PROPNAME, false); - - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - // Send one message with no errors. - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noExceptionsAssertion(testProps)))); - } - - /** - * Check that a mandatory message is sent succesfully, in a transaction, when a consumer is disconnected but - * the route exists. - */ - public void test_QPID_517_MandatoryOkConsumerDisconnectedTxPubSub() throws Exception - { - // Ensure transactional sessions are on. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - testProps.setProperty(PUBSUB_PROPNAME, true); - - // Use durable subscriptions, so that the route remains open with no subscribers. - testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true); - - // Disconnect the consumer. - testProps.setProperty(RECEIVER_CONSUMER_ACTIVE_PROPNAME, false); - - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - // Send one message with no errors. - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noExceptionsAssertion(testProps)))); - } - - /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */ - public void test_QPID_508_MandatoryFailsNoRouteNoTxPubSub() throws Exception - { - // Ensure transactional sessions are off. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - testProps.setProperty(PUBSUB_PROPNAME, true); - - // Set up the messaging topology so that only the publishers producer is bound (do not set up the receivers to - // collect its messages). - testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); - - // Send one message and get a linked no route exception. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noRouteAssertion(testProps)))); - } - - /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */ - public void test_QPID_508_MandatoryFailsNoRouteTxPubSub() throws Exception - { - // Ensure transactional sessions are on. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - testProps.setProperty(PUBSUB_PROPNAME, true); - - // Set up the messaging topology so that only the publishers producer is bound (do not set up the receivers to - // collect its messages). - testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); - - // Send one message and get a linked no route exception. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - assertNoFailures(testCircuit.test(1, - assertionList(((AMQPPublisher) testCircuit.getPublisher()).noRouteAssertion(testProps)))); - } - - protected void setUp() throws Exception - { - super.setUp(); - - testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); - - /** All these tests should have the mandatory flag on. */ - testProps.setProperty(IMMEDIATE_PROPNAME, false); - testProps.setProperty(MANDATORY_PROPNAME, true); - - /** Bind the receivers consumer by default. */ - testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, true); - testProps.setProperty(RECEIVER_CONSUMER_ACTIVE_PROPNAME, true); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/RollbackTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/RollbackTest.java deleted file mode 100644 index edcde796a8..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/RollbackTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.testcases; - -import org.apache.qpid.test.framework.Circuit; -import org.apache.qpid.test.framework.FrameworkBaseCase; -import org.apache.qpid.test.framework.MessagingTestConfigProperties; -import static org.apache.qpid.test.framework.MessagingTestConfigProperties.*; -import org.apache.qpid.test.framework.sequencers.CircuitFactory; - -import org.apache.qpid.junit.extensions.util.ParsedProperties; -import org.apache.qpid.junit.extensions.util.TestContextProperties; - -/** - * RollbackTest tests the rollback ability of transactional messaging. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Check messages sent but rolled back are never received. - * <tr><td> Check messages received but rolled back are redelivered on subsequent receives. - * <tr><td> Attempting to rollback outside of a transaction results in an IllegalStateException. - * </table> - */ -public class RollbackTest extends FrameworkBaseCase -{ - /** Used to read the tests configurable properties through. */ - private ParsedProperties testProps; - - /** - * Creates a new test case with the specified name. - * - * @param name The test case name. - */ - public RollbackTest(String name) - { - super(name); - } - - /** Check messages sent but rolled back are never received. */ - public void testRolledbackMessageNotDelivered() throws Exception - { - // Ensure transactional sessions are on. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, true); - testProps.setProperty(ROLLBACK_PUBLISHER_PROPNAME, true); - - // Run the default test sequence over the test circuit checking for no errors. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - assertNoFailures(testCircuit.test(1, - assertionList(testCircuit.getPublisher().noExceptionsAssertion(testProps), - testCircuit.getReceiver().noMessagesReceivedAssertion(testProps)))); - } - - /** Check messages received but rolled back are redelivered on subsequent receives. */ - public void testRolledbackMessagesSubsequentlyReceived() throws Exception - { - // Ensure transactional sessions are on. - testProps.setProperty(TRANSACTED_RECEIVER_PROPNAME, true); - testProps.setProperty(ROLLBACK_RECEIVER_PROPNAME, true); - - // Run the default test sequence over the test circuit checking for no errors. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - assertNoFailures(testCircuit.test(1, - assertionList(testCircuit.getPublisher().noExceptionsAssertion(testProps), - testCircuit.getReceiver().allMessagesReceivedAssertion(testProps)))); - } - - /** Attempting to rollback outside of a transaction results in an IllegalStateException. */ - public void testRollbackUnavailableOutsideTransactionPublisher() throws Exception - { - // Ensure transactional sessions are on. - testProps.setProperty(TRANSACTED_PUBLISHER_PROPNAME, false); - testProps.setProperty(ROLLBACK_PUBLISHER_PROPNAME, true); - - // Run the default test sequence over the test circuit checking for no errors. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().channelClosedAssertion(testProps)))); - } - - /** Attempting to rollback outside of a transaction results in an IllegalStateException. */ - public void testRollbackUnavailableOutsideTransactionReceiver() throws Exception - { - // Ensure transactional sessions are on. - testProps.setProperty(TRANSACTED_RECEIVER_PROPNAME, false); - testProps.setProperty(ROLLBACK_RECEIVER_PROPNAME, true); - - // Run the default test sequence over the test circuit checking for no errors. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), testProps); - - assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getReceiver().channelClosedAssertion(testProps)))); - } - - /** - * Sets up all tests to have an active outward route and consumer by default. - * - * @throws Exception Any exceptions are allowed to fall through and fail the test. - */ - protected void setUp() throws Exception - { - super.setUp(); - - testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); - - /** Bind the receivers consumer by default. */ - testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, true); - testProps.setProperty(RECEIVER_CONSUMER_ACTIVE_PROPNAME, true); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java index e6461c8267..841d0ea4ba 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java @@ -29,9 +29,9 @@ import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; -import javax.jms.MessageProducer; /** * Test the various JMS Acknowledge Modes the single testAcking method does all @@ -56,7 +56,6 @@ public class AcknowledgeTest extends QpidBrokerTestCase _queue = getTestQueue(); - _logger.info("AT: setup"); //Create Producer put some messages on the queue _connection = getConnection(); } @@ -70,13 +69,10 @@ public class AcknowledgeTest extends QpidBrokerTestCase // These should all end up being prefetched by session sendMessage(_consumerSession, _queue, 1); - if(!transacted) - { - ((AMQSession)_consumerSession).sync(); - } + syncIfNotTransacted(transacted); assertEquals("Wrong number of messages on queue", 1, - ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); + ((AMQSession<?,?>) _consumerSession).getQueueDepth((AMQDestination) _queue)); } /** @@ -114,6 +110,7 @@ public class AcknowledgeTest extends QpidBrokerTestCase { //Send the next message _producer.send(createNextMessage(_consumerSession, count)); + syncIfNotTransacted(transacted); } doAcknowlegement(msg); @@ -128,7 +125,7 @@ public class AcknowledgeTest extends QpidBrokerTestCase } assertEquals("Wrong number of messages on queue", 0, - ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); + ((AMQSession<?,?>) _consumerSession).getQueueDepth((AMQDestination) _queue)); } /** @@ -181,4 +178,11 @@ public class AcknowledgeTest extends QpidBrokerTestCase testAcking(false, AMQSession.PRE_ACKNOWLEDGE); } + private void syncIfNotTransacted(boolean transacted) throws Exception + { + if(!transacted) + { + ((AMQSession<?,?>)_consumerSession).sync(); + } + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java index 06be5cf456..291e1697ca 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java @@ -18,6 +18,8 @@ */ package org.apache.qpid.test.unit.ack; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -25,8 +27,6 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - public class ClientAcknowledgeTest extends QpidBrokerTestCase { private static final long ONE_DAY_MS = 1000l * 60 * 60 * 24; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 2fd3811cb4..23ea4ac258 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -19,15 +19,15 @@ */ package org.apache.qpid.test.unit.ack; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Session; -import org.apache.qpid.test.utils.FailoverBaseCase; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.jms.Connection; import javax.jms.Destination; @@ -38,7 +38,6 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.TextMessage; - import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -383,8 +382,8 @@ public class RecoverTest extends QpidBrokerTestCase cons.setMessageListener(new MessageListener() { - int messageSeen = 0; - int expectedIndex = 0; + private int messageSeen = 0; + private int expectedIndex = 0; public void onMessage(Message message) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java index 0731d56204..a121b39a56 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java @@ -20,7 +20,8 @@ package org.apache.qpid.test.unit.basic; import junit.framework.Assert; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; @@ -30,20 +31,17 @@ import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.transport.util.Waiter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; import javax.jms.MessageProducer; import javax.jms.Session; - import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; @@ -281,4 +279,46 @@ public class BytesMessageTest extends QpidBrokerTestCase implements MessageListe test._count = count; test.test(); } + + public void testModificationAfterSend() throws Exception + { + Connection connection = getConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + BytesMessage jmsMsg = session.createBytesMessage(); + Destination destination = getTestQueue(); + + /* Set the constant message contents. */ + + jmsMsg.setStringProperty("foo", "test"); + + /* Pre-populate the message body buffer to the target size. */ + byte[] jmsMsgBodyBuffer = new byte[1024]; + + connection.start(); + + /* Send messages. */ + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + for(int writtenMsgCount = 0; writtenMsgCount < 10; writtenMsgCount++) + { + /* Set the per send message contents. */ + jmsMsgBodyBuffer[0] = (byte) writtenMsgCount; + jmsMsg.writeBytes(jmsMsgBodyBuffer, 0, jmsMsgBodyBuffer.length); + /** Try to write a message. */ + producer.send(jmsMsg); + } + + + for(int writtenMsgCount = 0; writtenMsgCount < 10; writtenMsgCount++) + { + BytesMessage recvdMsg = (BytesMessage) consumer.receive(1000L); + assertNotNull("Expected to receive message " + writtenMsgCount + " but did not", recvdMsg); + assertEquals("Message "+writtenMsgCount+" not of expected size", (long) ((writtenMsgCount + 1)*1024), + recvdMsg.getBodyLength()); + + } + } + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java index 3af215d1d5..599c8061a7 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.test.unit.basic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -30,15 +33,11 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; - import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; @@ -56,7 +55,7 @@ public class FieldTableMessageTest extends QpidBrokerTestCase implements Message private final ArrayList<JMSBytesMessage> received = new ArrayList<JMSBytesMessage>(); private FieldTable _expected; private int _count = 10; - public String _connectionString = "vm://:1"; + private String _connectionString = "vm://:1"; private CountDownLatch _waitForCompletion; protected void setUp() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java index c9f6a22500..53f37cd915 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java @@ -25,12 +25,12 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import javax.jms.Session; -import javax.jms.QueueSession; +import javax.jms.InvalidDestinationException; import javax.jms.Queue; import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.Session; import javax.jms.TextMessage; -import javax.jms.InvalidDestinationException; public class InvalidDestinationTest extends QpidBrokerTestCase { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java index d97e22e024..7bd737ee53 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java @@ -21,14 +21,14 @@ package org.apache.qpid.test.unit.basic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MapMessageTest.java index 9f13ddcfdb..3f998938d9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MapMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MapMessageTest.java @@ -21,6 +21,8 @@ package org.apache.qpid.test.unit.basic; import junit.framework.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; @@ -28,9 +30,6 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSMapMessage; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; @@ -40,7 +39,6 @@ import javax.jms.MessageListener; import javax.jms.MessageNotWriteableException; import javax.jms.MessageProducer; import javax.jms.Session; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java index 87eae32cf8..3c26cbb3c9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java @@ -19,6 +19,9 @@ */ package org.apache.qpid.test.unit.basic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; @@ -26,9 +29,6 @@ import org.apache.qpid.client.AMQTopic; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -72,6 +72,11 @@ public class MultipleConnectionTest extends QpidBrokerTestCase { _connection.close(); } + + public MessageCounter[] getCounters() + { + return _counters; + } } private class Publisher @@ -151,7 +156,7 @@ public class MultipleConnectionTest extends QpidBrokerTestCase { for (int i = 0; i < receivers.length; i++) { - waitForCompletion(expected, wait, receivers[i]._counters); + waitForCompletion(expected, wait, receivers[i].getCounters()); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java index c8e7368092..10d53b7487 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java @@ -20,6 +20,8 @@ package org.apache.qpid.test.unit.basic; import junit.framework.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; @@ -28,15 +30,11 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSObjectMessage; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageNotWriteableException; import javax.jms.MessageProducer; - import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index 3b8b4946da..52213d15c4 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -20,20 +20,14 @@ */ package org.apache.qpid.test.unit.basic; -import junit.framework.Assert; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.message.JMSTextMessage; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.url.BindingURL; -import org.apache.qpid.url.AMQBindingURL; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import javax.jms.Connection; import javax.jms.Destination; @@ -45,30 +39,33 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.net.URISyntaxException; +import junit.framework.Assert; -import java.lang.reflect.*; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PropertyValueTest extends QpidBrokerTestCase implements MessageListener { private static final Logger _logger = LoggerFactory.getLogger(PropertyValueTest.class); - private int count = 0; private AMQConnection _connection; private Destination _destination; private AMQSession _session; private final List<JMSTextMessage> received = new ArrayList<JMSTextMessage>(); private final List<String> messages = new ArrayList<String>(); + private Map<String, Destination> _replyToDestinations; private int _count = 1; public String _connectionString = "vm://:1"; private static final String USERNAME = "guest"; protected void setUp() throws Exception { + _replyToDestinations = new HashMap<String, Destination>(); super.setUp(); } @@ -239,12 +236,11 @@ public class PropertyValueTest extends QpidBrokerTestCase implements MessageList } m.setJMSReplyTo(q); - m.setStringProperty("TempQueue", q.toString()); - _logger.debug("Message:" + m); + m.setStringProperty("ReplyToIndex", String.valueOf(i)); + _replyToDestinations.put(String.valueOf(i), q); - Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(), - m.getStringProperty("TempQueue")); + _logger.debug("Message:" + m); m.setJMSType("Test"); m.setLongProperty("UnsignedInt", (long) 4294967295L); @@ -292,8 +288,8 @@ public class PropertyValueTest extends QpidBrokerTestCase implements MessageList Assert.assertEquals("Check Priority properties are correctly transported", 8, m.getJMSPriority()); // Queue - Assert.assertEquals("Check ReplyTo properties are correctly transported", AMQDestination.createDestination(new AMQBindingURL(m.getStringProperty("TempQueue"))), - m.getJMSReplyTo()); + String replyToIndex = m.getStringProperty("ReplyToIndex"); + Assert.assertEquals("Check ReplyTo properties are correctly transported", _replyToDestinations.get(replyToIndex), m.getJMSReplyTo()); Assert.assertEquals("Check Type properties are correctly transported", "Test", m.getJMSType()); @@ -304,24 +300,7 @@ public class PropertyValueTest extends QpidBrokerTestCase implements MessageList Assert.assertEquals("Check Long properties are correctly transported", (long) Long.MAX_VALUE, m.getLongProperty("Long")); Assert.assertEquals("Check String properties are correctly transported", "Test", m.getStringProperty("String")); -/* - // AMQP Tests Specific values - - Assert.assertEquals("Check Timestamp properties are correctly transported", m.getStringProperty("time-str"), - ((AMQMessage) m).getTimestampProperty(new AMQShortString("time")).toString()); - - // Decimal - BigDecimal bd = new BigDecimal(Integer.MAX_VALUE); - - Assert.assertEquals("Check decimal properties are correctly transported", bd.setScale(Byte.MAX_VALUE), - ((AMQMessage) m).getDecimalProperty(new AMQShortString("decimal"))); - - // Void - ((AMQMessage) m).setVoidProperty(new AMQShortString("void")); - Assert.assertTrue("Check void properties are correctly transported", - ((AMQMessage) m).getPropertyHeaders().containsKey("void")); -*/ //JMSXUserID if (m.getStringProperty("JMSXUserID") != null) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java index c257dacf76..3ef8524656 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.test.unit.basic; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + import javax.jms.Connection; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -27,11 +32,6 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - /** * @author Apache Software Foundation */ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java index bc44617620..c764eda799 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.test.unit.basic; -import javax.jms.MessageConsumer; -import javax.jms.Message; - import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import javax.jms.Message; +import javax.jms.MessageConsumer; + public class ReceiveTest extends QpidBrokerTestCase { private AMQConnection _connection; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SessionStartTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SessionStartTest.java index ee837fd41a..cc64dbb125 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SessionStartTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SessionStartTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.test.unit.basic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -27,9 +30,6 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/TextMessageTest.java index a87de8ac0c..0a568d57ad 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/TextMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/TextMessageTest.java @@ -21,6 +21,8 @@ package org.apache.qpid.test.unit.basic; import junit.framework.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; @@ -29,9 +31,6 @@ import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -39,7 +38,6 @@ import javax.jms.MessageListener; import javax.jms.MessageNotWriteableException; import javax.jms.MessageProducer; import javax.jms.Session; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java index 3c7962d873..48d290c986 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java @@ -19,12 +19,13 @@ * */ package org.apache.qpid.test.unit.basic.close; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.client.AMQConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java index c33dde53b7..0d81b66be0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java @@ -20,16 +20,16 @@ */ package org.apache.qpid.test.unit.client; -import javax.jms.JMSException; -import javax.jms.QueueReceiver; -import javax.jms.TopicSubscriber; - import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import javax.jms.JMSException; +import javax.jms.QueueReceiver; +import javax.jms.TopicSubscriber; + /** * Tests for QueueReceiver and TopicSubscriber creation methods on AMQSession */ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java index e59dac8c01..fa0fe7e0b5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java @@ -20,10 +20,15 @@ */ package org.apache.qpid.test.unit.client; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.Destination; @@ -35,15 +40,10 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * Test that the MaxRedelivery feature works as expected, allowing the client to reject @@ -298,7 +298,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase consumer.close(); //check the source queue is now empty - assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue)); + assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue, true)); //check the DLQ has the required number of rejected-without-requeue messages verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub); @@ -341,7 +341,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase } assertEquals("The DLQ should have " + expected + " msgs on it", expected, - ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueueDLQ)); + ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueueDLQ, true)); } private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, String destName, boolean durableSub) throws JMSException diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java index ef90ab8ffe..370e44b3d5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.test.unit.client; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + import javax.jms.QueueConnection; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - /** * Ensures that queue specific session factory method {@link QueueConnection#createQueueSession()} create sessions * of type {@link QueueSession} and that those sessions correctly restrict the available JMS operations diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java index 6397f15e0a..ce15d452ab 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.test.unit.client; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + import javax.jms.Queue; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TopicConnection; import javax.jms.TopicSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - /** * Ensures that topic specific session factory method {@link TopicConnection#createTopicSession()} create sessions * of type {@link TopicSession} and that those sessions correctly restrict the available JMS operations diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index aae8b1feb9..1c9ee27b94 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -21,14 +21,13 @@ package org.apache.qpid.test.unit.client.channelclose; import junit.textui.TestRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; @@ -37,7 +36,6 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; - import java.util.ArrayList; import java.util.List; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 2e8a2d049d..c20eefd987 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -20,17 +20,23 @@ */ package org.apache.qpid.test.unit.client.channelclose; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; -import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.protocol.AMQConstant; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.ExceptionListener; @@ -46,7 +52,7 @@ public class ChannelCloseTest extends QpidBrokerTestCase implements ExceptionLis { private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseTest.class); - Connection _connection; + private Connection _connection; private Session _session; private static final long SYNC_TIMEOUT = 500; private int TEST = 0; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java index 56d03dc4a7..b7874ee85e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.test.unit.client.channelclose; -import javax.jms.MessageConsumer; -import javax.jms.Session; - import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import javax.jms.MessageConsumer; +import javax.jms.Session; + /** * @author Apache Software Foundation */ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java new file mode 100644 index 0000000000..6b83929258 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java @@ -0,0 +1,127 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.AMQConnectionClosedException; +import org.apache.qpid.AMQDisconnectedException; +import org.apache.qpid.management.jmx.ManagedConnectionMBeanTest; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.transport.ConnectionException; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Session; + +/** + * Tests the behaviour of the client when the Broker terminates client connection + * by the Broker being shutdown gracefully or otherwise. + * + * @see ManagedConnectionMBeanTest + */ +public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase +{ + private Connection _connection; + private boolean _isExternalBroker; + private final RecordingExceptionListener _recordingExceptionListener = new RecordingExceptionListener(); + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _connection = getConnection(); + _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _connection.setExceptionListener(_recordingExceptionListener); + + _isExternalBroker = isExternalBroker(); + } + + public void testClientCloseOnNormalBrokerShutdown() throws Exception + { + final Class<? extends Exception> expectedLinkedException = isBroker010() ? ConnectionException.class : AMQConnectionClosedException.class; + + stopBroker(); + + JMSException exception = _recordingExceptionListener.awaitException(10000); + assertConnectionCloseWasReported(exception, expectedLinkedException); + + ensureCanCloseWithoutException(); + } + + public void testClientCloseOnBrokerKill() throws Exception + { + final Class<? extends Exception> expectedLinkedException = isBroker010() ? ConnectionException.class : AMQDisconnectedException.class; + + if (!_isExternalBroker) + { + return; + } + + killBroker(); + + JMSException exception = _recordingExceptionListener.awaitException(10000); + assertConnectionCloseWasReported(exception, expectedLinkedException); + + ensureCanCloseWithoutException(); + } + + private void ensureCanCloseWithoutException() + { + try + { + _connection.close(); + } + catch (JMSException e) + { + fail("Connection should close without exception" + e.getMessage()); + } + } + + private void assertConnectionCloseWasReported(JMSException exception, Class<? extends Exception> linkedExceptionClass) + { + assertNotNull("Broker shutdown should be reported to the client via the ExceptionListener", exception); + assertNotNull("JMXException should have linked exception", exception.getLinkedException()); + + assertEquals("Unexpected linked exception", linkedExceptionClass, exception.getLinkedException().getClass()); + } + + private final class RecordingExceptionListener implements ExceptionListener + { + private final CountDownLatch _exceptionReceivedLatch = new CountDownLatch(1); + private volatile JMSException _exception; + + @Override + public void onException(JMSException exception) + { + _exception = exception; + } + + public JMSException awaitException(long timeoutInMillis) throws InterruptedException + { + _exceptionReceivedLatch.await(timeoutInMillis, TimeUnit.MILLISECONDS); + return _exception; + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java deleted file mode 100644 index 20044b7a14..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.client.connection; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Session; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -/** - * ConnectionCloseTest - * - */ - -public class ConnectionCloseTest extends QpidBrokerTestCase -{ - - /** - * This test is added due to QPID-3453 to test connection closing when AMQ - * session is not closed but underlying transport session is in detached - * state and transport connection is closed - */ - public void testConnectionCloseOnOnForcibleBrokerStop() throws Exception - { - Connection connection = getConnection(); - connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - stopBroker(); - - // we need to close connection explicitly in order to verify that - // closing of connection having transport session in DETACHED state and - // transport connection in CLOSED state does not throw an exception - try - { - connection.close(); - } - catch (JMSException e) - { - // session closing should not fail - fail("Cannot close connection:" + e.getMessage()); - } - } - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java index ac14f8e50e..0650531d2b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java @@ -20,8 +20,10 @@ */ package org.apache.qpid.test.unit.client.connection; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.JMSException; import javax.jms.Message; @@ -30,18 +32,15 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class ConnectionStartTest extends QpidBrokerTestCase { - String _broker = "vm://:1"; + private String _broker = "vm://:1"; - AMQConnection _connection; + private AMQConnection _connection; private Session _consumerSess; private MessageConsumer _consumer; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index f18f365f20..375626a2fa 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -20,10 +20,6 @@ */ package org.apache.qpid.test.unit.client.connection; -import javax.jms.Connection; -import javax.jms.QueueSession; -import javax.jms.TopicSession; - import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnresolvedAddressException; @@ -40,12 +36,16 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.Session; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import javax.jms.Connection; +import javax.jms.QueueSession; +import javax.jms.TopicSession; + public class ConnectionTest extends QpidBrokerTestCase { - String _broker_NotRunning = "tcp://localhost:" + findFreePort(); + private String _broker_NotRunning = "tcp://localhost:" + findFreePort(); - String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs"; + private String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs"; public void testSimpleConnection() throws Exception { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java deleted file mode 100644 index 0057422c8f..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.client.connection; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; - -/** - * ExceptionListenerTest - * - */ - -public class ExceptionListenerTest extends QpidBrokerTestCase -{ - - public void testBrokerDeath() throws Exception - { - Connection conn = getConnection("guest", "guest"); - - conn.start(); - - final CountDownLatch fired = new CountDownLatch(1); - conn.setExceptionListener(new ExceptionListener() - { - public void onException(JMSException e) - { - _logger.debug("&&&&&&&&&&&&&&&&&&&&&&&&&&&& Caught exception &&&&&&&&&&&&&&&&&&&&&&&&&&&& ", e); - fired.countDown(); - } - }); - _logger.debug("%%%%%%%%%%%%%%%% Stopping Broker %%%%%%%%%%%%%%%%%%%%%"); - stopBroker(); - _logger.debug("%%%%%%%%%%%%%%%% Stopped Broker %%%%%%%%%%%%%%%%%%%%%"); - - if (!fired.await(5, TimeUnit.SECONDS)) - { - fail("exception listener was not fired"); - } - } - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java deleted file mode 100644 index b60fe76b76..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.client.forwardall; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; - -/** - * Declare a private temporary response queue, - * send a message to amq.direct with a well known routing key with the - * private response queue as the reply-to destination - * consume responses. - */ -public class Client implements MessageListener -{ - private static final Logger _logger = LoggerFactory.getLogger(Client.class); - - private final AMQConnection _connection; - private final AMQSession _session; - private final int _expected; - private int _count; - private static QpidBrokerTestCase _qct; - - Client(String broker, int expected) throws Exception - { - this(connect(broker), expected); - } - - public static void setQTC(QpidBrokerTestCase qtc) - { - _qct = qtc; - } - Client(AMQConnection connection, int expected) throws Exception - { - _connection = connection; - _expected = expected; - _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE); - AMQQueue response = - new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("ResponseQueue"), true); - _session.createConsumer(response).setMessageListener(this); - _connection.start(); - // AMQQueue service = new SpecialQueue(_connection, "ServiceQueue"); - AMQQueue service = (AMQQueue) _session.createQueue("ServiceQueue") ; - Message request = _session.createTextMessage("Request!"); - request.setJMSReplyTo(response); - MessageProducer prod = _session.createProducer(service); - prod.send(request); - _session.commit(); - } - - void shutdownWhenComplete() throws Exception - { - waitUntilComplete(); - _connection.close(); - } - - public synchronized void onMessage(Message response) - { - - _logger.info("Received " + (++_count) + " of " + _expected + " responses."); - if (_count == _expected) - { - - notifyAll(); - } - try - { - _session.commit(); - } - catch (JMSException e) - { - - } - - } - - synchronized void waitUntilComplete() throws Exception - { - - if (_count < _expected) - { - wait(60000); - } - - if (_count < _expected) - { - throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected); - } - } - - static AMQConnection connect(String broker) throws Exception - { - //return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test"); - return (AMQConnection) _qct.getConnection("guest", "guest") ; - } - - public static void main(String[] argv) throws Exception - { - final String connectionString; - final int expected; - if (argv.length == 0) - { - connectionString = "localhost:5672"; - expected = 100; - } - else - { - connectionString = argv[0]; - expected = Integer.parseInt(argv[1]); - } - - new Client(connect(connectionString), expected).shutdownWhenComplete(); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java deleted file mode 100644 index 45945eb8fc..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.client.forwardall; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Runs the Service's and Client parts of the test in the same process - * as the broker - */ -public class CombinedTest extends QpidBrokerTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(CombinedTest.class); - private int run = 0; - - protected void setUp() throws Exception - { - super.setUp(); - Service.setQTC(this); - Client.setQTC(this); - } - - protected void tearDown() throws Exception - { - ServiceCreator.closeAll(); - super.tearDown(); - } - - public void testForwardAll() throws Exception - { - while (run < 10) - { - int services =1; - ServiceCreator.start("vm://:1", services); - - _logger.info("Starting " + ++run + " client..."); - - new Client("vm://:1", services).shutdownWhenComplete(); - - - _logger.info("Completed " + run + " successfully!"); - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(CombinedTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java deleted file mode 100644 index 160700bdda..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.client.forwardall; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -/** - * Declare a queue and bind it to amq.direct with a 'well known' routing key, - * register a consumer for this queue and send a response to every message received. - */ -public class Service implements MessageListener -{ - private final AMQConnection _connection; - private final AMQSession _session; - - private static QpidBrokerTestCase _qct; - - - public static void setQTC(QpidBrokerTestCase qtc) - { - _qct = qtc; - } - Service(String broker) throws Exception - { - this(connect(broker)); - } - - Service(AMQConnection connection) throws Exception - { - _connection = connection; - //AMQQueue queue = new SpecialQueue(connection, "ServiceQueue"); - _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE); - AMQQueue queue = (AMQQueue) _session.createQueue("ServiceQueue") ; - _session.createConsumer(queue).setMessageListener(this); - _connection.start(); - } - - public void onMessage(Message request) - { - try - { - Message response = _session.createTextMessage("Response!"); - Destination replyTo = request.getJMSReplyTo(); - _session.createProducer(replyTo).send(response); - _session.commit(); - } - catch (Exception e) - { - e.printStackTrace(System.out); - } - } - - public void close() throws JMSException - { - _connection.close(); - } - - static AMQConnection connect(String broker) throws Exception - { - //return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test"); - return (AMQConnection) _qct.getConnection("guest", "guest") ; - } - -// public static void main(String[] argv) throws Exception -// { -// String broker = argv.length == 0? "localhost:5672" : argv[0]; -// new Service(broker); -// } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java deleted file mode 100644 index be16f6b7ae..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.client.forwardall; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.JMSException; - -public class ServiceCreator implements Runnable -{ - private static final Logger _logger = LoggerFactory.getLogger(ServiceCreator.class); - - private static Thread[] threads; - private static ServiceCreator[] _services; - - private final String broker; - private Service service; - - ServiceCreator(String broker) - { - this.broker = broker; - } - - public void run() - { - try - { - service = new Service(broker); - } - catch (Exception e) - { - e.printStackTrace(System.out); - } - } - - public void closeSC() throws JMSException - { - service.close(); - } - - static void closeAll() - { - for (int i = 0; i < _services.length; i++) - { - try - { - _services[i].closeSC(); - } - catch (JMSException e) - { - // ignore - } - } - } - - static void start(String broker, int services) throws InterruptedException - { - threads = new Thread[services]; - _services = new ServiceCreator[services]; - ServiceCreator runner = new ServiceCreator(broker); - // start services - _logger.info("Starting " + services + " services..."); - for (int i = 0; i < services; i++) - { - threads[i] = new Thread(runner); - _services[i] = runner; - threads[i].start(); - } - - for (int i = 0; i < threads.length; i++) - { - threads[i].join(); - } - } - - public static void main(String[] argv) throws Exception - { - final String connectionString; - final int services; - if (argv.length == 0) - { - connectionString = "localhost:5672"; - services = 100; - } - else - { - connectionString = argv[0]; - services = Integer.parseInt(argv[1]); - } - - start(connectionString, services); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java deleted file mode 100644 index 27371b0397..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.client.forwardall; - -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.framing.AMQShortString; - -/** - * Queue that allows several private queues to be registered and bound - * to an exchange with the same routing key. - * - */ -class SpecialQueue extends AMQQueue -{ - private final AMQShortString name; - - SpecialQueue(AMQConnection con, String name) - { - super(con, name, true); - this.name = new AMQShortString(name); - } - - public AMQShortString getRoutingKey() - { - return name; - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java index 8ad8fa77d7..728ef85bd2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java @@ -20,24 +20,24 @@ */ package org.apache.qpid.test.unit.client.message; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; public class ObjectMessageTest extends QpidBrokerTestCase implements MessageListener { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 836684c965..1a7e9dfc96 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.test.unit.client.protocol; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.UnknownHostException; - import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; @@ -32,6 +27,11 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.transport.TestNetworkConnection; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; + public class AMQProtocolSessionTest extends QpidBrokerTestCase { private static class TestProtocolSession extends AMQProtocolSession @@ -44,7 +44,7 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase public TestNetworkConnection getNetworkConnection() { - return (TestNetworkConnection) _protocolHandler.getNetworkConnection(); + return (TestNetworkConnection) getProtocolHandler().getNetworkConnection(); } public AMQShortString genQueueName() diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java index c98e403671..41ab35f233 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -21,6 +21,10 @@ package org.apache.qpid.test.unit.client.temporaryqueue; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; @@ -29,10 +33,6 @@ import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - /** * Tests the behaviour of {@link TemporaryQueue}. */ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java index 039a172e4d..4da9a1db29 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java @@ -21,14 +21,12 @@ package org.apache.qpid.test.unit.close; import junit.framework.Assert; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.junit.concurrency.TestRunnable; import org.apache.qpid.junit.concurrency.ThreadTestCoordinator; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.Message; @@ -47,8 +45,8 @@ public class CloseBeforeAckTest extends QpidBrokerTestCase { private static final Logger log = LoggerFactory.getLogger(CloseBeforeAckTest.class); - Connection connection; - Session session; + private Connection connection; + private Session session; public static final String TEST_QUEUE_NAME = "TestQueue"; private int TEST_COUNT = 25; @@ -70,9 +68,9 @@ public class CloseBeforeAckTest extends QpidBrokerTestCase } } - TestThread1 testThread1 = new TestThread1(); + private TestThread1 testThread1 = new TestThread1(); - TestRunnable testThread2 = + private TestRunnable testThread2 = new TestRunnable() { public void runWithExceptions() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java index 6bc6c591ae..f2387fa99b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java @@ -21,7 +21,6 @@ package org.apache.qpid.test.unit.close; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index f5e0ed75d2..a4e9a992b4 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -20,22 +20,21 @@ */ package org.apache.qpid.test.unit.close; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.test.utils.QpidClientConnection; -import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.url.URLSyntaxException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; - import java.util.concurrent.atomic.AtomicInteger; public class MessageRequeueTest extends QpidBrokerTestCase @@ -55,7 +54,7 @@ public class MessageRequeueTest extends QpidBrokerTestCase private long[] receieved = new long[numTestMessages + 1]; private boolean passed = false; - QpidClientConnection conn; + private QpidClientConnection conn; protected void setUp() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java index 8a6dfb86ee..957063b2e1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java @@ -19,15 +19,15 @@ * */package org.apache.qpid.test.unit.close; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - /** * @author Apache Software Foundation */ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java index 80422cf3e9..c292c718bb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java @@ -17,6 +17,12 @@ */ package org.apache.qpid.test.unit.ct; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageProducer; @@ -29,12 +35,6 @@ import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - /** * Crash Recovery tests for durable subscription * @@ -379,7 +379,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase // Create durable subscriber that matches A TopicSubscriber subA = session.createDurableSubscriber(topic, - "testResubscribeWithChangedSelector", + "testResubscribeWithChangedSelectorAndRestart", "Match = True", false); // Send 1 matching message and 1 non-matching message @@ -397,8 +397,8 @@ public class DurableSubscriberTest extends QpidBrokerTestCase ((TextMessage) rMsg).getText()); // Queue has no messages left - AMQQueue subQueueTmp = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); - assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueueTmp)); + AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue, true)); rMsg = subA.receive(1000); assertNull(rMsg); @@ -423,8 +423,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase //verify no messages are now present on the queue as changing selector should have issued //an unsubscribe and thus deleted the previous durable backing queue for the subscription. //check the dur sub's underlying queue now has msg count 0 - AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); - assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue)); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue, true)); // Check that new messages are received properly msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); @@ -444,8 +443,8 @@ public class DurableSubscriberTest extends QpidBrokerTestCase assertNull(rMsg); //check the dur sub's underlying queue now has msg count 0 - subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); - assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue)); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue, true)); + conn.close(); //now restart the server try @@ -467,8 +466,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase //verify no messages now present on the queue after we restart the broker //check the dur sub's underlying queue now has msg count 0 - subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); - assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue)); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue, true)); // Reconnect with new selector that matches B TopicSubscriber subC = session.createDurableSubscriber(topic, @@ -484,8 +482,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase producer.send(msg); //check the dur sub's underlying queue now has msg count 1 - subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); - assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session).getQueueDepth(subQueue)); + assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session).getQueueDepth(subQueue, true)); rMsg = subC.receive(1000); assertNotNull(rMsg); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java index 97452ad1c8..3f2d6f92ab 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.test.unit.message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQPInvalidClassException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; @@ -29,9 +32,6 @@ import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; @@ -42,7 +42,6 @@ import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; - import java.util.Enumeration; import java.util.HashMap; import java.util.Map; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 39691a5c7c..e861b4f4ee 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -20,9 +20,8 @@ */ package org.apache.qpid.test.unit.message; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQHeadersExchange; @@ -32,15 +31,11 @@ import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.jms.Connection; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageEOFException; @@ -48,6 +43,9 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.StreamMessage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * @author Apache Software Foundation diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java index 978ebfa93f..e07c0ecbe9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java @@ -20,10 +20,7 @@ */ package org.apache.qpid.test.unit.message; -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.InputStreamReader; -import java.util.Properties; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.Destination; @@ -32,8 +29,10 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.InitialContext; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.util.Properties; /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index e948aaffb3..a07e531b98 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -20,8 +20,15 @@ */ package org.apache.qpid.test.unit.topic; -import java.io.IOException; -import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.management.common.JMXConnnectionFactory; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.InvalidDestinationException; @@ -37,15 +44,8 @@ import javax.jms.TopicSubscriber; import javax.management.MBeanServerConnection; import javax.management.ObjectName; import javax.management.remote.JMXConnector; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.management.common.JMXConnnectionFactory; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Set; /** * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as @@ -116,12 +116,10 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase _logger.info("Producer sending message A"); producer.send(session1.createTextMessage("A")); - - ((AMQSession<?, ?>) session1).sync(); - + //check the dur sub's underlying queue now has msg count 1 AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription"); - assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue)); + assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue, true)); Message msg; _logger.info("Receive message on consumer 1:expecting A"); @@ -139,11 +137,9 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); _logger.info("Receive message on consumer 1 :expecting null"); assertEquals(null, msg); - - ((AMQSession<?, ?>) session2).sync(); - + //check the dur sub's underlying queue now has msg count 0 - assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session2).getQueueDepth(subQueue)); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session2).getQueueDepth(subQueue, true)); consumer2.close(); _logger.info("Unsubscribe session2/consumer2"); @@ -151,7 +147,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase ((AMQSession<?, ?>) session2).sync(); - if(isJavaBroker() && isExternalBroker()) + if(isJavaBroker()) { //Verify that the queue was deleted by querying for its JMX MBean _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1", @@ -635,7 +631,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase // should be 5 or 10 messages on queue now // (5 for the java broker due to use of server side selectors, and 10 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector"); - assertEquals("Queue depth is wrong", isJavaBroker() ? 5 : 10, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + assertEquals("Queue depth is wrong", isJavaBroker() ? 5 : 10, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); // now recreate the durable subscriber and check the received messages TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false); @@ -721,11 +717,11 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); msg.setBooleanProperty("Match", false); producer.send(msg); - ((AMQSession)session).sync(); + // should be 1 or 2 messages on queue now // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose"); - assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); conn.start(); @@ -739,7 +735,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase assertNull(rMsg); // Check queue has no messages - assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); conn.close(); } @@ -793,7 +789,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase // should be 1 or 2 messages on queue now // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName"); - assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); conn.start(); @@ -807,7 +803,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase assertNull(rMsg); // Check queue has no messages - assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); conn.close(); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TemporaryTopicTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TemporaryTopicTest.java index c89b13a0f9..a5b9ce8365 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TemporaryTopicTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TemporaryTopicTest.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.test.unit.topic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; @@ -28,8 +30,6 @@ import javax.jms.Session; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - /** * Tests the behaviour of {@link TemporaryTopic}. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java index 5874133ab1..5fbbc7f67f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java @@ -20,16 +20,16 @@ */ package org.apache.qpid.test.unit.topic; -import javax.jms.MessageConsumer; -import javax.jms.TextMessage; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; - import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; + /** * @author Apache Software Foundation */ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 826545a23d..5dae98fe21 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.test.unit.topic; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + import javax.jms.Connection; import javax.jms.InvalidDestinationException; import javax.jms.Message; @@ -32,12 +38,6 @@ import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - /** @author Apache Software Foundation */ public class TopicSessionTest extends QpidBrokerTestCase diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index b8b5a29a43..0be4f7ff1d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -20,15 +20,22 @@ */ package org.apache.qpid.test.unit.transacted; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.configuration.ClientProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.*; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 3c0f951c96..4f7d592958 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -21,6 +21,9 @@ package org.apache.qpid.test.unit.transacted; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; @@ -28,9 +31,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Session; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.Connection; import javax.jms.IllegalStateException; import javax.jms.JMSException; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java index 2b90d38049..e2b0f00ee4 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java @@ -20,26 +20,13 @@ */ package org.apache.qpid.test.unit.transacted; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.DeliveryMode; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.TextMessage; - import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.Session; @@ -47,11 +34,25 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.util.LogMonitor; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.TextMessage; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + /** * The {@link TestCase} for transaction timeout testing. */ public abstract class TransactionTimeoutTestCase extends QpidBrokerTestCase implements ExceptionListener { + private static final int ALERT_MESSAGE_TOLERANCE = 6; public static final String VIRTUALHOST = "test"; public static final String TEXT = "0123456789abcdefghiforgettherest"; public static final String CHN_OPEN_TXN = "CHN-1007"; @@ -138,16 +139,21 @@ public abstract class TransactionTimeoutTestCase extends QpidBrokerTestCase impl /** * Send a number of messages to the queue, optionally pausing after each. + * + * Need to sync to ensure that the Broker has received the message(s) in order + * the test and broker start timing the idle transaction from the same point in time. */ protected void send(int count, float delay) throws Exception { for (int i = 0; i < count; i++) { - sleep(delay); + sleep(delay); Message msg = _psession.createTextMessage(TEXT); msg.setIntProperty("i", i); - _producer.send(msg); + _producer.send(msg); } + + ((AMQSession<?, ?>)_psession).sync(); } /** @@ -184,7 +190,7 @@ public abstract class TransactionTimeoutTestCase extends QpidBrokerTestCase impl } else { - assertTrue(idleErr, idleMsgs.size() >= idle - 2 && idleMsgs.size() <= idle + 2); + assertTrue(idleErr, idleMsgs.size() >= idle - ALERT_MESSAGE_TOLERANCE && idleMsgs.size() <= idle + ALERT_MESSAGE_TOLERANCE); } if (open == 0) @@ -193,7 +199,7 @@ public abstract class TransactionTimeoutTestCase extends QpidBrokerTestCase impl } else { - assertTrue(openErr, openMsgs.size() >= open - 2 && openMsgs.size() <= open + 2); + assertTrue(openErr, openMsgs.size() >= open - ALERT_MESSAGE_TOLERANCE && openMsgs.size() <= open + ALERT_MESSAGE_TOLERANCE); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java index f39f640d04..b5d1bff842 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java @@ -20,9 +20,15 @@ package org.apache.qpid.test.unit.xa; import org.apache.qpid.dtx.XidImpl; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import javax.transaction.xa.Xid; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; +import javax.jms.XASession; import javax.transaction.xa.XAResource; -import javax.jms.*; +import javax.transaction.xa.Xid; import java.util.Random; /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java index 47705f8105..e940a73bbb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java @@ -21,15 +21,20 @@ package org.apache.qpid.test.unit.xa; */ +import junit.framework.TestSuite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.*; -import javax.transaction.xa.Xid; -import javax.transaction.xa.XAResource; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.XAQueueConnection; +import javax.jms.XAQueueConnectionFactory; +import javax.jms.XAQueueSession; import javax.transaction.xa.XAException; - -import junit.framework.TestSuite; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; public class FaultTest extends AbstractXATestCase @@ -113,7 +118,7 @@ public class FaultTest extends AbstractXATestCase _queueFactory = getConnectionFactory(); _xaqueueConnection = _queueFactory.createXAQueueConnection("guest", "guest"); XAQueueSession session = _xaqueueConnection.createXAQueueSession(); - _queueConnection = _queueFactory.createQueueConnection(); + _queueConnection = _queueFactory.createQueueConnection("guest","guest"); _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); init(session, _queue); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java index d2abc0eac1..3fbe76323a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java @@ -17,15 +17,26 @@ */ package org.apache.qpid.test.unit.xa; -import javax.jms.*; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; -import javax.transaction.xa.XAException; - import junit.framework.TestSuite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.XAQueueConnection; +import javax.jms.XAQueueConnectionFactory; +import javax.jms.XAQueueSession; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + public class QueueTest extends AbstractXATestCase { /* this clas logger */ @@ -151,11 +162,12 @@ public class QueueTest extends AbstractXATestCase // create a standard session try { - _queueConnection = _queueFactory.createQueueConnection(); + _queueConnection = _queueFactory.createQueueConnection("guest", "guest"); _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { + e.printStackTrace(); fail("cannot create queue session: " + e.getMessage()); } init(session, _queue); @@ -627,7 +639,8 @@ public class QueueTest extends AbstractXATestCase TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000); if (message1 != null) { - fail("The queue is not empty! "); + + fail("The queue is not empty! " + message1.getLongProperty(_sequenceNumberPropertyName)); } } catch (JMSException e) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java index 99d0f0a075..d955979ad6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java @@ -17,18 +17,19 @@ */ package org.apache.qpid.test.unit.xa; +import junit.framework.TestSuite; +import org.apache.qpid.configuration.ClientProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.jms.*; +import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; -import javax.transaction.xa.XAException; - -import junit.framework.TestSuite; - +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * * @@ -105,7 +106,7 @@ public class TopicTest extends AbstractXATestCase } catch (Exception e) { - fail("Exception thrown when cleaning standard connection: " + e.getStackTrace()); + fail("Exception thrown when cleaning standard connection: " + e); } } super.tearDown(); @@ -118,6 +119,7 @@ public class TopicTest extends AbstractXATestCase { if (!isBroker08()) { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1"); // lookup test queue try { @@ -652,7 +654,12 @@ public class TopicTest extends AbstractXATestCase { message = (TextMessage) xaDurSub.receive(1000); - _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName)); + + if(message != null) + { + _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + if (message == null) { fail("no message received! expected: " + i); @@ -884,35 +891,40 @@ public class TopicTest extends AbstractXATestCase // receive 3 message within tx1: 3, 4 and 7 _xaResource.start(xid1, XAResource.TMRESUME); // receive messages 3, 4 and 7 + Set<Long> expected = new HashSet<Long>(); + expected.add(3L); + expected.add(4L); + expected.add(7L); message = (TextMessage) xaDurSub.receive(1000); if (message == null) { - fail("no message received! expected: " + 3); + fail("no message received! expected one of: " + expected); } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 3) + else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) { fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName) + " 3 was expected"); + .getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected); } message = (TextMessage) xaDurSub.receive(1000); if (message == null) { - fail("no message received! expected: " + 4); + fail("no message received! expected one of: " + expected); } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 4) + else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) { + fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName) + " 4 was expected"); + .getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected); } message = (TextMessage) xaDurSub.receive(1000); if (message == null) { - fail("no message received! expected: " + 7); + fail("no message received! expected one of: " + expected); } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 7) + else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) { fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName) + " 7 was expected"); + .getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected); } } catch (Exception e) @@ -938,8 +950,18 @@ public class TopicTest extends AbstractXATestCase try { - // consume messages 1 - 4 - //----- start xid1 + // consume messages: could be any from (1 - 4, 7-10) + //----- start xid4 + Set<Long> expected = new HashSet<Long>(); + Set<Long> xid4msgs = new HashSet<Long>(); + for(long l = 1; l <= 4l; l++) + { + expected.add(l); + } + for(long l = 7; l <= 10l; l++) + { + expected.add(l); + } _xaResource.start(xid4, XAResource.TMNOFLAGS); for (int i = 1; i <= 4; i++) { @@ -948,9 +970,14 @@ public class TopicTest extends AbstractXATestCase { fail("no message received! expected: " + i); } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + + long seqNo = message.getLongProperty(_sequenceNumberPropertyName); + xid4msgs.add(seqNo); + + if (!expected.remove(seqNo)) { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + fail("wrong sequence number: " + seqNo + + " expected one from " + expected); } } _xaResource.end(xid4, XAResource.TMSUSPEND); @@ -963,15 +990,17 @@ public class TopicTest extends AbstractXATestCase { fail("no message received! expected: " + i); } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName) + + " expected one from " + expected); } } _xaResource.end(xid5, XAResource.TMSUSPEND); // abort tx4 _xaResource.prepare(xid4); _xaResource.rollback(xid4); + expected.addAll(xid4msgs); // consume messages 1-4 with tx5 _xaResource.start(xid5, XAResource.TMRESUME); for (int i = 1; i <= 4; i++) @@ -981,13 +1010,15 @@ public class TopicTest extends AbstractXATestCase { fail("no message received! expected: " + i); } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName) + + " expected one from " + expected); } } _xaResource.end(xid5, XAResource.TMSUSPEND); // commit tx5 + _xaResource.prepare(xid5); _xaResource.commit(xid5, false); } @@ -1604,6 +1635,7 @@ public class TopicTest extends AbstractXATestCase } _xaResource.end(xid2, XAResource.TMSUCCESS); _xaResource.commit(xid2, true); + _session.close(); } catch (Exception e) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java index 8345803d56..66b3fe0c6a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java @@ -22,5 +22,7 @@ package org.apache.qpid.test.utils; public interface BrokerHolder { + String getWorkingDirectory(); void shutdown(); + void kill(); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ConversationFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ConversationFactory.java index e153b2e0f5..3a9354d822 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ConversationFactory.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ConversationFactory.java @@ -21,11 +21,19 @@ package org.apache.qpid.test.utils; import org.apache.log4j.Logger; -import org.apache.qpid.test.utils.ReflectionUtils; -import javax.jms.*; - -import java.util.*; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -113,19 +121,19 @@ public class ConversationFactory private Session session; /** The message consumer for incoming messages. */ - MessageConsumer consumer; + private MessageConsumer consumer; /** The message producer for outgoing messages. */ - MessageProducer producer; + private MessageProducer producer; /** The well-known or temporary destination to receive replies on. */ - Destination receiveDestination; + private Destination receiveDestination; /** Holds the queue implementation class for the reply queue. */ - Class<? extends BlockingQueue> queueClass; + private Class<? extends BlockingQueue> queueClass; /** Used to hold any replies that are received outside of the context of a conversation. */ - BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>(); + private BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>(); /* Used to hold conversation state on a per thread basis. */ /* @@ -143,7 +151,7 @@ public class ConversationFactory */ /** Generates new coversation id's as needed. */ - AtomicLong conversationIdGenerator = new AtomicLong(); + private AtomicLong conversationIdGenerator = new AtomicLong(); /** * Creates a conversation helper on the specified connection with the default sending destination, and listening @@ -238,13 +246,13 @@ public class ConversationFactory public class Conversation { /** Holds the correlation id for the context. */ - long conversationId; + private long conversationId; /** * Holds the send destination for the context. This will automatically be updated to the most recently received * reply-to destination. */ - Destination sendDestination; + private Destination sendDestination; /** * Sends a message to the default sending location. The correlation id of the message will be assigned by this diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index 10217585c1..f6c481431a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -20,15 +20,13 @@ */ package org.apache.qpid.test.utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.util.FileUtils; import javax.naming.NamingException; -import javax.jms.JMSException; -import javax.naming.NamingException; - -import org.apache.qpid.client.AMQConnectionFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class FailoverBaseCase extends QpidBrokerTestCase { @@ -36,8 +34,6 @@ public class FailoverBaseCase extends QpidBrokerTestCase public static final long DEFAULT_FAILOVER_TIME = 10000L; - protected int failingPort; - protected void setUp() throws java.lang.Exception { super.setUp(); @@ -68,15 +64,6 @@ public class FailoverBaseCase extends QpidBrokerTestCase return _connectionFactory; } - @Override - public void stopBroker(int port) throws Exception - { - if (isBrokerPresent(port)) - { - super.stopBroker(port); - } - } - public void tearDown() throws Exception { try @@ -92,11 +79,11 @@ public class FailoverBaseCase extends QpidBrokerTestCase } } - public void failBroker(int port) { try { + //TODO: use killBroker instead stopBroker(port); } catch (Exception e) @@ -104,6 +91,4 @@ public class FailoverBaseCase extends QpidBrokerTestCase throw new RuntimeException(e); } } - - } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java index 340f00fed8..adda9ca3ec 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java @@ -21,14 +21,17 @@ package org.apache.qpid.test.utils; import org.apache.log4j.Logger; + import org.apache.qpid.server.Broker; public class InternalBrokerHolder implements BrokerHolder { private static final Logger LOGGER = Logger.getLogger(InternalBrokerHolder.class); + private final Broker _broker; + private final String _workingDirectory; - public InternalBrokerHolder(final Broker broker) + public InternalBrokerHolder(final Broker broker, String workingDirectory) { if(broker == null) { @@ -36,6 +39,13 @@ public class InternalBrokerHolder implements BrokerHolder } _broker = broker; + _workingDirectory = workingDirectory; + } + + @Override + public String getWorkingDirectory() + { + return _workingDirectory; } public void shutdown() @@ -47,4 +57,12 @@ public class InternalBrokerHolder implements BrokerHolder LOGGER.info("Broker instance shutdown"); } + @Override + public void kill() + { + // Can't kill a internal broker as we would also kill ourselves as we share the same JVM. + shutdown(); + } + + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java index 3a1710671c..2b7c3f2664 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java @@ -20,32 +20,31 @@ */ package org.apache.qpid.test.utils; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - -import javax.management.JMException; -import javax.management.MBeanException; -import javax.management.MBeanServerConnection; -import javax.management.MBeanServerInvocationHandler; -import javax.management.ObjectName; -import javax.management.MalformedObjectNameException; -import javax.management.remote.JMXConnector; - import junit.framework.TestCase; - import org.apache.commons.configuration.ConfigurationException; + import org.apache.qpid.management.common.JMXConnnectionFactory; +import org.apache.qpid.management.common.mbeans.ConfigurationManagement; +import org.apache.qpid.management.common.mbeans.LoggingManagement; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedConnection; import org.apache.qpid.management.common.mbeans.ManagedExchange; -import org.apache.qpid.management.common.mbeans.LoggingManagement; -import org.apache.qpid.management.common.mbeans.ConfigurationManagement; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.management.common.mbeans.ServerInformation; import org.apache.qpid.management.common.mbeans.UserManagement; +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.MBeanServerConnection; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + /** * JMX access for tests. */ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java new file mode 100644 index 0000000000..9413e38606 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.utils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; + +public final class Piper extends Thread +{ + private static final Logger LOGGER = Logger.getLogger(Piper.class); + + private final BufferedReader _in; + private final PrintStream _out; + private final String _ready; + private final CountDownLatch _latch; + private final String _stopped; + private final String _prefix; + private volatile boolean _seenReady; + private volatile String _stopLine; + + public Piper(InputStream in, PrintStream out, String ready, String stopped) + { + this(in, out, ready, stopped, null); + } + + public Piper(InputStream in, PrintStream out, String ready, String stopped, String prefix) + { + _in = new BufferedReader(new InputStreamReader(in)); + _out = out; + _ready = ready; + _stopped = stopped; + _seenReady = false; + _prefix = prefix; + + if (this._ready != null && !this._ready.equals("")) + { + this._latch = new CountDownLatch(1); + } + else + { + this._latch = null; + } + } + + public boolean await(long timeout, TimeUnit unit) throws InterruptedException + { + if (_latch == null) + { + return true; + } + else + { + _latch.await(timeout, unit); + return _seenReady; + } + } + + public void run() + { + try + { + String line; + while ((line = _in.readLine()) != null) + { + if (_prefix != null) + { + line = _prefix + line; + } + _out.println(line); + + if (_latch != null && line.contains(_ready)) + { + _seenReady = true; + _latch.countDown(); + } + + if (!_seenReady && line.contains(_stopped)) + { + _stopLine = line; + } + } + } + catch (IOException e) + { + LOGGER.warn(e.getMessage() + " : Broker stream from unexpectedly closed; last log lines written by Broker may be lost."); + } + finally + { + if (_latch != null) + { + _latch.countDown(); + } + } + } + + public String getStopLine() + { + return _stopLine; + } + + String getReady() + { + return _ready; + } +}
\ No newline at end of file diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index 9a8da14f83..32c6094adb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -17,43 +17,11 @@ */ package org.apache.qpid.test.utils; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.LineNumberReader; -import java.io.PrintStream; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.naming.InitialContext; -import javax.naming.NamingException; - import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQQueue; @@ -72,6 +40,34 @@ import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.util.FileUtils; import org.apache.qpid.util.LogMonitor; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + /** * Qpid base class for system testing test cases. */ @@ -126,7 +122,6 @@ public class QpidBrokerTestCase extends QpidTestCase private static final String BROKER_LOG_PREFIX = "broker.log.prefix"; private static final String BROKER_PERSITENT = "broker.persistent"; private static final String BROKER_PROTOCOL_EXCLUDES = "broker.protocol.excludes"; - // values protected static final String JAVA = "java"; @@ -154,7 +149,7 @@ public class QpidBrokerTestCase extends QpidTestCase protected File _outputFile; - protected PrintStream _brokerOutputStream; + protected PrintStream _testcaseOutputStream; protected Map<Integer, BrokerHolder> _brokers = new HashMap<Integer, BrokerHolder>(); @@ -195,10 +190,10 @@ public class QpidBrokerTestCase extends QpidTestCase super(); } - public Logger getLogger() - { - return QpidBrokerTestCase._logger; - } + public Logger getLogger() + { + return QpidBrokerTestCase._logger; + } public void runBare() throws Throwable { @@ -228,12 +223,12 @@ public class QpidBrokerTestCase extends QpidTestCase if (_interleaveBrokerLog) { - _brokerOutputStream = out; + _testcaseOutputStream = out; } else { - _brokerOutputStream = new PrintStream(new FileOutputStream(String - .format("%s/TEST-%s.broker.out", _output, qname)), true); + _testcaseOutputStream = new PrintStream(new FileOutputStream(String + .format("%s/TEST-%s.broker.out", _output, qname)), true); } } @@ -278,7 +273,7 @@ public class QpidBrokerTestCase extends QpidTestCase out.close(); if (!_interleaveBrokerLog) { - _brokerOutputStream.close(); + _testcaseOutputStream.close(); } } } @@ -307,103 +302,6 @@ public class QpidBrokerTestCase extends QpidTestCase startBroker(); } - private static final class Piper extends Thread - { - - private LineNumberReader in; - private PrintStream out; - private String ready; - private CountDownLatch latch; - private boolean seenReady; - private String stopped; - private String stopLine; - - public Piper(InputStream in, PrintStream out, String ready) - { - this(in, out, ready, null); - } - - public Piper(InputStream in, PrintStream out, String ready, String stopped) - { - this.in = new LineNumberReader(new InputStreamReader(in)); - this.out = out; - this.ready = ready; - this.stopped = stopped; - this.seenReady = false; - - if (this.ready != null && !this.ready.equals("")) - { - this.latch = new CountDownLatch(1); - } - else - { - this.latch = null; - } - } - - public Piper(InputStream in, PrintStream out) - { - this(in, out, null); - } - - public boolean await(long timeout, TimeUnit unit) throws InterruptedException - { - if (latch == null) - { - return true; - } - else - { - latch.await(timeout, unit); - return seenReady; - } - } - - public void run() - { - try - { - String line; - while ((line = in.readLine()) != null) - { - if (_interleaveBrokerLog) - { - line = _brokerLogPrefix + line; - } - out.println(line); - - if (latch != null && line.contains(ready)) - { - seenReady = true; - latch.countDown(); - } - - if (!seenReady && line.contains(stopped)) - { - stopLine = line; - } - } - } - catch (IOException e) - { - // this seems to happen regularly even when - // exits are normal - } - finally - { - if (latch != null) - { - latch.countDown(); - } - } - } - - public String getStopLine() - { - return stopLine; - } - } - /** * Return the management port in use by the broker on this main port * @@ -489,7 +387,7 @@ public class QpidBrokerTestCase extends QpidTestCase _logger.info("starting internal broker (same JVM)"); broker.startup(options); - _brokers.put(port, new InternalBrokerHolder(broker)); + _brokers.put(port, new InternalBrokerHolder(broker, System.getProperty("QPID_WORK"))); } else if (!_brokerType.equals(BrokerType.EXTERNAL)) { @@ -563,18 +461,19 @@ public class QpidBrokerTestCase extends QpidTestCase // cpp broker requires that the work directory is created createBrokerWork(qpidWork); - Process process = pb.start();; + Process process = pb.start(); Piper p = new Piper(process.getInputStream(), - _brokerOutputStream, + _testcaseOutputStream, System.getProperty(BROKER_READY), - System.getProperty(BROKER_STOPPED)); + System.getProperty(BROKER_STOPPED), + _interleaveBrokerLog ? _brokerLogPrefix : null); p.start(); if (!p.await(30, TimeUnit.SECONDS)) { - _logger.info("broker failed to become ready (" + p.ready + "):" + p.getStopLine()); + _logger.info("broker failed to become ready (" + p.getReady() + "):" + p.getStopLine()); //Ensure broker has stopped process.destroy(); cleanBrokerWork(qpidWork); @@ -595,7 +494,7 @@ public class QpidBrokerTestCase extends QpidTestCase // this is expect if the broker started successfully } - _brokers.put(port, new SpawnedBrokerHolder(process)); + _brokers.put(port, new SpawnedBrokerHolder(process, qpidWork)); } } @@ -742,11 +641,31 @@ public class QpidBrokerTestCase extends QpidTestCase public void stopBroker(int port) throws Exception { - port = getPort(port); + if (isBrokerPresent(port)) + { + port = getPort(port); - _logger.info("stopping broker on port : " + port); - BrokerHolder broker = _brokers.remove(port); - broker.shutdown(); + _logger.info("stopping broker on port : " + port); + BrokerHolder broker = _brokers.remove(port); + broker.shutdown(); + } + } + + public void killBroker() throws Exception + { + killBroker(0); + } + + public void killBroker(int port) throws Exception + { + if (isBrokerPresent(port)) + { + port = getPort(port); + + _logger.info("killing broker on port : " + port); + BrokerHolder broker = _brokers.remove(port); + broker.kill(); + } } public boolean isBrokerPresent(int port) throws Exception @@ -755,7 +674,13 @@ public class QpidBrokerTestCase extends QpidTestCase return _brokers.containsKey(port); } - + + public BrokerHolder getBroker(int port) throws Exception + { + port = getPort(port); + return _brokers.get(port); + } + /** * Attempt to set the Java Broker to use the BDBMessageStore for persistence * Falling back to the DerbyMessageStore if @@ -984,7 +909,6 @@ public class QpidBrokerTestCase extends QpidTestCase /** * we assume that the environment is correctly set * i.e. -Djava.naming.provider.url="..//example010.properties" - * TODO should be a way of setting that through maven * * @return an initial context * @@ -1158,13 +1082,13 @@ public class QpidBrokerTestCase extends QpidTestCase /** * Send messages to the given destination. * - * If session is transacted then messages will be commited before returning + * If session is transacted then messages will be committed before returning * * @param session the session to use for sending * @param destination where to send them to * @param count no. of messages to send * - * @return the sent messges + * @return the sent messages * * @throws Exception */ @@ -1357,6 +1281,6 @@ public class QpidBrokerTestCase extends QpidTestCase protected int getFailingPort() { - return FAILING_PORT; + return FAILING_PORT; } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidClientConnection.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidClientConnection.java index 16f7bfd305..0e0032da64 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidClientConnection.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidClientConnection.java @@ -21,13 +21,12 @@ package org.apache.qpid.test.utils; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.JMSAMQException; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.JMSAMQException; + import javax.jms.Connection; import javax.jms.ExceptionListener; import javax.jms.JMSException; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java index 7946c6a6d1..83294c13ad 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java @@ -21,6 +21,7 @@ package org.apache.qpid.test.utils; import java.lang.reflect.Constructor; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -225,4 +226,34 @@ public class ReflectionUtils throw new ReflectionUtilsException("NoSuchMethodException", e); } } + + @SuppressWarnings("unchecked") + public static <T> T getDeclaredField(final Object obj, final String fieldName) + { + try + { + final Field field = obj.getClass().getDeclaredField(fieldName); + if (!field.isAccessible()) + { + field.setAccessible(true); + } + return (T) field.get(obj); + } + catch (NoSuchFieldException e) + { + throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e); + } + catch (SecurityException e) + { + throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e); + } + catch (IllegalArgumentException e) + { + throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e); + } + catch (IllegalAccessException e) + { + throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e); + } + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java index 65239bbe02..50b1ea7cea 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java @@ -20,15 +20,20 @@ */ package org.apache.qpid.test.utils; +import java.io.IOException; + import org.apache.log4j.Logger; public class SpawnedBrokerHolder implements BrokerHolder { private static final Logger LOGGER = Logger.getLogger(SpawnedBrokerHolder.class); + private final boolean _isWindows = String.valueOf(System.getProperty("os.name")).toLowerCase().contains("windows"); private final Process _process; + private final Integer _pid; + private final String _workingDirectory; - public SpawnedBrokerHolder(final Process process) + public SpawnedBrokerHolder(final Process process, final String workingDirectory) { if(process == null) { @@ -36,14 +41,87 @@ public class SpawnedBrokerHolder implements BrokerHolder } _process = process; + _pid = retrieveUnixPidIfPossible(); + _workingDirectory = workingDirectory; + } + + @Override + public String getWorkingDirectory() + { + return _workingDirectory; } public void shutdown() { LOGGER.info("Destroying broker process"); - _process.destroy(); + reapChildProcess(); + } + + @Override + public void kill() + { + if (_pid == null) + { + LOGGER.info("Destroying broker process"); + _process.destroy(); + } + else + { + LOGGER.info("Killing broker process with PID " + _pid); + sendSigkillForImmediateShutdown(_pid); + } + + reapChildProcess(); + } + + private void sendSigkillForImmediateShutdown(Integer pid) + { + boolean killSuccessful = false; + try + { + final Process killProcess = Runtime.getRuntime().exec("kill -KILL " + pid); + killProcess.waitFor(); + killSuccessful = killProcess.exitValue() == 0; + } + catch (IOException e) + { + LOGGER.error("Error whilst killing process " + _pid, e); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + finally + { + if (!killSuccessful) + { + _process.destroy(); + } + } + } + + private Integer retrieveUnixPidIfPossible() + { + if(!_isWindows) + { + try + { + Integer pid = ReflectionUtils.getDeclaredField(_process, "pid"); + LOGGER.info("PID " + pid); + return pid; + } + catch (ReflectionUtilsException e) + { + LOGGER.warn("Could not get pid for process, Broker process shutdown will be ungraceful"); + } + } + return null; + } + + private void reapChildProcess() + { try { _process.waitFor(); @@ -51,8 +129,21 @@ public class SpawnedBrokerHolder implements BrokerHolder } catch (InterruptedException e) { - LOGGER.error("Interrupted whilst waiting for process destruction"); + LOGGER.error("Interrupted whilst waiting for process shutdown"); Thread.currentThread().interrupt(); } + finally + { + try + { + _process.getInputStream().close(); + _process.getErrorStream().close(); + _process.getOutputStream().close(); + } + catch (IOException e) + { + } + } } + } |