summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java405
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java110
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java90
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java239
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java394
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java81
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java124
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java111
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java158
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java301
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java62
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java143
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java69
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java95
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java112
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java46
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java335
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java106
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java258
19 files changed, 3239 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
new file mode 100644
index 0000000000..292bcd6039
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
@@ -0,0 +1,405 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TopicSession;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionDelegate_0_10;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQConnectionTest extends QpidBrokerTestCase
+{
+ private static AMQConnection _connection;
+ private static AMQTopic _topic;
+ private static AMQQueue _queue;
+ private static QueueSession _queueSession;
+ private static TopicSession _topicSession;
+ protected static final Logger _logger = LoggerFactory.getLogger(AMQConnectionTest.class);
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _connection = (AMQConnection) getConnection("guest", "guest");
+ _topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic"));
+ _queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue"));
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ super.tearDown();
+ }
+
+ /**
+ * Simple tests to check we can create TopicSession and QueueSession ok
+ * And that they throw exceptions where appropriate as per JMS spec
+ */
+
+ public void testCreateQueueSession() throws JMSException
+ {
+ _queueSession = _connection.createQueueSession(false, AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ public void testCreateTopicSession() throws JMSException
+ {
+ _topicSession = _connection.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ public void testTopicSessionCreateBrowser() throws JMSException
+ {
+ try
+ {
+ _topicSession.createBrowser(_queue);
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected javax.jms.IllegalStateException, got " + e);
+ }
+ }
+
+ public void testTopicSessionCreateQueue() throws JMSException
+ {
+ try
+ {
+ _topicSession.createQueue("abc");
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected javax.jms.IllegalStateException, got " + e);
+ }
+ }
+
+ public void testTopicSessionCreateTemporaryQueue() throws JMSException
+ {
+ try
+ {
+ _topicSession.createTemporaryQueue();
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected javax.jms.IllegalStateException, got " + e);
+ }
+ }
+
+ public void testQueueSessionCreateTemporaryTopic() throws JMSException
+ {
+ try
+ {
+ _queueSession.createTemporaryTopic();
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected javax.jms.IllegalStateException, got " + e);
+ }
+ }
+
+ public void testQueueSessionCreateTopic() throws JMSException
+ {
+ try
+ {
+ _queueSession.createTopic("abc");
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected javax.jms.IllegalStateException, got " + e);
+ }
+ }
+
+ public void testQueueSessionDurableSubscriber() throws JMSException
+ {
+ try
+ {
+ _queueSession.createDurableSubscriber(_topic, "abc");
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected javax.jms.IllegalStateException, got " + e);
+ }
+ }
+
+ public void testQueueSessionUnsubscribe() throws JMSException
+ {
+ try
+ {
+ _queueSession.unsubscribe("abc");
+ fail("expected exception did not occur");
+ }
+ catch (javax.jms.IllegalStateException s)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected javax.jms.IllegalStateException, got " + e);
+ }
+ }
+
+ public void testPrefetchSystemProperty() throws Exception
+ {
+ String oldPrefetch = System.getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME);
+ try
+ {
+ _connection.close();
+ System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
+ _connection = (AMQConnection) getConnection();
+ _connection.start();
+ // Create two consumers on different sessions
+ Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerA = consSessA.createConsumer(_queue);
+
+ Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ // Send 3 messages
+ for (int i = 0; i < 3; i++)
+ {
+ producer.send(producerSession.createTextMessage("test"));
+ }
+
+ MessageConsumer consumerB = null;
+ if (isBroker08())
+ {
+ Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ consumerB = consSessB.createConsumer(_queue);
+ }
+ else
+ {
+ consumerB = consSessA.createConsumer(_queue);
+ }
+
+ Message msg;
+ // Check that consumer A has 2 messages
+ for (int i = 0; i < 2; i++)
+ {
+ msg = consumerA.receive(1500);
+ assertNotNull("Consumer A should receive 2 messages",msg);
+ }
+
+ msg = consumerA.receive(1500);
+ assertNull("Consumer A should not have received a 3rd message",msg);
+
+ // Check that consumer B has the last message
+ msg = consumerB.receive(1500);
+ assertNotNull("Consumer B should have received the message",msg);
+ }
+ finally
+ {
+ if (oldPrefetch == null)
+ {
+ oldPrefetch = ClientProperties.MAX_PREFETCH_DEFAULT;
+ }
+ System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, oldPrefetch);
+ }
+ }
+
+ public void testGetChannelID() throws Exception
+ {
+ long maxChannelID = _connection.getMaximumChannelCount();
+ if (isBroker010())
+ {
+ //Usable numbers are 0 to N-1 when using 0-10
+ //and 1 to N for 0-8/0-9
+ maxChannelID = maxChannelID-1;
+ }
+ for (int j = 0; j < 3; j++)
+ {
+ int i = isBroker010() ? 0 : 1;
+ for ( ; i <= maxChannelID; i++)
+ {
+ int id = _connection.getNextChannelID();
+ assertEquals("Unexpected number on iteration "+j, i, id);
+ _connection.deregisterSession(id);
+ }
+ }
+ }
+
+ /**
+ * Test Strategy : Kill -STOP the broker and see
+ * if the client terminates the connection with a
+ * read timeout.
+ * The broker process is cleaned up in the test itself
+ * and avoids using process.waitFor() as it hangs.
+ */
+ public void testHeartBeat() throws Exception
+ {
+ boolean windows =
+ ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
+
+ if (!isCppBroker() || windows)
+ {
+ return;
+ }
+
+ Process process = null;
+ int port = getPort(0);
+ String pid = null;
+ try
+ {
+ // close the connection and shutdown the broker started by QpidTest
+ _connection.close();
+ stopBroker(port);
+
+ System.setProperty("qpid.heartbeat", "1");
+
+ // in case this broker gets stuck, atleast the rest of the tests will not fail.
+ port = port + 200;
+ String startCmd = getBrokerCommand(port);
+
+ // start a broker using a script
+ ProcessBuilder pb = new ProcessBuilder(System.getProperty("broker.start"));
+ pb.redirectErrorStream(true);
+
+ Map<String, String> env = pb.environment();
+ env.put("BROKER_CMD",startCmd);
+ env.put("BROKER_READY",System.getProperty(BROKER_READY));
+
+ Process startScript = pb.start();
+ startScript.waitFor();
+ startScript.destroy();
+
+ Connection con =
+ new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:" + port + "'");
+ final AtomicBoolean lock = new AtomicBoolean(false);
+
+ String cmd = "/usr/bin/pgrep -f " + port;
+ process = Runtime.getRuntime().exec("/bin/bash");
+ LineNumberReader reader = new LineNumberReader(new InputStreamReader(process.getInputStream()));
+ PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())), true);
+ out.println(cmd);
+ pid = reader.readLine();
+ try
+ {
+ Integer.parseInt(pid);
+ }
+ catch (NumberFormatException e)
+ {
+ // Error! try to read further to gather the error msg.
+ String line;
+ _logger.debug(pid);
+ while ((line = reader.readLine()) != null )
+ {
+ _logger.debug(line);
+ }
+ throw new Exception( "Unable to get the brokers pid " + pid);
+ }
+ _logger.debug("pid : " + pid);
+
+ con.setExceptionListener(new ExceptionListener(){
+
+ public void onException(JMSException e)
+ {
+ synchronized(lock) {
+ lock.set(true);
+ lock.notifyAll();
+ }
+ }
+ });
+
+ out.println("kill -STOP " + pid);
+
+ synchronized(lock){
+ lock.wait(2500);
+ }
+ out.close();
+ reader.close();
+ assertTrue("Client did not terminate the connection, check log for details",lock.get());
+ }
+ catch(Exception e)
+ {
+ throw e;
+ }
+ finally
+ {
+ System.setProperty("qpid.heartbeat", "");
+
+ if (process != null)
+ {
+ process.destroy();
+ }
+
+ Process killScript = Runtime.getRuntime().exec(System.getProperty("broker.kill") + " " + pid);
+ killScript.waitFor();
+ killScript.destroy();
+ cleanBroker();
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(AMQConnectionTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
new file mode 100644
index 0000000000..93cceb1048
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import javax.jms.JMSException;
+import javax.jms.QueueReceiver;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Tests for QueueReceiver and TopicSubscriber creation methods on AMQSession
+ */
+public class AMQSessionTest extends QpidBrokerTestCase
+{
+
+ private static AMQSession _session;
+ private static AMQTopic _topic;
+ private static AMQQueue _queue;
+ private static AMQConnection _connection;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _connection = (AMQConnection) getConnection("guest", "guest");
+ _topic = new AMQTopic(_connection,"mytopic");
+ _queue = new AMQQueue(_connection,"myqueue");
+ _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ //just close
+ }
+ super.tearDown();
+ }
+
+ public void testCreateSubscriber() throws JMSException
+ {
+ TopicSubscriber subscriber = _session.createSubscriber(_topic);
+ assertEquals("Topic names should match from TopicSubscriber", _topic.getTopicName(), subscriber.getTopic().getTopicName());
+
+ subscriber = _session.createSubscriber(_topic, "abc", false);
+ assertEquals("Topic names should match from TopicSubscriber with selector", _topic.getTopicName(), subscriber.getTopic().getTopicName());
+ }
+
+ public void testCreateDurableSubscriber() throws JMSException
+ {
+ TopicSubscriber subscriber = _session.createDurableSubscriber(_topic, "mysubname");
+ assertEquals("Topic names should match from durable TopicSubscriber", _topic.getTopicName(), subscriber.getTopic().getTopicName());
+
+ subscriber = _session.createDurableSubscriber(_topic, "mysubname2", "abc", false);
+ assertEquals("Topic names should match from durable TopicSubscriber with selector", _topic.getTopicName(), subscriber.getTopic().getTopicName());
+ _session.unsubscribe("mysubname");
+ _session.unsubscribe("mysubname2");
+ }
+
+ public void testCreateQueueReceiver() throws JMSException
+ {
+ QueueReceiver receiver = _session.createQueueReceiver(_queue);
+ assertEquals("Queue names should match from QueueReceiver", _queue.getQueueName(), receiver.getQueue().getQueueName());
+
+ receiver = _session.createQueueReceiver(_queue, "abc");
+ assertEquals("Queue names should match from QueueReceiver with selector", _queue.getQueueName(), receiver.getQueue().getQueueName());
+ }
+
+ public void testCreateReceiver() throws JMSException
+ {
+ QueueReceiver receiver = _session.createReceiver(_queue);
+ assertEquals("Queue names should match from QueueReceiver", _queue.getQueueName(), receiver.getQueue().getQueueName());
+
+ receiver = _session.createReceiver(_queue, "abc");
+ assertEquals("Queue names should match from QueueReceiver with selector", _queue.getQueueName(), receiver.getQueue().getQueueName());
+ }
+
+ public static void stopVmBrokers()
+ {
+ _queue = null;
+ _topic = null;
+ _session = null;
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
new file mode 100644
index 0000000000..8577fb5b6a
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * QPID-155
+ *
+ * Test to validate that setting the respective qpid.declare_queues,
+ * qpid.declare_exchanges system properties functions as expected.
+ */
+public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase
+{
+ public void testQueueDeclare() throws Exception
+ {
+ setSystemProperty("qpid.declare_queues", "false");
+
+ Connection connection = getConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(getTestQueueName());
+
+ try
+ {
+ session.createConsumer(queue);
+ fail("JMSException should be thrown as the queue does not exist");
+ }
+ catch (JMSException e)
+ {
+ checkExceptionErrorCode(e, AMQConstant.NOT_FOUND);
+ }
+ }
+
+ public void testExchangeDeclare() throws Exception
+ {
+ setSystemProperty("qpid.declare_exchanges", "false");
+
+ Connection connection = getConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String EXCHANGE_TYPE = "test.direct";
+ Queue queue = session.createQueue("direct://" + EXCHANGE_TYPE + "/queue/queue");
+
+ try
+ {
+ session.createConsumer(queue);
+ fail("JMSException should be thrown as the exchange does not exist");
+ }
+ catch (JMSException e)
+ {
+ checkExceptionErrorCode(e, AMQConstant.NOT_FOUND);
+ }
+ }
+
+ private void checkExceptionErrorCode(JMSException original, AMQConstant code)
+ {
+ Exception linked = original.getLinkedException();
+ assertNotNull("Linked exception should have been set", linked);
+ assertTrue("Linked exception should be an AMQException", linked instanceof AMQException);
+ assertEquals("Error code should be " + code.getCode(), code, ((AMQException) linked).getErrorCode());
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
new file mode 100644
index 0000000000..79e2ff8148
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import junit.textui.TestRunner;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Due to bizarre exception handling all sessions are closed if you get
+ * a channel close request and no exception listener is registered.
+ * <p/>
+ * JIRA issue IBTBLZ-10.
+ * <p/>
+ * Simulate by:
+ * <p/>
+ * 0. Create two sessions with no exception listener.
+ * 1. Publish message to queue/topic that does not exist (wrong routing key).
+ * 2. This will cause a channel close.
+ * 3. Since client does not have an exception listener, currently all sessions are
+ * closed.
+ */
+public class ChannelCloseOkTest extends QpidBrokerTestCase
+{
+ private AMQConnection _connection;
+ private Destination _destination1;
+ private Destination _destination2;
+ private Session _session1;
+ private Session _session2;
+ private final List<Message> _received1 = new ArrayList<Message>();
+ private final List<Message> _received2 = new ArrayList<Message>();
+
+ private static final Logger _log = LoggerFactory.getLogger(ChannelCloseOkTest.class);
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _connection = (AMQConnection) getConnection("guest", "guest");
+
+ _destination1 = new AMQQueue(_connection, "q1", true);
+ _destination2 = new AMQQueue(_connection, "q2", true);
+ _session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _session1.createConsumer(_destination1).setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _log.debug("consumer 1 got message [" + getTextMessage(message) + "]");
+ synchronized (_received1)
+ {
+ _received1.add(message);
+ _received1.notify();
+ }
+ }
+ });
+ _session2 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _session2.createConsumer(_destination2).setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _log.debug("consumer 2 got message [" + getTextMessage(message) + "]");
+ synchronized (_received2)
+ {
+ _received2.add(message);
+ _received2.notify();
+ }
+ }
+ });
+
+ _connection.start();
+ }
+
+ private String getTextMessage(Message message)
+ {
+ TextMessage tm = (TextMessage) message;
+ try
+ {
+ return tm.getText();
+ }
+ catch (JMSException e)
+ {
+ return "oops " + e;
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ closeConnection();
+ super.tearDown();
+ }
+
+ public void closeConnection() throws JMSException
+ {
+ if (_connection != null)
+ {
+ _log.info(">>>>>>>>>>>>>>.. closing");
+ _connection.close();
+ }
+ }
+
+ public void testWithoutExceptionListener() throws Exception
+ {
+ doTest();
+ }
+
+ public void testWithExceptionListener() throws Exception
+ {
+ _connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmsException)
+ {
+ _log.warn("onException - " + jmsException.getMessage());
+ }
+ });
+
+ doTest();
+ }
+
+ public void doTest() throws Exception
+ {
+ // Check both sessions are ok.
+ sendAndWait(_session1, _destination1, "first", _received1, 1);
+ sendAndWait(_session2, _destination2, "second", _received2, 1);
+ assertEquals(1, _received1.size());
+ assertEquals(1, _received2.size());
+
+ // Now send message to incorrect destination on session 1.
+ Destination destination = new AMQQueue(_connection, "incorrect");
+ send(_session1, destination, "third"); // no point waiting as message will never be received.
+
+ // Ensure both sessions are still ok.
+ // Send a bunch of messages as this give time for the sessions to be erroneously closed.
+ final int num = 300;
+ for (int i = 0; i < num; ++i)
+ {
+ send(_session1, _destination1, "" + i);
+ send(_session2, _destination2, "" + i);
+ }
+
+ waitFor(_received1, num + 1);
+ waitFor(_received2, num + 1);
+
+ // Note that the third message is never received as it is sent to an incorrect destination.
+ assertEquals(num + 1, _received1.size());
+ assertEquals(num + 1, _received2.size());
+ }
+
+ private void sendAndWait(Session session, Destination destination, String message, List<Message> received, int count)
+ throws JMSException, InterruptedException
+ {
+ send(session, destination, message);
+ waitFor(received, count);
+ }
+
+ private void send(Session session, Destination destination, String message) throws JMSException
+ {
+ _log.debug("sending message " + message);
+ MessageProducer producer1 = session.createProducer(destination);
+ producer1.send(session.createTextMessage(message));
+ }
+
+ private void waitFor(List<Message> received, int count) throws InterruptedException
+ {
+ long timeout = 20000;
+
+ synchronized (received)
+ {
+ long start = System.currentTimeMillis();
+ while (received.size() < count)
+ {
+ if (System.currentTimeMillis() - start > timeout)
+ {
+ fail("timeout expired waiting for messages");
+ }
+ try
+ {
+ received.wait(timeout);
+ }
+ catch (InterruptedException e)
+ {
+ _log.info("Interrupted: " + e);
+ throw e;
+ }
+
+ }
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] args)
+ {
+ TestRunner.run(ChannelCloseOkTest.class);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ChannelCloseOkTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
new file mode 100644
index 0000000000..b6232b1734
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.protocol.AMQConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class ChannelCloseTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseTest.class);
+
+ Connection _connection;
+ private Session _session;
+ private static final long SYNC_TIMEOUT = 500;
+ private int TEST = 0;
+
+ /**
+ * Close channel, use chanel with same id ensure error.
+ *
+ * This test is only valid for non 0-10 connection .
+ */
+ public void testReusingChannelAfterFullClosure() throws Exception
+ {
+ _connection=newConnection();
+
+ // Create Producer
+ try
+ {
+ _connection.start();
+
+ createChannelAndTest(1);
+
+ // Cause it to close
+ try
+ {
+ _logger.info("Testing invalid exchange");
+ declareExchange(1, "", "name_that_will_lookup_to_null", false);
+ fail("Exchange name is empty so this should fail ");
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
+ }
+
+ // Check that
+ try
+ {
+ _logger.info("Testing valid exchange should fail");
+ declareExchange(1, "topic", "amq.topic", false);
+ fail("This should not succeed as the channel should be closed ");
+ }
+ catch (AMQException e)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Exception occured was:" + e.getErrorCode());
+ }
+
+ assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
+
+ _connection=newConnection();
+ }
+
+ checkSendingMessage();
+
+ _session.close();
+ _connection.close();
+
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /*
+ close channel and send guff then send ok no errors
+ REMOVE TEST - The behaviour after server has sent close is undefined.
+ the server should be free to fail as it may wish to reclaim its resources
+ immediately after close.
+ */
+ /*public void testSendingMethodsAfterClose() throws Exception
+ {
+ // this is testing an 0.8 connection
+ if(isBroker08())
+ {
+ try
+ {
+ _connection=new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'");
+
+ ((AMQConnection) _connection).setConnectionListener(this);
+
+ _connection.setExceptionListener(this);
+
+ // Change the StateManager for one that doesn't respond with Close-OKs
+ AMQStateManager oldStateManager=((AMQConnection) _connection).getProtocolHandler().getStateManager();
+
+ _session=_connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ _connection.start();
+
+ // Test connection
+ checkSendingMessage();
+
+ // Set StateManager to manager that ignores Close-oks
+ AMQProtocolSession protocolSession=
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolSession();
+
+ MethodDispatcher d = protocolSession.getMethodDispatcher();
+
+ MethodDispatcher wrappedDispatcher = (MethodDispatcher)
+ Proxy.newProxyInstance(d.getClass().getClassLoader(),
+ d.getClass().getInterfaces(),
+ new MethodDispatcherProxyHandler(
+ (ClientMethodDispatcherImpl) d));
+
+ protocolSession.setMethodDispatcher(wrappedDispatcher);
+
+
+ AMQStateManager newStateManager=new NoCloseOKStateManager(protocolSession);
+ newStateManager.changeState(oldStateManager.getCurrentState());
+
+ ((AMQConnection) _connection).getProtocolHandler().setStateManager(newStateManager);
+
+ final int TEST_CHANNEL=1;
+ _logger.info("Testing Channel(" + TEST_CHANNEL + ") Creation");
+
+ createChannelAndTest(TEST_CHANNEL);
+
+ // Cause it to close
+ try
+ {
+ _logger.info("Closing Channel - invalid exchange");
+ declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false);
+ fail("Exchange name is empty so this should fail ");
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
+ }
+
+ try
+ {
+ // Send other methods that should be ignored
+ // send them no wait as server will ignore them
+ _logger.info("Tested known exchange - should ignore");
+ declareExchange(TEST_CHANNEL, "topic", "amq.topic", true);
+
+ _logger.info("Tested known invalid exchange - should ignore");
+ declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true);
+
+ _logger.info("Tested known invalid exchange - should ignore");
+ declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true);
+
+ // Send sync .. server will igore and timy oue
+ _logger.info("Tested known invalid exchange - should ignore");
+ declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false);
+ }
+ catch (AMQTimeoutException te)
+ {
+ assertEquals("Request should timeout", AMQConstant.REQUEST_TIMEOUT, te.getErrorCode());
+ }
+ catch (AMQException e)
+ {
+ fail("This should not fail as all requests should be ignored");
+ }
+
+ _logger.info("Sending Close");
+ // Send Close-ok
+ sendClose(TEST_CHANNEL);
+
+ _logger.info("Re-opening channel");
+
+ createChannelAndTest(TEST_CHANNEL);
+
+ // Test connection is still ok
+
+ checkSendingMessage();
+
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+
+ }
+ catch (URLSyntaxException e)
+ {
+ fail(e.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ _session.close();
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+ }
+*/
+ private void createChannelAndTest(int channel) throws FailoverException
+ {
+ // Create A channel
+ try
+ {
+ createChannel(channel);
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ // Test it is ok
+ try
+ {
+ declareExchange(channel, "topic", "amq.topic", false);
+ _logger.info("Tested known exchange");
+ }
+ catch (AMQException e)
+ {
+ fail("This should not fail as this is the default exchange details");
+ }
+ }
+
+ private void sendClose(int channel)
+ {
+ ChannelCloseOkBody body =
+ ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelCloseOkBody();
+ AMQFrame frame = body.generateFrame(channel);
+
+ ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame);
+ }
+
+ private void checkSendingMessage() throws JMSException
+ {
+ TEST++;
+ _logger.info("Test creating producer which will use channel id 1");
+
+ Queue queue = _session.createQueue("CCT_test_validation_queue" + TEST);
+
+ MessageConsumer consumer = _session.createConsumer(queue);
+
+ MessageProducer producer = _session.createProducer(queue);
+
+ final String MESSAGE = "CCT_Test_Message";
+ producer.send(_session.createTextMessage(MESSAGE));
+
+ Message msg = consumer.receive(2000);
+
+ assertNotNull("Received messages should not be null.", msg);
+ assertEquals("Message received not what we sent", MESSAGE, ((TextMessage) msg).getText());
+ }
+
+ private Connection newConnection()
+ {
+ Connection connection = null;
+ try
+ {
+ connection = getConnection();
+
+ ((AMQConnection) connection).setConnectionListener(this);
+
+ _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ connection.start();
+
+ }
+ catch (Exception e)
+ {
+ fail("Creating new connection when:" + e.getMessage());
+ }
+
+ return connection;
+ }
+
+ private void declareExchange(int channelId, String _type, String _name, boolean nowait)
+ throws AMQException, FailoverException
+ {
+ ExchangeDeclareBody body =
+ ((AMQConnection) _connection).getProtocolHandler()
+ .getMethodRegistry()
+ .createExchangeDeclareBody(0,
+ new AMQShortString(_name),
+ new AMQShortString(_type),
+ true,
+ false,
+ false,
+ false,
+ nowait,
+ null);
+ AMQFrame exchangeDeclare = body.generateFrame(channelId);
+ AMQProtocolHandler protocolHandler = ((AMQConnection) _connection).getProtocolHandler();
+
+
+ if (nowait)
+ {
+ protocolHandler.writeFrame(exchangeDeclare);
+ }
+ else
+ {
+ protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
+ }
+
+// return null;
+// }
+// }, (AMQConnection)_connection).execute();
+
+ }
+
+ private void createChannel(int channelId) throws AMQException, FailoverException
+ {
+ ChannelOpenBody body =
+ ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
+
+ ((AMQConnection) _connection).getProtocolHandler().syncWrite(body.generateFrame(channelId), // outOfBand
+ ChannelOpenOkBody.class);
+
+ }
+
+ public void onException(JMSException jmsException)
+ {
+ // _logger.info("CCT" + jmsException);
+ fail(jmsException.getMessage());
+ }
+
+ public void bytesSent(long count)
+ { }
+
+ public void bytesReceived(long count)
+ { }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return false;
+ }
+
+ public boolean preResubscribe()
+ {
+ return false;
+ }
+
+ public void failoverComplete()
+ { }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
new file mode 100644
index 0000000000..56d03dc4a7
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class CloseWithBlockingReceiveTest extends QpidBrokerTestCase
+{
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+
+ public void testReceiveReturnsNull() throws Exception
+ {
+ final AMQConnection connection = (AMQConnection) getConnection("guest", "guest");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(new AMQTopic(connection, "banana"));
+ connection.start();
+
+ Runnable r = new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(1000);
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ };
+ long startTime = System.currentTimeMillis();
+ new Thread(r).start();
+ consumer.receive(10000);
+ assertTrue(System.currentTimeMillis() - startTime < 10000);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(CloseWithBlockingReceiveTest.class);
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java
new file mode 100644
index 0000000000..dc2f59c384
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+
+public class CloseAfterConnectionFailureTest extends QpidBrokerTestCase implements ExceptionListener
+{
+ private int sessionCount = 0;
+ AMQConnection connection;
+ Session session;
+ MessageConsumer consumer;
+ private CountDownLatch _latch = new CountDownLatch(1);
+ private JMSException _fail;
+
+ public void testNoFailover() throws URLSyntaxException, Exception,
+ InterruptedException, JMSException
+ {
+ //This test uses hard coded connection string so only runs on InVM case
+ if (!isExternalBroker())
+ {
+ String connectionString = "amqp://guest:guest@/test?brokerlist='vm://:1?connectdelay='500',retries='3'',failover='nofailover'";
+
+ AMQConnectionURL url = new AMQConnectionURL(connectionString);
+
+ try
+ {
+ //Start the connection so it will use the retries
+ connection = new AMQConnection(url, null);
+
+ connection.setExceptionListener(this);
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createConsumer(session.createQueue(this.getName()));
+
+ //Kill connection
+ stopBroker();
+
+ _latch.await();
+
+ if (_fail != null)
+ {
+ _fail.printStackTrace(System.out);
+ fail("Exception thrown:" + _fail.getMessage());
+ }
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ public void onException(JMSException e)
+ {
+ System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+ try
+ {
+ consumer.close();
+ }
+ catch (JMSException jmse)
+ {
+ System.out.println("Consumer close failed with:" + jmse.getMessage());
+ _fail = jmse;
+ }
+ System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+ try
+ {
+ //Note that if we actually do session.close() we will lock up as the session will never receive a frame
+ // from the
+ ((AMQSession) session).close(10);
+ }
+ catch (JMSException jmse)
+ {
+ System.out.println("Session close failed with:" + jmse.getMessage());
+ _fail = jmse;
+ }
+ System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException jmse)
+ {
+ System.out.println("Session close failed with:" + jmse.getMessage());
+ _fail = jmse;
+ }
+ System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+
+ _latch.countDown();
+
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
new file mode 100644
index 0000000000..6d1b6de238
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.util.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * ConnectionCloseTest
+ *
+ */
+
+public class ConnectionCloseTest extends QpidBrokerTestCase
+{
+
+ private static final Logger log = Logger.get(ConnectionCloseTest.class);
+
+ public void testSendReceiveClose() throws Exception
+ {
+ Map<Thread,StackTraceElement[]> before = Thread.getAllStackTraces();
+
+ for (int i = 0; i < 50; i++)
+ {
+ if ((i % 10) == 0)
+ {
+ log.warn("%d messages sent and received", i);
+ }
+
+ Connection receiver = getConnection();
+ receiver.start();
+ Session rssn = receiver.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = rssn.createQueue("connection-close-test-queue");
+ MessageConsumer cons = rssn.createConsumer(queue);
+
+ Connection sender = getConnection();
+ sender.start();
+ Session sssn = sender.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sssn.createProducer(queue);
+ prod.send(sssn.createTextMessage("test"));
+ sender.close();
+
+ TextMessage m = (TextMessage) cons.receive(2000);
+ assertNotNull("message was lost", m);
+ assertEquals(m.getText(), "test");
+ receiver.close();
+ }
+
+ // The finalizer is notifying connector thread waiting on a selector key.
+ // This should leave the finalizer enough time to notify those threads
+ synchronized (this)
+ {
+ this.wait(10000);
+ }
+
+ Map<Thread,StackTraceElement[]> after = Thread.getAllStackTraces();
+
+ Map<Thread,StackTraceElement[]> delta = new HashMap<Thread,StackTraceElement[]>(after);
+ for (Thread t : before.keySet())
+ {
+ delta.remove(t);
+ }
+
+ dumpStacks(delta);
+
+ int deltaThreshold = (isExternalBroker()? 1 : 2) //InVM creates more thread pools in the same VM
+ * (Runtime.getRuntime().availableProcessors() + 1) + 5;
+
+ assertTrue("Spurious thread creation exceeded threshold, " +
+ delta.size() + " threads created.",
+ delta.size() < deltaThreshold);
+ }
+
+ private void dumpStacks(Map<Thread,StackTraceElement[]> map)
+ {
+ for (Map.Entry<Thread,StackTraceElement[]> entry : map.entrySet())
+ {
+ Throwable t = new Throwable();
+ t.setStackTrace(entry.getValue());
+ log.warn(t, entry.getKey().toString());
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
new file mode 100644
index 0000000000..ac14f8e50e
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
@@ -0,0 +1,158 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ConnectionStartTest extends QpidBrokerTestCase
+{
+
+ String _broker = "vm://:1";
+
+ AMQConnection _connection;
+ private Session _consumerSess;
+ private MessageConsumer _consumer;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ try
+ {
+
+
+ AMQConnection pubCon = (AMQConnection) getConnection("guest", "guest");
+
+ AMQQueue queue = new AMQQueue(pubCon,"ConnectionStartTest");
+
+ Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+
+ MessageProducer pub = pubSess.createProducer(queue);
+
+ _connection = (AMQConnection) getConnection("guest", "guest");
+
+ _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+
+ _consumer = _consumerSess.createConsumer(queue);
+
+ //publish after queue is created to ensure it can be routed as expected
+ pub.send(pubSess.createTextMessage("Initial Message"));
+
+ pubCon.close();
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Connection to " + _broker + " should succeed. Reason: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ super.tearDown();
+ }
+
+ public void testSimpleReceiveConnection()
+ {
+ try
+ {
+ assertTrue("Connection should not be started", !_connection.started());
+ //Note that this next line will start the dispatcher in the session
+ // should really not be called before _connection start
+ //assertTrue("There should not be messages waiting for the consumer", _consumer.receiveNoWait() == null);
+ _connection.start();
+ assertTrue("There should be messages waiting for the consumer", _consumer.receive(10*1000) != null);
+ assertTrue("Connection should be started", _connection.started());
+
+ }
+ catch (JMSException e)
+ {
+ fail("An error occured during test because:" + e);
+ }
+
+ }
+
+ public void testMessageListenerConnection()
+ {
+ final CountDownLatch _gotMessage = new CountDownLatch(1);
+
+ try
+ {
+ assertTrue("Connection should not be started", !_connection.started());
+ _consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ assertTrue("Connection should be started", _connection.started());
+ assertEquals("Mesage Received", "Initial Message", ((TextMessage) message).getText());
+ _gotMessage.countDown();
+ }
+ catch (JMSException e)
+ {
+ fail("Couldn't get message text because:" + e.getCause());
+ }
+ }
+ });
+
+ assertTrue("Connection should not be started", !_connection.started());
+ _connection.start();
+ assertTrue("Connection should be started", _connection.started());
+
+ try
+ {
+ assertTrue("Listener was never called", _gotMessage.await(10 * 1000, TimeUnit.MILLISECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ fail("Timed out awaiting message via onMessage");
+ }
+
+ }
+ catch (JMSException e)
+ {
+ fail("Failed because:" + e.getCause());
+ }
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ConnectionStartTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
new file mode 100644
index 0000000000..04fc611cd1
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -0,0 +1,301 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.BrokerDetails;
+
+import javax.jms.Connection;
+import javax.jms.QueueSession;
+import javax.jms.TopicSession;
+import javax.naming.NamingException;
+
+public class ConnectionTest extends QpidBrokerTestCase
+{
+
+ String _broker_NotRunning = "vm://:2";
+ String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs";
+
+ public void testSimpleConnection() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ conn = new AMQConnection(getBroker().toString(), "guest", "guest", "fred", "test");
+ }
+ catch (Exception e)
+ {
+ fail("Connection to " + getBroker() + " should succeed. Reason: " + e);
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testDefaultExchanges() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ BrokerDetails broker = getBroker();
+ broker.setProperty(BrokerDetails.OPTIONS_RETRY, "1");
+ ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='"
+ + broker
+ + "'&defaultQueueExchange='test.direct'"
+ + "&defaultTopicExchange='test.topic'"
+ + "&temporaryQueueExchange='tmp.direct'"
+ + "&temporaryTopicExchange='tmp.topic'");
+
+ System.err.println(url.toString());
+ conn = new AMQConnection(url, null);
+
+
+ AMQSession sess = (AMQSession) conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ sess.declareExchange(new AMQShortString("test.direct"),
+ ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false);
+
+ sess.declareExchange(new AMQShortString("tmp.direct"),
+ ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false);
+
+ sess.declareExchange(new AMQShortString("tmp.topic"),
+ ExchangeDefaults.TOPIC_EXCHANGE_CLASS, false);
+
+ sess.declareExchange(new AMQShortString("test.topic"),
+ ExchangeDefaults.TOPIC_EXCHANGE_CLASS, false);
+
+ QueueSession queueSession = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ AMQQueue queue = (AMQQueue) queueSession.createQueue("MyQueue");
+
+ assertEquals(queue.getExchangeName().toString(), "test.direct");
+
+ AMQQueue tempQueue = (AMQQueue) queueSession.createTemporaryQueue();
+
+ assertEquals(tempQueue.getExchangeName().toString(), "tmp.direct");
+
+ queueSession.close();
+
+ TopicSession topicSession = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ AMQTopic topic = (AMQTopic) topicSession.createTopic("silly.topic");
+
+ assertEquals(topic.getExchangeName().toString(), "test.topic");
+
+ AMQTopic tempTopic = (AMQTopic) topicSession.createTemporaryTopic();
+
+ assertEquals(tempTopic.getExchangeName().toString(), "tmp.topic");
+
+ topicSession.close();
+
+ }
+ catch (Exception e)
+ {
+ fail("Connection to " + getBroker() + " should succeed. Reason: " + e);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ public void testPasswordFailureConnection() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ BrokerDetails broker = getBroker();
+ broker.setProperty(BrokerDetails.OPTIONS_RETRY, "0");
+ conn = new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + broker + "'");
+ fail("Connection should not be established password is wrong.");
+ }
+ catch (AMQConnectionFailureException amqe)
+ {
+ assertNotNull("No cause set:" + amqe.getMessage(), amqe.getCause());
+ assertTrue("Exception was wrong type", amqe.getCause() instanceof AMQException);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testConnectionFailure() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''");
+ fail("Connection should not be established");
+ }
+ catch (AMQException amqe)
+ {
+ if (!(amqe instanceof AMQConnectionFailureException))
+ {
+ fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+ public void testUnresolvedHostFailure() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''");
+ fail("Connection should not be established");
+ }
+ catch (AMQException amqe)
+ {
+ if (!(amqe instanceof AMQUnresolvedAddressException))
+ {
+ fail("Correct exception not thrown. Excpected 'AMQUnresolvedAddressException' got: " + amqe);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+ public void testUnresolvedVirtualHostFailure() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ BrokerDetails broker = getBroker();
+ broker.setProperty(BrokerDetails.OPTIONS_RETRY, "0");
+ conn = new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + broker + "'");
+ fail("Connection should not be established");
+ }
+ catch (AMQException amqe)
+ {
+ if (!(amqe instanceof AMQConnectionFailureException))
+ {
+ fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testClientIdCannotBeChanged() throws Exception
+ {
+ Connection connection = new AMQConnection(getBroker().toString(), "guest", "guest",
+ "fred", "test");
+ try
+ {
+ connection.setClientID("someClientId");
+ fail("No IllegalStateException thrown when resetting clientid");
+ }
+ catch (javax.jms.IllegalStateException e)
+ {
+ // PASS
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ public void testClientIdIsPopulatedAutomatically() throws Exception
+ {
+ Connection connection = new AMQConnection(getBroker().toString(), "guest", "guest",
+ null, "test");
+ try
+ {
+ assertNotNull(connection.getClientID());
+ }
+ finally
+ {
+ connection.close();
+ }
+ connection.close();
+ }
+
+ public void testUnsupportedSASLMechanism() throws Exception
+ {
+ BrokerDetails broker = getBroker();
+ broker.setProperty(BrokerDetails.OPTIONS_SASL_MECHS, "MY_MECH");
+
+ try
+ {
+ Connection connection = new AMQConnection(broker.toString(), "guest", "guest",
+ null, "test");
+ connection.close();
+ fail("The client should throw a ConnectionException stating the" +
+ " broker does not support the SASL mech specified by the client");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Incorrect exception thrown",
+ e.getMessage().contains("The following SASL mechanisms " +
+ "[MY_MECH]" +
+ " specified by the client are not supported by the broker"));
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ConnectionTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
new file mode 100644
index 0000000000..cec9d292cf
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+/**
+ * ExceptionListenerTest
+ *
+ */
+
+public class ExceptionListenerTest extends QpidBrokerTestCase
+{
+
+ public void testBrokerDeath() throws Exception
+ {
+ Connection conn = getConnection("guest", "guest");
+
+ conn.start();
+
+ final CountDownLatch fired = new CountDownLatch(1);
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ fired.countDown();
+ }
+ });
+
+ stopBroker();
+
+ if (!fired.await(3, TimeUnit.SECONDS))
+ {
+ fail("exception listener was not fired");
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java
new file mode 100644
index 0000000000..b60fe76b76
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.forwardall;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+
+/**
+ * Declare a private temporary response queue,
+ * send a message to amq.direct with a well known routing key with the
+ * private response queue as the reply-to destination
+ * consume responses.
+ */
+public class Client implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(Client.class);
+
+ private final AMQConnection _connection;
+ private final AMQSession _session;
+ private final int _expected;
+ private int _count;
+ private static QpidBrokerTestCase _qct;
+
+ Client(String broker, int expected) throws Exception
+ {
+ this(connect(broker), expected);
+ }
+
+ public static void setQTC(QpidBrokerTestCase qtc)
+ {
+ _qct = qtc;
+ }
+ Client(AMQConnection connection, int expected) throws Exception
+ {
+ _connection = connection;
+ _expected = expected;
+ _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE);
+ AMQQueue response =
+ new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("ResponseQueue"), true);
+ _session.createConsumer(response).setMessageListener(this);
+ _connection.start();
+ // AMQQueue service = new SpecialQueue(_connection, "ServiceQueue");
+ AMQQueue service = (AMQQueue) _session.createQueue("ServiceQueue") ;
+ Message request = _session.createTextMessage("Request!");
+ request.setJMSReplyTo(response);
+ MessageProducer prod = _session.createProducer(service);
+ prod.send(request);
+ _session.commit();
+ }
+
+ void shutdownWhenComplete() throws Exception
+ {
+ waitUntilComplete();
+ _connection.close();
+ }
+
+ public synchronized void onMessage(Message response)
+ {
+
+ _logger.info("Received " + (++_count) + " of " + _expected + " responses.");
+ if (_count == _expected)
+ {
+
+ notifyAll();
+ }
+ try
+ {
+ _session.commit();
+ }
+ catch (JMSException e)
+ {
+
+ }
+
+ }
+
+ synchronized void waitUntilComplete() throws Exception
+ {
+
+ if (_count < _expected)
+ {
+ wait(60000);
+ }
+
+ if (_count < _expected)
+ {
+ throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected);
+ }
+ }
+
+ static AMQConnection connect(String broker) throws Exception
+ {
+ //return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test");
+ return (AMQConnection) _qct.getConnection("guest", "guest") ;
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ final String connectionString;
+ final int expected;
+ if (argv.length == 0)
+ {
+ connectionString = "localhost:5672";
+ expected = 100;
+ }
+ else
+ {
+ connectionString = argv[0];
+ expected = Integer.parseInt(argv[1]);
+ }
+
+ new Client(connect(connectionString), expected).shutdownWhenComplete();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
new file mode 100644
index 0000000000..45945eb8fc
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.forwardall;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Runs the Service's and Client parts of the test in the same process
+ * as the broker
+ */
+public class CombinedTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(CombinedTest.class);
+ private int run = 0;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ Service.setQTC(this);
+ Client.setQTC(this);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ ServiceCreator.closeAll();
+ super.tearDown();
+ }
+
+ public void testForwardAll() throws Exception
+ {
+ while (run < 10)
+ {
+ int services =1;
+ ServiceCreator.start("vm://:1", services);
+
+ _logger.info("Starting " + ++run + " client...");
+
+ new Client("vm://:1", services).shutdownWhenComplete();
+
+
+ _logger.info("Completed " + run + " successfully!");
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(CombinedTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java
new file mode 100644
index 0000000000..160700bdda
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.forwardall;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Declare a queue and bind it to amq.direct with a 'well known' routing key,
+ * register a consumer for this queue and send a response to every message received.
+ */
+public class Service implements MessageListener
+{
+ private final AMQConnection _connection;
+ private final AMQSession _session;
+
+ private static QpidBrokerTestCase _qct;
+
+
+ public static void setQTC(QpidBrokerTestCase qtc)
+ {
+ _qct = qtc;
+ }
+ Service(String broker) throws Exception
+ {
+ this(connect(broker));
+ }
+
+ Service(AMQConnection connection) throws Exception
+ {
+ _connection = connection;
+ //AMQQueue queue = new SpecialQueue(connection, "ServiceQueue");
+ _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE);
+ AMQQueue queue = (AMQQueue) _session.createQueue("ServiceQueue") ;
+ _session.createConsumer(queue).setMessageListener(this);
+ _connection.start();
+ }
+
+ public void onMessage(Message request)
+ {
+ try
+ {
+ Message response = _session.createTextMessage("Response!");
+ Destination replyTo = request.getJMSReplyTo();
+ _session.createProducer(replyTo).send(response);
+ _session.commit();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ public void close() throws JMSException
+ {
+ _connection.close();
+ }
+
+ static AMQConnection connect(String broker) throws Exception
+ {
+ //return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test");
+ return (AMQConnection) _qct.getConnection("guest", "guest") ;
+ }
+
+// public static void main(String[] argv) throws Exception
+// {
+// String broker = argv.length == 0? "localhost:5672" : argv[0];
+// new Service(broker);
+// }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java
new file mode 100644
index 0000000000..be16f6b7ae
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.forwardall;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+
+public class ServiceCreator implements Runnable
+{
+ private static final Logger _logger = LoggerFactory.getLogger(ServiceCreator.class);
+
+ private static Thread[] threads;
+ private static ServiceCreator[] _services;
+
+ private final String broker;
+ private Service service;
+
+ ServiceCreator(String broker)
+ {
+ this.broker = broker;
+ }
+
+ public void run()
+ {
+ try
+ {
+ service = new Service(broker);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ public void closeSC() throws JMSException
+ {
+ service.close();
+ }
+
+ static void closeAll()
+ {
+ for (int i = 0; i < _services.length; i++)
+ {
+ try
+ {
+ _services[i].closeSC();
+ }
+ catch (JMSException e)
+ {
+ // ignore
+ }
+ }
+ }
+
+ static void start(String broker, int services) throws InterruptedException
+ {
+ threads = new Thread[services];
+ _services = new ServiceCreator[services];
+ ServiceCreator runner = new ServiceCreator(broker);
+ // start services
+ _logger.info("Starting " + services + " services...");
+ for (int i = 0; i < services; i++)
+ {
+ threads[i] = new Thread(runner);
+ _services[i] = runner;
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threads.length; i++)
+ {
+ threads[i].join();
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ final String connectionString;
+ final int services;
+ if (argv.length == 0)
+ {
+ connectionString = "localhost:5672";
+ services = 100;
+ }
+ else
+ {
+ connectionString = argv[0];
+ services = Integer.parseInt(argv[1]);
+ }
+
+ start(connectionString, services);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
new file mode 100644
index 0000000000..27371b0397
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.forwardall;
+
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * Queue that allows several private queues to be registered and bound
+ * to an exchange with the same routing key.
+ *
+ */
+class SpecialQueue extends AMQQueue
+{
+ private final AMQShortString name;
+
+ SpecialQueue(AMQConnection con, String name)
+ {
+ super(con, name, true);
+ this.name = new AMQShortString(name);
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return name;
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
new file mode 100644
index 0000000000..fd28b86762
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
@@ -0,0 +1,335 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.message;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+public class ObjectMessageTest extends QpidBrokerTestCase implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(ObjectMessageTest.class);
+
+ private AMQConnection connection;
+ private AMQDestination destination;
+ private AMQSession session;
+ private MessageProducer producer;
+ private Serializable[] data;
+ private volatile boolean waiting;
+ private int received;
+ private final ArrayList items = new ArrayList();
+
+ private String _broker = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ connection = (AMQConnection) getConnection("guest", "guest");
+ destination = new AMQQueue(connection, randomize("LatencyTest"), true);
+ session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ // set up a consumer
+ session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+
+ // create a publisher
+ producer = session.createProducer(destination, false, false, true);
+ A a1 = new A(1, "A");
+ A a2 = new A(2, "a");
+ B b = new B(1, "B");
+ C c = new C();
+ c.put("A1", a1);
+ c.put("a2", a2);
+ c.put("B", b);
+ c.put("String", "String");
+
+ data = new Serializable[] { a1, a2, b, c, "Hello World!", new Integer(1001) };
+ }
+
+ protected void tearDown() throws Exception
+ {
+ close();
+ super.tearDown();
+ }
+
+ public ObjectMessageTest()
+ { }
+
+ ObjectMessageTest(String broker) throws Exception
+ {
+ _broker = broker;
+ }
+
+ public void testSendAndReceive() throws Exception
+ {
+ try
+ {
+ send();
+ waitUntilReceived(data.length);
+ check();
+ _logger.info("All " + data.length + " items matched.");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("This Test should succeed but failed due to: " + e);
+ }
+ }
+
+ public void testSetObjectPropertyForString() throws Exception
+ {
+ String testStringProperty = "TestStringProperty";
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestStringProperty", testStringProperty);
+ assertEquals(testStringProperty, msg.getObjectProperty("TestStringProperty"));
+ }
+
+ public void testSetObjectPropertyForBoolean() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestBooleanProperty", Boolean.TRUE);
+ assertEquals(Boolean.TRUE, msg.getObjectProperty("TestBooleanProperty"));
+ }
+
+ public void testSetObjectPropertyForByte() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestByteProperty", Byte.MAX_VALUE);
+ assertEquals(Byte.MAX_VALUE, msg.getObjectProperty("TestByteProperty"));
+ }
+
+ public void testSetObjectPropertyForShort() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestShortProperty", Short.MAX_VALUE);
+ assertEquals(Short.MAX_VALUE, msg.getObjectProperty("TestShortProperty"));
+ }
+
+ public void testSetObjectPropertyForInteger() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestIntegerProperty", Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE, msg.getObjectProperty("TestIntegerProperty"));
+ }
+
+ public void testSetObjectPropertyForDouble() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestDoubleProperty", Double.MAX_VALUE);
+ assertEquals(Double.MAX_VALUE, msg.getObjectProperty("TestDoubleProperty"));
+ }
+
+ public void testSetObjectPropertyForFloat() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestFloatProperty", Float.MAX_VALUE);
+ assertEquals(Float.MAX_VALUE, msg.getObjectProperty("TestFloatProperty"));
+ }
+
+ public void testSetObjectPropertyForByteArray() throws Exception
+ {
+ byte[] array = { 1, 2, 3, 4, 5 };
+ ObjectMessage msg = session.createObjectMessage(data[0]);
+ msg.setObjectProperty("TestByteArrayProperty", array);
+ assertTrue(Arrays.equals(array, (byte[]) msg.getObjectProperty("TestByteArrayProperty")));
+ }
+
+ public void testSetObjectForNull() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage();
+ msg.setObject(null);
+ assertNull(msg.getObject());
+ }
+
+ private void send() throws Exception
+ {
+ for (int i = 0; i < data.length; i++)
+ {
+ ObjectMessage msg;
+ if ((i % 2) == 0)
+ {
+ msg = session.createObjectMessage(data[i]);
+ }
+ else
+ {
+ msg = session.createObjectMessage();
+ msg.setObject(data[i]);
+ }
+
+ producer.send(msg);
+ }
+ }
+
+ public void check() throws Exception
+ {
+ Object[] actual = (Object[]) items.toArray();
+ if (actual.length != data.length)
+ {
+ throw new Exception("Expected " + data.length + " objects, got " + actual.length);
+ }
+
+ for (int i = 0; i < data.length; i++)
+ {
+ if (actual[i] instanceof Exception)
+ {
+ throw new Exception("Error on receive of " + data[i], ((Exception) actual[i]));
+ }
+
+ if (actual[i] == null)
+ {
+ throw new Exception("Expected " + data[i] + " got null");
+ }
+
+ if (!data[i].equals(actual[i]))
+ {
+ throw new Exception("Expected " + data[i] + " got " + actual[i]);
+ }
+ }
+ }
+
+ private void close() throws Exception
+ {
+ session.close();
+ connection.close();
+ }
+
+ private synchronized void waitUntilReceived(int count) throws InterruptedException
+ {
+ waiting = true;
+ while (received < count)
+ {
+ wait();
+ }
+
+ waiting = false;
+ }
+
+ public void onMessage(Message message)
+ {
+
+ try
+ {
+ if (message instanceof ObjectMessage)
+ {
+ items.add(((ObjectMessage) message).getObject());
+ }
+ else
+ {
+ _logger.error("ERROR: Got " + message.getClass().getName() + " not ObjectMessage");
+ items.add(message);
+ }
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ items.add(e);
+ }
+
+ synchronized (this)
+ {
+ received++;
+ notify();
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ String broker = (argv.length > 0) ? argv[0] : "vm://:1";
+ if ("-help".equals(broker))
+ {
+ System.out.println("Usage: <broker>");
+ }
+
+ new ObjectMessageTest(broker).testSendAndReceive();
+ }
+
+ private static class A implements Serializable
+ {
+ private String sValue;
+ private int iValue;
+
+ A(int i, String s)
+ {
+ sValue = s;
+ iValue = i;
+ }
+
+ public int hashCode()
+ {
+ return iValue;
+ }
+
+ public boolean equals(Object o)
+ {
+ return (o instanceof A) && equals((A) o);
+ }
+
+ protected boolean equals(A a)
+ {
+ return areEqual(a.sValue, sValue) && (a.iValue == iValue);
+ }
+ }
+
+ private static class B extends A
+ {
+ private long time;
+
+ B(int i, String s)
+ {
+ super(i, s);
+ time = System.currentTimeMillis();
+ }
+
+ protected boolean equals(A a)
+ {
+ return super.equals(a) && (a instanceof B) && (time == ((B) a).time);
+ }
+ }
+
+ private static class C extends HashMap implements Serializable
+ { }
+
+ private static boolean areEqual(Object a, Object b)
+ {
+ return (a == null) ? (b == null) : a.equals(b);
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
new file mode 100644
index 0000000000..278b9e9c04
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.protocol;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.TestNetworkDriver;
+
+public class AMQProtocolSessionTest extends QpidBrokerTestCase
+{
+ private static class AMQProtSession extends AMQProtocolSession
+ {
+
+ public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ {
+ super(protocolHandler,connection);
+ }
+
+ public TestNetworkDriver getNetworkDriver()
+ {
+ return (TestNetworkDriver) _protocolHandler.getNetworkDriver();
+ }
+
+ public AMQShortString genQueueName()
+ {
+ return generateQueueName();
+ }
+ }
+
+ private AMQProtSession _testSession;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con);
+ protocolHandler.setNetworkDriver(new TestNetworkDriver());
+
+ //don't care about the values set here apart from the dummy IoSession
+ _testSession = new AMQProtSession(protocolHandler , con);
+ }
+
+ public void testTemporaryQueueWildcard() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(1234), "tmp_0_0_0_0_0_0_0_0_1234_1");
+ }
+
+ public void testTemporaryQueueLocalhostAddr() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234), "tmp_127_0_0_1_1234_1");
+ }
+
+ public void testTemporaryQueueLocalhostName() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(InetAddress.getByName("localhost"), 1234), "tmp_localhost_127_0_0_1_1234_1");
+ }
+
+ public void testTemporaryQueueInet4() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(InetAddress.getByName("192.168.1.2"), 1234), "tmp_192_168_1_2_1234_1");
+ }
+
+ public void testTemporaryQueueInet6() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(InetAddress.getByName("1080:0:0:0:8:800:200C:417A"), 1234), "tmp_1080_0_0_0_8_800_200c_417a_1234_1");
+ }
+
+ public void testTemporaryQueuePipe() throws UnknownHostException
+ {
+ checkTempQueueName(new VmPipeAddress(1), "tmp_vm_1_1");
+ }
+
+ private void checkTempQueueName(SocketAddress address, String queueName)
+ {
+ _testSession.getNetworkDriver().setLocalAddress(address);
+ assertEquals("Wrong queue name", queueName, _testSession.genQueueName().asString());
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
new file mode 100644
index 0000000000..8c806fa2da
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.test.unit.client.temporaryqueue;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+import junit.framework.Assert;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.ConnectionListener;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.LinkedList;
+
+public class TemporaryQueueTest extends QpidBrokerTestCase implements ExceptionListener
+{
+ private List<Exception> _exceptions = new ArrayList<Exception>();
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ protected Connection createConnection() throws Exception
+ {
+ return getConnection("guest", "guest");
+ }
+
+ public void testTemporaryQueue() throws Exception
+ {
+ Connection conn = createConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ conn.start();
+ producer.send(session.createTextMessage("hello"));
+ TextMessage tm = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm);
+ assertEquals("hello", tm.getText());
+
+ try
+ {
+ queue.delete();
+ fail("Expected JMSException : should not be able to delete while there are active consumers");
+ }
+ catch (JMSException je)
+ {
+ ; //pass
+ }
+
+ consumer.close();
+
+ try
+ {
+ queue.delete();
+ }
+ catch (JMSException je)
+ {
+ fail("Unexpected Exception: " + je.getMessage());
+ }
+
+ conn.close();
+ }
+
+ public void tUniqueness() throws Exception
+ {
+ int numProcs = Runtime.getRuntime().availableProcessors();
+ final int threadsProc = 5;
+
+ runUniqueness(1, 10);
+ runUniqueness(numProcs * threadsProc, 10);
+ runUniqueness(numProcs * threadsProc, 100);
+ runUniqueness(numProcs * threadsProc, 500);
+ }
+
+ void runUniqueness(int makers, int queues) throws Exception
+ {
+ Connection connection = createConnection();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>();
+
+ //Create Makers
+ for (int m = 0; m < makers; m++)
+ {
+ tqList.add(new TempQueueMaker(session, queues));
+ }
+
+
+ List<Thread> threadList = new LinkedList<Thread>();
+
+ //Create Makers
+ for (TempQueueMaker maker : tqList)
+ {
+ threadList.add(new Thread(maker));
+ }
+
+ //Start threads
+ for (Thread thread : threadList)
+ {
+ thread.start();
+ }
+
+ // Join Threads
+ for (Thread thread : threadList)
+ {
+ try
+ {
+ thread.join();
+ }
+ catch (InterruptedException e)
+ {
+ fail("Couldn't correctly join threads");
+ }
+ }
+
+
+ List<AMQQueue> list = new LinkedList<AMQQueue>();
+
+ // Test values
+ for (TempQueueMaker maker : tqList)
+ {
+ check(maker, list);
+ }
+
+ Assert.assertEquals("Not enough queues made.", makers * queues, list.size());
+
+ connection.close();
+ }
+
+ private void check(TempQueueMaker tq, List<AMQQueue> list)
+ {
+ for (AMQQueue q : tq.getList())
+ {
+ if (list.contains(q))
+ {
+ fail(q + " already exists.");
+ }
+ else
+ {
+ list.add(q);
+ }
+ }
+ }
+
+
+ class TempQueueMaker implements Runnable
+ {
+ List<AMQQueue> _queues;
+ Session _session;
+ private int _count;
+
+
+ TempQueueMaker(Session session, int queues) throws JMSException
+ {
+ _queues = new LinkedList<AMQQueue>();
+
+ _count = queues;
+
+ _session = session;
+ }
+
+ public void run()
+ {
+ int i = 0;
+ try
+ {
+ for (; i < _count; i++)
+ {
+ _queues.add((AMQQueue) _session.createTemporaryQueue());
+ }
+ }
+ catch (JMSException jmse)
+ {
+ //stop
+ }
+ }
+
+ List<AMQQueue> getList()
+ {
+ return _queues;
+ }
+ }
+
+ public void testQPID1217() throws Exception
+ {
+ Connection conA = getConnection();
+ conA.setExceptionListener(this);
+ Session sessA = conA.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue temp = sessA.createTemporaryQueue();
+
+ MessageProducer prod = sessA.createProducer(temp);
+ prod.send(sessA.createTextMessage("hi"));
+
+ Thread.sleep(500);
+ assertTrue("Exception received", _exceptions.isEmpty());
+
+ Connection conB = getConnection();
+ Session sessB = conB.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JMSException ex = null;
+ try
+ {
+ MessageConsumer consB = sessB.createConsumer(temp);
+ }
+ catch (JMSException e)
+ {
+ ex = e;
+ }
+ assertNotNull(ex);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TemporaryQueueTest.class);
+ }
+
+ public void onException(JMSException arg0)
+ {
+ _exceptions.add(arg0);
+ }
+
+}