summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java284
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java163
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java104
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java190
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MapMessageTest.java1271
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java218
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java278
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java408
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java75
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java82
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SessionStartTest.java115
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/TextMessageTest.java248
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java58
13 files changed, 3494 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
new file mode 100644
index 0000000000..59ce64eb4f
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import junit.framework.Assert;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.util.Waiter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class BytesMessageTest extends QpidBrokerTestCase implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(BytesMessageTest.class);
+
+ private Connection _connection;
+ private Destination _destination;
+ private Session _session;
+ private final List<JMSBytesMessage> received = new ArrayList<JMSBytesMessage>();
+ private final List<byte[]> messages = new ArrayList<byte[]>();
+ private int _count = 100;
+ public String _connectionString = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ init((AMQConnection) getConnection("guest", "guest"));
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue(connection, randomize("BytesMessageTest"), true));
+ }
+
+ void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ // Set up a slow consumer.
+ _session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+ }
+
+ public void test() throws Exception
+ {
+ try
+ {
+ send(_count);
+ waitFor(_count);
+ check();
+ _logger.info("Completed without failure");
+ }
+ finally
+ {
+ _connection.close();
+ }
+ }
+
+ void send(int count) throws JMSException
+ {
+ // create a publisher
+ MessageProducer producer = _session.createProducer(_destination);
+ for (int i = 0; i < count; i++)
+ {
+ BytesMessage msg = _session.createBytesMessage();
+
+ try
+ {
+ msg.readFloat();
+ Assert.fail("Message should not be readable");
+ }
+ catch (MessageNotReadableException mnwe)
+ {
+ // normal execution
+ }
+
+ byte[] data = ("Message " + i).getBytes();
+ msg.writeBytes(data);
+ messages.add(data);
+ producer.send(msg);
+ }
+ }
+
+ void waitFor(int count) throws InterruptedException
+ {
+ synchronized (received)
+ {
+ Waiter w = new Waiter(received, 30000);
+ while (received.size() < count && w.hasTime())
+ {
+ w.await();
+ }
+ }
+ }
+
+ void check() throws JMSException
+ {
+ List<byte[]> actual = new ArrayList<byte[]>();
+ for (JMSBytesMessage m : received)
+ {
+ ByteBuffer buffer = m.getData();
+ byte[] data = new byte[buffer.remaining()];
+ buffer.get(data);
+ actual.add(data);
+
+ // Check Body Write Status
+ try
+ {
+ m.writeBoolean(true);
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ // normal execution
+ }
+
+ m.clearBody();
+
+ try
+ {
+ m.writeBoolean(true);
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+
+ // Check property write status
+ try
+ {
+ m.setStringProperty("test", "test");
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ // normal execution
+ }
+
+ m.clearProperties();
+
+ try
+ {
+ m.setStringProperty("test", "test");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+
+ }
+
+ assertEqual(messages.iterator(), actual.iterator());
+ }
+
+ private static void assertEqual(Iterator expected, Iterator actual)
+ {
+ List<String> errors = new ArrayList<String>();
+ while (expected.hasNext() && actual.hasNext())
+ {
+ try
+ {
+ assertEquivalent((byte[]) expected.next(), (byte[]) actual.next());
+ }
+ catch (Exception e)
+ {
+ errors.add(e.getMessage());
+ }
+ }
+ while (expected.hasNext())
+ {
+ errors.add("Expected " + expected.next() + " but no more actual values.");
+ }
+ while (actual.hasNext())
+ {
+ errors.add("Found " + actual.next() + " but no more expected values.");
+ }
+
+ if (!errors.isEmpty())
+ {
+ throw new RuntimeException(errors.toString());
+ }
+ }
+
+ private static void assertEquivalent(byte[] expected, byte[] actual)
+ {
+ if (expected.length != actual.length)
+ {
+ throw new RuntimeException("Expected length " + expected.length + " got " + actual.length);
+ }
+
+ for (int i = 0; i < expected.length; i++)
+ {
+ if (expected[i] != actual[i])
+ {
+ throw new RuntimeException("Failed on byte " + i + " of " + expected.length);
+ }
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ synchronized (received)
+ {
+ received.add((JMSBytesMessage) message);
+ received.notify();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ final String connectionString;
+ final int count;
+ if (argv.length == 0)
+ {
+ connectionString = "vm://:1";
+ count = 100;
+ }
+ else
+ {
+ connectionString = argv[0];
+ count = Integer.parseInt(argv[1]);
+ }
+
+ System.out.println("connectionString = " + connectionString);
+ System.out.println("count = " + count);
+
+ BytesMessageTest test = new BytesMessageTest();
+ test._connectionString = connectionString;
+ test._count = count;
+ test.test();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
new file mode 100644
index 0000000000..abf8da799c
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class FieldTableMessageTest extends QpidBrokerTestCase implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(FieldTableMessageTest.class);
+
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private final ArrayList<JMSBytesMessage> received = new ArrayList<JMSBytesMessage>();
+ private FieldTable _expected;
+ private int _count = 10;
+ public String _connectionString = "vm://:1";
+ private CountDownLatch _waitForCompletion;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ init( (AMQConnection) getConnection("guest", "guest"));
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue(connection, randomize("FieldTableMessageTest"), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ // set up a slow consumer
+ _session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+
+ // _expected = new FieldTableTest().load("FieldTableTest2.properties");
+ _expected = load();
+ }
+
+ private FieldTable load() throws IOException
+ {
+ FieldTable result = FieldTableFactory.newFieldTable();
+ result.setLong("one", 1L);
+ result.setLong("two", 2L);
+ result.setLong("three", 3L);
+ result.setLong("four", 4L);
+ result.setLong("five", 5L);
+
+ return result;
+ }
+
+ public void test() throws Exception
+ {
+ int count = _count;
+ _waitForCompletion = new CountDownLatch(_count);
+ send(count);
+ _waitForCompletion.await(20, TimeUnit.SECONDS);
+ check();
+ _logger.info("Completed without failure");
+ _connection.close();
+ }
+
+ void send(int count) throws JMSException, IOException
+ {
+ // create a publisher
+ MessageProducer producer = _session.createProducer(_destination);
+ for (int i = 0; i < count; i++)
+ {
+ BytesMessage msg = _session.createBytesMessage();
+ msg.writeBytes(_expected.getDataAsBytes());
+ producer.send(msg);
+ }
+ }
+
+
+ void check() throws JMSException, AMQFrameDecodingException
+ {
+ for (Object m : received)
+ {
+ ByteBuffer buffer = ((JMSBytesMessage) m).getData();
+ FieldTable actual = FieldTableFactory.newFieldTable(buffer, buffer.remaining());
+ for (String key : _expected.keys())
+ {
+ assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key));
+ }
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ synchronized (received)
+ {
+ received.add((JMSBytesMessage) message);
+ _waitForCompletion.countDown();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ FieldTableMessageTest test = new FieldTableMessageTest();
+ test._connectionString = (argv.length == 0) ? "vm://:1" : argv[0];
+ test.setUp();
+ test._count = (argv.length > 1) ? Integer.parseInt(argv[1]) : 5;
+ test.test();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
new file mode 100644
index 0000000000..c9f6a22500
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Session;
+import javax.jms.QueueSession;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+import javax.jms.TextMessage;
+import javax.jms.InvalidDestinationException;
+
+public class InvalidDestinationTest extends QpidBrokerTestCase
+{
+ private AMQConnection _connection;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _connection = (AMQConnection) getConnection("guest", "guest");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ super.tearDown();
+ }
+
+
+
+ public void testInvalidDestination() throws Exception
+ {
+ Queue invalidDestination = new AMQQueue("amq.direct","unknownQ");
+ AMQQueue validDestination = new AMQQueue("amq.direct","knownQ");
+ QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // This is the only easy way to create and bind a queue from the API :-(
+ queueSession.createConsumer(validDestination);
+
+ QueueSender sender = queueSession.createSender(invalidDestination);
+ TextMessage msg = queueSession.createTextMessage("Hello");
+ try
+ {
+ sender.send(msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.close();
+
+ sender = queueSession.createSender(null);
+ invalidDestination = new AMQQueue("amq.direct","unknownQ");
+
+ try
+ {
+ sender.send(invalidDestination,msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.send(validDestination,msg);
+ sender.close();
+ validDestination = new AMQQueue("amq.direct","knownQ");
+ sender = queueSession.createSender(validDestination);
+ sender.send(msg);
+
+
+
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+
+ return new junit.framework.TestSuite(InvalidDestinationTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
new file mode 100644
index 0000000000..d97e22e024
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class LargeMessageTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(LargeMessageTest.class);
+
+ private Destination _destination;
+ private AMQSession _session;
+ private AMQConnection _connection;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ try
+ {
+ _connection = (AMQConnection) getConnection("guest", "guest");
+ init( _connection );
+ }
+ catch (Exception e)
+ {
+ fail("Unable to initialilse connection: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ super.tearDown();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ Destination destination = new AMQQueue(connection, "LargeMessageTest", true);
+ init(connection, destination);
+ }
+
+ private void init(AMQConnection connection, Destination destination) throws Exception
+ {
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ connection.start();
+ }
+
+ // Test boundary of 1 packet to 2 packets
+ public void test64kminus9()
+ {
+ checkLargeMessage((64 * 1024) - 9);
+ }
+
+ public void test64kminus8()
+ {
+ checkLargeMessage((64 * 1024)-8);
+ }
+
+ public void test64kminus7()
+ {
+ checkLargeMessage((64 * 1024)-7);
+ }
+
+
+ public void test64kplus1()
+ {
+ checkLargeMessage((64 * 1024) + 1);
+ }
+
+ // Test packet boundary of 3 packtes
+ public void test128kminus1()
+ {
+ checkLargeMessage((128 * 1024) - 1);
+ }
+
+ public void test128k()
+ {
+ checkLargeMessage(128 * 1024);
+ }
+
+ public void test128kplus1()
+ {
+ checkLargeMessage((128 * 1024) + 1);
+ }
+
+ // Testing larger messages
+
+ public void test256k()
+ {
+ checkLargeMessage(256 * 1024);
+ }
+
+ public void test512k()
+ {
+ checkLargeMessage(512 * 1024);
+ }
+
+ public void test1024k()
+ {
+ checkLargeMessage(1024 * 1024);
+ }
+
+ protected void checkLargeMessage(int messageSize)
+ {
+ try
+ {
+ MessageConsumer consumer = _session.createConsumer(_destination);
+ MessageProducer producer = _session.createProducer(_destination);
+ _logger.info("Testing message of size:" + messageSize);
+
+ String _messageText = buildLargeMessage(messageSize);
+
+ _logger.debug("Message built");
+
+ producer.send(_session.createTextMessage(_messageText));
+
+ TextMessage result = (TextMessage) consumer.receive(10000);
+
+ assertNotNull("Null message recevied", result);
+ assertEquals("Message Size", _messageText.length(), result.getText().length());
+ assertEquals("Message content differes", _messageText, result.getText());
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail("Excpetion occured:" + e.getCause());
+ }
+ }
+
+ private String buildLargeMessage(int size)
+ {
+ StringBuilder builder = new StringBuilder(size);
+
+ char ch = 'a';
+
+ for (int i = 1; i <= size; i++)
+ {
+ builder.append(ch);
+
+ if ((i % 1000) == 0)
+ {
+ ch++;
+ if (ch == ('z' + 1))
+ {
+ ch = 'a';
+ }
+ }
+ }
+
+ return builder.toString();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(LargeMessageTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
new file mode 100644
index 0000000000..9f13ddcfdb
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
@@ -0,0 +1,1271 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import junit.framework.Assert;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageListener;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class MapMessageTest extends QpidBrokerTestCase implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(MapMessageTest.class);
+
+ private AMQConnection _connection;
+ private Destination _destination;
+ private AMQSession _session;
+ private final List<JMSMapMessage> received = new ArrayList<JMSMapMessage>();
+
+ private static final String MESSAGE = "Message ";
+ private int _count = 100;
+ public String _connectionString = "vm://:1";
+ private byte[] _bytes = { 99, 98, 97, 96, 95 };
+ private static final float _smallfloat = 100.0f;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ try
+ {
+ init((AMQConnection) getConnection("guest", "guest"));
+ }
+ catch (Exception e)
+ {
+ fail("Unable to initialilse connection: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _logger.info("Tearing Down unit.basic.MapMessageTest");
+ super.tearDown();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ Destination destination = new AMQQueue(connection, randomize("MapMessageTest"), true);
+ init(connection, destination);
+ }
+
+ private void init(AMQConnection connection, Destination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // set up a slow consumer
+ _session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+ }
+
+ public void test() throws Exception
+ {
+ int count = _count;
+ send(count);
+ waitFor(count);
+ check();
+ _connection.close();
+ }
+
+ void send(int count) throws JMSException
+ {
+ // create a publisher
+ MessageProducer producer = _session.createProducer(_destination);
+ for (int i = 0; i < count; i++)
+ {
+ MapMessage message = _session.createMapMessage();
+
+ setMapValues(message, i);
+
+ producer.send(message);
+ }
+ }
+
+ private void setMapValues(MapMessage message, int i) throws JMSException
+ {
+ message.setBoolean("odd", (i / 2) == 0);
+ message.setByte("byte",Byte.MAX_VALUE);
+ message.setBytes("bytes", _bytes);
+ message.setChar("char",'c');
+ message.setDouble("double", Double.MAX_VALUE);
+ message.setFloat("float", Float.MAX_VALUE);
+ message.setFloat("smallfloat", 100);
+ message.setInt("messageNumber", i);
+ message.setInt("int", Integer.MAX_VALUE);
+ message.setLong("long", Long.MAX_VALUE);
+ message.setShort("short", Short.MAX_VALUE);
+ message.setString("message", MESSAGE + i);
+
+ // Test Setting Object Values
+ message.setObject("object-bool", true);
+ message.setObject("object-byte", Byte.MAX_VALUE);
+ message.setObject("object-bytes", _bytes);
+ message.setObject("object-char", 'c');
+ message.setObject("object-double", Double.MAX_VALUE);
+ message.setObject("object-float", Float.MAX_VALUE);
+ message.setObject("object-int", Integer.MAX_VALUE);
+ message.setObject("object-long", Long.MAX_VALUE);
+ message.setObject("object-short", Short.MAX_VALUE);
+
+ // Set a null String value
+ message.setString("nullString", null);
+ // Highlight protocol problem
+ message.setString("emptyString", "");
+
+ }
+
+ void waitFor(int count) throws Exception
+ {
+ long waitTime = 30000L;
+ long waitUntilTime = System.currentTimeMillis() + 30000L;
+
+ synchronized (received)
+ {
+ while ((received.size() < count) && (waitTime > 0))
+ {
+ if (received.size() < count)
+ {
+ received.wait(waitTime);
+ }
+
+ if (received.size() < count)
+ {
+ waitTime = waitUntilTime - System.currentTimeMillis();
+ }
+ }
+
+ if (received.size() < count)
+ {
+ throw new Exception("Timed-out. Waiting for " + count + " only got " + received.size());
+ }
+ }
+ }
+
+ void check() throws JMSException
+ {
+ int count = 0;
+ for (JMSMapMessage m : received)
+ {
+ testMapValues(m, count);
+
+ testCorrectExceptions(m);
+
+ testMessageWriteStatus(m);
+
+ testPropertyWriteStatus(m);
+
+ count++;
+ }
+ }
+
+ private void testCorrectExceptions(JMSMapMessage m) throws JMSException
+ {
+ testBoolean(m);
+
+ testByte(m);
+
+ testBytes(m);
+
+ testChar(m);
+
+ testDouble(m);
+
+ testFloat(m);
+
+ testInt(m);
+
+ testLong(m);
+
+ testShort(m);
+
+ testString(m);
+ }
+
+ private void testString(JMSMapMessage m) throws JMSException
+ {
+
+ Assert.assertFalse(m.getBoolean("message"));
+
+ try
+ {
+ m.getByte("message");
+ fail("Exception Expected.");
+ }
+ catch (NumberFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getShort("message");
+ fail("Exception Expected.");
+ }
+ catch (NumberFormatException nfe)
+ {
+ // normal execution
+ }
+
+ // Try bad reads
+ try
+ {
+ m.getChar("message");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException npe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getInt("message");
+ fail("Exception Expected.");
+ }
+ catch (NumberFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getLong("message");
+ fail("Exception Expected.");
+ }
+ catch (NumberFormatException nfe)
+ {
+ // normal execution
+ }
+
+ // Try bad reads
+ try
+ {
+ m.getFloat("message");
+ fail("Exception Expected.");
+ }
+ catch (NumberFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getDouble("message");
+ fail("Exception Expected.");
+ }
+ catch (NumberFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getBytes("message");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals(MESSAGE + m.getInt("messageNumber"), m.getString("message"));
+ }
+
+ private void testShort(JMSMapMessage m) throws JMSException
+ {
+
+ // Try bad reads
+ try
+ {
+ m.getBoolean("short");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getByte("short");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals(Short.MAX_VALUE, m.getShort("short"));
+
+ // Try bad reads
+ try
+ {
+ m.getChar("short");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException npe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals(Short.MAX_VALUE, m.getInt("short"));
+
+ Assert.assertEquals(Short.MAX_VALUE, m.getLong("short"));
+
+ // Try bad reads
+ try
+ {
+ m.getFloat("short");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getDouble("short");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getBytes("short");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals("" + Short.MAX_VALUE, m.getString("short"));
+ }
+
+ private void testLong(JMSMapMessage m) throws JMSException
+ {
+
+ // Try bad reads
+ try
+ {
+ m.getBoolean("long");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getByte("long");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getShort("long");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ // Try bad reads
+ try
+ {
+ m.getChar("long");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException npe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getInt("long");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals(Long.MAX_VALUE, m.getLong("long"));
+
+ // Try bad reads
+ try
+ {
+ m.getFloat("long");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getDouble("long");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getBytes("long");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals("" + Long.MAX_VALUE, m.getString("long"));
+ }
+
+ private void testDouble(JMSMapMessage m) throws JMSException
+ {
+
+ // Try bad reads
+ try
+ {
+ m.getBoolean("double");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getByte("double");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getShort("double");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ // Try bad reads
+ try
+ {
+ m.getChar("double");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException npe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getInt("double");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getLong("double");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ // Try bad reads
+ try
+ {
+ m.getFloat("double");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals(Double.MAX_VALUE, m.getDouble("double"));
+
+ // Try bad reads
+ try
+ {
+ m.getBytes("double");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals("" + Double.MAX_VALUE, m.getString("double"));
+ }
+
+ private void testFloat(JMSMapMessage m) throws JMSException
+ {
+
+ // Try bad reads
+ try
+ {
+ m.getBoolean("float");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getByte("float");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getShort("float");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ // Try bad reads
+ try
+ {
+ m.getChar("float");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException npe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getInt("float");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getLong("float");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals(Float.MAX_VALUE, m.getFloat("float"));
+
+ Assert.assertEquals(_smallfloat, (float) m.getDouble("smallfloat"));
+
+ // Try bad reads
+ try
+ {
+ m.getBytes("float");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals("" + Float.MAX_VALUE, m.getString("float"));
+ }
+
+ private void testInt(JMSMapMessage m) throws JMSException
+ {
+
+ // Try bad reads
+ try
+ {
+ m.getBoolean("int");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getByte("int");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getShort("int");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ // Try bad reads
+ try
+ {
+ m.getChar("int");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException npe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals(Integer.MAX_VALUE, m.getInt("int"));
+
+ Assert.assertEquals(Integer.MAX_VALUE, (int) m.getLong("int"));
+
+ // Try bad reads
+ try
+ {
+ m.getFloat("int");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getDouble("int");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getBytes("int");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals("" + Integer.MAX_VALUE, m.getString("int"));
+ }
+
+ private void testChar(JMSMapMessage m) throws JMSException
+ {
+
+ // Try bad reads
+ try
+ {
+ m.getBoolean("char");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getByte("char");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getShort("char");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals('c', m.getChar("char"));
+
+ try
+ {
+ m.getInt("char");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getLong("char");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ // Try bad reads
+ try
+ {
+ m.getFloat("char");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getDouble("char");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getBytes("char");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals("" + 'c', m.getString("char"));
+ }
+
+ private void testBytes(JMSMapMessage m) throws JMSException
+ {
+ // Try bad reads
+ try
+ {
+ m.getBoolean("bytes");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getByte("bytes");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getShort("bytes");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ // Try bad reads
+ try
+ {
+ m.getChar("bytes");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException npe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getInt("bytes");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ try
+ {
+ m.getLong("bytes");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ // Try bad reads
+ try
+ {
+ m.getFloat("bytes");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getDouble("bytes");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ assertBytesEqual(_bytes, m.getBytes("bytes"));
+
+ try
+ {
+ m.getString("bytes");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ }
+
+ private void testByte(JMSMapMessage m) throws JMSException
+ {
+ // Try bad reads
+ try
+ {
+ m.getBoolean("byte");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals(Byte.MAX_VALUE, m.getByte("byte"));
+
+ Assert.assertEquals((short) Byte.MAX_VALUE, m.getShort("byte"));
+
+ // Try bad reads
+ try
+ {
+ m.getChar("byte");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException npe)
+ {
+ // normal execution
+ }
+
+ // Reading a byte as an int is ok
+ Assert.assertEquals((short) Byte.MAX_VALUE, m.getInt("byte"));
+
+ Assert.assertEquals((short) Byte.MAX_VALUE, m.getLong("byte"));
+
+ // Try bad reads
+ try
+ {
+ m.getFloat("byte");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getDouble("byte");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getBytes("byte");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals("" + Byte.MAX_VALUE, m.getString("byte"));
+
+ }
+
+ private void testBoolean(JMSMapMessage m) throws JMSException
+ {
+
+ Assert.assertEquals((m.getInt("messageNumber") / 2) == 0, m.getBoolean("odd"));
+
+ // Try bad reads
+ try
+ {
+ m.getByte("odd");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ // Try bad reads
+ try
+ {
+ m.getShort("odd");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getChar("odd");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException npe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getInt("odd");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getLong("odd");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getFloat("odd");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getDouble("odd");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+ // Try bad reads
+ try
+ {
+ m.getBytes("odd");
+ fail("Exception Expected.");
+ }
+ catch (MessageFormatException nfe)
+ {
+ // normal execution
+ }
+
+ Assert.assertEquals("" + ((m.getInt("messageNumber") / 2) == 0), m.getString("odd"));
+ }
+
+ private void testPropertyWriteStatus(JMSMapMessage m) throws JMSException
+ {
+ // Check property write status
+ try
+ {
+ m.setStringProperty("test", "test");
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ // normal execution
+ }
+
+ m.clearProperties();
+
+ try
+ {
+ m.setStringProperty("test", "test");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+ }
+
+ private void testMessageWriteStatus(JMSMapMessage m) throws JMSException
+ {
+ try
+ {
+ m.setInt("testint", 3);
+ fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ // normal execution
+ }
+
+ m.clearBody();
+
+ try
+ {
+ m.setInt("testint", 3);
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+ }
+
+ private void testMapValues(JMSMapMessage m, int count) throws JMSException
+ {
+ // Test get<Primiative>
+
+ // Boolean
+ assertEqual((count / 2) == 0, m.getBoolean("odd"));
+ assertEqual("" + ((count / 2) == 0), m.getString("odd"));
+
+ // Byte
+ assertEqual(Byte.MAX_VALUE, m.getByte("byte"));
+ assertEqual("" + Byte.MAX_VALUE, m.getString("byte"));
+
+ // Bytes
+ assertBytesEqual(_bytes, m.getBytes("bytes"));
+
+ // Char
+ assertEqual('c', m.getChar("char"));
+
+ // Double
+ assertEqual(Double.MAX_VALUE, m.getDouble("double"));
+ assertEqual("" + Double.MAX_VALUE, m.getString("double"));
+
+ // Float
+ assertEqual(Float.MAX_VALUE, m.getFloat("float"));
+ assertEqual(_smallfloat, (float) m.getDouble("smallfloat"));
+ assertEqual("" + Float.MAX_VALUE, m.getString("float"));
+
+ // Integer
+ assertEqual(Integer.MAX_VALUE, m.getInt("int"));
+ assertEqual("" + Integer.MAX_VALUE, m.getString("int"));
+ assertEqual(count, m.getInt("messageNumber"));
+
+ // long
+ assertEqual(Long.MAX_VALUE, m.getLong("long"));
+ assertEqual("" + Long.MAX_VALUE, m.getString("long"));
+
+ // Short
+ assertEqual(Short.MAX_VALUE, m.getShort("short"));
+ assertEqual("" + Short.MAX_VALUE, m.getString("short"));
+ assertEqual((int) Short.MAX_VALUE, m.getInt("short"));
+
+ // String
+ assertEqual(MESSAGE + count, m.getString("message"));
+
+ // Test getObjects
+ assertEqual(true, m.getObject("object-bool"));
+ assertEqual(Byte.MAX_VALUE, m.getObject("object-byte"));
+ assertBytesEqual(_bytes, (byte[]) m.getObject("object-bytes"));
+ assertEqual('c', m.getObject("object-char"));
+ assertEqual(Double.MAX_VALUE, m.getObject("object-double"));
+ assertEqual(Float.MAX_VALUE, m.getObject("object-float"));
+ assertEqual(Integer.MAX_VALUE, m.getObject("object-int"));
+ assertEqual(Long.MAX_VALUE, m.getObject("object-long"));
+ assertEqual(Short.MAX_VALUE, m.getObject("object-short"));
+
+ // Check Special values
+ assertTrue(m.getString("nullString") == null);
+ assertEqual("", m.getString("emptyString"));
+ }
+
+ private void assertBytesEqual(byte[] expected, byte[] actual)
+ {
+ Assert.assertEquals(expected.length, actual.length);
+
+ for (int index = 0; index < expected.length; index++)
+ {
+ Assert.assertEquals(expected[index], actual[index]);
+ }
+ }
+
+ private static void assertEqual(Iterator expected, Iterator actual)
+ {
+ List<String> errors = new ArrayList<String>();
+ while (expected.hasNext() && actual.hasNext())
+ {
+ try
+ {
+ assertEqual(expected.next(), actual.next());
+ }
+ catch (Exception e)
+ {
+ errors.add(e.getMessage());
+ }
+ }
+ while (expected.hasNext())
+ {
+ errors.add("Expected " + expected.next() + " but no more actual values.");
+ }
+ while (actual.hasNext())
+ {
+ errors.add("Found " + actual.next() + " but no more expected values.");
+ }
+
+ if (!errors.isEmpty())
+ {
+ throw new RuntimeException(errors.toString());
+ }
+ }
+
+ private static void assertEqual(Object expected, Object actual)
+ {
+ if (!expected.equals(actual))
+ {
+ throw new RuntimeException("Expected '" + expected + "' found '" + actual + "'");
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ synchronized (received)
+ {
+ _logger.info("****************** Recevied Messgage:" + message);
+ received.add((JMSMapMessage) message);
+ received.notify();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ MapMessageTest test = new MapMessageTest();
+ test._connectionString = (argv.length == 0) ? "vm://:1" : argv[0];
+ test.setUp();
+ if (argv.length > 1)
+ {
+ test._count = Integer.parseInt(argv[1]);
+ }
+
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MapMessageTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
new file mode 100644
index 0000000000..3a5f676ca6
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+public class MultipleConnectionTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(MultipleConnectionTest.class);
+
+ public static final String _defaultBroker = "vm://:1";
+ public String _connectionString = _defaultBroker;
+
+ private class Receiver
+ {
+ private AMQConnection _connection;
+ private Session[] _sessions;
+ private MessageCounter[] _counters;
+
+ Receiver(String broker, AMQDestination dest, int sessions) throws Exception
+ {
+ this((AMQConnection) getConnection("guest", "guest"), dest, sessions);
+ }
+
+ Receiver(AMQConnection connection, AMQDestination dest, int sessions) throws Exception
+ {
+ _connection = connection;
+ _sessions = new AMQSession[sessions];
+ _counters = new MessageCounter[sessions];
+ for (int i = 0; i < sessions; i++)
+ {
+ _sessions[i] = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _counters[i] = new MessageCounter(_sessions[i].toString());
+ _sessions[i].createConsumer(dest).setMessageListener(_counters[i]);
+ }
+
+ _connection.start();
+ }
+
+ void close() throws JMSException
+ {
+ _connection.close();
+ }
+ }
+
+ private class Publisher
+ {
+ private AMQConnection _connection;
+ private Session _session;
+ private MessageProducer _producer;
+
+ Publisher(String broker, AMQDestination dest) throws Exception
+ {
+ this((AMQConnection) getConnection("guest", "guest"), dest);
+ }
+
+ Publisher(AMQConnection connection, AMQDestination dest) throws Exception
+ {
+ _connection = connection;
+ _session = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _producer = _session.createProducer(dest);
+ }
+
+ void send(String msg) throws JMSException
+ {
+ _producer.send(_session.createTextMessage(msg));
+ }
+
+ void close() throws JMSException
+ {
+ _connection.close();
+ }
+ }
+
+ private static class MessageCounter implements MessageListener
+ {
+ private final String _name;
+ private int _count;
+
+ MessageCounter(String name)
+ {
+ _name = name;
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ _count++;
+ notify();
+ }
+
+ synchronized boolean waitUntil(int expected, long maxWait) throws InterruptedException
+ {
+ long start = System.currentTimeMillis();
+ while (expected > _count)
+ {
+ long timeLeft = maxWait - timeSince(start);
+ if (timeLeft <= 0)
+ {
+ break;
+ }
+
+ wait(timeLeft);
+ }
+
+ return expected <= _count;
+ }
+
+ private long timeSince(long start)
+ {
+ return System.currentTimeMillis() - start;
+ }
+
+ public synchronized String toString()
+ {
+ return _name + ": " + _count;
+ }
+ }
+
+ private static void waitForCompletion(int expected, long wait, Receiver[] receivers) throws InterruptedException
+ {
+ for (int i = 0; i < receivers.length; i++)
+ {
+ waitForCompletion(expected, wait, receivers[i]._counters);
+ }
+ }
+
+ private static void waitForCompletion(int expected, long wait, MessageCounter[] counters) throws InterruptedException
+ {
+ for (int i = 0; i < counters.length; i++)
+ {
+ if (!counters[i].waitUntil(expected, wait))
+ {
+ throw new RuntimeException("Expected: " + expected + " got " + counters[i]);
+ }
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ String broker = (argv.length > 0) ? argv[0] : _defaultBroker;
+
+ MultipleConnectionTest test = new MultipleConnectionTest();
+ test._connectionString = broker;
+ test.test();
+ }
+
+ public void test() throws Exception
+ {
+ String broker = _connectionString;
+ int messages = 10;
+
+ AMQTopic topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "amq.topic");
+
+ Receiver[] receivers = new Receiver[] { new Receiver(broker, topic, 2), new Receiver(broker, topic, 14) };
+
+ Publisher publisher = new Publisher(broker, topic);
+ for (int i = 0; i < messages; i++)
+ {
+ publisher.send("Message " + (i + 1));
+ }
+
+ try
+ {
+ waitForCompletion(messages, 5000, receivers);
+ _logger.info("All receivers received all expected messages");
+ }
+ finally
+ {
+ publisher.close();
+ for (int i = 0; i < receivers.length; i++)
+ {
+ receivers[i].close();
+ }
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MultipleConnectionTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
new file mode 100644
index 0000000000..c8e7368092
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import junit.framework.Assert;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSObjectMessage;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageProducer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class ObjectMessageTest extends QpidBrokerTestCase implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(ObjectMessageTest.class);
+
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private final List<JMSObjectMessage> received = new ArrayList<JMSObjectMessage>();
+ private final List<Payload> messages = new ArrayList<Payload>();
+ private int _count = 100;
+ public String _connectionString = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ try
+ {
+ init( (AMQConnection) getConnection("guest", "guest"));
+ }
+ catch (Exception e)
+ {
+ fail("Uable to initialise: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue(connection, randomize("ObjectMessageTest"), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ // set up a slow consumer
+ _session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+ }
+
+ public void test() throws Exception
+ {
+ int count = _count;
+ send(count);
+ waitFor(count);
+ check();
+ _logger.info("Completed without failure");
+ _connection.close();
+ }
+
+ void send(int count) throws JMSException
+ {
+ // create a publisher
+ MessageProducer producer = _session.createProducer(_destination);
+ for (int i = 0; i < count; i++)
+ {
+ Payload payload = new Payload("Message " + i);
+ messages.add(payload);
+ producer.send(_session.createObjectMessage(payload));
+ }
+ }
+
+ void waitFor(int count) throws InterruptedException
+ {
+ synchronized (received)
+ {
+ long endTime = System.currentTimeMillis() + 30000L;
+ while (received.size() < count)
+ {
+ received.wait(30000);
+ if(received.size() < count && System.currentTimeMillis() > endTime)
+ {
+ throw new RuntimeException("Only received " + received.size() + " messages, was expecting " + count);
+ }
+ }
+ }
+ }
+
+ void check() throws JMSException
+ {
+ List<Object> actual = new ArrayList<Object>();
+ for (JMSObjectMessage m : received)
+ {
+ actual.add(m.getObject());
+
+ try
+ {
+ m.setObject("Test text");
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ // normal execution
+ }
+
+ m.clearBody();
+
+ try
+ {
+ m.setObject("Test text");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+
+ // Check property write status
+ try
+ {
+ m.setStringProperty("test", "test");
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ // normal execution
+ }
+
+ m.clearProperties();
+
+ try
+ {
+ m.setStringProperty("test", "test");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+
+ }
+
+ assertEqual(messages.iterator(), actual.iterator());
+
+ }
+
+ private static void assertEqual(Iterator expected, Iterator actual)
+ {
+ List<String> errors = new ArrayList<String>();
+ while (expected.hasNext() && actual.hasNext())
+ {
+ try
+ {
+ assertEqual(expected.next(), actual.next());
+ }
+ catch (Exception e)
+ {
+ errors.add(e.getMessage());
+ }
+ }
+ while (expected.hasNext())
+ {
+ errors.add("Expected " + expected.next() + " but no more actual values.");
+ }
+ while (actual.hasNext())
+ {
+ errors.add("Found " + actual.next() + " but no more expected values.");
+ }
+
+ if (!errors.isEmpty())
+ {
+ throw new RuntimeException(errors.toString());
+ }
+ }
+
+ private static void assertEqual(Object expected, Object actual)
+ {
+ if (!expected.equals(actual))
+ {
+ throw new RuntimeException("Expected '" + expected + "' found '" + actual + "'");
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ synchronized (received)
+ {
+ received.add((JMSObjectMessage) message);
+ received.notify();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ private static class Payload implements Serializable
+ {
+ private final String data;
+
+ Payload(String data)
+ {
+ this.data = data;
+ }
+
+ public int hashCode()
+ {
+ return data.hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ return (o instanceof Payload) && ((Payload) o).data.equals(data);
+ }
+
+ public String toString()
+ {
+ return "Payload[" + data + "]";
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ ObjectMessageTest test = new ObjectMessageTest();
+ test._connectionString = (argv.length == 0) ? "vm://:1" : argv[0];
+ test.setUp();
+ if (argv.length > 1)
+ {
+ test._count = Integer.parseInt(argv[1]);
+ }
+
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ObjectMessageTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
new file mode 100644
index 0000000000..3b8b4946da
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -0,0 +1,408 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import junit.framework.Assert;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.AMQBindingURL;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.net.URISyntaxException;
+
+import java.lang.reflect.*;
+
+public class PropertyValueTest extends QpidBrokerTestCase implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(PropertyValueTest.class);
+
+ private int count = 0;
+ private AMQConnection _connection;
+ private Destination _destination;
+ private AMQSession _session;
+ private final List<JMSTextMessage> received = new ArrayList<JMSTextMessage>();
+ private final List<String> messages = new ArrayList<String>();
+ private int _count = 1;
+ public String _connectionString = "vm://:1";
+ private static final String USERNAME = "guest";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ Destination destination = new AMQQueue(connection, randomize("PropertyValueTest"), true);
+ init(connection, destination);
+ }
+
+ private void init(AMQConnection connection, Destination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // set up a slow consumer
+ _session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+ }
+
+ private Message getTestMessage() throws Exception
+ {
+ Connection conn = getConnection();
+ Session ssn = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ return ssn.createTextMessage();
+ }
+
+ public void testGetNonexistent() throws Exception
+ {
+ Message m = getTestMessage();
+ String s = m.getStringProperty("nonexistent");
+ assertNull(s);
+ }
+
+ private static final String[] NAMES = {
+ "setBooleanProperty", "setByteProperty", "setShortProperty",
+ "setIntProperty", "setLongProperty", "setFloatProperty",
+ "setDoubleProperty", "setObjectProperty"
+ };
+
+ private static final Class[] TYPES = {
+ boolean.class, byte.class, short.class, int.class, long.class,
+ float.class, double.class, Object.class
+ };
+
+ private static final Object[] VALUES = {
+ true, (byte) 0, (short) 0, 0, (long) 0, (float) 0, (double) 0,
+ new Object()
+ };
+
+ public void testSetEmptyPropertyName() throws Exception
+ {
+ Message m = getTestMessage();
+
+ for (int i = 0; i < NAMES.length; i++)
+ {
+ Method meth = m.getClass().getMethod(NAMES[i], String.class, TYPES[i]);
+ try
+ {
+ meth.invoke(m, "", VALUES[i]);
+ fail("expected illegal argument exception");
+ }
+ catch (InvocationTargetException e)
+ {
+ assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
+ }
+ }
+ }
+
+ public void testSetDisallowedClass() throws Exception
+ {
+ Message m = getTestMessage();
+ try
+ {
+ m.setObjectProperty("foo", new Object());
+ fail("expected a MessageFormatException");
+ }
+ catch (MessageFormatException e)
+ {
+ // pass
+ }
+ }
+
+ public void testOnce()
+ {
+ runBatch(1);
+ }
+
+ public void test50()
+ {
+ runBatch(50);
+ }
+
+ private void runBatch(int runSize)
+ {
+ try
+ {
+ int run = 0;
+ while (run < runSize)
+ {
+ _logger.error("Run Number:" + run++);
+ try
+ {
+ init( (AMQConnection) getConnection("guest", "guest"));
+ }
+ catch (Exception e)
+ {
+ _logger.error("exception:", e);
+ fail("Unable to initialilse connection: " + e);
+ }
+
+ int count = _count;
+ send(count);
+ waitFor(count);
+ check();
+ _logger.info("Completed without failure");
+
+ Thread.sleep(10);
+ _connection.close();
+
+ _logger.error("End Run Number:" + (run - 1));
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error(e.getMessage(), e);
+ e.printStackTrace();
+ }
+ }
+
+ void send(int count) throws JMSException
+ {
+ // create a publisher
+ MessageProducer producer = _session.createProducer(_destination);
+ for (int i = 0; i < count; i++)
+ {
+ String text = "Message " + i;
+ messages.add(text);
+ Message m = _session.createTextMessage(text);
+
+ m.setBooleanProperty("Bool", true);
+
+ m.setByteProperty("Byte", (byte) Byte.MAX_VALUE);
+ m.setDoubleProperty("Double", (double) Double.MAX_VALUE);
+ m.setFloatProperty("Float", (float) Float.MAX_VALUE);
+ m.setIntProperty("Int", (int) Integer.MAX_VALUE);
+
+ m.setJMSCorrelationID("Correlation");
+ // fixme the m.setJMSMessage has no effect
+ producer.setPriority(8);
+ m.setJMSPriority(3);
+
+ // Queue
+ Queue q;
+
+ if ((i / 2) == 0)
+ {
+ q = _session.createTemporaryQueue();
+ }
+ else
+ {
+ q = new AMQQueue(_connection, "TestReply");
+ }
+
+ m.setJMSReplyTo(q);
+ m.setStringProperty("TempQueue", q.toString());
+
+ _logger.debug("Message:" + m);
+
+ Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(),
+ m.getStringProperty("TempQueue"));
+
+ m.setJMSType("Test");
+ m.setLongProperty("UnsignedInt", (long) 4294967295L);
+ m.setLongProperty("Long", (long) Long.MAX_VALUE);
+
+ m.setShortProperty("Short", (short) Short.MAX_VALUE);
+ m.setStringProperty("String", "Test");
+
+ _logger.debug("Sending Msg:" + m);
+ producer.send(m);
+ }
+ }
+
+ void waitFor(int count) throws InterruptedException
+ {
+ synchronized (received)
+ {
+ while (received.size() < count)
+ {
+ received.wait();
+ }
+ }
+ }
+
+ void check() throws JMSException, URISyntaxException
+ {
+ List<String> actual = new ArrayList<String>();
+ for (JMSTextMessage m : received)
+ {
+ actual.add(m.getText());
+
+ // Check Properties
+
+ Assert.assertEquals("Check Boolean properties are correctly transported", true, m.getBooleanProperty("Bool"));
+ Assert.assertEquals("Check Byte properties are correctly transported", (byte) Byte.MAX_VALUE,
+ m.getByteProperty("Byte"));
+ Assert.assertEquals("Check Double properties are correctly transported", (double) Double.MAX_VALUE,
+ m.getDoubleProperty("Double"));
+ Assert.assertEquals("Check Float properties are correctly transported", (float) Float.MAX_VALUE,
+ m.getFloatProperty("Float"));
+ Assert.assertEquals("Check Int properties are correctly transported", (int) Integer.MAX_VALUE,
+ m.getIntProperty("Int"));
+ Assert.assertEquals("Check CorrelationID properties are correctly transported", "Correlation",
+ m.getJMSCorrelationID());
+ Assert.assertEquals("Check Priority properties are correctly transported", 8, m.getJMSPriority());
+
+ // Queue
+ Assert.assertEquals("Check ReplyTo properties are correctly transported", AMQDestination.createDestination(new AMQBindingURL(m.getStringProperty("TempQueue"))),
+ m.getJMSReplyTo());
+
+ Assert.assertEquals("Check Type properties are correctly transported", "Test", m.getJMSType());
+
+ Assert.assertEquals("Check Short properties are correctly transported", (short) Short.MAX_VALUE,
+ m.getShortProperty("Short"));
+ Assert.assertEquals("Check UnsignedInt properties are correctly transported", (long) 4294967295L,
+ m.getLongProperty("UnsignedInt"));
+ Assert.assertEquals("Check Long properties are correctly transported", (long) Long.MAX_VALUE,
+ m.getLongProperty("Long"));
+ Assert.assertEquals("Check String properties are correctly transported", "Test", m.getStringProperty("String"));
+/*
+ // AMQP Tests Specific values
+
+ Assert.assertEquals("Check Timestamp properties are correctly transported", m.getStringProperty("time-str"),
+ ((AMQMessage) m).getTimestampProperty(new AMQShortString("time")).toString());
+
+ // Decimal
+ BigDecimal bd = new BigDecimal(Integer.MAX_VALUE);
+
+ Assert.assertEquals("Check decimal properties are correctly transported", bd.setScale(Byte.MAX_VALUE),
+ ((AMQMessage) m).getDecimalProperty(new AMQShortString("decimal")));
+
+ // Void
+ ((AMQMessage) m).setVoidProperty(new AMQShortString("void"));
+
+ Assert.assertTrue("Check void properties are correctly transported",
+ ((AMQMessage) m).getPropertyHeaders().containsKey("void"));
+*/
+ //JMSXUserID
+ if (m.getStringProperty("JMSXUserID") != null)
+ {
+ Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME,
+ m.getStringProperty("JMSXUserID"));
+ }
+ }
+
+ received.clear();
+
+ assertEqual(messages.iterator(), actual.iterator());
+
+ messages.clear();
+ }
+
+ private static void assertEqual(Iterator expected, Iterator actual)
+ {
+ List<String> errors = new ArrayList<String>();
+ while (expected.hasNext() && actual.hasNext())
+ {
+ try
+ {
+ assertEqual(expected.next(), actual.next());
+ }
+ catch (Exception e)
+ {
+ errors.add(e.getMessage());
+ }
+ }
+ while (expected.hasNext())
+ {
+ errors.add("Expected " + expected.next() + " but no more actual values.");
+ }
+ while (actual.hasNext())
+ {
+ errors.add("Found " + actual.next() + " but no more expected values.");
+ }
+
+ if (!errors.isEmpty())
+ {
+ throw new RuntimeException(errors.toString());
+ }
+ }
+
+ private static void assertEqual(Object expected, Object actual)
+ {
+ if (!expected.equals(actual))
+ {
+ throw new RuntimeException("Expected '" + expected + "' found '" + actual + "'");
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ synchronized (received)
+ {
+ received.add((JMSTextMessage) message);
+ received.notify();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ PropertyValueTest test = new PropertyValueTest();
+ test._connectionString = (argv.length == 0) ? "vm://:1" : argv[0];
+ test.setUp();
+ if (argv.length > 1)
+ {
+ test._count = Integer.parseInt(argv[1]);
+ }
+
+ test.testOnce();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(PropertyValueTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
new file mode 100644
index 0000000000..c257dacf76
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class PubSubTwoConnectionTest extends QpidBrokerTestCase
+{
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ /**
+ * This tests that a consumer is set up synchronously
+ * @throws Exception
+ */
+ public void testTwoConnections() throws Exception
+ {
+
+ AMQConnection con1 = (AMQConnection) getConnection("guest", "guest");
+
+ Topic topic = new AMQTopic(con1, "MyTopic");
+
+ Session session1 = con1.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ MessageProducer producer = session1.createProducer(topic);
+
+ Connection con2 = (AMQConnection) getConnection("guest", "guest") ;
+ Session session2 = con2.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ MessageConsumer consumer = session2.createConsumer(topic);
+ con2.start();
+ producer.send(session1.createTextMessage("Hello"));
+ TextMessage tm1 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm1);
+ assertEquals("Hello", tm1.getText());
+ con1.close();
+ con2.close();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
new file mode 100644
index 0000000000..bc44617620
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Message;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ReceiveTest extends QpidBrokerTestCase
+{
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private MessageConsumer _consumer;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ init((AMQConnection) getConnection("guest", "guest"));
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue(connection,"ReceiveTest", true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(true, AMQSession.NO_ACKNOWLEDGE);
+ _consumer = _session.createConsumer(_destination);
+ _connection.start();
+ }
+
+ public void test() throws Exception
+ {
+ Message m = _consumer.receive(5000);
+ assertNull("should not have received a message", m);
+ _connection.close();
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ // TODO: note that this test doesn't use the VMBrokerSetup
+ // test helper class to create and tear down its
+ // VMBroker. This is because the main() above seems to
+ // indicate that it's also used outside of the surefire test
+ // framework. If it isn't, then this test should also be
+ // changed to use VMBrokerSetup here.
+ return new junit.framework.TestSuite(ReceiveTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SessionStartTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
new file mode 100644
index 0000000000..ee837fd41a
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+public class SessionStartTest extends QpidBrokerTestCase implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(SessionStartTest.class);
+
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private int count;
+ public String _connectionString = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ init((AMQConnection) getConnection("guest", "guest"));
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ init(connection,
+ new AMQQueue(connection.getDefaultQueueExchangeName(), new AMQShortString(randomize("SessionStartTest")), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ connection.start();
+
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _session.createConsumer(destination).setMessageListener(this);
+ }
+
+ public synchronized void test() throws JMSException, InterruptedException
+ {
+ try
+ {
+ _session.createProducer(_destination).send(_session.createTextMessage("Message"));
+ _logger.info("Message sent, waiting for response...");
+ wait(1000);
+ if (count > 0)
+ {
+ _logger.info("Got message");
+ }
+ else
+ {
+ throw new RuntimeException("Did not get message!");
+ }
+ }
+ finally
+ {
+ _session.close();
+ _connection.close();
+ }
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ count++;
+ notify();
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ SessionStartTest test = new SessionStartTest();
+ test._connectionString = (argv.length == 0) ? "localhost:5672" : argv[0];
+ test.setUp();
+ test.test();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
new file mode 100644
index 0000000000..a87de8ac0c
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
@@ -0,0 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import junit.framework.Assert;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class TextMessageTest extends QpidBrokerTestCase implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(TextMessageTest.class);
+
+ private AMQConnection _connection;
+ private Destination _destination;
+ private AMQSession _session;
+ private final List<JMSTextMessage> received = new ArrayList<JMSTextMessage>();
+ private final List<String> messages = new ArrayList<String>();
+ private int _count = 100;
+ public String _connectionString = "vm://:1";
+ private CountDownLatch _waitForCompletion;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ try
+ {
+ init((AMQConnection) getConnection("guest", "guest"));
+ }
+ catch (Exception e)
+ {
+ fail("Unable to initialilse connection: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ Destination destination =
+ new AMQQueue(connection.getDefaultQueueExchangeName(), new AMQShortString(randomize("TextMessageTest")), true);
+ init(connection, destination);
+ }
+
+ private void init(AMQConnection connection, Destination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // set up a slow consumer
+ try
+ {
+ _session.createConsumer(destination).setMessageListener(this);
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ }
+ connection.start();
+ }
+
+ public void test() throws Exception
+ {
+ int count = _count;
+ _waitForCompletion = new CountDownLatch(_count);
+ send(count);
+ _waitForCompletion.await(20, TimeUnit.SECONDS);
+ check();
+ _logger.info("Completed without failure");
+ _connection.close();
+ }
+
+ void send(int count) throws JMSException
+ {
+ // create a publisher
+ MessageProducer producer = _session.createProducer(_destination);
+ for (int i = 0; i < count; i++)
+ {
+ String text = "Message " + i;
+ messages.add(text);
+ Message m = _session.createTextMessage(text);
+ //m.setStringProperty("String", "hello");
+
+ _logger.info("Sending Msg:" + m);
+ producer.send(m);
+ }
+ _logger.info("sent " + count + " mesages");
+ }
+
+
+ void check() throws JMSException
+ {
+ List<String> actual = new ArrayList<String>();
+ for (JMSTextMessage m : received)
+ {
+ actual.add(m.getText());
+
+ // Check body write status
+ try
+ {
+ m.setText("Test text");
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ // normal execution
+ }
+
+ m.clearBody();
+
+ try
+ {
+ m.setText("Test text");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+
+ // Check property write status
+ try
+ {
+ m.setStringProperty("test", "test");
+ Assert.fail("Message should not be writeable");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ // normal execution
+ }
+
+ m.clearProperties();
+
+ try
+ {
+ m.setStringProperty("test", "test");
+ }
+ catch (MessageNotWriteableException mnwe)
+ {
+ Assert.fail("Message should be writeable");
+ }
+
+ }
+
+ assertEqual(messages.iterator(), actual.iterator());
+ }
+
+ private static void assertEqual(Iterator expected, Iterator actual)
+ {
+ List<String> errors = new ArrayList<String>();
+ while (expected.hasNext() && actual.hasNext())
+ {
+ try
+ {
+ assertEqual(expected.next(), actual.next());
+ }
+ catch (Exception e)
+ {
+ errors.add(e.getMessage());
+ }
+ }
+ while (expected.hasNext())
+ {
+ errors.add("Expected " + expected.next() + " but no more actual values.");
+ }
+ while (actual.hasNext())
+ {
+ errors.add("Found " + actual.next() + " but no more expected values.");
+ }
+
+ if (!errors.isEmpty())
+ {
+ throw new RuntimeException(errors.toString());
+ }
+ }
+
+ private static void assertEqual(Object expected, Object actual)
+ {
+ if (!expected.equals(actual))
+ {
+ throw new RuntimeException("Expected '" + expected + "' found '" + actual + "'");
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ synchronized (received)
+ {
+ _logger.info("===== received one message");
+ received.add((JMSTextMessage) message);
+ _waitForCompletion.countDown();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TextMessageTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java
new file mode 100644
index 0000000000..c6b8069300
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.basic.close;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.url.AMQBindingURL;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+public class CloseTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(CloseTest.class);
+
+ public void testCloseQueueReceiver() throws Exception
+ {
+ AMQConnection connection = (AMQConnection) getConnection("guest", "guest");
+
+ Session session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ AMQQueue queue = new AMQQueue(new AMQBindingURL("test-queue"));
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageProducer producer_not_used_but_created_for_testing = session.createProducer(queue);
+
+ connection.start();
+
+ _logger.info("About to close consumer");
+
+ consumer.close();
+
+ _logger.info("Closed Consumer");
+ connection.close();
+ }
+}