diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test')
5 files changed, 245 insertions, 27 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java new file mode 100644 index 0000000000..c9810e7304 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import org.apache.qpid.test.utils.QpidTestCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * QPID-155 + * + * Test to validate that setting the respective qpid.declare_queues, + * qpid.declare_exchanges system properties functions as expected. + * + */ +public class DynamicQueueExchangeCreateTest extends QpidTestCase +{ + + public void testQueueDeclare() throws Exception + { + setSystemProperty("qpid.declare_queues", "false"); + + Connection connection = getConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(getTestQueueName()); + + try + { + session.createConsumer(queue); + fail("JMSException should be thrown as the queue does not exist"); + } + catch (JMSException e) + { + assertTrue("Exception should be that the queue does not exist :" + + e.getMessage(), + e.getMessage().contains("does not exist")); + + } + } + + public void testExchangeDeclare() throws Exception + { + setSystemProperty("qpid.declare_exchanges", "false"); + + Connection connection = getConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String EXCHANGE_TYPE = "test.direct"; + Queue queue = session.createQueue("direct://" + EXCHANGE_TYPE + "/queue/queue"); + + try + { + session.createConsumer(queue); + fail("JMSException should be thrown as the exchange does not exist"); + } + catch (JMSException e) + { + assertTrue("Exception should be that the exchange does not exist :" + + e.getMessage(), + e.getMessage().contains("Exchange " + EXCHANGE_TYPE + " does not exist")); + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java new file mode 100644 index 0000000000..3fb6cd3526 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.close; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.test.utils.QpidTestCase; + +import javax.jms.Session; + +/** QPID-1809 + * + * Race condition on error handling and close logic. + * + * See most often with SimpleACLTest as this test is the expects the server to + * shut the connection/channels. This sort of testing is not performed by many, + * if any, of the other system tests. + * + * The problem is that we have two threads + * + * MainThread Exception(Mina)Thread + * | | + * Performs | + * ACtion | + * | Receives Server + * | Close + * Blocks for | + * Response | + * | Starts To Notify + * | client + * | | + * | <----- Notify Main Thread + * Notification | + * wakes client | + * | | + * Client then | + * processes Error. | + * | | + * Potentially Attempting Close Channel/Connection + * Connection Close + * + * The two threads both attempt to close the connection but the main thread does + * so assuming that the connection is open and valid. + * + * The Exception thread must modify the connection so that no furter syncWait + * commands are performed. + * + * This test sends an ExchangeDeclare that is Asynchronous and will fail and + * so cause a ChannelClose error but we perform a syncWait so that we can be + * sure to test that the BlockingWaiter is correctly awoken. + * + */ +public class JavaServerCloseRaceConditionTest extends QpidTestCase +{ + private static final String EXCHANGE_NAME = "NewExchangeNametoFailLookup"; + + public void test() throws Exception + { + + AMQConnection connection = (AMQConnection) getConnection(); + + AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Set no wait true so that we block the connection + // Also set a different exchange class string so the attempt to declare + // the exchange causes an exchange. + ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), new AMQShortString(EXCHANGE_NAME), null, + true, false, false, false, true, null); + + AMQFrame exchangeDeclare = body.generateFrame(session.getChannelId()); + + try + { + // block our thread so that can times out + connection.getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + } + catch (Exception e) + { + assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME)); + } + + try + { + // Depending on if the notification thread has closed the connection + // or not we may get an exception here when we attempt to close the + // connection. If we do get one then it should be the same as above + // an AMQAuthenticationException. + connection.close(); + } + catch (Exception e) + { + assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME)); + } + + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index c5cdb83bbf..2a44413ac8 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -51,7 +51,13 @@ import javax.jms.TopicSubscriber; public class DurableSubscriptionTest extends QpidTestCase { private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class); - + + /** Timeout for receive() if we are expecting a message */ + private static final long POSITIVE_RECEIVE_TIMEOUT = 2000; + + /** Timeout for receive() if we are not expecting a message */ + private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000; + public void testUnsubscribe() throws Exception { AMQConnection con = (AMQConnection) getConnection("guest", "guest"); @@ -76,16 +82,18 @@ public class DurableSubscriptionTest extends QpidTestCase Message msg; _logger.info("Receive message on consumer 1:expecting A"); - msg = consumer1.receive(); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); - msg = consumer1.receive(1000); + msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); - _logger.info("Receive message on consumer 1:expecting A"); - msg = consumer2.receive(); + _logger.info("Receive message on consumer 2:expecting A"); + msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); - msg = consumer2.receive(1000); + msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); _logger.info("Receive message on consumer 1 :expecting null"); assertEquals(null, msg); @@ -96,14 +104,15 @@ public class DurableSubscriptionTest extends QpidTestCase producer.send(session1.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); - msg = consumer1.receive(); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Message should have been received",msg); assertEquals("B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); - msg = consumer1.receive(1000); + msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); _logger.info("Receive message on consumer 2 :expecting null"); - msg = consumer2.receive(1000); + msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); _logger.info("Close connection"); @@ -143,14 +152,16 @@ public class DurableSubscriptionTest extends QpidTestCase producer.send(session1.createTextMessage("A")); Message msg; - msg = consumer1.receive(); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); - msg = consumer1.receive(1000); + msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); - msg = consumer2.receive(); + msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); - msg = consumer2.receive(1000); + msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); consumer2.close(); @@ -220,8 +231,8 @@ public class DurableSubscriptionTest extends QpidTestCase msg = consumer1.receive(500); assertNull("There should be no more messages for consumption on consumer1.", msg); - msg = consumer2.receive(); - assertNotNull(msg); + msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Message should have been received",msg); assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText()); msg = consumer2.receive(500); assertNull("There should be no more messages for consumption on consumer2.", msg); @@ -235,10 +246,10 @@ public class DurableSubscriptionTest extends QpidTestCase producer.send(session0.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); - msg = consumer1.receive(1000); + msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals("B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); - msg = consumer1.receive(1000); + msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed. @@ -296,7 +307,7 @@ public class DurableSubscriptionTest extends QpidTestCase producer.send(session.createTextMessage("testDurableWithInvalidSelector2")); - Message msg = liveSubscriber.receive(); + Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull ("Message should have been received", msg); assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText()); assertNull("Should not receive subsequent message", liveSubscriber.receive(200)); @@ -331,7 +342,7 @@ public class DurableSubscriptionTest extends QpidTestCase assertNotNull("Subscriber should have been created", liveSubscriber); producer.send(session.createTextMessage("testDurableWithInvalidSelector2")); - Message msg = liveSubscriber.receive(); + Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull ("Message should have been received", msg); assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText()); assertNull("Should not receive subsequent message", liveSubscriber.receive(200)); @@ -360,13 +371,13 @@ public class DurableSubscriptionTest extends QpidTestCase // Send 1 matching message and 1 non-matching message sendMatchingAndNonMatchingMessage(session, producer); - Message rMsg = subA.receive(1000); + Message rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelector1", ((TextMessage) rMsg).getText()); - rMsg = subA.receive(1000); + rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull(rMsg); // Disconnect subscriber @@ -379,13 +390,13 @@ public class DurableSubscriptionTest extends QpidTestCase // Check messages are recieved properly sendMatchingAndNonMatchingMessage(session, producer); - rMsg = subB.receive(1000); + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelector2", ((TextMessage) rMsg).getText()); - rMsg = subB.receive(1000); + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull(rMsg); session.unsubscribe("testResubscribeWithChangedSelector"); } @@ -429,5 +440,5 @@ public class DurableSubscriptionTest extends QpidTestCase public static junit.framework.Test suite() { return new junit.framework.TestSuite(DurableSubscriptionTest.class); - } + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 9c755fcb41..b603455644 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -479,7 +479,7 @@ public class CommitRollbackTest extends QpidTestCase _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); _pubSession.commit(); - assertNotNull(_consumer.receive(100)); + assertNotNull(_consumer.receive(1000)); _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index cc9cfce34b..1bef07fcd5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -55,7 +55,7 @@ public class FailoverBaseCase extends QpidTestCase { super.setUp(); setSystemProperty("QPID_WORK", System.getProperty("java.io.tmpdir")+"/"+getFailingPort()); - startBroker(FAILING_PORT); + startBroker(failingPort); } /** @@ -76,7 +76,7 @@ public class FailoverBaseCase extends QpidTestCase public void tearDown() throws Exception { - stopBroker(FAILING_PORT); + stopBroker(_broker.equals(VM)?FAILING_PORT:FAILING_PORT); super.tearDown(); FileUtils.deleteDirectory(System.getProperty("java.io.tmpdir")+"/"+getFailingPort()); } |