summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/client/message')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java373
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java257
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java141
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java310
4 files changed, 1081 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
new file mode 100644
index 0000000000..a7efe4922b
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
@@ -0,0 +1,373 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.message;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.TabularData;
+import java.nio.BufferOverflowException;
+import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * From the API Docs getJMSDestination:
+ *
+ * When a message is received, its JMSDestination value must be equivalent to
+ * the value assigned when it was sent.
+ */
+public class JMSDestinationTest extends QpidBrokerTestCase
+{
+
+ private Connection _connection;
+ private Session _session;
+
+ private static final String USER = "admin";
+ private CountDownLatch _receiveMessage;
+ private Message _message;
+
+ public void setUp() throws Exception
+ {
+ //Ensure JMX management is enabled for MovedToQueue test
+ setConfigurationProperty("management.enabled", "true");
+
+ super.setUp();
+
+ _connection = getConnection();
+
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ /**
+ * Test a message sent to a queue comes back with JMSDestination queue
+ *
+ * @throws Exception
+ */
+ public void testQueue() throws Exception
+ {
+
+ Queue queue = _session.createQueue(getTestQueueName());
+
+ MessageConsumer consumer = _session.createConsumer(queue);
+
+ sendMessage(_session, queue, 1);
+
+ _connection.start();
+
+ Message message = consumer.receive(10000);
+
+ assertNotNull("Message should not be null", message);
+
+ Destination destination = message.getJMSDestination();
+
+ assertNotNull("JMSDestination should not be null", destination);
+
+ assertEquals("Incorrect Destination type", queue.getClass(), destination.getClass());
+ }
+
+ /**
+ * Test a message sent to a topic comes back with JMSDestination topic
+ *
+ * @throws Exception
+ */
+ public void testTopic() throws Exception
+ {
+
+ Topic topic = _session.createTopic(getTestQueueName() + "Topic");
+
+ MessageConsumer consumer = _session.createConsumer(topic);
+
+ sendMessage(_session, topic, 1);
+
+ _connection.start();
+
+ Message message = consumer.receive(10000);
+
+ assertNotNull("Message should not be null", message);
+
+ Destination destination = message.getJMSDestination();
+
+ assertNotNull("JMSDestination should not be null", destination);
+
+ assertEquals("Incorrect Destination type", topic.getClass(), destination.getClass());
+ }
+
+ /**
+ * Test a message sent to a topic then moved on the broker
+ * comes back with JMSDestination queue.
+ *
+ * i.e. The client is not just setting the value to be the same as the
+ * current consumer destination.
+ *
+ * This test can only be run against the Java broker as it uses JMX to move
+ * messages between queues.
+ *
+ * @throws Exception
+ */
+ public void testMovedToQueue() throws Exception
+ {
+ // Setup JMXUtils
+ JMXTestUtils jmxUtils = new JMXTestUtils(this, USER, USER);
+ jmxUtils.setUp();
+ // Open the JMX Connection
+ jmxUtils.open();
+ try
+ {
+
+ Queue queue = _session.createQueue(getTestQueueName());
+
+ _session.createConsumer(queue).close();
+
+ sendMessage(_session, queue, 1);
+
+ Topic topic = _session.createTopic(getTestQueueName() + "Topic");
+
+ MessageConsumer consumer = _session.createConsumer(topic);
+
+ // Use Management to move message.
+
+ ManagedQueue managedQueue = jmxUtils.
+ getManagedObject(ManagedQueue.class,
+ jmxUtils.getQueueObjectName(getConnectionFactory().getVirtualPath().substring(1),
+ getTestQueueName()));
+
+ // Find the first message on the queue
+ TabularData data = managedQueue.viewMessages(1L, 2L);
+
+ Iterator values = data.values().iterator();
+ assertTrue("No Messages found via JMX", values.hasNext());
+
+ // Get its message ID
+ Long msgID = (Long) ((CompositeDataSupport) values.next()).get("AMQ MessageId");
+
+ // Start the connection and consume message that has been moved to the
+ // queue
+ _connection.start();
+
+ Message message = consumer.receive(1000);
+
+ //Validate we don't have a message on the queue before we start
+ assertNull("Message should be null", message);
+
+ // Move it to from the topic to the queue
+ managedQueue.moveMessages(msgID, msgID, ((AMQTopic) topic).getQueueName());
+
+ // Retrieve the newly moved message
+ message = consumer.receive(1000);
+
+ assertNotNull("Message should not be null", message);
+
+ Destination destination = message.getJMSDestination();
+
+ assertNotNull("JMSDestination should not be null", destination);
+
+ assertEquals("Incorrect Destination type", queue.getClass(), destination.getClass());
+
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+
+ }
+
+ /**
+ * Test a message sent to a queue comes back with JMSDestination queue
+ * when received via a message listener
+ *
+ * @throws Exception
+ */
+ public void testQueueAsync() throws Exception
+ {
+
+ Queue queue = _session.createQueue(getTestQueueName());
+
+ MessageConsumer consumer = _session.createConsumer(queue);
+
+ sendMessage(_session, queue, 1);
+
+ _connection.start();
+
+ _message = null;
+ _receiveMessage = new CountDownLatch(1);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _message = message;
+ _receiveMessage.countDown();
+ }
+ });
+
+ assertTrue("Timed out waiting for message to be received ", _receiveMessage.await(1, TimeUnit.SECONDS));
+
+ assertNotNull("Message should not be null", _message);
+
+ Destination destination = _message.getJMSDestination();
+
+ assertNotNull("JMSDestination should not be null", destination);
+
+ assertEquals("Incorrect Destination type", queue.getClass(), destination.getClass());
+ }
+
+ /**
+ * Test a message received without the JMS_QPID_DESTTYPE can be resent
+ * and correctly have the property set.
+ *
+ * To do this we need to create a 0-10 connection and send a message
+ * which is then received by a 0-8/9 client.
+ *
+ * @throws Exception
+ */
+ public void testReceiveResend() throws Exception
+ {
+ // Create a 0-10 Connection and send message
+ setSystemProperty(ClientProperties.AMQP_VERSION, "0-10");
+
+ Connection connection010 = getConnection();
+
+ Session session010 = connection010.createSession(true, Session.SESSION_TRANSACTED);
+
+ // Create queue for testing
+ Queue queue = session010.createQueue(getTestQueueName());
+
+ // Ensure queue exists
+ session010.createConsumer(queue).close();
+
+ sendMessage(session010, queue, 1);
+
+ // Close the 010 connection
+ connection010.close();
+
+ // Create a 0-8 Connection and receive message
+ setSystemProperty(ClientProperties.AMQP_VERSION, "0-8");
+
+ Connection connection08 = getConnection();
+
+ Session session08 = connection08.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer = session08.createConsumer(queue);
+
+ connection08.start();
+
+ Message message = consumer.receive(1000);
+
+ assertNotNull("Didn't receive 0-10 message.", message);
+
+ // Validate that JMS_QPID_DESTTYPE is not set
+ try
+ {
+ message.getIntProperty(CustomJMSXProperty.JMS_QPID_DESTTYPE.toString());
+ fail("JMS_QPID_DESTTYPE should not be set, so should throw NumberFormatException");
+ }
+ catch (NumberFormatException nfe)
+ {
+
+ }
+
+ // Resend message back to queue and validate that
+ // a) getJMSDestination works without the JMS_QPID_DESTTYPE
+ // b) we can actually send without a BufferOverFlow.
+
+ MessageProducer producer = session08.createProducer(queue);
+
+ try
+ {
+ producer.send(message);
+ }
+ catch (BufferOverflowException bofe)
+ {
+ // Print the stack trace so we can validate where the execption occured.
+ bofe.printStackTrace();
+ fail("BufferOverflowException thrown during send");
+ }
+
+ message = consumer.receive(1000);
+
+ assertNotNull("Didn't receive recent 0-8 message.", message);
+
+ // Validate that JMS_QPID_DESTTYPE is not set
+ assertEquals("JMS_QPID_DESTTYPE should be set to a Queue", AMQDestination.QUEUE_TYPE,
+ message.getIntProperty(CustomJMSXProperty.JMS_QPID_DESTTYPE.toString()));
+
+ }
+
+ /**
+ * Send a message to a custom exchange and then verify
+ * the message received has the proper destination set
+ *
+ * @throws Exception
+ */
+ public void testGetDestinationWithCustomExchange() throws Exception
+ {
+
+ AMQDestination dest = new AMQAnyDestination(new AMQShortString("my-exchange"),
+ new AMQShortString("direct"),
+ new AMQShortString("test"),
+ false,
+ false,
+ new AMQShortString("test"),
+ false,
+ new AMQShortString[]{new AMQShortString("test")});
+
+ // to force the creation of my-exchange.
+ sendMessage(_session, dest, 1);
+
+ MessageProducer prod = _session.createProducer(dest);
+
+ MessageConsumer consumer = _session.createConsumer(dest);
+
+ _connection.start();
+
+ sendMessage(_session, dest, 1);
+
+ Message message = consumer.receive(10000);
+
+ assertNotNull("Message should not be null", message);
+
+ Destination destination = message.getJMSDestination();
+
+ assertNotNull("JMSDestination should not be null", destination);
+
+ assertEquals("Incorrect Destination name", "my-exchange", dest.getExchangeName().asString());
+ assertEquals("Incorrect Destination type", "direct", dest.getExchangeClass().asString());
+ assertEquals("Incorrect Routing Key", "test", dest.getRoutingKey().asString());
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
new file mode 100644
index 0000000000..1071861d47
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
@@ -0,0 +1,257 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.message;
+
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.UUID;
+
+public class MessageToStringTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+ private Session _session;
+ private Queue _queue;
+ MessageConsumer _consumer;
+ private static final String BYTE_TEST = "MapByteTest";
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ //Create Producer put some messages on the queue
+ _connection = getConnection();
+
+ //Create Consumer
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String queueName = getTestQueueName();
+
+ //Create Queue
+ ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, false, false);
+ _queue = _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+
+
+ _consumer = _session.createConsumer(_queue);
+
+ _connection.start();
+ }
+
+ public void tearDown() throws Exception
+ {
+ //clean up
+ _connection.close();
+
+ super.tearDown();
+ }
+
+ public void testBytesMessage() throws JMSException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ BytesMessage testMessage = _session.createBytesMessage();
+
+ //Convert UUID into bytes for transit
+ byte[] testBytes = test.toString().getBytes();
+
+ testMessage.writeBytes(testBytes);
+
+ sendAndTest(testMessage, testBytes);
+ }
+
+ public void testMapMessage() throws JMSException, IOException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ MapMessage testMessage = _session.createMapMessage();
+
+ byte[] testBytes = convertToBytes(test);
+
+ testMessage.setBytes(BYTE_TEST, testBytes);
+
+ sendAndTest(testMessage, testBytes);
+ }
+
+ public void testObjectMessage() throws JMSException
+ {
+ MessageProducer producer = _session.createProducer(_queue);
+
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ Message testMessage = _session.createObjectMessage(test);
+
+ sendAndTest(testMessage, test);
+ }
+
+ public void testStreamMessage() throws JMSException, IOException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ StreamMessage testMessage = _session.createStreamMessage();
+
+ byte[] testBytes = convertToBytes(test);
+
+ testMessage.writeBytes(testBytes);
+
+ sendAndTest(testMessage, testBytes);
+ }
+
+ public void testTextMessage() throws JMSException, IOException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ TextMessage testMessage = _session.createTextMessage();
+
+ String stringValue = String.valueOf(test);
+ byte[] testBytes = stringValue.getBytes();
+
+ testMessage.setText(stringValue);
+
+ sendAndTest(testMessage, testBytes);
+ }
+
+ //***************** Helpers
+
+ private void sendAndTest(Message message, Object testBytes) throws JMSException
+ {
+ MessageProducer producer = _session.createProducer(_queue);
+
+ producer.send(message);
+
+ Message receivedMessage = _consumer.receive(1000);
+
+ assertNotNull("Message was not received.", receivedMessage);
+
+ //Ensure that to calling toString doesn't error and that doing this doesn't break next tests.
+ assertNotNull("Message returned null from toString", receivedMessage.toString());
+
+ byte[] byteResults;
+ UUID result;
+
+ try
+ {
+ if (receivedMessage instanceof ObjectMessage)
+ {
+ result = (UUID) ((ObjectMessage) receivedMessage).getObject();
+ assertEquals("UUIDs were not equal", testBytes, result);
+ }
+ else
+ {
+ byteResults = getBytes(receivedMessage, ((byte[]) testBytes).length);
+ assertBytesEquals("UUIDs were not equal", (byte[]) testBytes, byteResults);
+ }
+ }
+ catch (Exception e)
+ {
+ fail(e.getMessage());
+ }
+
+ }
+
+ private void assertBytesEquals(String message, byte[] expected, byte[] actual)
+ {
+ if (expected.length == actual.length)
+ {
+ int index = 0;
+ boolean failed = false;
+ for (byte b : expected)
+ {
+ if (actual[index++] != b)
+ {
+ failed = true;
+ break;
+ }
+ }
+
+ if (!failed)
+ {
+ return;
+ }
+
+ }
+
+ fail(message);
+ }
+
+ private byte[] getBytes(Message receivedMessage, int testBytesLength) throws JMSException
+ {
+ byte[] byteResults = new byte[testBytesLength];
+
+ if (receivedMessage instanceof BytesMessage)
+ {
+ assertEquals(testBytesLength, ((BytesMessage) receivedMessage).readBytes(byteResults));
+ }
+ else if (receivedMessage instanceof StreamMessage)
+ {
+ assertEquals(testBytesLength, ((StreamMessage) receivedMessage).readBytes(byteResults));
+ }
+ else if (receivedMessage instanceof MapMessage)
+ {
+ byteResults = ((MapMessage) receivedMessage).getBytes(BYTE_TEST);
+ assertEquals(testBytesLength, byteResults.length);
+ }
+ else if (receivedMessage instanceof TextMessage)
+ {
+ byteResults = ((TextMessage) receivedMessage).getText().getBytes();
+ assertEquals(testBytesLength, byteResults.length);
+ }
+
+
+ return byteResults;
+ }
+
+ private byte[] convertToBytes(UUID test) throws IOException
+ {
+ //Convert UUID into bytes for transit
+ ObjectOutput out;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ out = new ObjectOutputStream(bos);
+ out.writeObject(test);
+ out.close();
+
+ return bos.toByteArray();
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
new file mode 100644
index 0000000000..147a03be0c
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.message;
+
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.UUID;
+
+public class ObjectMessageTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+ private Session _session;
+ MessageConsumer _consumer;
+ MessageProducer _producer;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ //Create Connection
+ _connection = getConnection();
+
+
+ //Create Session
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Create Queue
+ String queueName = getTestQueueName();
+ ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, false, false);
+ Queue queue = _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+
+ //Create Consumer
+ _consumer = _session.createConsumer(queue);
+
+ //Create Producer
+ _producer = _session.createProducer(queue);
+
+ _connection.start();
+ }
+
+ public void tearDown() throws Exception
+ {
+ //clean up
+ _connection.close();
+
+ super.tearDown();
+ }
+
+ public void testGetAndSend() throws JMSException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ ObjectMessage testMessage = _session.createObjectMessage(test);
+
+ Object o = testMessage.getObject();
+
+ assertNotNull("Object was null", o);
+
+ sendAndTest(testMessage, test);
+ }
+
+ public void testSend() throws JMSException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ ObjectMessage testMessage = _session.createObjectMessage(test);
+
+ sendAndTest(testMessage, test);
+ }
+
+ public void testTostringAndSend() throws JMSException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ ObjectMessage testMessage = _session.createObjectMessage(test);
+
+ assertNotNull("Object was null", testMessage.toString());
+
+ sendAndTest(testMessage, test);
+ }
+
+ public void testSendNull() throws JMSException
+ {
+
+ ObjectMessage testMessage = _session.createObjectMessage(null);
+
+ assertNotNull("Object was null", testMessage.toString());
+
+ sendAndTest(testMessage, null);
+ }
+
+ //***************** Helpers
+
+ private void sendAndTest(ObjectMessage message, Object sent) throws JMSException
+ {
+ _producer.send(message);
+
+ ObjectMessage receivedMessage = (ObjectMessage) _consumer.receive(1000);
+
+ assertNotNull("Message was not received.", receivedMessage);
+
+ UUID result = (UUID) receivedMessage.getObject();
+
+ assertEquals("First read: UUIDs were not equal", sent, result);
+
+ result = (UUID) receivedMessage.getObject();
+
+ assertEquals("Second read: UUIDs were not equal", sent, result);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
new file mode 100644
index 0000000000..49a608190d
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
@@ -0,0 +1,310 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.message;
+
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.DeliveryMode;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SelectorTest extends QpidBrokerTestCase implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(SelectorTest.class);
+
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private int count;
+ public String _connectionString = "vm://:1";
+ private static final String INVALID_SELECTOR = "Cost LIKE 5";
+ CountDownLatch _responseLatch = new CountDownLatch(1);
+
+ private static final String BAD_MATHS_SELECTOR = " 1 % 5";
+
+ private static final long RECIEVE_TIMEOUT = 1000;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ init((AMQConnection) getConnection("guest", "guest"));
+ }
+
+ private void init(AMQConnection connection) throws JMSException
+ {
+ init(connection, new AMQQueue(connection, getTestQueueName(), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws JMSException
+ {
+ _connection = connection;
+ _destination = destination;
+ connection.start();
+ }
+
+ public void onMessage(Message message)
+ {
+ count++;
+ _logger.info("Got Message:" + message);
+ _responseLatch.countDown();
+ }
+
+ public void testUsingOnMessage() throws Exception
+ {
+ String selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
+ // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+
+ Session session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ // _session.createConsumer(destination).setMessageListener(this);
+ session.createConsumer(_destination, selector).setMessageListener(this);
+
+ try
+ {
+ Message msg = session.createTextMessage("Message");
+ msg.setJMSPriority(1);
+ msg.setIntProperty("Cost", 2);
+ msg.setStringProperty("property-with-hyphen", "wibble");
+ msg.setJMSType("Special");
+
+ _logger.info("Sending Message:" + msg);
+
+ ((BasicMessageProducer) session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT);
+ _logger.info("Message sent, waiting for response...");
+
+ _responseLatch.await();
+
+ if (count > 0)
+ {
+ _logger.info("Got message");
+ }
+
+ if (count == 0)
+ {
+ fail("Did not get message!");
+ // throw new RuntimeException("Did not get message!");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ System.out.println("SUCCESS!!");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ }
+
+ }
+
+ public void testUnparsableSelectors() throws Exception
+ {
+ AMQSession session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ boolean caught = false;
+
+ //Try Creating a Browser
+ try
+ {
+ session.createBrowser(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ //Try Creating a Consumer
+ try
+ {
+ session.createConsumer(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ //Try Creating a Receiever
+ try
+ {
+ session.createReceiver(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ try
+ {
+ session.createReceiver(session.createQueue("Ping"), BAD_MATHS_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ }
+
+ public void testRuntimeSelectorError() throws JMSException
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(_destination , "testproperty % 5 = 1");
+ MessageProducer producer = session.createProducer(_destination);
+ Message sentMsg = session.createTextMessage();
+
+ sentMsg.setIntProperty("testproperty", 1); // 1 % 5
+ producer.send(sentMsg);
+ Message recvd = consumer.receive(RECIEVE_TIMEOUT);
+ assertNotNull(recvd);
+
+ sentMsg.setStringProperty("testproperty", "hello"); // "hello" % 5 makes no sense
+ producer.send(sentMsg);
+ try
+ {
+ recvd = consumer.receive(RECIEVE_TIMEOUT);
+ assertNull(recvd);
+ }
+ catch (Exception e)
+ {
+
+ }
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+ public void testSelectorWithJMSMessageID() throws Exception
+ {
+ Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer prod = session.createProducer(_destination);
+ MessageConsumer consumer = session.createConsumer(_destination,"JMSMessageID IS NOT NULL");
+
+ for (int i=0; i<2; i++)
+ {
+ Message msg = session.createTextMessage("Msg" + String.valueOf(i));
+ prod.send(msg);
+ }
+ session.commit();
+
+ Message msg1 = consumer.receive(1000);
+ Message msg2 = consumer.receive(1000);
+
+ Assert.assertNotNull("Msg1 should not be null", msg1);
+ Assert.assertNotNull("Msg2 should not be null", msg2);
+
+ session.commit();
+
+ prod.setDisableMessageID(true);
+
+ for (int i=0; i<2; i++)
+ {
+ Message msg = session.createTextMessage("Msg" + String.valueOf(i));
+ prod.send(msg);
+ }
+
+ session.commit();
+ Message msg3 = consumer.receive(1000);
+ Assert.assertNull("Msg3 should be null", msg3);
+ session.commit();
+ consumer = session.createConsumer(_destination,"JMSMessageID IS NULL");
+
+ Message msg4 = consumer.receive(1000);
+ Message msg5 = consumer.receive(1000);
+ session.commit();
+ Assert.assertNotNull("Msg4 should not be null", msg4);
+ Assert.assertNotNull("Msg5 should not be null", msg5);
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ SelectorTest test = new SelectorTest();
+ test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0];
+
+ try
+ {
+ while (true)
+ {
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.setUp();
+ }
+ test.testUsingOnMessage();
+
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.tearDown();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+}