summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/client
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/java/systests/src/main/java/org/apache/qpid/client
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-66765100f4257159622cefe57bed50125a5ad017.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/client')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java145
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/AMQTestConnection_0_10.java34
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java238
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java44
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java253
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java261
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java82
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java229
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java63
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java283
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java236
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java181
12 files changed, 2049 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
new file mode 100644
index 0000000000..ca10126aa7
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase
+{
+
+ private static final int NUM_MESSAGES = 1000;
+
+ private Connection con;
+ private Session session;
+ private AMQQueue queue;
+ private MessageConsumer consumer;
+
+ private static final Logger _logger = LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
+
+ private ASyncProducer producerThread;
+
+ private class ASyncProducer extends Thread
+ {
+
+ private MessageProducer producer;
+ private final Logger _logger = LoggerFactory.getLogger(ASyncProducer.class);
+ private Session session;
+ private int start;
+ private int end;
+
+ public ASyncProducer(AMQQueue q, int start, int end) throws Exception
+ {
+ this.session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ this._logger.info("Create Consumer of Q1");
+ this.producer = this.session.createProducer(q);
+ this.start = start;
+ this.end = end;
+ }
+
+ public void run()
+ {
+ try
+ {
+ this._logger.info("Starting to send messages");
+ for (int i = start; i < end && !interrupted(); i++)
+ {
+ producer.send(session.createTextMessage(Integer.toString(i)));
+ }
+ this._logger.info("Sent " + (end - start) + " messages");
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _logger.info("Create Connection");
+ con = getConnection();
+ _logger.info("Create Session");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _logger.info("Create Q");
+ queue = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString("Q"), new AMQShortString("Q"),
+ false, true);
+ _logger.info("Create Consumer of Q");
+ consumer = session.createConsumer(queue);
+ _logger.info("Start Connection");
+ con.start();
+ }
+
+ public void testPausedOrder() throws Exception
+ {
+
+ // Setup initial messages
+ _logger.info("Creating first producer thread");
+ producerThread = new ASyncProducer(queue, 0, NUM_MESSAGES / 2);
+ producerThread.start();
+ // Wait for them to be done
+ producerThread.join();
+
+ // Setup second set of messages to produce while we consume
+ _logger.info("Creating second producer thread");
+ producerThread = new ASyncProducer(queue, NUM_MESSAGES / 2, NUM_MESSAGES);
+ producerThread.start();
+
+ // Start consuming and checking they're in order
+ _logger.info("Consuming messages");
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message msg = consumer.receive(3000);
+ assertNotNull("Message should not be null", msg);
+ assertTrue("Message should be a text message", msg instanceof TextMessage);
+ assertEquals("Message content does not match expected", Integer.toString(i), ((TextMessage) msg).getText());
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _logger.info("Interuptting producer thread");
+ producerThread.interrupt();
+ _logger.info("Closing connection");
+ con.close();
+
+ super.tearDown();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(AMQQueueDeferredOrderingTest.class);
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQTestConnection_0_10.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQTestConnection_0_10.java
new file mode 100644
index 0000000000..09a03a17a0
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQTestConnection_0_10.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.transport.Connection;
+
+public class AMQTestConnection_0_10 extends AMQConnection
+{
+ public AMQTestConnection_0_10(String url) throws Exception
+ {
+ super(url);
+ }
+
+ public Connection getConnection()
+ {
+ return((AMQConnectionDelegate_0_10)_delegate).getQpidConnection();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java
new file mode 100644
index 0000000000..a8a23c2c41
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class DispatcherTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(DispatcherTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 6;
+ private int _receivedCount = 0;
+ private int _receivedCountWhileStopped = 0;
+ private Connection _clientConnection, _producerConnection;
+ private MessageConsumer _consumer;
+ MessageProducer _producer;
+ Session _clientSession, _producerSession;
+
+ private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1); // all messages Sent Lock
+ private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1); // all messages Sent Lock
+
+ private volatile boolean _connectionStopped = false;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ // Create Client 1
+ _clientConnection = getConnection();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = _clientSession.createQueue(this.getClass().getName());
+ _consumer = _clientSession.createConsumer(queue);
+
+ // Create Producer
+ _producerConnection = getConnection();
+
+ _producerConnection.start();
+
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _producer = _producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(_producerSession.createTextMessage("Message " + msg));
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+
+ _clientConnection.close();
+
+ _producerConnection.close();
+ super.tearDown();
+ }
+
+ public void testAsynchronousRecieve()
+ {
+ _logger.info("Test Start");
+
+ assertTrue(!((AMQConnection) _clientConnection).started());
+
+ // Set default Message Listener
+ try
+ {
+ _consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message);
+
+ _receivedCount++;
+
+ if (_receivedCount == MSG_COUNT)
+ {
+ _allFirstMessagesSent.countDown();
+ }
+
+ if (_connectionStopped)
+ {
+ _logger.info("Running with Message:" + _receivedCount);
+ }
+
+ if (_connectionStopped && (_allFirstMessagesSent.getCount() == 0))
+ {
+ _receivedCountWhileStopped++;
+ }
+
+ if (_allFirstMessagesSent.getCount() == 0)
+ {
+ if (_receivedCount == (MSG_COUNT * 2))
+ {
+ _allSecondMessagesSent.countDown();
+ }
+ }
+ }
+ });
+
+ assertTrue("Connecion should not be started", !((AMQConnection) _clientConnection).started());
+ _clientConnection.start();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Default ML on consumer1");
+ }
+
+ try
+ {
+ _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+
+ try
+ {
+ assertTrue("Connecion should be started", ((AMQConnection) _clientConnection).started());
+ _clientConnection.stop();
+ _connectionStopped = true;
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error stopping connection");
+ }
+
+ try
+ {
+ _logger.error("Send additional messages");
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(_producerSession.createTextMessage("Message " + msg));
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Unable to send additional messages", e);
+ }
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+
+ try
+ {
+ _logger.info("Restarting connection");
+
+ _connectionStopped = false;
+ _clientConnection.start();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Better ML on consumer1", e);
+ }
+
+ _logger.info("Waiting upto 2 seconds for messages");
+
+ try
+ {
+ _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+
+ assertEquals("Messages not received correctly", 0, _allFirstMessagesSent.getCount());
+ assertEquals("Messages not received correctly", 0, _allSecondMessagesSent.getCount());
+ assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount);
+ assertEquals("Messages received while stopped is not 0", 0, _receivedCountWhileStopped);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(DispatcherTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
new file mode 100644
index 0000000000..7461f6c200
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerMultiConsumerImmediatePrefetch extends MessageListenerMultiConsumerTest
+{
+ protected void setUp() throws Exception
+ {
+ System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true");
+ super.setUp();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerMultiConsumerImmediatePrefetch.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
new file mode 100644
index 0000000000..ca83b99120
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.Context;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.UUID;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerMultiConsumerTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(MessageListenerMultiConsumerTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 6;
+ private int receivedCount1 = 0;
+ private int receivedCount2 = 0;
+ private Connection _clientConnection;
+ private MessageConsumer _consumer1;
+ private MessageConsumer _consumer2;
+ private Session _clientSession1;
+ private Queue _queue;
+ private final CountDownLatch _allMessagesSent = new CountDownLatch(2); // all messages Sent Lock
+ private static final String QUEUE_NAME = "queue" + UUID.randomUUID().toString();
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ // Create Client 1
+ _clientConnection = getConnection("guest", "guest");
+
+ _clientConnection.start();
+
+ _clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _queue =_clientSession1.createQueue(QUEUE_NAME);
+
+ _consumer1 = _clientSession1.createConsumer(_queue);
+
+ // Create Client 2
+ Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer2 = clientSession2.createConsumer(_queue);
+
+ // Create Producer
+ Connection producerConnection = getConnection("guest", "guest");
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(producerSession.createTextMessage("Message " + msg));
+ }
+
+ producerConnection.close();
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _clientConnection.close();
+ super.tearDown();
+ }
+
+ public void testRecieveInterleaved() throws Exception
+ {
+ int msg = 0;
+ int MAX_LOOPS = MSG_COUNT * 2;
+ for (int loops = 0; (msg < MSG_COUNT) || (loops < MAX_LOOPS); loops++)
+ {
+
+ if (_consumer1.receive(1000) != null)
+ {
+ msg++;
+ }
+
+ if (_consumer2.receive(1000) != null)
+ {
+ msg++;
+ }
+ }
+
+ assertEquals("Not all messages received.", MSG_COUNT, msg);
+ }
+
+ public void testAsynchronousRecieve() throws Exception
+ {
+ _consumer1.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 1 Received Message(" + receivedCount1 + "):" + message);
+
+ receivedCount1++;
+
+ if (receivedCount1 == (MSG_COUNT / 2))
+ {
+ _allMessagesSent.countDown();
+ }
+
+ }
+ });
+
+ _consumer2.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message);
+
+ receivedCount2++;
+ if (receivedCount2 == (MSG_COUNT / 2))
+ {
+ _allMessagesSent.countDown();
+ }
+ }
+ });
+
+ _logger.info("Waiting upto 2 seconds for messages");
+
+ try
+ {
+ _allMessagesSent.await(4000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+
+ assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
+ }
+
+ public void testRecieveC2Only() throws Exception
+ {
+ if (
+ !Boolean.parseBoolean(
+ System.getProperties().getProperty(AMQSession.IMMEDIATE_PREFETCH,
+ AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
+ {
+ _logger.info("Performing Receive only on C2");
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg, _consumer2.receive(1000) != null);
+ }
+ }
+ }
+
+ public void testRecieveBoth() throws Exception
+ {
+ if (
+ !Boolean.parseBoolean(
+ System.getProperties().getProperty(AMQSession.IMMEDIATE_PREFETCH,
+ AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
+ {
+ _logger.info("Performing Receive only with two consumers on one session ");
+
+ //Create a new consumer on session one that we don't use
+ _clientSession1.createConsumer(_queue);
+
+ int msg;
+ for (msg = 0; msg < (MSG_COUNT / 2); msg++)
+ {
+
+ // Attempt to receive up to half the messages
+ // The other half may have gone to the consumer above
+ final Message message = _consumer1.receive(1000);
+ if(message == null)
+ {
+ break;
+ }
+
+ }
+
+ _consumer1.close();
+ // This will close the unused consumer above.
+ _clientSession1.close();
+
+
+ // msg will now have recorded the number received on session 1
+ // attempt to retrieve the rest on session 2
+ for (; msg < MSG_COUNT ; msg++)
+ {
+ assertTrue("Failed at msg id" + msg, _consumer2.receive(1000) != null);
+ }
+
+ }
+ else
+ {
+ _logger.info("Performing Receive only on both C1 and C2");
+
+ for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
+ {
+
+ assertTrue(_consumer1.receive(3000) != null);
+ }
+
+ for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
+ {
+ assertTrue(_consumer2.receive(3000) != null);
+ }
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerMultiConsumerTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
new file mode 100644
index 0000000000..e4d1c72208
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.LogMonitor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.Context;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerTest extends QpidBrokerTestCase implements MessageListener, ExceptionListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 5;
+ private int _receivedCount = 0;
+ private int _errorCount = 0;
+ private MessageConsumer _consumer;
+ private Connection _clientConnection;
+ private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT);
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ // Create Client
+ _clientConnection = getConnection("guest", "guest");
+
+ _clientConnection.start();
+
+ Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue =clientSession.createQueue("message-listener-test-queue");
+
+ _consumer = clientSession.createConsumer(queue);
+
+ // Create Producer
+
+ Connection producerConnection = getConnection("guest", "guest");
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(producerSession.createTextMessage("Message " + msg));
+ }
+
+ producerConnection.close();
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (_clientConnection != null)
+ {
+ _clientConnection.close();
+ }
+ super.tearDown();
+ }
+
+ public void testSynchronousReceive() throws Exception
+ {
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ assertTrue(_consumer.receive(2000) != null);
+ }
+ }
+
+ public void testSynchronousReceiveNoWait() throws Exception
+ {
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ assertTrue("Failed to receive message " + msg, _consumer.receiveNoWait() != null);
+ }
+ }
+
+ public void testAsynchronousReceive() throws Exception
+ {
+ _consumer.setMessageListener(this);
+
+ _logger.info("Waiting 3 seconds for messages");
+
+ try
+ {
+ _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+ // Should have received all async messages
+ assertEquals(MSG_COUNT, _receivedCount);
+
+ }
+
+ public void testReceiveThenUseMessageListener() throws Exception
+ {
+ _logger.error("Test disabled as initial receive is not called first");
+ // Perform initial receive to start connection
+ assertTrue(_consumer.receive(2000) != null);
+ _receivedCount++;
+
+ // Sleep to ensure remaining 4 msgs end up on _synchronousQueue
+ Thread.sleep(1000);
+
+ // Set the message listener and wait for the messages to come in.
+ _consumer.setMessageListener(this);
+
+ _logger.info("Waiting 3 seconds for messages");
+
+ try
+ {
+ _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+ // Should have received all async messages
+ assertEquals(MSG_COUNT, _receivedCount);
+
+ _clientConnection.close();
+
+ Connection conn = getConnection("guest", "guest");
+ Session clientSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = clientSession.createQueue("message-listener-test-queue");
+ MessageConsumer cons = clientSession.createConsumer(queue);
+ conn.start();
+
+ // check that the messages were actually dequeued
+ assertTrue(cons.receive(2000) == null);
+ }
+
+ /**
+ * Tests the case where the message listener throws an java.lang.Error.
+ *
+ */
+ public void testMessageListenerThrowsError() throws Exception
+ {
+ final String javaLangErrorMessageText = "MessageListener failed with java.lang.Error";
+ _clientConnection.setExceptionListener(this);
+
+ _awaitMessages = new CountDownLatch(1);
+
+ _consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ _logger.debug("onMessage called");
+ _receivedCount++;
+
+
+ throw new Error(javaLangErrorMessageText);
+ }
+ finally
+ {
+ _awaitMessages.countDown();
+ }
+ }
+ });
+
+
+ _logger.info("Waiting 3 seconds for message");
+ _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
+
+ assertEquals("onMessage should have been called", 1, _receivedCount);
+ assertEquals("onException should NOT have been called", 0, _errorCount);
+
+ // Check that Error has been written to the application log.
+
+ LogMonitor _monitor = new LogMonitor(_outputFile);
+ assertTrue("The expected message not written to log file.",
+ _monitor.waitForMessage(javaLangErrorMessageText, LOGMONITOR_TIMEOUT));
+
+ if (_clientConnection != null)
+ {
+ try
+ {
+ _clientConnection.close();
+ }
+ catch (JMSException e)
+ {
+ // Ignore connection close errors for this test.
+ }
+ finally
+ {
+ _clientConnection = null;
+ }
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ _logger.info("Received Message(" + _receivedCount + "):" + message);
+
+ _receivedCount++;
+ _awaitMessages.countDown();
+ }
+
+ @Override
+ public void onException(JMSException e)
+ {
+ _logger.info("Exception received", e);
+ _errorCount++;
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java
new file mode 100644
index 0000000000..29b4dd82a7
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import java.io.File;
+import java.security.Provider;
+import java.security.Security;
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * QPID-1394 : Test to ensure that the client can register their custom JCAProviders after the broker to ensure that
+ * the Qpid custom authentication SASL plugins are used.
+ */
+public class MultipleJCAProviderRegistrationTest extends QpidBrokerTestCase
+{
+
+ public void setUp() throws Exception
+ {
+ _broker = VM;
+
+ super.setUp();
+ }
+
+ public void test() throws Exception
+ {
+ // Get the providers before connection
+ Provider[] providers = Security.getProviders();
+
+ // Force the client to load the providers
+ getConnection();
+
+ Provider[] afterConnectionCreation = Security.getProviders();
+
+ // Find the additions
+ List additions = new LinkedList();
+ for (Provider afterCreation : afterConnectionCreation)
+ {
+ boolean found = false;
+ for (Provider provider : providers)
+ {
+ if (provider == afterCreation)
+ {
+ found=true;
+ break;
+ }
+ }
+
+ // Record added registies
+ if (!found)
+ {
+ additions.add(afterCreation);
+ }
+ }
+
+ assertTrue("Client did not register any providers", additions.size() > 0);
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java
new file mode 100644
index 0000000000..303da29389
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class ResetMessageListenerTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(ResetMessageListenerTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 6;
+ private Connection _clientConnection, _producerConnection;
+ private MessageConsumer _consumer1;
+ MessageProducer _producer;
+ Session _clientSession, _producerSession;
+
+ private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
+ private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _clientConnection = getConnection("guest", "guest");
+ _clientConnection.start();
+ // Create Client 1
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = _clientSession.createQueue("reset-message-listener-test-queue");
+
+ _consumer1 = _clientSession.createConsumer(queue);
+
+ // Create Producer
+ _producerConnection = getConnection("guest", "guest");
+
+ _producerConnection.start();
+
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _producer = _producerSession.createProducer(queue);
+
+ TextMessage m = _producerSession.createTextMessage();
+ m.setStringProperty("rank", "first");
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ m.setText("Message " + msg);
+ _producer.send(m);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _clientConnection.close();
+
+ super.tearDown();
+ }
+
+ public void testAsynchronousRecieve()
+ {
+
+ _logger.info("Test Start");
+
+ try
+ {
+ _consumer1.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ if (message.getStringProperty("rank").equals("first"))
+ {
+ _allFirstMessagesSent.countDown();
+ }
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail("error receiving message");
+ }
+ }
+ });
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Default ML on consumer1");
+ }
+ try
+ {
+ assertTrue("Did not receive all first batch of messages",
+ _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS));
+ _logger.info("Received first batch of messages");
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+
+ try
+ {
+ _clientConnection.stop();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error stopping connection");
+ }
+
+ _logger.info("Reset Message Listener ");
+ try
+ {
+ _consumer1.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ if (message.getStringProperty("rank").equals("first"))
+ {
+ // Something ugly will happen, it'll probably kill the dispatcher
+ fail("All first set of messages should have been received");
+ }
+ else
+ {
+ _allSecondMessagesSent.countDown();
+ }
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ // Something ugly will happen, it'll probably kill the dispatcher
+ fail("error receiving message");
+ }
+ }
+ });
+
+ _clientConnection.start();
+ }
+ catch (javax.jms.IllegalStateException e)
+ {
+ _logger.error("Connection not stopped while setting ML", e);
+ fail("Unable to change message listener:" + e.getCause());
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Better ML on consumer1", e);
+ }
+
+ try
+ {
+ _logger.info("Send additional messages");
+ TextMessage m = _producerSession.createTextMessage();
+ m.setStringProperty("rank", "second");
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ m.setText("Message " + msg);
+ _producer.send(m);
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Unable to send additional messages", e);
+ }
+
+ _logger.info("Waiting for messages");
+
+ try
+ {
+ assertTrue(_allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+ assertEquals("First batch of messages not received correctly", 0, _allFirstMessagesSent.getCount());
+ assertEquals("Second batch of messages not received correctly", 0, _allSecondMessagesSent.getCount());
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ResetMessageListenerTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java
new file mode 100644
index 0000000000..15900a17fe
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.naming.Context;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to check that session creation on a connection has no accidental limit
+ */
+public class SessionCreateTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class);
+
+ Context _context;
+
+ private Connection _clientConnection;
+ protected int maxSessions = 65555;
+
+ public void testSessionCreationLimit() throws Exception
+ {
+ // Create Client
+ _clientConnection = getConnection("guest", "guest");
+
+ _clientConnection.start();
+
+ for (int i=0; i < maxSessions; i++)
+ {
+ Session sess = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(sess);
+ sess.close();
+ System.out.println("created session: " + i);
+ }
+
+ _clientConnection.close();
+
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java
new file mode 100644
index 0000000000..bf96dae02e
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java
@@ -0,0 +1,283 @@
+package org.apache.qpid.client.message;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+
+public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+ private Session _session;
+ MessageConsumer _consumer;
+ MessageProducer _producer;
+ UUID myUUID = UUID.randomUUID();
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ //Create Connection
+ _connection = getConnection();
+
+ //Create Session
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Create Queue
+ String queueName = getTestQueueName();
+ Queue queue = _session.createQueue(queueName);
+
+ //Create Consumer
+ _consumer = _session.createConsumer(queue);
+
+ //Create Producer
+ _producer = _session.createProducer(queue);
+
+ _connection.start();
+ }
+
+ public void testEmptyMessage() throws JMSException
+ {
+ MapMessage m = _session.createMapMessage();
+ _producer.send(m);
+ AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message was not received on time",msg);
+ assertEquals("Message content-type is incorrect",
+ AMQPEncodedMapMessage.MIME_TYPE,
+ ((AbstractJMSMessage)msg).getContentType());
+
+ assertEquals("Message content should be an empty map",
+ Collections.EMPTY_MAP,
+ ((AMQPEncodedMapMessage)msg).getMap());
+ }
+
+ public void testNullMessage() throws JMSException
+ {
+ MapMessage m = _session.createMapMessage();
+ ((AMQPEncodedMapMessage)m).setMap(null);
+ _producer.send(m);
+ AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message was not received on time",msg);
+ assertEquals("Message content-type is incorrect",
+ AMQPEncodedMapMessage.MIME_TYPE,
+ ((AbstractJMSMessage)msg).getContentType());
+
+ assertEquals("Message content should be null",
+ null,
+ ((AMQPEncodedMapMessage)msg).getMap());
+
+ }
+
+ public void testMessageWithContent() throws JMSException
+ {
+ MapMessage m = _session.createMapMessage();
+ m.setBoolean("Boolean", true);
+ m.setByte("Byte", (byte)5);
+ byte[] bytes = new byte[]{(byte)5,(byte)8};
+ m.setBytes("Bytes", bytes);
+ m.setChar("Char", 'X');
+ m.setDouble("Double", 56.84);
+ m.setFloat("Float", Integer.MAX_VALUE + 5000);
+ m.setInt("Int", Integer.MAX_VALUE - 5000);
+ m.setShort("Short", (short)58);
+ m.setString("String", "Hello");
+ m.setObject("uuid", myUUID);
+ _producer.send(m);
+
+ AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message was not received on time",msg);
+ assertEquals("Message content-type is incorrect",
+ AMQPEncodedMapMessage.MIME_TYPE,
+ ((AbstractJMSMessage)msg).getContentType());
+
+ assertEquals(true,m.getBoolean("Boolean"));
+ assertEquals((byte)5,m.getByte("Byte"));
+ byte[] bytesRcv = m.getBytes("Bytes");
+ assertNotNull("Byte array is null",bytesRcv);
+ assertEquals((byte)5,bytesRcv[0]);
+ assertEquals((byte)8,bytesRcv[1]);
+ assertEquals('X',m.getChar("Char"));
+ assertEquals(56.84,m.getDouble("Double"));
+ //assertEquals(Integer.MAX_VALUE + 5000,m.getFloat("Float"));
+ assertEquals(Integer.MAX_VALUE - 5000,m.getInt("Int"));
+ assertEquals((short)58,m.getShort("Short"));
+ assertEquals("Hello",m.getString("String"));
+ assertEquals(myUUID,(UUID)m.getObject("uuid"));
+ }
+
+
+ public void testMessageWithListEntries() throws JMSException
+ {
+ MapMessage m = _session.createMapMessage();
+
+ List<Integer> myList = getList();
+
+ m.setObject("List", myList);
+
+ List<UUID> uuidList = new ArrayList<UUID>();
+ uuidList.add(myUUID);
+ m.setObject("uuid-list", uuidList);
+ _producer.send(m);
+
+ AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message was not received on time",msg);
+ assertEquals("Message content-type is incorrect",
+ AMQPEncodedMapMessage.MIME_TYPE,
+ ((AbstractJMSMessage)msg).getContentType());
+
+ List<Integer> list = (List<Integer>)msg.getObject("List");
+ assertNotNull("List not received",list);
+ Collections.sort(list);
+ int i = 1;
+ for (Integer j: list)
+ {
+ assertEquals(i,j.intValue());
+ i++;
+ }
+
+ List<UUID> list2 = (List<UUID>)msg.getObject("uuid-list");
+ assertNotNull("UUID List not received",list2);
+ assertEquals(myUUID,list2.get(0));
+ }
+
+ public void testMessageWithMapEntries() throws JMSException
+ {
+ MapMessage m = _session.createMapMessage();
+
+ Map<String,String> myMap = getMap();
+ m.setObject("Map", myMap);
+
+ Map<String,UUID> uuidMap = new HashMap<String,UUID>();
+ uuidMap.put("uuid", myUUID);
+ m.setObject("uuid-map", uuidMap);
+
+ _producer.send(m);
+
+ AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message was not received on time",msg);
+ assertEquals("Message content-type is incorrect",
+ AMQPEncodedMapMessage.MIME_TYPE,
+ ((AbstractJMSMessage)msg).getContentType());
+
+ Map<String,String> map = (Map<String,String>)msg.getObject("Map");
+ assertNotNull("Map not received",map);
+ for (int i=1; i <4; i++ )
+ {
+ assertEquals("String" + i,map.get("Key" + i));
+ i++;
+ }
+
+ Map<String,UUID> map2 = (Map<String,UUID>)msg.getObject("uuid-map");
+ assertNotNull("Map not received",map2);
+ assertEquals(myUUID,map2.get("uuid"));
+ }
+
+ public void testMessageWithNestedListsAndMaps() throws JMSException
+ {
+ MapMessage m = _session.createMapMessage();
+
+ Map<String,Object> myMap = new HashMap<String,Object>();
+ myMap.put("map", getMap());
+ myMap.put("list", getList());
+
+ m.setObject("Map", myMap);
+ _producer.send(m);
+
+ AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message was not received on time",msg);
+ assertEquals("Message content-type is incorrect",
+ AMQPEncodedMapMessage.MIME_TYPE,
+ ((AbstractJMSMessage)msg).getContentType());
+
+ Map<String,Object> mainMap = (Map<String,Object>)msg.getObject("Map");
+ assertNotNull("Main Map not received",mainMap);
+
+ Map<String,String> map = (Map<String,String>)mainMap.get("map");
+ assertNotNull("Nested Map not received",map);
+ for (int i=1; i <4; i++ )
+ {
+ assertEquals("String" + i,map.get("Key" + i));
+ i++;
+ }
+
+ List<Integer> list = (List<Integer>)mainMap.get("list");
+ assertNotNull("Nested List not received",list);
+ Collections.sort(list);
+
+ int i = 1;
+ for (Integer j: list)
+ {
+ assertEquals(i,j.intValue());
+ i++;
+ }
+ }
+
+ private List<Integer> getList()
+ {
+ List<Integer> myList = new ArrayList<Integer>();
+ myList.add(1);
+ myList.add(2);
+ myList.add(3);
+
+ return myList;
+ }
+
+ private Map<String,String> getMap()
+ {
+ Map<String,String> myMap = new HashMap<String,String>();
+ myMap.put("Key1","String1");
+ myMap.put("Key2","String2");
+ myMap.put("Key3","String3");
+
+ return myMap;
+ }
+
+ public void tearDown() throws Exception
+ {
+ //clean up
+ _connection.close();
+
+ super.tearDown();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java
new file mode 100644
index 0000000000..857adaf82c
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java
@@ -0,0 +1,236 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import java.io.Serializable;
+import java.util.Enumeration;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+public class NonQpidObjectMessage implements ObjectMessage {
+
+ private ObjectMessage _realMessage;
+ private String _contentString;
+
+ /**
+ * Allows us to construct a JMS message which
+ * does not inherit from the Qpid message superclasses
+ * and expand our unit testing of MessageConverter et al
+ * @param session
+ */
+ public NonQpidObjectMessage(Session session) throws JMSException
+ {
+ _realMessage = session.createObjectMessage();
+ }
+
+ public String getJMSMessageID() throws JMSException {
+ return _realMessage.getJMSMessageID();
+ }
+
+ public void setJMSMessageID(String string) throws JMSException {
+ _realMessage.setJMSMessageID(string);
+ }
+
+ public long getJMSTimestamp() throws JMSException {
+ return _realMessage.getJMSTimestamp();
+ }
+
+ public void setJMSTimestamp(long l) throws JMSException {
+ _realMessage.setJMSTimestamp(l);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
+ return _realMessage.getJMSCorrelationIDAsBytes();
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException {
+ _realMessage.setJMSCorrelationIDAsBytes(bytes);
+ }
+
+ public void setJMSCorrelationID(String string) throws JMSException {
+ _realMessage.setJMSCorrelationID(string);
+ }
+
+ public String getJMSCorrelationID() throws JMSException {
+ return _realMessage.getJMSCorrelationID();
+ }
+
+ public Destination getJMSReplyTo() throws JMSException {
+ return _realMessage.getJMSReplyTo();
+ }
+
+ public void setJMSReplyTo(Destination destination) throws JMSException {
+ _realMessage.setJMSReplyTo(destination);
+ }
+
+ public Destination getJMSDestination() throws JMSException {
+ return _realMessage.getJMSDestination();
+ }
+
+ public void setJMSDestination(Destination destination) throws JMSException {
+ _realMessage.setJMSDestination(destination);
+ }
+
+ public int getJMSDeliveryMode() throws JMSException {
+ return _realMessage.getJMSDeliveryMode();
+ }
+
+ public void setJMSDeliveryMode(int i) throws JMSException {
+ _realMessage.setJMSDeliveryMode(i);
+ }
+
+ public boolean getJMSRedelivered() throws JMSException {
+ return _realMessage.getJMSRedelivered();
+ }
+
+ public void setJMSRedelivered(boolean b) throws JMSException {
+ _realMessage.setJMSRedelivered(b);
+ }
+
+ public String getJMSType() throws JMSException {
+ return _realMessage.getJMSType();
+ }
+
+ public void setJMSType(String string) throws JMSException {
+ _realMessage.setJMSType(string);
+ }
+
+ public long getJMSExpiration() throws JMSException {
+ return _realMessage.getJMSExpiration();
+ }
+
+ public void setJMSExpiration(long l) throws JMSException {
+ _realMessage.setJMSExpiration(l);
+ }
+
+ public int getJMSPriority() throws JMSException {
+ return _realMessage.getJMSPriority();
+ }
+
+ public void setJMSPriority(int i) throws JMSException {
+ _realMessage.setJMSPriority(i);
+ }
+
+ public void clearProperties() throws JMSException {
+ _realMessage.clearProperties();
+ }
+
+ public boolean propertyExists(String string) throws JMSException {
+ return _realMessage.propertyExists(string);
+ }
+
+ public boolean getBooleanProperty(String string) throws JMSException {
+ return _realMessage.getBooleanProperty(string);
+ }
+
+ public byte getByteProperty(String string) throws JMSException {
+ return _realMessage.getByteProperty(string);
+ }
+
+ public short getShortProperty(String string) throws JMSException {
+ return _realMessage.getShortProperty(string);
+ }
+
+ public int getIntProperty(String string) throws JMSException {
+ return _realMessage.getIntProperty(string);
+ }
+
+ public long getLongProperty(String string) throws JMSException {
+ return _realMessage.getLongProperty(string);
+ }
+
+ public float getFloatProperty(String string) throws JMSException {
+ return _realMessage.getFloatProperty(string);
+ }
+
+ public double getDoubleProperty(String string) throws JMSException {
+ return _realMessage.getDoubleProperty(string);
+ }
+
+ public String getStringProperty(String string) throws JMSException {
+ return _realMessage.getStringProperty(string);
+ }
+
+ public Object getObjectProperty(String string) throws JMSException {
+ return _realMessage.getObjectProperty(string);
+ }
+
+ public Enumeration getPropertyNames() throws JMSException {
+ return _realMessage.getPropertyNames();
+ }
+
+ public void setBooleanProperty(String string, boolean b) throws JMSException {
+ _realMessage.setBooleanProperty(string,b);
+ }
+
+ public void setByteProperty(String string, byte b) throws JMSException {
+ _realMessage.setByteProperty(string,b);
+ }
+
+ public void setShortProperty(String string, short i) throws JMSException {
+ _realMessage.setShortProperty(string,i);
+ }
+
+ public void setIntProperty(String string, int i) throws JMSException {
+ _realMessage.setIntProperty(string,i);
+ }
+
+ public void setLongProperty(String string, long l) throws JMSException {
+ _realMessage.setLongProperty(string,l);
+ }
+
+ public void setFloatProperty(String string, float v) throws JMSException {
+ _realMessage.setFloatProperty(string,v);
+ }
+
+ public void setDoubleProperty(String string, double v) throws JMSException {
+ _realMessage.setDoubleProperty(string,v);
+ }
+
+ public void setStringProperty(String string, String string1) throws JMSException {
+ _realMessage.setStringProperty(string,string1);
+ }
+
+ public void setObjectProperty(String string, Object object) throws JMSException {
+ _realMessage.setObjectProperty(string,object);
+ }
+
+ public void acknowledge() throws JMSException {
+ _realMessage.acknowledge();
+ }
+
+ public void clearBody() throws JMSException {
+ _realMessage.clearBody();
+ }
+
+ public void setObject(Serializable serializable) throws JMSException {
+ if (serializable instanceof String)
+ {
+ _contentString = (String)serializable;
+ }
+ }
+
+ public Serializable getObject() throws JMSException {
+ return _contentString; }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java
new file mode 100644
index 0000000000..8cdf12eaa4
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.ssl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTestConnection_0_10;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.Connection;
+
+public class SSLTest extends QpidBrokerTestCase
+{
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ System.setProperty("javax.net.debug", "ssl");
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ System.setProperty("javax.net.debug", "");
+ super.tearDown();
+ }
+
+ public void testCreateSSLContextFromConnectionURLParams()
+ {
+ if (Boolean.getBoolean("profile.use_ssl"))
+ {
+ String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" +
+ "?ssl='true'&ssl_verify_hostname='true'" +
+ "&key_store='%s'&key_store_password='%s'" +
+ "&trust_store='%s'&trust_store_password='%s'" +
+ "'";
+
+ String keyStore = System.getProperty("javax.net.ssl.keyStore");
+ String keyStorePass = System.getProperty("javax.net.ssl.keyStorePassword");
+ String trustStore = System.getProperty("javax.net.ssl.trustStore");
+ String trustStorePass = System.getProperty("javax.net.ssl.trustStorePassword");
+
+ url = String.format(url,System.getProperty("test.port.ssl"),
+ keyStore,keyStorePass,trustStore,trustStorePass);
+
+ // temporarily set the trust/key store jvm args to something else
+ // to ensure we only read from the connection URL param.
+ System.setProperty("javax.net.ssl.trustStore","fessgsdgd");
+ System.setProperty("javax.net.ssl.trustStorePassword","fessgsdgd");
+ System.setProperty("javax.net.ssl.keyStore","fessgsdgd");
+ System.setProperty("javax.net.ssl.keyStorePassword","fessgsdgd");
+ try
+ {
+ AMQConnection con = new AMQConnection(url);
+ Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ }
+ catch (Exception e)
+ {
+ fail("SSL Connection should be successful");
+ }
+ finally
+ {
+ System.setProperty("javax.net.ssl.trustStore",trustStore);
+ System.setProperty("javax.net.ssl.trustStorePassword",trustStorePass);
+ System.setProperty("javax.net.ssl.keyStore",keyStore);
+ System.setProperty("javax.net.ssl.keyStorePassword",keyStorePass);
+ }
+ }
+ }
+
+ public void testMultipleCertsInSingleStore() throws Exception
+ {
+ if (Boolean.getBoolean("profile.use_ssl"))
+ {
+ String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" +
+ System.getProperty("test.port.ssl") +
+ "?ssl='true'&ssl_cert_alias='app1''";
+
+ AMQTestConnection_0_10 con = new AMQTestConnection_0_10(url);
+ Connection transportCon = con.getConnection();
+ String userID = transportCon.getSecurityLayer().getUserID();
+ assertEquals("The correct certificate was not choosen","app1@acme.org",userID);
+ con.close();
+
+ url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" +
+ System.getProperty("test.port.ssl") +
+ "?ssl='true'&ssl_cert_alias='app2''";
+
+ con = new AMQTestConnection_0_10(url);
+ transportCon = con.getConnection();
+ userID = transportCon.getSecurityLayer().getUserID();
+ assertEquals("The correct certificate was not choosen","app2@acme.org",userID);
+ con.close();
+ }
+ }
+
+ public void testVerifyHostName()
+ {
+ if (Boolean.getBoolean("profile.use_ssl"))
+ {
+ String url = "amqp://guest:guest@test/?brokerlist='tcp://127.0.0.1:" +
+ System.getProperty("test.port.ssl") +
+ "?ssl='true'&ssl_verify_hostname='true''";
+
+ try
+ {
+ AMQConnection con = new AMQConnection(url);
+ fail("Hostname verification failed. No exception was thrown");
+ }
+ catch (Exception e)
+ {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ e.printStackTrace(new PrintStream(bout));
+ String strace = bout.toString();
+ assertTrue("Correct exception not thrown",strace.contains("SSL hostname verification failed"));
+ }
+
+ }
+ }
+
+ public void testVerifyLocalHost()
+ {
+ if (Boolean.getBoolean("profile.use_ssl"))
+ {
+ String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" +
+ System.getProperty("test.port.ssl") +
+ "?ssl='true'&ssl_verify_hostname='true''";
+
+ try
+ {
+ AMQConnection con = new AMQConnection(url);
+ }
+ catch (Exception e)
+ {
+ fail("Hostname verification should succeed");
+ }
+ }
+ }
+
+ public void testVerifyLocalHostLocalDomain()
+ {
+ if (Boolean.getBoolean("profile.use_ssl"))
+ {
+ String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost.localdomain:" +
+ System.getProperty("test.port.ssl") +
+ "?ssl='true'&ssl_verify_hostname='true''";
+
+ try
+ {
+ AMQConnection con = new AMQConnection(url);
+ }
+ catch (Exception e)
+ {
+ fail("Hostname verification should succeed");
+ }
+
+ }
+ }
+}