diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client')
19 files changed, 3239 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java new file mode 100644 index 0000000000..292bcd6039 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -0,0 +1,405 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.TopicSession; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionDelegate_0_10; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQConnectionTest extends QpidBrokerTestCase +{ + private static AMQConnection _connection; + private static AMQTopic _topic; + private static AMQQueue _queue; + private static QueueSession _queueSession; + private static TopicSession _topicSession; + protected static final Logger _logger = LoggerFactory.getLogger(AMQConnectionTest.class); + + protected void setUp() throws Exception + { + super.setUp(); + _connection = (AMQConnection) getConnection("guest", "guest"); + _topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic")); + _queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue")); + } + + protected void tearDown() throws Exception + { + _connection.close(); + super.tearDown(); + } + + /** + * Simple tests to check we can create TopicSession and QueueSession ok + * And that they throw exceptions where appropriate as per JMS spec + */ + + public void testCreateQueueSession() throws JMSException + { + _queueSession = _connection.createQueueSession(false, AMQSession.NO_ACKNOWLEDGE); + } + + public void testCreateTopicSession() throws JMSException + { + _topicSession = _connection.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + } + + public void testTopicSessionCreateBrowser() throws JMSException + { + try + { + _topicSession.createBrowser(_queue); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // ok + } + catch (Exception e) + { + fail("expected javax.jms.IllegalStateException, got " + e); + } + } + + public void testTopicSessionCreateQueue() throws JMSException + { + try + { + _topicSession.createQueue("abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // ok + } + catch (Exception e) + { + fail("expected javax.jms.IllegalStateException, got " + e); + } + } + + public void testTopicSessionCreateTemporaryQueue() throws JMSException + { + try + { + _topicSession.createTemporaryQueue(); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // ok + } + catch (Exception e) + { + fail("expected javax.jms.IllegalStateException, got " + e); + } + } + + public void testQueueSessionCreateTemporaryTopic() throws JMSException + { + try + { + _queueSession.createTemporaryTopic(); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // ok + } + catch (Exception e) + { + fail("expected javax.jms.IllegalStateException, got " + e); + } + } + + public void testQueueSessionCreateTopic() throws JMSException + { + try + { + _queueSession.createTopic("abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // ok + } + catch (Exception e) + { + fail("expected javax.jms.IllegalStateException, got " + e); + } + } + + public void testQueueSessionDurableSubscriber() throws JMSException + { + try + { + _queueSession.createDurableSubscriber(_topic, "abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // ok + } + catch (Exception e) + { + fail("expected javax.jms.IllegalStateException, got " + e); + } + } + + public void testQueueSessionUnsubscribe() throws JMSException + { + try + { + _queueSession.unsubscribe("abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // ok + } + catch (Exception e) + { + fail("expected javax.jms.IllegalStateException, got " + e); + } + } + + public void testPrefetchSystemProperty() throws Exception + { + String oldPrefetch = System.getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME); + try + { + _connection.close(); + System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString()); + _connection = (AMQConnection) getConnection(); + _connection.start(); + // Create two consumers on different sessions + Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumerA = consSessA.createConsumer(_queue); + + Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + // Send 3 messages + for (int i = 0; i < 3; i++) + { + producer.send(producerSession.createTextMessage("test")); + } + + MessageConsumer consumerB = null; + if (isBroker08()) + { + Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + consumerB = consSessB.createConsumer(_queue); + } + else + { + consumerB = consSessA.createConsumer(_queue); + } + + Message msg; + // Check that consumer A has 2 messages + for (int i = 0; i < 2; i++) + { + msg = consumerA.receive(1500); + assertNotNull("Consumer A should receive 2 messages",msg); + } + + msg = consumerA.receive(1500); + assertNull("Consumer A should not have received a 3rd message",msg); + + // Check that consumer B has the last message + msg = consumerB.receive(1500); + assertNotNull("Consumer B should have received the message",msg); + } + finally + { + if (oldPrefetch == null) + { + oldPrefetch = ClientProperties.MAX_PREFETCH_DEFAULT; + } + System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, oldPrefetch); + } + } + + public void testGetChannelID() throws Exception + { + long maxChannelID = _connection.getMaximumChannelCount(); + if (isBroker010()) + { + //Usable numbers are 0 to N-1 when using 0-10 + //and 1 to N for 0-8/0-9 + maxChannelID = maxChannelID-1; + } + for (int j = 0; j < 3; j++) + { + int i = isBroker010() ? 0 : 1; + for ( ; i <= maxChannelID; i++) + { + int id = _connection.getNextChannelID(); + assertEquals("Unexpected number on iteration "+j, i, id); + _connection.deregisterSession(id); + } + } + } + + /** + * Test Strategy : Kill -STOP the broker and see + * if the client terminates the connection with a + * read timeout. + * The broker process is cleaned up in the test itself + * and avoids using process.waitFor() as it hangs. + */ + public void testHeartBeat() throws Exception + { + boolean windows = + ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); + + if (!isCppBroker() || windows) + { + return; + } + + Process process = null; + int port = getPort(0); + String pid = null; + try + { + // close the connection and shutdown the broker started by QpidTest + _connection.close(); + stopBroker(port); + + System.setProperty("qpid.heartbeat", "1"); + + // in case this broker gets stuck, atleast the rest of the tests will not fail. + port = port + 200; + String startCmd = getBrokerCommand(port); + + // start a broker using a script + ProcessBuilder pb = new ProcessBuilder(System.getProperty("broker.start")); + pb.redirectErrorStream(true); + + Map<String, String> env = pb.environment(); + env.put("BROKER_CMD",startCmd); + env.put("BROKER_READY",System.getProperty(BROKER_READY)); + + Process startScript = pb.start(); + startScript.waitFor(); + startScript.destroy(); + + Connection con = + new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:" + port + "'"); + final AtomicBoolean lock = new AtomicBoolean(false); + + String cmd = "/usr/bin/pgrep -f " + port; + process = Runtime.getRuntime().exec("/bin/bash"); + LineNumberReader reader = new LineNumberReader(new InputStreamReader(process.getInputStream())); + PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())), true); + out.println(cmd); + pid = reader.readLine(); + try + { + Integer.parseInt(pid); + } + catch (NumberFormatException e) + { + // Error! try to read further to gather the error msg. + String line; + _logger.debug(pid); + while ((line = reader.readLine()) != null ) + { + _logger.debug(line); + } + throw new Exception( "Unable to get the brokers pid " + pid); + } + _logger.debug("pid : " + pid); + + con.setExceptionListener(new ExceptionListener(){ + + public void onException(JMSException e) + { + synchronized(lock) { + lock.set(true); + lock.notifyAll(); + } + } + }); + + out.println("kill -STOP " + pid); + + synchronized(lock){ + lock.wait(2500); + } + out.close(); + reader.close(); + assertTrue("Client did not terminate the connection, check log for details",lock.get()); + } + catch(Exception e) + { + throw e; + } + finally + { + System.setProperty("qpid.heartbeat", ""); + + if (process != null) + { + process.destroy(); + } + + Process killScript = Runtime.getRuntime().exec(System.getProperty("broker.kill") + " " + pid); + killScript.waitFor(); + killScript.destroy(); + cleanBroker(); + } + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(AMQConnectionTest.class); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java new file mode 100644 index 0000000000..93cceb1048 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import javax.jms.JMSException; +import javax.jms.QueueReceiver; +import javax.jms.TopicSubscriber; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Tests for QueueReceiver and TopicSubscriber creation methods on AMQSession + */ +public class AMQSessionTest extends QpidBrokerTestCase +{ + + private static AMQSession _session; + private static AMQTopic _topic; + private static AMQQueue _queue; + private static AMQConnection _connection; + + protected void setUp() throws Exception + { + super.setUp(); + _connection = (AMQConnection) getConnection("guest", "guest"); + _topic = new AMQTopic(_connection,"mytopic"); + _queue = new AMQQueue(_connection,"myqueue"); + _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + } + + protected void tearDown() throws Exception + { + try + { + _connection.close(); + } + catch (JMSException e) + { + //just close + } + super.tearDown(); + } + + public void testCreateSubscriber() throws JMSException + { + TopicSubscriber subscriber = _session.createSubscriber(_topic); + assertEquals("Topic names should match from TopicSubscriber", _topic.getTopicName(), subscriber.getTopic().getTopicName()); + + subscriber = _session.createSubscriber(_topic, "abc", false); + assertEquals("Topic names should match from TopicSubscriber with selector", _topic.getTopicName(), subscriber.getTopic().getTopicName()); + } + + public void testCreateDurableSubscriber() throws JMSException + { + TopicSubscriber subscriber = _session.createDurableSubscriber(_topic, "mysubname"); + assertEquals("Topic names should match from durable TopicSubscriber", _topic.getTopicName(), subscriber.getTopic().getTopicName()); + + subscriber = _session.createDurableSubscriber(_topic, "mysubname2", "abc", false); + assertEquals("Topic names should match from durable TopicSubscriber with selector", _topic.getTopicName(), subscriber.getTopic().getTopicName()); + _session.unsubscribe("mysubname"); + _session.unsubscribe("mysubname2"); + } + + public void testCreateQueueReceiver() throws JMSException + { + QueueReceiver receiver = _session.createQueueReceiver(_queue); + assertEquals("Queue names should match from QueueReceiver", _queue.getQueueName(), receiver.getQueue().getQueueName()); + + receiver = _session.createQueueReceiver(_queue, "abc"); + assertEquals("Queue names should match from QueueReceiver with selector", _queue.getQueueName(), receiver.getQueue().getQueueName()); + } + + public void testCreateReceiver() throws JMSException + { + QueueReceiver receiver = _session.createReceiver(_queue); + assertEquals("Queue names should match from QueueReceiver", _queue.getQueueName(), receiver.getQueue().getQueueName()); + + receiver = _session.createReceiver(_queue, "abc"); + assertEquals("Queue names should match from QueueReceiver with selector", _queue.getQueueName(), receiver.getQueue().getQueueName()); + } + + public static void stopVmBrokers() + { + _queue = null; + _topic = null; + _session = null; + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java new file mode 100644 index 0000000000..8577fb5b6a --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * QPID-155 + * + * Test to validate that setting the respective qpid.declare_queues, + * qpid.declare_exchanges system properties functions as expected. + */ +public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase +{ + public void testQueueDeclare() throws Exception + { + setSystemProperty("qpid.declare_queues", "false"); + + Connection connection = getConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(getTestQueueName()); + + try + { + session.createConsumer(queue); + fail("JMSException should be thrown as the queue does not exist"); + } + catch (JMSException e) + { + checkExceptionErrorCode(e, AMQConstant.NOT_FOUND); + } + } + + public void testExchangeDeclare() throws Exception + { + setSystemProperty("qpid.declare_exchanges", "false"); + + Connection connection = getConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String EXCHANGE_TYPE = "test.direct"; + Queue queue = session.createQueue("direct://" + EXCHANGE_TYPE + "/queue/queue"); + + try + { + session.createConsumer(queue); + fail("JMSException should be thrown as the exchange does not exist"); + } + catch (JMSException e) + { + checkExceptionErrorCode(e, AMQConstant.NOT_FOUND); + } + } + + private void checkExceptionErrorCode(JMSException original, AMQConstant code) + { + Exception linked = original.getLinkedException(); + assertNotNull("Linked exception should have been set", linked); + assertTrue("Linked exception should be an AMQException", linked instanceof AMQException); + assertEquals("Error code should be " + code.getCode(), code, ((AMQException) linked).getErrorCode()); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java new file mode 100644 index 0000000000..79e2ff8148 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -0,0 +1,239 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.channelclose; + +import junit.textui.TestRunner; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import java.util.ArrayList; +import java.util.List; + +/** + * Due to bizarre exception handling all sessions are closed if you get + * a channel close request and no exception listener is registered. + * <p/> + * JIRA issue IBTBLZ-10. + * <p/> + * Simulate by: + * <p/> + * 0. Create two sessions with no exception listener. + * 1. Publish message to queue/topic that does not exist (wrong routing key). + * 2. This will cause a channel close. + * 3. Since client does not have an exception listener, currently all sessions are + * closed. + */ +public class ChannelCloseOkTest extends QpidBrokerTestCase +{ + private AMQConnection _connection; + private Destination _destination1; + private Destination _destination2; + private Session _session1; + private Session _session2; + private final List<Message> _received1 = new ArrayList<Message>(); + private final List<Message> _received2 = new ArrayList<Message>(); + + private static final Logger _log = LoggerFactory.getLogger(ChannelCloseOkTest.class); + + protected void setUp() throws Exception + { + super.setUp(); + + _connection = (AMQConnection) getConnection("guest", "guest"); + + _destination1 = new AMQQueue(_connection, "q1", true); + _destination2 = new AMQQueue(_connection, "q2", true); + _session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _session1.createConsumer(_destination1).setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _log.debug("consumer 1 got message [" + getTextMessage(message) + "]"); + synchronized (_received1) + { + _received1.add(message); + _received1.notify(); + } + } + }); + _session2 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _session2.createConsumer(_destination2).setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _log.debug("consumer 2 got message [" + getTextMessage(message) + "]"); + synchronized (_received2) + { + _received2.add(message); + _received2.notify(); + } + } + }); + + _connection.start(); + } + + private String getTextMessage(Message message) + { + TextMessage tm = (TextMessage) message; + try + { + return tm.getText(); + } + catch (JMSException e) + { + return "oops " + e; + } + } + + protected void tearDown() throws Exception + { + closeConnection(); + super.tearDown(); + } + + public void closeConnection() throws JMSException + { + if (_connection != null) + { + _log.info(">>>>>>>>>>>>>>.. closing"); + _connection.close(); + } + } + + public void testWithoutExceptionListener() throws Exception + { + doTest(); + } + + public void testWithExceptionListener() throws Exception + { + _connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmsException) + { + _log.warn("onException - " + jmsException.getMessage()); + } + }); + + doTest(); + } + + public void doTest() throws Exception + { + // Check both sessions are ok. + sendAndWait(_session1, _destination1, "first", _received1, 1); + sendAndWait(_session2, _destination2, "second", _received2, 1); + assertEquals(1, _received1.size()); + assertEquals(1, _received2.size()); + + // Now send message to incorrect destination on session 1. + Destination destination = new AMQQueue(_connection, "incorrect"); + send(_session1, destination, "third"); // no point waiting as message will never be received. + + // Ensure both sessions are still ok. + // Send a bunch of messages as this give time for the sessions to be erroneously closed. + final int num = 300; + for (int i = 0; i < num; ++i) + { + send(_session1, _destination1, "" + i); + send(_session2, _destination2, "" + i); + } + + waitFor(_received1, num + 1); + waitFor(_received2, num + 1); + + // Note that the third message is never received as it is sent to an incorrect destination. + assertEquals(num + 1, _received1.size()); + assertEquals(num + 1, _received2.size()); + } + + private void sendAndWait(Session session, Destination destination, String message, List<Message> received, int count) + throws JMSException, InterruptedException + { + send(session, destination, message); + waitFor(received, count); + } + + private void send(Session session, Destination destination, String message) throws JMSException + { + _log.debug("sending message " + message); + MessageProducer producer1 = session.createProducer(destination); + producer1.send(session.createTextMessage(message)); + } + + private void waitFor(List<Message> received, int count) throws InterruptedException + { + long timeout = 20000; + + synchronized (received) + { + long start = System.currentTimeMillis(); + while (received.size() < count) + { + if (System.currentTimeMillis() - start > timeout) + { + fail("timeout expired waiting for messages"); + } + try + { + received.wait(timeout); + } + catch (InterruptedException e) + { + _log.info("Interrupted: " + e); + throw e; + } + + } + } + } + + private static String randomize(String in) + { + return in + System.currentTimeMillis(); + } + + public static void main(String[] args) + { + TestRunner.run(ChannelCloseOkTest.class); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(ChannelCloseOkTest.class); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java new file mode 100644 index 0000000000..b6232b1734 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.client.channelclose; + +import org.apache.qpid.AMQException; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.*; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.protocol.AMQConstant; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +public class ChannelCloseTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener +{ + private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseTest.class); + + Connection _connection; + private Session _session; + private static final long SYNC_TIMEOUT = 500; + private int TEST = 0; + + /** + * Close channel, use chanel with same id ensure error. + * + * This test is only valid for non 0-10 connection . + */ + public void testReusingChannelAfterFullClosure() throws Exception + { + _connection=newConnection(); + + // Create Producer + try + { + _connection.start(); + + createChannelAndTest(1); + + // Cause it to close + try + { + _logger.info("Testing invalid exchange"); + declareExchange(1, "", "name_that_will_lookup_to_null", false); + fail("Exchange name is empty so this should fail "); + } + catch (AMQException e) + { + assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode()); + } + + // Check that + try + { + _logger.info("Testing valid exchange should fail"); + declareExchange(1, "topic", "amq.topic", false); + fail("This should not succeed as the channel should be closed "); + } + catch (AMQException e) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Exception occured was:" + e.getErrorCode()); + } + + assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); + + _connection=newConnection(); + } + + checkSendingMessage(); + + _session.close(); + _connection.close(); + + } + catch (JMSException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /* + close channel and send guff then send ok no errors + REMOVE TEST - The behaviour after server has sent close is undefined. + the server should be free to fail as it may wish to reclaim its resources + immediately after close. + */ + /*public void testSendingMethodsAfterClose() throws Exception + { + // this is testing an 0.8 connection + if(isBroker08()) + { + try + { + _connection=new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'"); + + ((AMQConnection) _connection).setConnectionListener(this); + + _connection.setExceptionListener(this); + + // Change the StateManager for one that doesn't respond with Close-OKs + AMQStateManager oldStateManager=((AMQConnection) _connection).getProtocolHandler().getStateManager(); + + _session=_connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + _connection.start(); + + // Test connection + checkSendingMessage(); + + // Set StateManager to manager that ignores Close-oks + AMQProtocolSession protocolSession= + ((AMQConnection) _connection).getProtocolHandler().getProtocolSession(); + + MethodDispatcher d = protocolSession.getMethodDispatcher(); + + MethodDispatcher wrappedDispatcher = (MethodDispatcher) + Proxy.newProxyInstance(d.getClass().getClassLoader(), + d.getClass().getInterfaces(), + new MethodDispatcherProxyHandler( + (ClientMethodDispatcherImpl) d)); + + protocolSession.setMethodDispatcher(wrappedDispatcher); + + + AMQStateManager newStateManager=new NoCloseOKStateManager(protocolSession); + newStateManager.changeState(oldStateManager.getCurrentState()); + + ((AMQConnection) _connection).getProtocolHandler().setStateManager(newStateManager); + + final int TEST_CHANNEL=1; + _logger.info("Testing Channel(" + TEST_CHANNEL + ") Creation"); + + createChannelAndTest(TEST_CHANNEL); + + // Cause it to close + try + { + _logger.info("Closing Channel - invalid exchange"); + declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false); + fail("Exchange name is empty so this should fail "); + } + catch (AMQException e) + { + assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode()); + } + + try + { + // Send other methods that should be ignored + // send them no wait as server will ignore them + _logger.info("Tested known exchange - should ignore"); + declareExchange(TEST_CHANNEL, "topic", "amq.topic", true); + + _logger.info("Tested known invalid exchange - should ignore"); + declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true); + + _logger.info("Tested known invalid exchange - should ignore"); + declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true); + + // Send sync .. server will igore and timy oue + _logger.info("Tested known invalid exchange - should ignore"); + declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false); + } + catch (AMQTimeoutException te) + { + assertEquals("Request should timeout", AMQConstant.REQUEST_TIMEOUT, te.getErrorCode()); + } + catch (AMQException e) + { + fail("This should not fail as all requests should be ignored"); + } + + _logger.info("Sending Close"); + // Send Close-ok + sendClose(TEST_CHANNEL); + + _logger.info("Re-opening channel"); + + createChannelAndTest(TEST_CHANNEL); + + // Test connection is still ok + + checkSendingMessage(); + + } + catch (JMSException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + catch (AMQException e) + { + fail(e.getMessage()); + + } + catch (URLSyntaxException e) + { + fail(e.getMessage()); + } + finally + { + try + { + _session.close(); + _connection.close(); + } + catch (JMSException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + } + } + } +*/ + private void createChannelAndTest(int channel) throws FailoverException + { + // Create A channel + try + { + createChannel(channel); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + // Test it is ok + try + { + declareExchange(channel, "topic", "amq.topic", false); + _logger.info("Tested known exchange"); + } + catch (AMQException e) + { + fail("This should not fail as this is the default exchange details"); + } + } + + private void sendClose(int channel) + { + ChannelCloseOkBody body = + ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelCloseOkBody(); + AMQFrame frame = body.generateFrame(channel); + + ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame); + } + + private void checkSendingMessage() throws JMSException + { + TEST++; + _logger.info("Test creating producer which will use channel id 1"); + + Queue queue = _session.createQueue("CCT_test_validation_queue" + TEST); + + MessageConsumer consumer = _session.createConsumer(queue); + + MessageProducer producer = _session.createProducer(queue); + + final String MESSAGE = "CCT_Test_Message"; + producer.send(_session.createTextMessage(MESSAGE)); + + Message msg = consumer.receive(2000); + + assertNotNull("Received messages should not be null.", msg); + assertEquals("Message received not what we sent", MESSAGE, ((TextMessage) msg).getText()); + } + + private Connection newConnection() + { + Connection connection = null; + try + { + connection = getConnection(); + + ((AMQConnection) connection).setConnectionListener(this); + + _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + connection.start(); + + } + catch (Exception e) + { + fail("Creating new connection when:" + e.getMessage()); + } + + return connection; + } + + private void declareExchange(int channelId, String _type, String _name, boolean nowait) + throws AMQException, FailoverException + { + ExchangeDeclareBody body = + ((AMQConnection) _connection).getProtocolHandler() + .getMethodRegistry() + .createExchangeDeclareBody(0, + new AMQShortString(_name), + new AMQShortString(_type), + true, + false, + false, + false, + nowait, + null); + AMQFrame exchangeDeclare = body.generateFrame(channelId); + AMQProtocolHandler protocolHandler = ((AMQConnection) _connection).getProtocolHandler(); + + + if (nowait) + { + protocolHandler.writeFrame(exchangeDeclare); + } + else + { + protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT); + } + +// return null; +// } +// }, (AMQConnection)_connection).execute(); + + } + + private void createChannel(int channelId) throws AMQException, FailoverException + { + ChannelOpenBody body = + ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelOpenBody(null); + + ((AMQConnection) _connection).getProtocolHandler().syncWrite(body.generateFrame(channelId), // outOfBand + ChannelOpenOkBody.class); + + } + + public void onException(JMSException jmsException) + { + // _logger.info("CCT" + jmsException); + fail(jmsException.getMessage()); + } + + public void bytesSent(long count) + { } + + public void bytesReceived(long count) + { } + + public boolean preFailover(boolean redirect) + { + return false; + } + + public boolean preResubscribe() + { + return false; + } + + public void failoverComplete() + { } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java new file mode 100644 index 0000000000..56d03dc4a7 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.channelclose; + +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * @author Apache Software Foundation + */ +public class CloseWithBlockingReceiveTest extends QpidBrokerTestCase +{ + + + protected void setUp() throws Exception + { + super.setUp(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + + public void testReceiveReturnsNull() throws Exception + { + final AMQConnection connection = (AMQConnection) getConnection("guest", "guest"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(new AMQTopic(connection, "banana")); + connection.start(); + + Runnable r = new Runnable() + { + + public void run() + { + try + { + Thread.sleep(1000); + connection.close(); + } + catch (Exception e) + { + } + } + }; + long startTime = System.currentTimeMillis(); + new Thread(r).start(); + consumer.receive(10000); + assertTrue(System.currentTimeMillis() - startTime < 10000); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(CloseWithBlockingReceiveTest.class); + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java new file mode 100644 index 0000000000..dc2f59c384 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; + +public class CloseAfterConnectionFailureTest extends QpidBrokerTestCase implements ExceptionListener +{ + private int sessionCount = 0; + AMQConnection connection; + Session session; + MessageConsumer consumer; + private CountDownLatch _latch = new CountDownLatch(1); + private JMSException _fail; + + public void testNoFailover() throws URLSyntaxException, Exception, + InterruptedException, JMSException + { + //This test uses hard coded connection string so only runs on InVM case + if (!isExternalBroker()) + { + String connectionString = "amqp://guest:guest@/test?brokerlist='vm://:1?connectdelay='500',retries='3'',failover='nofailover'"; + + AMQConnectionURL url = new AMQConnectionURL(connectionString); + + try + { + //Start the connection so it will use the retries + connection = new AMQConnection(url, null); + + connection.setExceptionListener(this); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createConsumer(session.createQueue(this.getName())); + + //Kill connection + stopBroker(); + + _latch.await(); + + if (_fail != null) + { + _fail.printStackTrace(System.out); + fail("Exception thrown:" + _fail.getMessage()); + } + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + } + + public void onException(JMSException e) + { + System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed()); + try + { + consumer.close(); + } + catch (JMSException jmse) + { + System.out.println("Consumer close failed with:" + jmse.getMessage()); + _fail = jmse; + } + System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed()); + try + { + //Note that if we actually do session.close() we will lock up as the session will never receive a frame + // from the + ((AMQSession) session).close(10); + } + catch (JMSException jmse) + { + System.out.println("Session close failed with:" + jmse.getMessage()); + _fail = jmse; + } + System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed()); + + try + { + connection.close(); + } + catch (JMSException jmse) + { + System.out.println("Session close failed with:" + jmse.getMessage()); + _fail = jmse; + } + System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed()); + + _latch.countDown(); + + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java new file mode 100644 index 0000000000..6d1b6de238 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.transport.util.Logger; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * ConnectionCloseTest + * + */ + +public class ConnectionCloseTest extends QpidBrokerTestCase +{ + + private static final Logger log = Logger.get(ConnectionCloseTest.class); + + public void testSendReceiveClose() throws Exception + { + Map<Thread,StackTraceElement[]> before = Thread.getAllStackTraces(); + + for (int i = 0; i < 50; i++) + { + if ((i % 10) == 0) + { + log.warn("%d messages sent and received", i); + } + + Connection receiver = getConnection(); + receiver.start(); + Session rssn = receiver.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = rssn.createQueue("connection-close-test-queue"); + MessageConsumer cons = rssn.createConsumer(queue); + + Connection sender = getConnection(); + sender.start(); + Session sssn = sender.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod = sssn.createProducer(queue); + prod.send(sssn.createTextMessage("test")); + sender.close(); + + TextMessage m = (TextMessage) cons.receive(2000); + assertNotNull("message was lost", m); + assertEquals(m.getText(), "test"); + receiver.close(); + } + + // The finalizer is notifying connector thread waiting on a selector key. + // This should leave the finalizer enough time to notify those threads + synchronized (this) + { + this.wait(10000); + } + + Map<Thread,StackTraceElement[]> after = Thread.getAllStackTraces(); + + Map<Thread,StackTraceElement[]> delta = new HashMap<Thread,StackTraceElement[]>(after); + for (Thread t : before.keySet()) + { + delta.remove(t); + } + + dumpStacks(delta); + + int deltaThreshold = (isExternalBroker()? 1 : 2) //InVM creates more thread pools in the same VM + * (Runtime.getRuntime().availableProcessors() + 1) + 5; + + assertTrue("Spurious thread creation exceeded threshold, " + + delta.size() + " threads created.", + delta.size() < deltaThreshold); + } + + private void dumpStacks(Map<Thread,StackTraceElement[]> map) + { + for (Map.Entry<Thread,StackTraceElement[]> entry : map.entrySet()) + { + Throwable t = new Throwable(); + t.setStackTrace(entry.getValue()); + log.warn(t, entry.getKey().toString()); + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java new file mode 100644 index 0000000000..ac14f8e50e --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java @@ -0,0 +1,158 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class ConnectionStartTest extends QpidBrokerTestCase +{ + + String _broker = "vm://:1"; + + AMQConnection _connection; + private Session _consumerSess; + private MessageConsumer _consumer; + + protected void setUp() throws Exception + { + super.setUp(); + try + { + + + AMQConnection pubCon = (AMQConnection) getConnection("guest", "guest"); + + AMQQueue queue = new AMQQueue(pubCon,"ConnectionStartTest"); + + Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + + MessageProducer pub = pubSess.createProducer(queue); + + _connection = (AMQConnection) getConnection("guest", "guest"); + + _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + + _consumer = _consumerSess.createConsumer(queue); + + //publish after queue is created to ensure it can be routed as expected + pub.send(pubSess.createTextMessage("Initial Message")); + + pubCon.close(); + + } + catch (Exception e) + { + e.printStackTrace(); + fail("Connection to " + _broker + " should succeed. Reason: " + e); + } + } + + protected void tearDown() throws Exception + { + _connection.close(); + super.tearDown(); + } + + public void testSimpleReceiveConnection() + { + try + { + assertTrue("Connection should not be started", !_connection.started()); + //Note that this next line will start the dispatcher in the session + // should really not be called before _connection start + //assertTrue("There should not be messages waiting for the consumer", _consumer.receiveNoWait() == null); + _connection.start(); + assertTrue("There should be messages waiting for the consumer", _consumer.receive(10*1000) != null); + assertTrue("Connection should be started", _connection.started()); + + } + catch (JMSException e) + { + fail("An error occured during test because:" + e); + } + + } + + public void testMessageListenerConnection() + { + final CountDownLatch _gotMessage = new CountDownLatch(1); + + try + { + assertTrue("Connection should not be started", !_connection.started()); + _consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + try + { + assertTrue("Connection should be started", _connection.started()); + assertEquals("Mesage Received", "Initial Message", ((TextMessage) message).getText()); + _gotMessage.countDown(); + } + catch (JMSException e) + { + fail("Couldn't get message text because:" + e.getCause()); + } + } + }); + + assertTrue("Connection should not be started", !_connection.started()); + _connection.start(); + assertTrue("Connection should be started", _connection.started()); + + try + { + assertTrue("Listener was never called", _gotMessage.await(10 * 1000, TimeUnit.MILLISECONDS)); + } + catch (InterruptedException e) + { + fail("Timed out awaiting message via onMessage"); + } + + } + catch (JMSException e) + { + fail("Failed because:" + e.getCause()); + } + + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(ConnectionStartTest.class); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java new file mode 100644 index 0000000000..04fc611cd1 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -0,0 +1,301 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.AMQConnectionFailureException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.client.AMQAuthenticationException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.Session; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.BrokerDetails; + +import javax.jms.Connection; +import javax.jms.QueueSession; +import javax.jms.TopicSession; +import javax.naming.NamingException; + +public class ConnectionTest extends QpidBrokerTestCase +{ + + String _broker_NotRunning = "vm://:2"; + String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs"; + + public void testSimpleConnection() throws Exception + { + AMQConnection conn = null; + try + { + conn = new AMQConnection(getBroker().toString(), "guest", "guest", "fred", "test"); + } + catch (Exception e) + { + fail("Connection to " + getBroker() + " should succeed. Reason: " + e); + } + finally + { + if(conn != null) + { + conn.close(); + } + } + } + + public void testDefaultExchanges() throws Exception + { + AMQConnection conn = null; + try + { + BrokerDetails broker = getBroker(); + broker.setProperty(BrokerDetails.OPTIONS_RETRY, "1"); + ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + + broker + + "'&defaultQueueExchange='test.direct'" + + "&defaultTopicExchange='test.topic'" + + "&temporaryQueueExchange='tmp.direct'" + + "&temporaryTopicExchange='tmp.topic'"); + + System.err.println(url.toString()); + conn = new AMQConnection(url, null); + + + AMQSession sess = (AMQSession) conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + sess.declareExchange(new AMQShortString("test.direct"), + ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false); + + sess.declareExchange(new AMQShortString("tmp.direct"), + ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false); + + sess.declareExchange(new AMQShortString("tmp.topic"), + ExchangeDefaults.TOPIC_EXCHANGE_CLASS, false); + + sess.declareExchange(new AMQShortString("test.topic"), + ExchangeDefaults.TOPIC_EXCHANGE_CLASS, false); + + QueueSession queueSession = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + AMQQueue queue = (AMQQueue) queueSession.createQueue("MyQueue"); + + assertEquals(queue.getExchangeName().toString(), "test.direct"); + + AMQQueue tempQueue = (AMQQueue) queueSession.createTemporaryQueue(); + + assertEquals(tempQueue.getExchangeName().toString(), "tmp.direct"); + + queueSession.close(); + + TopicSession topicSession = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + AMQTopic topic = (AMQTopic) topicSession.createTopic("silly.topic"); + + assertEquals(topic.getExchangeName().toString(), "test.topic"); + + AMQTopic tempTopic = (AMQTopic) topicSession.createTemporaryTopic(); + + assertEquals(tempTopic.getExchangeName().toString(), "tmp.topic"); + + topicSession.close(); + + } + catch (Exception e) + { + fail("Connection to " + getBroker() + " should succeed. Reason: " + e); + } + finally + { + conn.close(); + } + } + + public void testPasswordFailureConnection() throws Exception + { + AMQConnection conn = null; + try + { + BrokerDetails broker = getBroker(); + broker.setProperty(BrokerDetails.OPTIONS_RETRY, "0"); + conn = new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + broker + "'"); + fail("Connection should not be established password is wrong."); + } + catch (AMQConnectionFailureException amqe) + { + assertNotNull("No cause set:" + amqe.getMessage(), amqe.getCause()); + assertTrue("Exception was wrong type", amqe.getCause() instanceof AMQException); + } + finally + { + if (conn != null) + { + conn.close(); + } + } + } + + public void testConnectionFailure() throws Exception + { + AMQConnection conn = null; + try + { + conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''"); + fail("Connection should not be established"); + } + catch (AMQException amqe) + { + if (!(amqe instanceof AMQConnectionFailureException)) + { + fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe); + } + } + finally + { + if (conn != null) + { + conn.close(); + } + } + + } + + public void testUnresolvedHostFailure() throws Exception + { + AMQConnection conn = null; + try + { + conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''"); + fail("Connection should not be established"); + } + catch (AMQException amqe) + { + if (!(amqe instanceof AMQUnresolvedAddressException)) + { + fail("Correct exception not thrown. Excpected 'AMQUnresolvedAddressException' got: " + amqe); + } + } + finally + { + if (conn != null) + { + conn.close(); + } + } + + } + + public void testUnresolvedVirtualHostFailure() throws Exception + { + AMQConnection conn = null; + try + { + BrokerDetails broker = getBroker(); + broker.setProperty(BrokerDetails.OPTIONS_RETRY, "0"); + conn = new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + broker + "'"); + fail("Connection should not be established"); + } + catch (AMQException amqe) + { + if (!(amqe instanceof AMQConnectionFailureException)) + { + fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe); + } + } + finally + { + if (conn != null) + { + conn.close(); + } + } + } + + public void testClientIdCannotBeChanged() throws Exception + { + Connection connection = new AMQConnection(getBroker().toString(), "guest", "guest", + "fred", "test"); + try + { + connection.setClientID("someClientId"); + fail("No IllegalStateException thrown when resetting clientid"); + } + catch (javax.jms.IllegalStateException e) + { + // PASS + } + finally + { + if (connection != null) + { + connection.close(); + } + } + } + + public void testClientIdIsPopulatedAutomatically() throws Exception + { + Connection connection = new AMQConnection(getBroker().toString(), "guest", "guest", + null, "test"); + try + { + assertNotNull(connection.getClientID()); + } + finally + { + connection.close(); + } + connection.close(); + } + + public void testUnsupportedSASLMechanism() throws Exception + { + BrokerDetails broker = getBroker(); + broker.setProperty(BrokerDetails.OPTIONS_SASL_MECHS, "MY_MECH"); + + try + { + Connection connection = new AMQConnection(broker.toString(), "guest", "guest", + null, "test"); + connection.close(); + fail("The client should throw a ConnectionException stating the" + + " broker does not support the SASL mech specified by the client"); + } + catch (Exception e) + { + assertTrue("Incorrect exception thrown", + e.getMessage().contains("The following SASL mechanisms " + + "[MY_MECH]" + + " specified by the client are not supported by the broker")); + } + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(ConnectionTest.class); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java new file mode 100644 index 0000000000..cec9d292cf --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; + +/** + * ExceptionListenerTest + * + */ + +public class ExceptionListenerTest extends QpidBrokerTestCase +{ + + public void testBrokerDeath() throws Exception + { + Connection conn = getConnection("guest", "guest"); + + conn.start(); + + final CountDownLatch fired = new CountDownLatch(1); + conn.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + fired.countDown(); + } + }); + + stopBroker(); + + if (!fired.await(3, TimeUnit.SECONDS)) + { + fail("exception listener was not fired"); + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java new file mode 100644 index 0000000000..b60fe76b76 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java @@ -0,0 +1,143 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.forwardall; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; + +/** + * Declare a private temporary response queue, + * send a message to amq.direct with a well known routing key with the + * private response queue as the reply-to destination + * consume responses. + */ +public class Client implements MessageListener +{ + private static final Logger _logger = LoggerFactory.getLogger(Client.class); + + private final AMQConnection _connection; + private final AMQSession _session; + private final int _expected; + private int _count; + private static QpidBrokerTestCase _qct; + + Client(String broker, int expected) throws Exception + { + this(connect(broker), expected); + } + + public static void setQTC(QpidBrokerTestCase qtc) + { + _qct = qtc; + } + Client(AMQConnection connection, int expected) throws Exception + { + _connection = connection; + _expected = expected; + _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE); + AMQQueue response = + new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("ResponseQueue"), true); + _session.createConsumer(response).setMessageListener(this); + _connection.start(); + // AMQQueue service = new SpecialQueue(_connection, "ServiceQueue"); + AMQQueue service = (AMQQueue) _session.createQueue("ServiceQueue") ; + Message request = _session.createTextMessage("Request!"); + request.setJMSReplyTo(response); + MessageProducer prod = _session.createProducer(service); + prod.send(request); + _session.commit(); + } + + void shutdownWhenComplete() throws Exception + { + waitUntilComplete(); + _connection.close(); + } + + public synchronized void onMessage(Message response) + { + + _logger.info("Received " + (++_count) + " of " + _expected + " responses."); + if (_count == _expected) + { + + notifyAll(); + } + try + { + _session.commit(); + } + catch (JMSException e) + { + + } + + } + + synchronized void waitUntilComplete() throws Exception + { + + if (_count < _expected) + { + wait(60000); + } + + if (_count < _expected) + { + throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected); + } + } + + static AMQConnection connect(String broker) throws Exception + { + //return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test"); + return (AMQConnection) _qct.getConnection("guest", "guest") ; + } + + public static void main(String[] argv) throws Exception + { + final String connectionString; + final int expected; + if (argv.length == 0) + { + connectionString = "localhost:5672"; + expected = 100; + } + else + { + connectionString = argv[0]; + expected = Integer.parseInt(argv[1]); + } + + new Client(connect(connectionString), expected).shutdownWhenComplete(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java new file mode 100644 index 0000000000..45945eb8fc --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.forwardall; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Runs the Service's and Client parts of the test in the same process + * as the broker + */ +public class CombinedTest extends QpidBrokerTestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(CombinedTest.class); + private int run = 0; + + protected void setUp() throws Exception + { + super.setUp(); + Service.setQTC(this); + Client.setQTC(this); + } + + protected void tearDown() throws Exception + { + ServiceCreator.closeAll(); + super.tearDown(); + } + + public void testForwardAll() throws Exception + { + while (run < 10) + { + int services =1; + ServiceCreator.start("vm://:1", services); + + _logger.info("Starting " + ++run + " client..."); + + new Client("vm://:1", services).shutdownWhenComplete(); + + + _logger.info("Completed " + run + " successfully!"); + } + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(CombinedTest.class); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java new file mode 100644 index 0000000000..160700bdda --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.forwardall; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Declare a queue and bind it to amq.direct with a 'well known' routing key, + * register a consumer for this queue and send a response to every message received. + */ +public class Service implements MessageListener +{ + private final AMQConnection _connection; + private final AMQSession _session; + + private static QpidBrokerTestCase _qct; + + + public static void setQTC(QpidBrokerTestCase qtc) + { + _qct = qtc; + } + Service(String broker) throws Exception + { + this(connect(broker)); + } + + Service(AMQConnection connection) throws Exception + { + _connection = connection; + //AMQQueue queue = new SpecialQueue(connection, "ServiceQueue"); + _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE); + AMQQueue queue = (AMQQueue) _session.createQueue("ServiceQueue") ; + _session.createConsumer(queue).setMessageListener(this); + _connection.start(); + } + + public void onMessage(Message request) + { + try + { + Message response = _session.createTextMessage("Response!"); + Destination replyTo = request.getJMSReplyTo(); + _session.createProducer(replyTo).send(response); + _session.commit(); + } + catch (Exception e) + { + e.printStackTrace(System.out); + } + } + + public void close() throws JMSException + { + _connection.close(); + } + + static AMQConnection connect(String broker) throws Exception + { + //return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test"); + return (AMQConnection) _qct.getConnection("guest", "guest") ; + } + +// public static void main(String[] argv) throws Exception +// { +// String broker = argv.length == 0? "localhost:5672" : argv[0]; +// new Service(broker); +// } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java new file mode 100644 index 0000000000..be16f6b7ae --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.forwardall; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; + +public class ServiceCreator implements Runnable +{ + private static final Logger _logger = LoggerFactory.getLogger(ServiceCreator.class); + + private static Thread[] threads; + private static ServiceCreator[] _services; + + private final String broker; + private Service service; + + ServiceCreator(String broker) + { + this.broker = broker; + } + + public void run() + { + try + { + service = new Service(broker); + } + catch (Exception e) + { + e.printStackTrace(System.out); + } + } + + public void closeSC() throws JMSException + { + service.close(); + } + + static void closeAll() + { + for (int i = 0; i < _services.length; i++) + { + try + { + _services[i].closeSC(); + } + catch (JMSException e) + { + // ignore + } + } + } + + static void start(String broker, int services) throws InterruptedException + { + threads = new Thread[services]; + _services = new ServiceCreator[services]; + ServiceCreator runner = new ServiceCreator(broker); + // start services + _logger.info("Starting " + services + " services..."); + for (int i = 0; i < services; i++) + { + threads[i] = new Thread(runner); + _services[i] = runner; + threads[i].start(); + } + + for (int i = 0; i < threads.length; i++) + { + threads[i].join(); + } + } + + public static void main(String[] argv) throws Exception + { + final String connectionString; + final int services; + if (argv.length == 0) + { + connectionString = "localhost:5672"; + services = 100; + } + else + { + connectionString = argv[0]; + services = Integer.parseInt(argv[1]); + } + + start(connectionString, services); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java new file mode 100644 index 0000000000..27371b0397 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.forwardall; + +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.framing.AMQShortString; + +/** + * Queue that allows several private queues to be registered and bound + * to an exchange with the same routing key. + * + */ +class SpecialQueue extends AMQQueue +{ + private final AMQShortString name; + + SpecialQueue(AMQConnection con, String name) + { + super(con, name, true); + this.name = new AMQShortString(name); + } + + public AMQShortString getRoutingKey() + { + return name; + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java new file mode 100644 index 0000000000..fd28b86762 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java @@ -0,0 +1,335 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.message; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; + +public class ObjectMessageTest extends QpidBrokerTestCase implements MessageListener +{ + private static final Logger _logger = LoggerFactory.getLogger(ObjectMessageTest.class); + + private AMQConnection connection; + private AMQDestination destination; + private AMQSession session; + private MessageProducer producer; + private Serializable[] data; + private volatile boolean waiting; + private int received; + private final ArrayList items = new ArrayList(); + + private String _broker = "vm://:1"; + + protected void setUp() throws Exception + { + super.setUp(); + connection = (AMQConnection) getConnection("guest", "guest"); + destination = new AMQQueue(connection, randomize("LatencyTest"), true); + session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + + // set up a consumer + session.createConsumer(destination).setMessageListener(this); + connection.start(); + + // create a publisher + producer = session.createProducer(destination, false, false, true); + A a1 = new A(1, "A"); + A a2 = new A(2, "a"); + B b = new B(1, "B"); + C c = new C(); + c.put("A1", a1); + c.put("a2", a2); + c.put("B", b); + c.put("String", "String"); + + data = new Serializable[] { a1, a2, b, c, "Hello World!", new Integer(1001) }; + } + + protected void tearDown() throws Exception + { + close(); + super.tearDown(); + } + + public ObjectMessageTest() + { } + + ObjectMessageTest(String broker) throws Exception + { + _broker = broker; + } + + public void testSendAndReceive() throws Exception + { + try + { + send(); + waitUntilReceived(data.length); + check(); + _logger.info("All " + data.length + " items matched."); + } + catch (Exception e) + { + e.printStackTrace(); + fail("This Test should succeed but failed due to: " + e); + } + } + + public void testSetObjectPropertyForString() throws Exception + { + String testStringProperty = "TestStringProperty"; + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestStringProperty", testStringProperty); + assertEquals(testStringProperty, msg.getObjectProperty("TestStringProperty")); + } + + public void testSetObjectPropertyForBoolean() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestBooleanProperty", Boolean.TRUE); + assertEquals(Boolean.TRUE, msg.getObjectProperty("TestBooleanProperty")); + } + + public void testSetObjectPropertyForByte() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestByteProperty", Byte.MAX_VALUE); + assertEquals(Byte.MAX_VALUE, msg.getObjectProperty("TestByteProperty")); + } + + public void testSetObjectPropertyForShort() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestShortProperty", Short.MAX_VALUE); + assertEquals(Short.MAX_VALUE, msg.getObjectProperty("TestShortProperty")); + } + + public void testSetObjectPropertyForInteger() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestIntegerProperty", Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE, msg.getObjectProperty("TestIntegerProperty")); + } + + public void testSetObjectPropertyForDouble() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestDoubleProperty", Double.MAX_VALUE); + assertEquals(Double.MAX_VALUE, msg.getObjectProperty("TestDoubleProperty")); + } + + public void testSetObjectPropertyForFloat() throws Exception + { + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestFloatProperty", Float.MAX_VALUE); + assertEquals(Float.MAX_VALUE, msg.getObjectProperty("TestFloatProperty")); + } + + public void testSetObjectPropertyForByteArray() throws Exception + { + byte[] array = { 1, 2, 3, 4, 5 }; + ObjectMessage msg = session.createObjectMessage(data[0]); + msg.setObjectProperty("TestByteArrayProperty", array); + assertTrue(Arrays.equals(array, (byte[]) msg.getObjectProperty("TestByteArrayProperty"))); + } + + public void testSetObjectForNull() throws Exception + { + ObjectMessage msg = session.createObjectMessage(); + msg.setObject(null); + assertNull(msg.getObject()); + } + + private void send() throws Exception + { + for (int i = 0; i < data.length; i++) + { + ObjectMessage msg; + if ((i % 2) == 0) + { + msg = session.createObjectMessage(data[i]); + } + else + { + msg = session.createObjectMessage(); + msg.setObject(data[i]); + } + + producer.send(msg); + } + } + + public void check() throws Exception + { + Object[] actual = (Object[]) items.toArray(); + if (actual.length != data.length) + { + throw new Exception("Expected " + data.length + " objects, got " + actual.length); + } + + for (int i = 0; i < data.length; i++) + { + if (actual[i] instanceof Exception) + { + throw new Exception("Error on receive of " + data[i], ((Exception) actual[i])); + } + + if (actual[i] == null) + { + throw new Exception("Expected " + data[i] + " got null"); + } + + if (!data[i].equals(actual[i])) + { + throw new Exception("Expected " + data[i] + " got " + actual[i]); + } + } + } + + private void close() throws Exception + { + session.close(); + connection.close(); + } + + private synchronized void waitUntilReceived(int count) throws InterruptedException + { + waiting = true; + while (received < count) + { + wait(); + } + + waiting = false; + } + + public void onMessage(Message message) + { + + try + { + if (message instanceof ObjectMessage) + { + items.add(((ObjectMessage) message).getObject()); + } + else + { + _logger.error("ERROR: Got " + message.getClass().getName() + " not ObjectMessage"); + items.add(message); + } + } + catch (JMSException e) + { + e.printStackTrace(); + items.add(e); + } + + synchronized (this) + { + received++; + notify(); + } + } + + public static void main(String[] argv) throws Exception + { + String broker = (argv.length > 0) ? argv[0] : "vm://:1"; + if ("-help".equals(broker)) + { + System.out.println("Usage: <broker>"); + } + + new ObjectMessageTest(broker).testSendAndReceive(); + } + + private static class A implements Serializable + { + private String sValue; + private int iValue; + + A(int i, String s) + { + sValue = s; + iValue = i; + } + + public int hashCode() + { + return iValue; + } + + public boolean equals(Object o) + { + return (o instanceof A) && equals((A) o); + } + + protected boolean equals(A a) + { + return areEqual(a.sValue, sValue) && (a.iValue == iValue); + } + } + + private static class B extends A + { + private long time; + + B(int i, String s) + { + super(i, s); + time = System.currentTimeMillis(); + } + + protected boolean equals(A a) + { + return super.equals(a) && (a instanceof B) && (time == ((B) a).time); + } + } + + private static class C extends HashMap implements Serializable + { } + + private static boolean areEqual(Object a, Object b) + { + return (a == null) ? (b == null) : a.equals(b); + } + + private static String randomize(String in) + { + return in + System.currentTimeMillis(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java new file mode 100644 index 0000000000..278b9e9c04 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.protocol; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; + +import org.apache.mina.transport.vmpipe.VmPipeAddress; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.transport.TestNetworkDriver; + +public class AMQProtocolSessionTest extends QpidBrokerTestCase +{ + private static class AMQProtSession extends AMQProtocolSession + { + + public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection) + { + super(protocolHandler,connection); + } + + public TestNetworkDriver getNetworkDriver() + { + return (TestNetworkDriver) _protocolHandler.getNetworkDriver(); + } + + public AMQShortString genQueueName() + { + return generateQueueName(); + } + } + + private AMQProtSession _testSession; + + protected void setUp() throws Exception + { + super.setUp(); + + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con); + protocolHandler.setNetworkDriver(new TestNetworkDriver()); + + //don't care about the values set here apart from the dummy IoSession + _testSession = new AMQProtSession(protocolHandler , con); + } + + public void testTemporaryQueueWildcard() throws UnknownHostException + { + checkTempQueueName(new InetSocketAddress(1234), "tmp_0_0_0_0_0_0_0_0_1234_1"); + } + + public void testTemporaryQueueLocalhostAddr() throws UnknownHostException + { + checkTempQueueName(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234), "tmp_127_0_0_1_1234_1"); + } + + public void testTemporaryQueueLocalhostName() throws UnknownHostException + { + checkTempQueueName(new InetSocketAddress(InetAddress.getByName("localhost"), 1234), "tmp_localhost_127_0_0_1_1234_1"); + } + + public void testTemporaryQueueInet4() throws UnknownHostException + { + checkTempQueueName(new InetSocketAddress(InetAddress.getByName("192.168.1.2"), 1234), "tmp_192_168_1_2_1234_1"); + } + + public void testTemporaryQueueInet6() throws UnknownHostException + { + checkTempQueueName(new InetSocketAddress(InetAddress.getByName("1080:0:0:0:8:800:200C:417A"), 1234), "tmp_1080_0_0_0_8_800_200c_417a_1234_1"); + } + + public void testTemporaryQueuePipe() throws UnknownHostException + { + checkTempQueueName(new VmPipeAddress(1), "tmp_vm_1_1"); + } + + private void checkTempQueueName(SocketAddress address, String queueName) + { + _testSession.getNetworkDriver().setLocalAddress(address); + assertEquals("Wrong queue name", queueName, _testSession.genQueueName().asString()); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java new file mode 100644 index 0000000000..8c806fa2da --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.test.unit.client.temporaryqueue; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; +import junit.framework.Assert; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.jms.ConnectionListener; + +import java.util.ArrayList; +import java.util.List; +import java.util.LinkedList; + +public class TemporaryQueueTest extends QpidBrokerTestCase implements ExceptionListener +{ + private List<Exception> _exceptions = new ArrayList<Exception>(); + + protected void setUp() throws Exception + { + super.setUp(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + protected Connection createConnection() throws Exception + { + return getConnection("guest", "guest"); + } + + public void testTemporaryQueue() throws Exception + { + Connection conn = createConnection(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = session.createTemporaryQueue(); + assertNotNull(queue); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + conn.start(); + producer.send(session.createTextMessage("hello")); + TextMessage tm = (TextMessage) consumer.receive(2000); + assertNotNull(tm); + assertEquals("hello", tm.getText()); + + try + { + queue.delete(); + fail("Expected JMSException : should not be able to delete while there are active consumers"); + } + catch (JMSException je) + { + ; //pass + } + + consumer.close(); + + try + { + queue.delete(); + } + catch (JMSException je) + { + fail("Unexpected Exception: " + je.getMessage()); + } + + conn.close(); + } + + public void tUniqueness() throws Exception + { + int numProcs = Runtime.getRuntime().availableProcessors(); + final int threadsProc = 5; + + runUniqueness(1, 10); + runUniqueness(numProcs * threadsProc, 10); + runUniqueness(numProcs * threadsProc, 100); + runUniqueness(numProcs * threadsProc, 500); + } + + void runUniqueness(int makers, int queues) throws Exception + { + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>(); + + //Create Makers + for (int m = 0; m < makers; m++) + { + tqList.add(new TempQueueMaker(session, queues)); + } + + + List<Thread> threadList = new LinkedList<Thread>(); + + //Create Makers + for (TempQueueMaker maker : tqList) + { + threadList.add(new Thread(maker)); + } + + //Start threads + for (Thread thread : threadList) + { + thread.start(); + } + + // Join Threads + for (Thread thread : threadList) + { + try + { + thread.join(); + } + catch (InterruptedException e) + { + fail("Couldn't correctly join threads"); + } + } + + + List<AMQQueue> list = new LinkedList<AMQQueue>(); + + // Test values + for (TempQueueMaker maker : tqList) + { + check(maker, list); + } + + Assert.assertEquals("Not enough queues made.", makers * queues, list.size()); + + connection.close(); + } + + private void check(TempQueueMaker tq, List<AMQQueue> list) + { + for (AMQQueue q : tq.getList()) + { + if (list.contains(q)) + { + fail(q + " already exists."); + } + else + { + list.add(q); + } + } + } + + + class TempQueueMaker implements Runnable + { + List<AMQQueue> _queues; + Session _session; + private int _count; + + + TempQueueMaker(Session session, int queues) throws JMSException + { + _queues = new LinkedList<AMQQueue>(); + + _count = queues; + + _session = session; + } + + public void run() + { + int i = 0; + try + { + for (; i < _count; i++) + { + _queues.add((AMQQueue) _session.createTemporaryQueue()); + } + } + catch (JMSException jmse) + { + //stop + } + } + + List<AMQQueue> getList() + { + return _queues; + } + } + + public void testQPID1217() throws Exception + { + Connection conA = getConnection(); + conA.setExceptionListener(this); + Session sessA = conA.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue temp = sessA.createTemporaryQueue(); + + MessageProducer prod = sessA.createProducer(temp); + prod.send(sessA.createTextMessage("hi")); + + Thread.sleep(500); + assertTrue("Exception received", _exceptions.isEmpty()); + + Connection conB = getConnection(); + Session sessB = conB.createSession(false, Session.AUTO_ACKNOWLEDGE); + + JMSException ex = null; + try + { + MessageConsumer consB = sessB.createConsumer(temp); + } + catch (JMSException e) + { + ex = e; + } + assertNotNull(ex); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TemporaryQueueTest.class); + } + + public void onException(JMSException arg0) + { + _exceptions.add(arg0); + } + +} |