summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/client')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java100
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java167
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java213
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java528
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserClientAckTest.java34
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserDupsOkTest.java31
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserNoAckTest.java33
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserPreAckTest.java32
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTransactedTest.java31
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java194
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java1072
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java389
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java373
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java257
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java141
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java310
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java84
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java110
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java112
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java72
20 files changed, 4283 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
new file mode 100644
index 0000000000..13a9dd73b8
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.test.client;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import java.util.Enumeration;
+
+public class CancelTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = Logger.getLogger(CancelTest.class);
+
+ private Connection _clientConnection;
+ private Session _clientSession;
+ private Queue _queue;
+
+ public void setUp() throws Exception
+ {
+
+ super.setUp();
+
+ _queue = (Queue) getInitialContext().lookup("queue");
+
+ //Create Client
+ _clientConnection = getConnection();
+
+ _clientConnection.start();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Ensure _queue is created
+ _clientSession.createConsumer(_queue).close();
+ }
+
+ /**
+ * Simply
+ * This test originally did not assert anything but was just checking
+ * that a message could be browsed and consumed without throwing an exception.
+ * It now checks that at least a message is browsed and that a message is received.
+ */
+ public void test() throws Exception
+ {
+ Connection producerConnection = getConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(_queue);
+ producer.send(producerSession.createTextMessage());
+ producerConnection.close();
+
+
+ QueueBrowser browser = _clientSession.createBrowser(_queue);
+ Enumeration e = browser.getEnumeration();
+
+ assertTrue(e.hasMoreElements());
+
+ int i = 0;
+ while (e.hasMoreElements())
+ {
+ e.nextElement();
+ if(++i > 1)
+ {
+ fail("Two many elemnts to browse!");
+ }
+ }
+
+ browser.close();
+
+ MessageConsumer consumer = _clientSession.createConsumer(_queue);
+ assertNotNull( consumer.receive() );
+ consumer.close();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
new file mode 100644
index 0000000000..a94d975a32
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
@@ -0,0 +1,167 @@
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+public class DupsOkTest extends QpidBrokerTestCase
+{
+
+ private Queue _queue;
+ private static final int MSG_COUNT = 100;
+ private CountDownLatch _awaitCompletion = new CountDownLatch(1);
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _queue = (Queue) getInitialContext().lookup("queue");
+
+
+ //Declare the queue
+ Connection consumerConnection = getConnection();
+ consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close();
+
+ //Create Producer put some messages on the queue
+ Connection producerConnection = getConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ for (int count = 1; count <= MSG_COUNT; count++)
+ {
+ Message msg = producerSession.createTextMessage("Message " + count);
+ msg.setIntProperty("count", count);
+ producer.send(msg);
+ }
+
+ producerConnection.close();
+ }
+
+ /**
+ * This test sends x messages and receives them with an async consumer.
+ * Waits for all messages to be received or for 60 s
+ * and checks whether the queue is empty.
+ *
+ * @throws Exception
+ */
+ public void testDupsOK() throws Exception
+ {
+ //Create Client
+ Connection clientConnection = getConnection();
+
+ final Session clientSession = clientConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ MessageConsumer consumer = clientSession.createConsumer(_queue);
+
+ assertEquals("The queue should have msgs at start", MSG_COUNT, ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue));
+
+ clientConnection.start();
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ int _msgCount = 0;
+
+ public void onMessage(Message message)
+ {
+ _msgCount++;
+ if (message == null)
+ {
+ fail("Should not get null messages");
+ }
+
+ if (message instanceof TextMessage)
+ {
+ try
+ {
+ if (message.getIntProperty("count") == MSG_COUNT)
+ {
+ try
+ {
+ if(_msgCount != MSG_COUNT)
+ {
+ assertEquals("Wrong number of messages seen.", MSG_COUNT, _msgCount);
+ }
+ }
+ finally
+ {
+ //This is the last message so release test.
+ _awaitCompletion.countDown();
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ fail("Unable to get int property 'count'");
+ }
+ }
+ else
+ {
+ fail("Got wrong message type");
+ }
+ }
+ });
+
+ try
+ {
+ if (!_awaitCompletion.await(120, TimeUnit.SECONDS))
+ {
+ fail("Test did not complete in 120 seconds");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ fail("Unable to wait for test completion");
+ throw e;
+ }
+
+ //Close consumer to give broker time to process in bound Acks. As The main thread will be released while
+ // before the dispatcher has sent the ack back to the broker.
+ consumer.close();
+
+ clientSession.close();
+
+ final Session clientSession2 = clientConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ assertEquals("The queue should have 0 msgs left", 0, ((AMQSession) clientSession2).getQueueDepth((AMQDestination) _queue));
+
+ clientConnection.close();
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
new file mode 100644
index 0000000000..e1f639afb6
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
@@ -0,0 +1,213 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.client.AMQSession_0_8;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+
+public class FlowControlTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = Logger.getLogger(FlowControlTest.class);
+
+ private Connection _clientConnection;
+ private Session _clientSession;
+ private Queue _queue;
+
+ /**
+ * Simply
+ *
+ * @throws Exception
+ */
+ public void testBasicBytesFlowControl() throws Exception
+ {
+ _queue = (Queue) getInitialContext().lookup("queue");
+
+ //Create Client
+ _clientConnection = getConnection();
+
+ _clientConnection.start();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Ensure _queue is created
+ _clientSession.createConsumer(_queue).close();
+
+ Connection producerConnection = getConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ BytesMessage m1 = producerSession.createBytesMessage();
+ m1.writeBytes(new byte[128]);
+ m1.setIntProperty("msg", 1);
+ producer.send(m1);
+ BytesMessage m2 = producerSession.createBytesMessage();
+ m2.writeBytes(new byte[128]);
+ m2.setIntProperty("msg", 2);
+ producer.send(m2);
+ BytesMessage m3 = producerSession.createBytesMessage();
+ m3.writeBytes(new byte[256]);
+ m3.setIntProperty("msg", 3);
+ producer.send(m3);
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Connection consumerConnection = getConnection();
+ Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ ((AMQSession_0_8) consumerSession).setPrefetchLimits(0, 256);
+ MessageConsumer recv = consumerSession.createConsumer(_queue);
+ consumerConnection.start();
+
+ Message r1 = recv.receive(RECEIVE_TIMEOUT);
+ assertNotNull("First message not received", r1);
+ assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
+
+ Message r2 = recv.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Second message not received", r2);
+ assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
+
+ Message r3 = recv.receive(RECEIVE_TIMEOUT);
+ assertNull("Third message incorrectly delivered", r3);
+
+ ((AbstractJMSMessage)r1).acknowledgeThis();
+
+ r3 = recv.receive(RECEIVE_TIMEOUT);
+ assertNull("Third message incorrectly delivered", r3);
+
+ ((AbstractJMSMessage)r2).acknowledgeThis();
+
+ r3 = recv.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Third message not received", r3);
+ assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
+
+ ((AbstractJMSMessage)r3).acknowledgeThis();
+ consumerConnection.close();
+ }
+
+ public void testTwoConsumersBytesFlowControl() throws Exception
+ {
+ _queue = (Queue) getInitialContext().lookup("queue");
+
+ //Create Client
+ _clientConnection = getConnection();
+
+ _clientConnection.start();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Ensure _queue is created
+ _clientSession.createConsumer(_queue).close();
+
+ Connection producerConnection = getConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ BytesMessage m1 = producerSession.createBytesMessage();
+ m1.writeBytes(new byte[128]);
+ m1.setIntProperty("msg", 1);
+ producer.send(m1);
+ BytesMessage m2 = producerSession.createBytesMessage();
+ m2.writeBytes(new byte[256]);
+ m2.setIntProperty("msg", 2);
+ producer.send(m2);
+ BytesMessage m3 = producerSession.createBytesMessage();
+ m3.writeBytes(new byte[128]);
+ m3.setIntProperty("msg", 3);
+ producer.send(m3);
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Connection consumerConnection = getConnection();
+ Session consumerSession1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ ((AMQSession_0_8) consumerSession1).setPrefetchLimits(0, 256);
+ MessageConsumer recv1 = consumerSession1.createConsumer(_queue);
+
+ consumerConnection.start();
+
+ Message r1 = recv1.receive(RECEIVE_TIMEOUT);
+ assertNotNull("First message not received", r1);
+ assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
+
+ Message r2 = recv1.receive(RECEIVE_TIMEOUT);
+ assertNull("Second message incorrectly delivered", r2);
+
+ Session consumerSession2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ ((AMQSession_0_8) consumerSession2).setPrefetchLimits(0, 256);
+ MessageConsumer recv2 = consumerSession2.createConsumer(_queue);
+
+ r2 = recv2.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Second message not received", r2);
+ assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
+
+ Message r3 = recv2.receive(RECEIVE_TIMEOUT);
+ assertNull("Third message incorrectly delivered", r3);
+
+ r3 = recv1.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Third message not received", r3);
+ assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
+
+ r2.acknowledge();
+ r3.acknowledge();
+ recv1.close();
+ recv2.close();
+ consumerSession1.close();
+ consumerSession2.close();
+ consumerConnection.close();
+
+ }
+
+ public static void main(String args[]) throws Throwable
+ {
+ FlowControlTest test = new FlowControlTest();
+
+ int run = 0;
+ while (true)
+ {
+ System.err.println("Test Run:" + ++run);
+ Thread.sleep(1000);
+ try
+ {
+ test.startBroker();
+ test.testBasicBytesFlowControl();
+
+ Thread.sleep(1000);
+ }
+ finally
+ {
+ test.stopBroker();
+ }
+ }
+ }
+}
+
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
new file mode 100644
index 0000000000..97d825177c
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.NamingException;
+import java.util.Enumeration;
+import java.util.Random;
+
+public class QueueBrowserAutoAckTest extends FailoverBaseCase
+{
+ protected Connection _clientConnection;
+ protected Session _clientSession;
+ protected Queue _queue;
+ protected static final String MESSAGE_ID_PROPERTY = "MessageIDProperty";
+ protected boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ //Create Client
+ _clientConnection = getConnection();
+ _clientConnection.start();
+
+ setupSession();
+
+ _queue = _clientSession.createQueue(getTestQueueName());
+ _clientSession.createConsumer(_queue).close();
+
+ //Ensure there are no messages on the queue to start with.
+ checkQueueDepth(0);
+ }
+
+ protected void setupSession() throws Exception
+ {
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void tearDown() throws Exception
+ {
+ if (_clientConnection != null)
+ {
+ _clientConnection.close();
+ }
+
+ super.tearDown();
+ }
+
+ protected void sendMessages(int num) throws JMSException
+ {
+ Connection producerConnection = null;
+ try
+ {
+ producerConnection = getConnection();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to lookup connection in JNDI.");
+ }
+
+ sendMessages(producerConnection, num);
+ }
+
+ protected void sendMessages(String connection, int num) throws JMSException
+ {
+ Connection producerConnection = null;
+ try
+ {
+ producerConnection = getConnectionFactory(connection).createConnection("guest", "guest");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Unable to lookup connection in JNDI.");
+ }
+ sendMessages(producerConnection, num);
+ }
+
+
+ protected void sendMessages(Connection producerConnection, int messageSendCount) throws JMSException
+ {
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ //Ensure _queue is created
+ producerSession.createConsumer(_queue).close();
+
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ for (int messsageID = 0; messsageID < messageSendCount; messsageID++)
+ {
+ TextMessage textMsg = producerSession.createTextMessage("Message " + messsageID);
+ textMsg.setIntProperty(MESSAGE_ID_PROPERTY, messsageID);
+ producer.send(textMsg);
+ }
+ producerSession.commit();
+
+ producerConnection.close();
+ }
+
+ /**
+ * Using the Protocol getQueueDepth method ensure that the correct number of messages are on the queue.
+ *
+ * Also uses a QueueBrowser as a second method of validating the message count on the queue.
+ *
+ * @param expectedDepth The expected Queue depth
+ * @throws JMSException on error
+ */
+ protected void checkQueueDepth(int expectedDepth) throws JMSException
+ {
+
+ // create QueueBrowser
+ _logger.info("Creating Queue Browser");
+
+ QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
+
+ // check for messages
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Checking for " + expectedDepth + " messages with QueueBrowser");
+ }
+
+ //Check what the session believes the queue count to be.
+ long queueDepth = 0;
+
+ try
+ {
+ queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
+ }
+ catch (AMQException e)
+ {
+ }
+
+ assertEquals("Session reports Queue expectedDepth not as expected", expectedDepth, queueDepth);
+
+
+
+ // Browse the queue to get a second opinion
+ int msgCount = 0;
+ Enumeration msgs = queueBrowser.getEnumeration();
+
+ while (msgs.hasMoreElements())
+ {
+ msgs.nextElement();
+ msgCount++;
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Found " + msgCount + " messages total in browser");
+ }
+
+ // check to see if all messages found
+ assertEquals("Browser did not find all messages", expectedDepth, msgCount);
+
+ //Close browser
+ queueBrowser.close();
+ }
+
+ protected void closeBrowserBeforeAfterGetNext(int messageCount) throws JMSException
+ {
+ QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
+
+ Enumeration msgs = queueBrowser.getEnumeration();
+
+ int msgCount = 0;
+
+ while (msgs.hasMoreElements() && msgCount < messageCount)
+ {
+ msgs.nextElement();
+ msgCount++;
+ }
+
+ try
+ {
+ queueBrowser.close();
+ }
+ catch (JMSException e)
+ {
+ fail("Close should happen without error:" + e.getMessage());
+ }
+ }
+
+ /**
+ * This method checks that multiple calls to getEnumeration() on a queueBrowser provide the same behaviour.
+ *
+ * @param sentMessages The number of messages sent
+ * @param browserEnumerationCount The number of times to call getEnumeration()
+ * @throws JMSException
+ */
+ protected void checkMultipleGetEnum(int sentMessages, int browserEnumerationCount) throws JMSException
+ {
+ QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
+
+ for (int count = 0; count < browserEnumerationCount; count++)
+ {
+ _logger.info("Checking getEnumeration:" + count);
+ Enumeration msgs = queueBrowser.getEnumeration();
+
+ int msgCount = 0;
+
+ while (msgs.hasMoreElements())
+ {
+ msgs.nextElement();
+ msgCount++;
+ }
+
+ // Verify that the browser can see all the messages sent.
+ assertEquals(sentMessages, msgCount);
+ }
+
+ try
+ {
+ queueBrowser.close();
+ }
+ catch (JMSException e)
+ {
+ fail("Close should happen without error:" + e.getMessage());
+ }
+ }
+
+ protected void checkOverlappingMultipleGetEnum(int expectedMessages, int browserEnumerationCount) throws JMSException
+ {
+ checkOverlappingMultipleGetEnum(expectedMessages, browserEnumerationCount, null);
+ }
+
+ protected void checkOverlappingMultipleGetEnum(int expectedMessages, int browserEnumerationCount, String selector) throws JMSException
+ {
+ QueueBrowser queueBrowser = selector == null ?
+ _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue);
+// _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector);
+
+ Enumeration[] msgs = new Enumeration[browserEnumerationCount];
+ int[] msgCount = new int[browserEnumerationCount];
+
+ //create Enums
+ for (int count = 0; count < browserEnumerationCount; count++)
+ {
+ msgs[count] = queueBrowser.getEnumeration();
+ }
+
+ //interleave reads
+ for (int cnt = 0; cnt < expectedMessages; cnt++)
+ {
+ for (int count = 0; count < browserEnumerationCount; count++)
+ {
+ if (msgs[count].hasMoreElements())
+ {
+ msgs[count].nextElement();
+ msgCount[count]++;
+ }
+ }
+ }
+
+ //validate all browsers get right message count.
+ for (int count = 0; count < browserEnumerationCount; count++)
+ {
+ assertEquals(msgCount[count], expectedMessages);
+ }
+
+ try
+ {
+ queueBrowser.close();
+ }
+ catch (JMSException e)
+ {
+ fail("Close should happen without error:" + e.getMessage());
+ }
+ }
+
+ protected void validate(int messages) throws JMSException
+ {
+ //Create a new connection to validate message content
+ Connection connection = null;
+
+ try
+ {
+ connection = getConnection();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to make validation connection");
+ }
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ connection.start();
+
+ MessageConsumer consumer = session.createConsumer(_queue);
+
+ _logger.info("Verify messages are still on the queue");
+
+ Message tempMsg;
+
+ for (int msgCount = 0; msgCount < messages; msgCount++)
+ {
+ tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
+ if (tempMsg == null)
+ {
+ fail("Message " + msgCount + " not retrieved from queue");
+ }
+ }
+
+ //Close this new connection
+ connection.close();
+
+ _logger.info("All messages recevied from queue");
+
+ //ensure no message left.
+ checkQueueDepth(0);
+ }
+
+ protected void checkQueueDepthWithSelectors(int totalMessages, int clients) throws JMSException
+ {
+
+ String selector = MESSAGE_ID_PROPERTY + " % " + clients;
+
+ checkOverlappingMultipleGetEnum(totalMessages / clients, clients, selector);
+ }
+
+
+ /**
+ * This tests you can browse an empty queue, see QPID-785
+ *
+ * @throws Exception
+ */
+ public void testBrowsingEmptyQueue() throws Exception
+ {
+ checkQueueDepth(0);
+ }
+
+ /*
+ * Test Messages Remain on Queue
+ * Create a queu and send messages to it. Browse them and then receive them all to verify they were still there
+ *
+ */
+ public void testQueueBrowserMsgsRemainOnQueue() throws Exception
+ {
+ int messages = 10;
+
+ sendMessages(messages);
+
+ checkQueueDepth(messages);
+
+ validate(messages);
+ }
+
+
+ public void testClosingBrowserMidReceiving() throws NamingException, JMSException
+ {
+ int messages = 100;
+
+ sendMessages(messages);
+
+ checkQueueDepth(messages);
+
+ closeBrowserBeforeAfterGetNext(10);
+
+ validate(messages);
+ }
+
+ /**
+ * This tests that multiple getEnumerations on a QueueBrowser return the required number of messages.
+ * @throws NamingException
+ * @throws JMSException
+ */
+ public void testMultipleGetEnum() throws NamingException, JMSException
+ {
+ int messages = 10;
+
+ sendMessages(messages);
+
+ checkQueueDepth(messages);
+
+ checkMultipleGetEnum(messages, 5);
+
+ validate(messages);
+ }
+
+ public void testMultipleOverlappingGetEnum() throws NamingException, JMSException
+ {
+ int messages = 25;
+
+ sendMessages(messages);
+
+ checkQueueDepth(messages);
+
+ checkOverlappingMultipleGetEnum(messages, 5);
+
+ validate(messages);
+ }
+
+
+ public void testBrowsingWithSelector() throws JMSException
+ {
+ int messages = 40;
+
+ sendMessages(messages);
+
+ checkQueueDepth(messages);
+
+ for (int clients = 2; clients <= 10; clients++)
+ {
+ checkQueueDepthWithSelectors(messages, clients);
+ }
+
+ validate(messages);
+ }
+
+ /**
+ * Testing that a QueueBrowser doesn't actually consume messages from a broker when it fails over.
+ * @throws JMSException
+ */
+ public void testFailoverWithQueueBrowser() throws JMSException
+ {
+ int messages = 5;
+
+ sendMessages("connection1", messages);
+ if (!CLUSTERED)
+ {
+ sendMessages("connection2", messages);
+ }
+
+ checkQueueDepth(messages);
+
+ _logger.info("Creating Queue Browser");
+ QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
+
+ long queueDepth = 0;
+
+ try
+ {
+ queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
+ }
+ catch (AMQException e)
+ {
+ fail("Caught exception getting queue depth: " + e.getMessage());
+ }
+
+ assertEquals("Session reports Queue depth not as expected", messages, queueDepth);
+
+ int msgCount = 0;
+ int failPoint = 0;
+
+ failPoint = new Random().nextInt(messages) + 1;
+
+ Enumeration msgs = queueBrowser.getEnumeration();
+ while (msgs.hasMoreElements())
+ {
+ msgs.nextElement();
+ msgCount++;
+
+ if (msgCount == failPoint)
+ {
+ failBroker(getFailingPort());
+ }
+ }
+
+ assertTrue("We should get atleast " + messages + " msgs.", msgCount >= messages);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("QBAAT Found " + msgCount + " messages total in browser");
+ }
+
+ //Close browser
+ queueBrowser.close();
+
+ _logger.info("Closed Queue Browser, validating messages on broker.");
+
+ //Validate all messages still on Broker
+ validate(messages);
+ }
+
+ public void testFailoverAsQueueBrowserCreated() throws JMSException
+ {
+ // The IoServiceListenerSupport seems to get stuck in with a managedSession that isn't closing when requested.
+ // So it hangs waiting for the session.
+ int messages = 50;
+
+ sendMessages("connection1", messages);
+ if (!CLUSTERED)
+ {
+ sendMessages("connection2", messages);
+ }
+
+ failBroker(getFailingPort());
+
+ checkQueueDepth(messages);
+
+ //Validate all messages still on Broker 1
+ validate(messages);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserClientAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserClientAckTest.java
new file mode 100644
index 0000000000..f30b8043ad
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserClientAckTest.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client;
+
+import javax.jms.Session;
+
+public class QueueBrowserClientAckTest extends QueueBrowserAutoAckTest
+{
+
+
+ protected void setupSession() throws Exception
+ {
+ _clientSession = _clientConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserDupsOkTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserDupsOkTest.java
new file mode 100644
index 0000000000..b19809b8f2
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserDupsOkTest.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client;
+
+import javax.jms.Session;
+
+public class QueueBrowserDupsOkTest extends QueueBrowserAutoAckTest
+{
+ protected void setupSession() throws Exception
+ {
+ _clientSession = _clientConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserNoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserNoAckTest.java
new file mode 100644
index 0000000000..c97343464c
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserNoAckTest.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.client.AMQSession;
+
+
+public class QueueBrowserNoAckTest extends QueueBrowserAutoAckTest
+{
+
+ protected void setupSession() throws Exception
+ {
+ _clientSession = _clientConnection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserPreAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserPreAckTest.java
new file mode 100644
index 0000000000..bb1c0d3698
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserPreAckTest.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.client.AMQSession;
+
+public class QueueBrowserPreAckTest extends QueueBrowserAutoAckTest
+{
+
+ protected void setupSession() throws Exception
+ {
+ _clientSession = _clientConnection.createSession(false, AMQSession.PRE_ACKNOWLEDGE);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTransactedTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTransactedTest.java
new file mode 100644
index 0000000000..d79788f017
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTransactedTest.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client;
+
+import javax.jms.Session;
+
+public class QueueBrowserTransactedTest extends QueueBrowserAutoAckTest
+{
+ protected void setupSession() throws Exception
+ {
+ _clientSession = _clientConnection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
new file mode 100644
index 0000000000..b944f2ddd2
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
@@ -0,0 +1,194 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.test.utils.*;
+import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.ComparisonFailure;
+import junit.framework.AssertionFailedError;
+
+/**
+ * RollbackOrderTest, QPID-1864, QPID-1871
+ *
+ * Description:
+ *
+ * The problem that this test is exposing is that the dispatcher used to be capable
+ * of holding on to a message when stopped. This ment that when the rollback was
+ * called and the dispatcher stopped it may have hold of a message. So after all
+ * the local queues(preDeliveryQueue, SynchronousQueue, PostDeliveryTagQueue)
+ * have been cleared the client still had a single message, the one the
+ * dispatcher was holding on to.
+ *
+ * As a result the TxRollback operation would run and then release the dispatcher.
+ * Whilst the dispatcher would then proceed to reject the message it was holiding
+ * the Broker would already have resent that message so the rejection would silently
+ * fail.
+ *
+ * And the client would receieve that single message 'early', depending on the
+ * number of messages already recevied when rollback was called.
+ *
+ *
+ * Aims:
+ *
+ * The tests puts 50 messages on to the queue.
+ *
+ * The test then tries to cause the dispatcher to stop whilst it is in the process
+ * of moving a message from the preDeliveryQueue to a consumers sychronousQueue.
+ *
+ * To exercise this path we have 50 message flowing to the client to give the
+ * dispatcher a bit of work to do moving messages.
+ *
+ * Then we loop - 10 times
+ * - Validating that the first message received is always message 1.
+ * - Receive a few more so that there are a few messages to reject.
+ * - call rollback, to try and catch the dispatcher mid process.
+ *
+ * Outcome:
+ *
+ * The hope is that we catch the dispatcher mid process and cause a BasicReject
+ * to fail. Which will be indicated in the log but will also cause that failed
+ * rejected message to be the next to be delivered which will not be message 1
+ * as expected.
+ *
+ * We are testing a race condition here but we can check through the log file if
+ * the race condition occured. However, performing that check will only validate
+ * the problem exists and will not be suitable as part of a system test.
+ *
+ */
+public class RollbackOrderTest extends QpidBrokerTestCase
+{
+
+ private Connection _connection;
+ private Queue _queue;
+ private Session _session;
+ private MessageConsumer _consumer;
+
+ @Override public void setUp() throws Exception
+ {
+ super.setUp();
+ _connection = getConnection();
+
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _queue = _session.createQueue(getTestQueueName());
+ _consumer = _session.createConsumer(_queue);
+
+ //Send more messages so it is more likely that the dispatcher is
+ // processing on rollback.
+ sendMessage(_session, _queue, 50);
+ _session.commit();
+
+ }
+
+ public void testOrderingAfterRollback() throws Exception
+ {
+ //Start the session now so we
+ _connection.start();
+
+ for (int i = 0; i < 20; i++)
+ {
+ Message msg = _consumer.receive();
+ assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+ // Pull additional messages through so we have some reject work to do
+ for (int m=0; m < 5 ; m++)
+ {
+ _consumer.receive();
+ }
+
+ System.err.println("ROT-Rollback");
+ _logger.warn("ROT-Rollback");
+ _session.rollback();
+ }
+ }
+
+ public void testOrderingAfterRollbackOnMessage() throws Exception
+ {
+ final CountDownLatch count= new CountDownLatch(20);
+ final Exception exceptions[] = new Exception[20];
+ final AtomicBoolean failed = new AtomicBoolean(false);
+
+ _consumer.setMessageListener(new MessageListener()
+ {
+
+ public void onMessage(Message message)
+ {
+
+ Message msg = message;
+ try
+ {
+ count.countDown();
+ assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+ _session.rollback();
+ }
+ catch (JMSException e)
+ {
+ System.out.println("Error:" + e.getMessage());
+ exceptions[(int)count.getCount()] = e;
+ }
+ catch (AssertionFailedError cf)
+ {
+ // End Test if Equality test fails
+ while (count.getCount() != 0)
+ {
+ count.countDown();
+ }
+
+ System.out.println("Error:" + cf.getMessage());
+ System.err.println(cf.getMessage());
+ cf.printStackTrace();
+ failed.set(true);
+ }
+ }
+ });
+ //Start the session now so we
+ _connection.start();
+
+ count.await();
+
+ for (Exception e : exceptions)
+ {
+ if (e != null)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ failed.set(true);
+ }
+ }
+
+// _consumer.close();
+ _connection.close();
+
+ assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get());
+ }
+
+ @Override public void tearDown() throws Exception
+ {
+
+ drainQueue(_queue);
+
+ super.tearDown();
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
new file mode 100644
index 0000000000..fb389c5345
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -0,0 +1,1072 @@
+package org.apache.qpid.test.client.destination;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
+import org.apache.qpid.client.messaging.address.Node.QueueNode;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AddressBasedDestinationTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class);
+ private Connection _connection;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _connection = getConnection() ;
+ _connection.start();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ _connection.close();
+ super.tearDown();
+ }
+
+ public void testCreateOptions() throws Exception
+ {
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod;
+ MessageConsumer cons;
+
+ // default (create never, assert never) -------------------
+ // create never --------------------------------------------
+ String addr1 = "ADDR:testQueue1";
+ AMQDestination dest = new AMQAnyDestination(addr1);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ }
+ catch(JMSException e)
+ {
+ assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " +
+ "doesn't resolve to an exchange or a queue"));
+ }
+
+ try
+ {
+ prod = jmsSession.createProducer(dest);
+ }
+ catch(JMSException e)
+ {
+ assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " +
+ "doesn't resolve to an exchange or a queue"));
+ }
+
+ assertFalse("Queue should not be created",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, (QueueNode)dest.getSourceNode() ,true));
+
+
+ // create always -------------------------------------------
+ addr1 = "ADDR:testQueue1; { create: always }";
+ dest = new AMQAnyDestination(addr1);
+ cons = jmsSession.createConsumer(dest);
+
+ assertTrue("Queue not created as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("",
+ dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
+
+ // create receiver -----------------------------------------
+ addr1 = "ADDR:testQueue2; { create: receiver }";
+ dest = new AMQAnyDestination(addr1);
+ try
+ {
+ prod = jmsSession.createProducer(dest);
+ }
+ catch(JMSException e)
+ {
+ assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " +
+ "doesn't resolve to an exchange or a queue"));
+ }
+
+ assertFalse("Queue should not be created",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ cons = jmsSession.createConsumer(dest);
+
+ assertTrue("Queue not created as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("",
+ dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
+
+ // create never --------------------------------------------
+ addr1 = "ADDR:testQueue3; { create: never }";
+ dest = new AMQAnyDestination(addr1);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ }
+ catch(JMSException e)
+ {
+ assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
+ "doesn't resolve to an exchange or a queue"));
+ }
+
+ try
+ {
+ prod = jmsSession.createProducer(dest);
+ }
+ catch(JMSException e)
+ {
+ assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " +
+ "doesn't resolve to an exchange or a queue"));
+ }
+
+ assertFalse("Queue should not be created",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+ // create sender ------------------------------------------
+ addr1 = "ADDR:testQueue3; { create: sender }";
+ dest = new AMQAnyDestination(addr1);
+
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ }
+ catch(JMSException e)
+ {
+ assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
+ "doesn't resolve to an exchange or a queue"));
+ }
+ assertFalse("Queue should not be created",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+ prod = jmsSession.createProducer(dest);
+ assertTrue("Queue not created as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("",
+ dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
+
+ }
+
+ public void testCreateQueue() throws Exception
+ {
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ String addr = "ADDR:my-queue/hello; " +
+ "{" +
+ "create: always, " +
+ "node: " +
+ "{" +
+ "durable: true ," +
+ "x-declare: " +
+ "{" +
+ "auto-delete: true," +
+ "arguments: {" +
+ "'qpid.max_size': 1000," +
+ "'qpid.max_count': 100" +
+ "}" +
+ "}, " +
+ "x-bindings: [{exchange : 'amq.direct', key : test}, " +
+ "{exchange : 'amq.fanout'}," +
+ "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," +
+ "{exchange : 'amq.topic', key : 'a.#'}" +
+ "]," +
+
+ "}" +
+ "}";
+ AMQDestination dest = new AMQAnyDestination(addr);
+ MessageConsumer cons = jmsSession.createConsumer(dest);
+
+ assertTrue("Queue not created as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("",
+ dest.getAddressName(),dest.getAddressName(), null));
+
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+ dest.getAddressName(),"test", null));
+
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout",
+ dest.getAddressName(),null, null));
+
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+ dest.getAddressName(),"a.#", null));
+
+ Map<String,Object> args = new HashMap<String,Object>();
+ args.put("x-match","any");
+ args.put("dep","sales");
+ args.put("loc","CA");
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+ dest.getAddressName(),null, args));
+
+ }
+
+ public void testCreateExchange() throws Exception
+ {
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ String addr = "ADDR:my-exchange/hello; " +
+ "{ " +
+ "create: always, " +
+ "node: " +
+ "{" +
+ "type: topic, " +
+ "x-declare: " +
+ "{ " +
+ "type:direct, " +
+ "auto-delete: true, " +
+ "arguments: {" +
+ "'qpid.msg_sequence': 1, " +
+ "'qpid.ive': 1" +
+ "}" +
+ "}" +
+ "}" +
+ "}";
+
+ AMQDestination dest = new AMQAnyDestination(addr);
+ MessageConsumer cons = jmsSession.createConsumer(dest);
+
+ assertTrue("Exchange not created as expected",(
+ (AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true));
+
+ // The existence of the queue is implicitly tested here
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
+ dest.getQueueName(),"hello", Collections.<String, Object>emptyMap()));
+
+ // The client should be able to query and verify the existence of my-exchange (QPID-2774)
+ dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}");
+ cons = jmsSession.createConsumer(dest);
+ }
+
+ public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception
+ {
+ assertTrue("Queue not created as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("",
+ dest.getAddressName(),dest.getAddressName(), null));
+
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+ dest.getAddressName(),"test", null));
+
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+ dest.getAddressName(),"a.#", null));
+
+ Address a = Address.parse(headersBinding);
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+ dest.getAddressName(),null, a.getOptions()));
+ }
+
+ /**
+ * Test goal: Verifies that a producer and consumer creation triggers the correct
+ * behavior for x-bindings specified in node props.
+ */
+ public void testBindQueueWithArgs() throws Exception
+ {
+
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}";
+
+ String addr = "node: " +
+ "{" +
+ "durable: true ," +
+ "x-declare: " +
+ "{ " +
+ "auto-delete: true," +
+ "arguments: {'qpid.max_count': 100}" +
+ "}, " +
+ "x-bindings: [{exchange : 'amq.direct', key : test}, " +
+ "{exchange : 'amq.topic', key : 'a.#'}," +
+ headersBinding +
+ "]" +
+ "}" +
+ "}";
+
+
+ AMQDestination dest1 = new AMQAnyDestination("ADDR:my-queue/hello; {create: receiver, " +addr);
+ MessageConsumer cons = jmsSession.createConsumer(dest1);
+ checkQueueForBindings(jmsSession,dest1,headersBinding);
+
+ AMQDestination dest2 = new AMQAnyDestination("ADDR:my-queue2/hello; {create: sender, " +addr);
+ MessageProducer prod = jmsSession.createProducer(dest2);
+ checkQueueForBindings(jmsSession,dest2,headersBinding);
+ }
+
+ /**
+ * Test goal: Verifies the capacity property in address string is handled properly.
+ * Test strategy:
+ * Creates a destination with capacity 10.
+ * Creates consumer with client ack.
+ * Sends 15 messages to the queue, tries to receive 10.
+ * Tries to receive the 11th message and checks if its null.
+ *
+ * Since capacity is 10 and we haven't acked any messages,
+ * we should not have received the 11th.
+ *
+ * Acks the 10th message and verifies we receive the rest of the msgs.
+ */
+ public void testCapacity() throws Exception
+ {
+ verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: 10}}");
+ }
+
+ public void testSourceAndTargetCapacity() throws Exception
+ {
+ verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: {source:10, target:15} }}");
+ }
+
+ private void verifyCapacity(String address) throws Exception
+ {
+ if (!isCppBroker())
+ {
+ _logger.info("Not C++ broker, exiting test");
+ return;
+ }
+
+ Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+
+ AMQDestination dest = new AMQAnyDestination(address);
+ MessageConsumer cons = jmsSession.createConsumer(dest);
+ MessageProducer prod = jmsSession.createProducer(dest);
+
+ for (int i=0; i< 15; i++)
+ {
+ prod.send(jmsSession.createTextMessage("msg" + i) );
+ }
+
+ for (int i=0; i< 9; i++)
+ {
+ cons.receive();
+ }
+ Message msg = cons.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Should have received the 10th message",msg);
+ assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT));
+ msg.acknowledge();
+ for (int i=11; i<16; i++)
+ {
+ assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT));
+ }
+ }
+
+ /**
+ * Test goal: Verifies if the new address format based destinations
+ * can be specified and loaded correctly from the properties file.
+ *
+ */
+ public void testLoadingFromPropertiesFile() throws Exception
+ {
+ Hashtable<String,String> map = new Hashtable<String,String>();
+ map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " +
+ "{x-declare: {auto-delete: true, arguments : {'qpid.max_size': 1000}}}}");
+
+ map.put("destination.myQueue2", "ADDR:my-queue2; { create: receiver }");
+
+ map.put("destination.myQueue3", "BURL:direct://amq.direct/my-queue3?routingkey='test'");
+
+ PropertiesFileInitialContextFactory props = new PropertiesFileInitialContextFactory();
+ Context ctx = props.getInitialContext(map);
+
+ AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1");
+ AMQDestination dest2 = (AMQDestination)ctx.lookup("myQueue2");
+ AMQDestination dest3 = (AMQDestination)ctx.lookup("myQueue3");
+
+ Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons1 = jmsSession.createConsumer(dest1);
+ MessageConsumer cons2 = jmsSession.createConsumer(dest2);
+ MessageConsumer cons3 = jmsSession.createConsumer(dest3);
+
+ assertTrue("Destination1 was not created as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true));
+
+ assertTrue("Destination1 was not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("",
+ dest1.getAddressName(),dest1.getAddressName(), null));
+
+ assertTrue("Destination2 was not created as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true));
+
+ assertTrue("Destination2 was not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("",
+ dest2.getAddressName(),dest2.getAddressName(), null));
+
+ MessageProducer producer = jmsSession.createProducer(dest3);
+ producer.send(jmsSession.createTextMessage("Hello"));
+ TextMessage msg = (TextMessage)cons3.receive(1000);
+ assertEquals("Destination3 was not created as expected.",msg.getText(),"Hello");
+ }
+
+ /**
+ * Test goal: Verifies the subject can be overridden using "qpid.subject" message property.
+ * Test strategy: Creates and address with a default subject "topic1"
+ * Creates a message with "qpid.subject"="topic2" and sends it.
+ * Verifies that the message goes to "topic2" instead of "topic1".
+ */
+ public void testOverridingSubject() throws Exception
+ {
+ Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+
+ AMQDestination topic1 = new AMQAnyDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}");
+
+ MessageProducer prod = jmsSession.createProducer(topic1);
+
+ Message m = jmsSession.createTextMessage("Hello");
+ m.setStringProperty("qpid.subject", "topic2");
+
+ MessageConsumer consForTopic1 = jmsSession.createConsumer(topic1);
+ MessageConsumer consForTopic2 = jmsSession.createConsumer(new AMQAnyDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}"));
+
+ prod.send(m);
+ Message msg = consForTopic1.receive(1000);
+ assertNull("message shouldn't have been sent to topic1",msg);
+
+ msg = consForTopic2.receive(1000);
+ assertNotNull("message should have been sent to topic2",msg);
+
+ }
+
+ /**
+ * Test goal: Verifies that session.createQueue method
+ * works as expected both with the new and old addressing scheme.
+ */
+ public void testSessionCreateQueue() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ // Using the BURL method
+ Destination queue = ssn.createQueue("my-queue");
+ MessageProducer prod = ssn.createProducer(queue);
+ MessageConsumer cons = ssn.createConsumer(queue);
+ assertTrue("my-queue was not created as expected",(
+ (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ "my-queue","my-queue", null));
+
+ prod.send(ssn.createTextMessage("test"));
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+
+ // Using the ADDR method
+ // default case
+ queue = ssn.createQueue("ADDR:my-queue2");
+ try
+ {
+ prod = ssn.createProducer(queue);
+ fail("The client should throw an exception, since there is no queue present in the broker");
+ }
+ catch(Exception e)
+ {
+ String s = "The name 'my-queue2' supplied in the address " +
+ "doesn't resolve to an exchange or a queue";
+ assertEquals(s,e.getCause().getCause().getMessage());
+ }
+
+ // explicit create case
+ queue = ssn.createQueue("ADDR:my-queue2; {create: sender}");
+ prod = ssn.createProducer(queue);
+ cons = ssn.createConsumer(queue);
+ assertTrue("my-queue2 was not created as expected",(
+ (AMQSession_0_10)ssn).isQueueBound("",
+ "my-queue2","my-queue2", null));
+
+ prod.send(ssn.createTextMessage("test"));
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+
+ // Using the ADDR method to create a more complicated queue
+ String addr = "ADDR:amq.direct/x512; {create: receiver, " +
+ "link : {name : 'MY.RESP.QUEUE', " +
+ "x-declare : { auto-delete: true, exclusive: true, " +
+ "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }";
+ queue = ssn.createQueue(addr);
+
+ prod = ssn.createProducer(queue);
+ cons = ssn.createConsumer(queue);
+ assertTrue("MY.RESP.QUEUE was not created as expected",(
+ (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ "MY.RESP.QUEUE","x512", null));
+ cons.close();
+ }
+
+ /**
+ * Test goal: Verifies that session.creatTopic method
+ * works as expected both with the new and old addressing scheme.
+ */
+ public void testSessionCreateTopic() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ // Using the BURL method
+ Topic topic = ssn.createTopic("ACME");
+ MessageProducer prod = ssn.createProducer(topic);
+ MessageConsumer cons = ssn.createConsumer(topic);
+
+ prod.send(ssn.createTextMessage("test"));
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+
+ // Using the ADDR method
+ topic = ssn.createTopic("ADDR:ACME");
+ prod = ssn.createProducer(topic);
+ cons = ssn.createConsumer(topic);
+
+ prod.send(ssn.createTextMessage("test"));
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+
+ String addr = "ADDR:vehicles/bus; " +
+ "{ " +
+ "create: always, " +
+ "node: " +
+ "{" +
+ "type: topic, " +
+ "x-declare: " +
+ "{ " +
+ "type:direct, " +
+ "auto-delete: true, " +
+ "arguments: {" +
+ "'qpid.msg_sequence': 1, " +
+ "'qpid.ive': 1" +
+ "}" +
+ "}" +
+ "}, " +
+ "link: {name : my-topic, " +
+ "x-bindings: [{exchange : 'vehicles', key : car}, " +
+ "{exchange : 'vehicles', key : van}]" +
+ "}" +
+ "}";
+
+ // Using the ADDR method to create a more complicated topic
+ topic = ssn.createTopic(addr);
+ prod = ssn.createProducer(topic);
+ cons = ssn.createConsumer(topic);
+
+ assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ "my-topic","bus", null));
+
+ assertTrue("The queue was not bound to vehicle exchange using car as the binding key",(
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ "my-topic","car", null));
+
+ assertTrue("The queue was not bound to vehicle exchange using van as the binding key",(
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ "my-topic","van", null));
+
+ Message msg = ssn.createTextMessage("test");
+ msg.setStringProperty("qpid.subject", "van");
+ prod.send(msg);
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+ }
+
+ /**
+ * Test Goal : Verify the default subjects used for each exchange type.
+ * The default for amq.topic is "#" and for the rest it's ""
+ */
+ public void testDefaultSubjects() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer queueCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.direct"));
+ MessageConsumer topicCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.topic"));
+
+ MessageProducer queueProducer = ssn.createProducer(new AMQAnyDestination("ADDR:amq.direct"));
+ MessageProducer topicProducer1 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/usa.weather"));
+ MessageProducer topicProducer2 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/sales"));
+
+ queueProducer.send(ssn.createBytesMessage());
+ assertNotNull("The consumer subscribed to amq.direct " +
+ "with empty binding key should have received the message ",queueCons.receive(1000));
+
+ topicProducer1.send(ssn.createTextMessage("25c"));
+ assertEquals("The consumer subscribed to amq.topic " +
+ "with '#' binding key should have received the message ",
+ ((TextMessage)topicCons.receive(1000)).getText(),"25c");
+
+ topicProducer2.send(ssn.createTextMessage("1000"));
+ assertEquals("The consumer subscribed to amq.topic " +
+ "with '#' binding key should have received the message ",
+ ((TextMessage)topicCons.receive(1000)).getText(),"1000");
+ }
+
+ /**
+ * Test Goal : Verify that 'mode : browse' works as expected using a regular consumer.
+ * This indirectly tests ring queues as well.
+ */
+ public void testBrowseMode() throws Exception
+ {
+
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ String addr = "ADDR:my-ring-queue; {create: always, mode: browse, " +
+ "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " +
+ "x-declare:{arguments : {'qpid.policy_type':ring, 'qpid.max_count':2}}}}";
+
+ Destination dest = ssn.createQueue(addr);
+ MessageConsumer browseCons = ssn.createConsumer(dest);
+ MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test"));
+
+ prod.send(ssn.createTextMessage("Test1"));
+ prod.send(ssn.createTextMessage("Test2"));
+
+ TextMessage msg = (TextMessage)browseCons.receive(1000);
+ assertEquals("Didn't receive the first message",msg.getText(),"Test1");
+
+ msg = (TextMessage)browseCons.receive(1000);
+ assertEquals("Didn't receive the first message",msg.getText(),"Test2");
+
+ browseCons.close();
+ prod.send(ssn.createTextMessage("Test3"));
+ browseCons = ssn.createConsumer(dest);
+
+ msg = (TextMessage)browseCons.receive(1000);
+ assertEquals("Should receive the second message again",msg.getText(),"Test2");
+
+ msg = (TextMessage)browseCons.receive(1000);
+ assertEquals("Should receive the third message since it's a ring queue",msg.getText(),"Test3");
+
+ assertNull("Should not receive anymore messages",browseCons.receive(500));
+ }
+
+ /**
+ * Test Goal : When the same destination is used when creating two consumers,
+ * If the type == topic, verify that unique subscription queues are created,
+ * unless subscription queue has a name.
+ *
+ * If the type == queue, same queue should be shared.
+ */
+ public void testSubscriptionForSameDestination() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Destination dest = ssn.createTopic("ADDR:amq.topic/foo");
+ MessageConsumer consumer1 = ssn.createConsumer(dest);
+ MessageConsumer consumer2 = ssn.createConsumer(dest);
+ MessageProducer prod = ssn.createProducer(dest);
+
+ prod.send(ssn.createTextMessage("A"));
+ TextMessage m = (TextMessage)consumer1.receive(1000);
+ assertEquals("Consumer1 should recieve message A",m.getText(),"A");
+ m = (TextMessage)consumer2.receive(1000);
+ assertEquals("Consumer2 should recieve message A",m.getText(),"A");
+
+ consumer1.close();
+ consumer2.close();
+
+ dest = ssn.createTopic("ADDR:amq.topic/foo; { link: {name: my-queue}}");
+ consumer1 = ssn.createConsumer(dest);
+ try
+ {
+ consumer2 = ssn.createConsumer(dest);
+ fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber");
+ }
+ catch(Exception e)
+ {
+ }
+ _connection.close();
+
+ _connection = getConnection() ;
+ _connection.start();
+ ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ dest = ssn.createTopic("ADDR:my_queue; {create: always}");
+ consumer1 = ssn.createConsumer(dest);
+ consumer2 = ssn.createConsumer(dest);
+ prod = ssn.createProducer(dest);
+
+ prod.send(ssn.createTextMessage("A"));
+ Message m1 = consumer1.receive(1000);
+ Message m2 = consumer2.receive(1000);
+
+ if (m1 != null)
+ {
+ assertNull("Only one consumer should receive the message",m2);
+ }
+ else
+ {
+ assertNotNull("Only one consumer should receive the message",m2);
+ }
+ }
+
+ public void testXBindingsWithoutExchangeName() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ String addr = "ADDR:MRKT; " +
+ "{" +
+ "create: receiver," +
+ "node : {type: topic, x-declare: {type: topic} }," +
+ "link:{" +
+ "name: my-topic," +
+ "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" +
+ "}" +
+ "}";
+
+ // Using the ADDR method to create a more complicated topic
+ MessageConsumer cons = ssn.createConsumer(new AMQAnyDestination(addr));
+
+ assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",(
+ (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ "my-topic","NYSE.#", null));
+
+ assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",(
+ (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ "my-topic","NASDAQ.#", null));
+
+ assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",(
+ (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ "my-topic","CNTL.#", null));
+
+ MessageProducer prod = ssn.createProducer(ssn.createTopic(addr));
+ Message msg = ssn.createTextMessage("test");
+ msg.setStringProperty("qpid.subject", "NASDAQ.ABCD");
+ prod.send(msg);
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+ }
+
+ public void testXSubscribeOverrides() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ String str = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+ Destination dest = ssn.createTopic(str);
+ MessageConsumer consumer1 = ssn.createConsumer(dest);
+ try
+ {
+ MessageConsumer consumer2 = ssn.createConsumer(dest);
+ fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber");
+ }
+ catch(Exception e)
+ {
+ }
+ }
+
+ public void testQueueReceiversAndTopicSubscriber() throws Exception
+ {
+ Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}");
+ Topic topic = new AMQAnyDestination("ADDR:amq.topic/test");
+
+ QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueReceiver receiver = qSession.createReceiver(queue);
+
+ TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = tSession.createSubscriber(topic);
+
+ Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue"));
+ prod1.send(ssn.createTextMessage("test1"));
+
+ MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test"));
+ prod2.send(ssn.createTextMessage("test2"));
+
+ Message msg1 = receiver.receive();
+ assertNotNull(msg1);
+ assertEquals("test1",((TextMessage)msg1).getText());
+
+ Message msg2 = sub.receive();
+ assertNotNull(msg2);
+ assertEquals("test2",((TextMessage)msg2).getText());
+ }
+
+ public void testDurableSubscriber() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ Properties props = new Properties();
+ props.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+ props.setProperty("destination.address1", "ADDR:amq.topic");
+ props.setProperty("destination.address2", "ADDR:amq.direct/test");
+ String addrStr = "ADDR:amq.topic/test; {link:{name: my-topic," +
+ "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}";
+ props.setProperty("destination.address3", addrStr);
+ props.setProperty("topic.address4", "hello.world");
+ addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+ props.setProperty("destination.address5", addrStr);
+
+ Context ctx = new InitialContext(props);
+
+ for (int i=1; i < 5; i++)
+ {
+ Topic topic = (Topic) ctx.lookup("address"+i);
+ createDurableSubscriber(ctx,ssn,"address"+i,topic);
+ }
+
+ Topic topic = ssn.createTopic("ADDR:news.us");
+ createDurableSubscriber(ctx,ssn,"my-dest",topic);
+
+ Topic namedQueue = (Topic) ctx.lookup("address5");
+ try
+ {
+ createDurableSubscriber(ctx,ssn,"my-queue",namedQueue);
+ fail("Exception should be thrown. Durable subscribers cannot be created for Queues");
+ }
+ catch(JMSException e)
+ {
+ assertEquals("Durable subscribers can only be created for Topics",
+ e.getMessage());
+ }
+ }
+
+ private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic) throws Exception
+ {
+ MessageConsumer cons = ssn.createDurableSubscriber(topic, destName);
+ MessageProducer prod = ssn.createProducer(topic);
+
+ Message m = ssn.createTextMessage(destName);
+ prod.send(m);
+ Message msg = cons.receive(1000);
+ assertNotNull(msg);
+ assertEquals(destName,((TextMessage)msg).getText());
+ ssn.unsubscribe(destName);
+ }
+
+ public void testDeleteOptions() throws Exception
+ {
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons;
+
+ // default (create never, assert never) -------------------
+ // create never --------------------------------------------
+ String addr1 = "ADDR:testQueue1;{create: always, delete: always}";
+ AMQDestination dest = new AMQAnyDestination(addr1);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ cons.close();
+ }
+ catch(JMSException e)
+ {
+ fail("Exception should not be thrown. Exception thrown is : " + e);
+ }
+
+ assertFalse("Queue not deleted as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
+ dest = new AMQAnyDestination(addr2);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ cons.close();
+ }
+ catch(JMSException e)
+ {
+ fail("Exception should not be thrown. Exception thrown is : " + e);
+ }
+
+ assertFalse("Queue not deleted as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
+ dest = new AMQAnyDestination(addr3);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ MessageProducer prod = jmsSession.createProducer(dest);
+ prod.close();
+ }
+ catch(JMSException e)
+ {
+ fail("Exception should not be thrown. Exception thrown is : " + e);
+ }
+
+ assertFalse("Queue not deleted as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ }
+
+ /**
+ * Test Goals : 1. Test if the client sets the correct accept mode for unreliable
+ * and at-least-once.
+ * 2. Test default reliability modes for Queues and Topics.
+ * 3. Test if an exception is thrown if exactly-once is used.
+ * 4. Test if an exception is thrown if at-least-once is used with topics.
+ *
+ * Test Strategy: For goal #1 & #2
+ * For unreliable and at-least-once the test tries to receives messages
+ * in client_ack mode but does not ack the messages.
+ * It will then close the session, recreate a new session
+ * and will then try to verify the queue depth.
+ * For unreliable the messages should have been taken off the queue.
+ * For at-least-once the messages should be put back onto the queue.
+ *
+ */
+
+ public void testReliabilityOptions() throws Exception
+ {
+ String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}";
+ acceptModeTest(addr1,0);
+
+ String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}";
+ acceptModeTest(addr2,2);
+
+ // Default accept-mode for topics
+ acceptModeTest("ADDR:amq.topic/test",0);
+
+ // Default accept-mode for queues
+ acceptModeTest("ADDR:testQueue1;{create: always}",2);
+
+ String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}";
+ try
+ {
+ AMQAnyDestination dest = new AMQAnyDestination(addr3);
+ fail("An exception should be thrown indicating it's an unsupported type");
+ }
+ catch(Exception e)
+ {
+ assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported"));
+ }
+
+ String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}";
+ try
+ {
+ AMQAnyDestination dest = new AMQAnyDestination(addr4);
+ Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons = ssn.createConsumer(dest);
+ fail("An exception should be thrown indicating it's an unsupported combination");
+ }
+ catch(Exception e)
+ {
+ assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics"));
+ }
+ }
+
+ private void acceptModeTest(String address, int expectedQueueDepth) throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons;
+ MessageProducer prod;
+
+ AMQDestination dest = new AMQAnyDestination(address);
+ cons = ssn.createConsumer(dest);
+ prod = ssn.createProducer(dest);
+
+ for (int i=0; i < expectedQueueDepth; i++)
+ {
+ prod.send(ssn.createTextMessage("Msg" + i));
+ }
+
+ for (int i=0; i < expectedQueueDepth; i++)
+ {
+ Message msg = cons.receive(1000);
+ assertNotNull(msg);
+ assertEquals("Msg" + i,((TextMessage)msg).getText());
+ }
+
+ ssn.close();
+ ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+ long queueDepth = ((AMQSession) ssn).getQueueDepth(dest);
+ assertEquals(expectedQueueDepth,queueDepth);
+ cons.close();
+ prod.close();
+ }
+
+ public void testDestinationOnSend() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons = ssn.createConsumer(ssn.createTopic("amq.topic/test"));
+ MessageProducer prod = ssn.createProducer(null);
+
+ Queue queue = ssn.createQueue("amq.topic/test");
+ prod.send(queue,ssn.createTextMessage("A"));
+
+ Message msg = cons.receive(1000);
+ assertNotNull(msg);
+ assertEquals("A",((TextMessage)msg).getText());
+ prod.close();
+ cons.close();
+ }
+
+ public void testReplyToWithNamelessExchange() throws Exception
+ {
+ System.setProperty("qpid.declare_exchanges","false");
+ replyToTest("ADDR:my-queue;{create: always}");
+ System.setProperty("qpid.declare_exchanges","true");
+ }
+
+ public void testReplyToWithCustomExchange() throws Exception
+ {
+ replyToTest("ADDR:hello;{create:always,node:{type:topic}}");
+ }
+
+ private void replyToTest(String replyTo) throws Exception
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination replyToDest = AMQDestination.createDestination(replyTo);
+ MessageConsumer replyToCons = session.createConsumer(replyToDest);
+
+ Destination dest = session.createQueue("amq.direct/test");
+
+ MessageConsumer cons = session.createConsumer(dest);
+ MessageProducer prod = session.createProducer(dest);
+ Message m = session.createTextMessage("test");
+ m.setJMSReplyTo(replyToDest);
+ prod.send(m);
+
+ Message msg = cons.receive();
+ MessageProducer prodR = session.createProducer(msg.getJMSReplyTo());
+ prodR.send(session.createTextMessage("x"));
+
+ Message m1 = replyToCons.receive();
+ assertNotNull("The reply to consumer should have received the messsage",m1);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
new file mode 100644
index 0000000000..fcbab273e5
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -0,0 +1,389 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.test.client.failover;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.NamingException;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+
+public class FailoverTest extends FailoverBaseCase implements ConnectionListener
+{
+ private static final Logger _logger = Logger.getLogger(FailoverTest.class);
+
+ private static final String QUEUE = "queue";
+ private static final int DEFAULT_NUM_MESSAGES = 10;
+ private static final int DEFAULT_SEED = 20080921;
+ protected int numMessages = 0;
+ protected Connection connection;
+ private Session producerSession;
+ private Queue queue;
+ private MessageProducer producer;
+ private Session consumerSession;
+ private MessageConsumer consumer;
+
+ private CountDownLatch failoverComplete;
+ private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
+ private int seed;
+ private Random rand;
+ private int _currentPort = getFailingPort();
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ numMessages = Integer.getInteger("profile.failoverMsgCount",DEFAULT_NUM_MESSAGES);
+ seed = Integer.getInteger("profile.failoverRandomSeed",DEFAULT_SEED);
+ rand = new Random(seed);
+
+ connection = getConnection();
+ ((AMQConnection) connection).setConnectionListener(this);
+ connection.start();
+ failoverComplete = new CountDownLatch(1);
+ }
+
+ protected void init(boolean transacted, int mode) throws JMSException, NamingException
+ {
+ consumerSession = connection.createSession(transacted, mode);
+ queue = consumerSession.createQueue(getName()+System.currentTimeMillis());
+ consumer = consumerSession.createConsumer(queue);
+
+ producerSession = connection.createSession(transacted, mode);
+ producer = producerSession.createProducer(queue);
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (Exception e)
+ {
+
+ }
+
+ super.tearDown();
+ }
+
+ private void consumeMessages(int startIndex,int endIndex, boolean transacted) throws JMSException
+ {
+ Message msg;
+ _logger.debug("**************** Receive (Start: " + startIndex + ", End:" + endIndex + ")***********************");
+
+ for (int i = startIndex; i < endIndex; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull("Message " + i + " was null!", msg);
+
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+ _logger.debug("Received : " + ((TextMessage) msg).getText());
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+
+ assertEquals("Invalid message order","message " + i, ((TextMessage) msg).getText());
+
+ }
+ _logger.debug("***********************************************************");
+
+ if (transacted)
+ {
+ consumerSession.commit();
+ }
+ }
+
+ private void sendMessages(int startIndex,int endIndex, boolean transacted) throws JMSException
+ {
+ _logger.debug("**************** Send (Start: " + startIndex + ", End:" + endIndex + ")***********************");
+
+ for (int i = startIndex; i < endIndex; i++)
+ {
+ producer.send(producerSession.createTextMessage("message " + i));
+
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+ _logger.debug("Sending message"+i);
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+ }
+
+ _logger.debug("***********************************************************");
+
+ if (transacted)
+ {
+ producerSession.commit();
+ }
+ }
+
+ public void testP2PFailover() throws Exception
+ {
+ testP2PFailover(numMessages, true,true, false);
+ }
+
+ public void testP2PFailoverWithMessagesLeftToConsumeAndProduce() throws Exception
+ {
+ if (CLUSTERED)
+ {
+ testP2PFailover(numMessages, false,false, false);
+ }
+ }
+
+ public void testP2PFailoverWithMessagesLeftToConsume() throws Exception
+ {
+ if (CLUSTERED)
+ {
+ testP2PFailover(numMessages, false,true, false);
+ }
+ }
+
+ public void testP2PFailoverTransacted() throws Exception
+ {
+ testP2PFailover(numMessages, true,true, false);
+ }
+
+ public void testP2PFailoverTransactedWithMessagesLeftToConsumeAndProduce() throws Exception
+ {
+ // Currently the cluster does not support transactions that span a failover
+ if (CLUSTERED)
+ {
+ testP2PFailover(numMessages, false,false, false);
+ }
+ }
+
+ private void testP2PFailover(int totalMessages, boolean consumeAll, boolean produceAll , boolean transacted) throws JMSException, NamingException
+ {
+ init(transacted, Session.AUTO_ACKNOWLEDGE);
+ runP2PFailover(totalMessages,consumeAll, produceAll , transacted);
+ }
+
+ protected void runP2PFailover(int totalMessages, boolean consumeAll, boolean produceAll , boolean transacted) throws JMSException, NamingException
+ {
+ Message msg = null;
+ int toProduce = totalMessages;
+
+ _logger.debug("===================================================================");
+ _logger.debug("Total messages used for the test " + totalMessages + " messages");
+ _logger.debug("===================================================================");
+
+ if (!produceAll)
+ {
+ toProduce = totalMessages - rand.nextInt(totalMessages);
+ }
+
+ _logger.debug("==================");
+ _logger.debug("Sending " + toProduce + " messages");
+ _logger.debug("==================");
+
+ sendMessages(0,toProduce, transacted);
+
+ // Consume some messages
+ int toConsume = toProduce;
+ if (!consumeAll)
+ {
+ toConsume = toProduce - rand.nextInt(toProduce);
+ }
+
+ consumeMessages(0,toConsume, transacted);
+
+ _logger.debug("==================");
+ _logger.debug("Consuming " + toConsume + " messages");
+ _logger.debug("==================");
+
+ _logger.info("Failing over");
+
+ causeFailure(_currentPort, DEFAULT_FAILOVER_TIME);
+
+ // Check that you produce and consume the rest of messages.
+ _logger.debug("==================");
+ _logger.debug("Sending " + (totalMessages-toProduce) + " messages");
+ _logger.debug("==================");
+
+ sendMessages(toProduce,totalMessages, transacted);
+ consumeMessages(toConsume,totalMessages, transacted);
+
+ _logger.debug("==================");
+ _logger.debug("Consuming " + (totalMessages-toConsume) + " messages");
+ _logger.debug("==================");
+ }
+
+ private void causeFailure(int port, long delay)
+ {
+
+ failBroker(port);
+
+ _logger.info("Awaiting Failover completion");
+ try
+ {
+ if (!failoverComplete.await(delay, TimeUnit.MILLISECONDS))
+ {
+ fail("failover did not complete");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ //evil ignore IE.
+ }
+ }
+
+ public void testClientAckFailover() throws Exception
+ {
+ init(false, Session.CLIENT_ACKNOWLEDGE);
+ sendMessages(0,1, false);
+ Message msg = consumer.receive();
+ assertNotNull("Expected msgs not received", msg);
+
+ causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME);
+
+ Exception failure = null;
+ try
+ {
+ msg.acknowledge();
+ }
+ catch (Exception e)
+ {
+ failure = e;
+ }
+ assertNotNull("Exception should be thrown", failure);
+ }
+
+ /**
+ * The client used to have a fixed timeout of 4 minutes after which failover would no longer work.
+ * Check that this code has not regressed
+ *
+ * @throws Exception if something unexpected occurs in the test.
+ */
+
+ public void test4MinuteFailover() throws Exception
+ {
+ ConnectionURL connectionURL = getConnectionFactory().getConnectionURL();
+
+ int RETRIES = 4;
+ int DELAY = 60000;
+
+ //Set up a long delay on and large number of retries
+ BrokerDetails details = connectionURL.getBrokerDetails(1);
+ details.setProperty(BrokerDetails.OPTIONS_RETRY, String.valueOf(RETRIES));
+ details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, String.valueOf(DELAY));
+
+ connection = new AMQConnection(connectionURL, null);
+
+ ((AMQConnection) connection).setConnectionListener(this);
+
+ //Start the connection
+ connection.start();
+
+ long FAILOVER_DELAY = ((long)RETRIES * (long)DELAY);
+
+ // Use Nano seconds as it is more accurate for comparision.
+ long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
+
+ //Fail the first broker
+ causeFailure(getFailingPort(), FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
+
+ //Reconnection should occur
+ assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
+ }
+
+
+ /**
+ * The idea is to run a failover test in a loop by failing over
+ * to the other broker each time.
+ */
+ public void testFailoverInALoop() throws Exception
+ {
+ if (!CLUSTERED)
+ {
+ return;
+ }
+
+ int iterations = Integer.getInteger("profile.failoverIterations",0);
+ boolean useAltPort = false;
+ int altPort = FAILING_PORT;
+ int stdPort = DEFAULT_PORT;
+ init(false, Session.AUTO_ACKNOWLEDGE);
+ for (int i=0; i < iterations; i++)
+ {
+ _logger.debug("===================================================================");
+ _logger.debug("Failover In a loop : iteration number " + i);
+ _logger.debug("===================================================================");
+
+ runP2PFailover(numMessages, false,false, false);
+ startBroker(_currentPort);
+ if (useAltPort)
+ {
+ _currentPort = altPort;
+ useAltPort = false;
+ }
+ else
+ {
+ _currentPort = stdPort;
+ useAltPort = true;
+ }
+
+ }
+ //To prevent any failover logic being initiated when we shutdown the brokers.
+ connection.close();
+
+ // Shutdown the brokers
+ stopBroker(altPort);
+ stopBroker(stdPort);
+
+ }
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ failoverComplete.countDown();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
new file mode 100644
index 0000000000..a7efe4922b
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
@@ -0,0 +1,373 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.message;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.TabularData;
+import java.nio.BufferOverflowException;
+import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * From the API Docs getJMSDestination:
+ *
+ * When a message is received, its JMSDestination value must be equivalent to
+ * the value assigned when it was sent.
+ */
+public class JMSDestinationTest extends QpidBrokerTestCase
+{
+
+ private Connection _connection;
+ private Session _session;
+
+ private static final String USER = "admin";
+ private CountDownLatch _receiveMessage;
+ private Message _message;
+
+ public void setUp() throws Exception
+ {
+ //Ensure JMX management is enabled for MovedToQueue test
+ setConfigurationProperty("management.enabled", "true");
+
+ super.setUp();
+
+ _connection = getConnection();
+
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ /**
+ * Test a message sent to a queue comes back with JMSDestination queue
+ *
+ * @throws Exception
+ */
+ public void testQueue() throws Exception
+ {
+
+ Queue queue = _session.createQueue(getTestQueueName());
+
+ MessageConsumer consumer = _session.createConsumer(queue);
+
+ sendMessage(_session, queue, 1);
+
+ _connection.start();
+
+ Message message = consumer.receive(10000);
+
+ assertNotNull("Message should not be null", message);
+
+ Destination destination = message.getJMSDestination();
+
+ assertNotNull("JMSDestination should not be null", destination);
+
+ assertEquals("Incorrect Destination type", queue.getClass(), destination.getClass());
+ }
+
+ /**
+ * Test a message sent to a topic comes back with JMSDestination topic
+ *
+ * @throws Exception
+ */
+ public void testTopic() throws Exception
+ {
+
+ Topic topic = _session.createTopic(getTestQueueName() + "Topic");
+
+ MessageConsumer consumer = _session.createConsumer(topic);
+
+ sendMessage(_session, topic, 1);
+
+ _connection.start();
+
+ Message message = consumer.receive(10000);
+
+ assertNotNull("Message should not be null", message);
+
+ Destination destination = message.getJMSDestination();
+
+ assertNotNull("JMSDestination should not be null", destination);
+
+ assertEquals("Incorrect Destination type", topic.getClass(), destination.getClass());
+ }
+
+ /**
+ * Test a message sent to a topic then moved on the broker
+ * comes back with JMSDestination queue.
+ *
+ * i.e. The client is not just setting the value to be the same as the
+ * current consumer destination.
+ *
+ * This test can only be run against the Java broker as it uses JMX to move
+ * messages between queues.
+ *
+ * @throws Exception
+ */
+ public void testMovedToQueue() throws Exception
+ {
+ // Setup JMXUtils
+ JMXTestUtils jmxUtils = new JMXTestUtils(this, USER, USER);
+ jmxUtils.setUp();
+ // Open the JMX Connection
+ jmxUtils.open();
+ try
+ {
+
+ Queue queue = _session.createQueue(getTestQueueName());
+
+ _session.createConsumer(queue).close();
+
+ sendMessage(_session, queue, 1);
+
+ Topic topic = _session.createTopic(getTestQueueName() + "Topic");
+
+ MessageConsumer consumer = _session.createConsumer(topic);
+
+ // Use Management to move message.
+
+ ManagedQueue managedQueue = jmxUtils.
+ getManagedObject(ManagedQueue.class,
+ jmxUtils.getQueueObjectName(getConnectionFactory().getVirtualPath().substring(1),
+ getTestQueueName()));
+
+ // Find the first message on the queue
+ TabularData data = managedQueue.viewMessages(1L, 2L);
+
+ Iterator values = data.values().iterator();
+ assertTrue("No Messages found via JMX", values.hasNext());
+
+ // Get its message ID
+ Long msgID = (Long) ((CompositeDataSupport) values.next()).get("AMQ MessageId");
+
+ // Start the connection and consume message that has been moved to the
+ // queue
+ _connection.start();
+
+ Message message = consumer.receive(1000);
+
+ //Validate we don't have a message on the queue before we start
+ assertNull("Message should be null", message);
+
+ // Move it to from the topic to the queue
+ managedQueue.moveMessages(msgID, msgID, ((AMQTopic) topic).getQueueName());
+
+ // Retrieve the newly moved message
+ message = consumer.receive(1000);
+
+ assertNotNull("Message should not be null", message);
+
+ Destination destination = message.getJMSDestination();
+
+ assertNotNull("JMSDestination should not be null", destination);
+
+ assertEquals("Incorrect Destination type", queue.getClass(), destination.getClass());
+
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+
+ }
+
+ /**
+ * Test a message sent to a queue comes back with JMSDestination queue
+ * when received via a message listener
+ *
+ * @throws Exception
+ */
+ public void testQueueAsync() throws Exception
+ {
+
+ Queue queue = _session.createQueue(getTestQueueName());
+
+ MessageConsumer consumer = _session.createConsumer(queue);
+
+ sendMessage(_session, queue, 1);
+
+ _connection.start();
+
+ _message = null;
+ _receiveMessage = new CountDownLatch(1);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _message = message;
+ _receiveMessage.countDown();
+ }
+ });
+
+ assertTrue("Timed out waiting for message to be received ", _receiveMessage.await(1, TimeUnit.SECONDS));
+
+ assertNotNull("Message should not be null", _message);
+
+ Destination destination = _message.getJMSDestination();
+
+ assertNotNull("JMSDestination should not be null", destination);
+
+ assertEquals("Incorrect Destination type", queue.getClass(), destination.getClass());
+ }
+
+ /**
+ * Test a message received without the JMS_QPID_DESTTYPE can be resent
+ * and correctly have the property set.
+ *
+ * To do this we need to create a 0-10 connection and send a message
+ * which is then received by a 0-8/9 client.
+ *
+ * @throws Exception
+ */
+ public void testReceiveResend() throws Exception
+ {
+ // Create a 0-10 Connection and send message
+ setSystemProperty(ClientProperties.AMQP_VERSION, "0-10");
+
+ Connection connection010 = getConnection();
+
+ Session session010 = connection010.createSession(true, Session.SESSION_TRANSACTED);
+
+ // Create queue for testing
+ Queue queue = session010.createQueue(getTestQueueName());
+
+ // Ensure queue exists
+ session010.createConsumer(queue).close();
+
+ sendMessage(session010, queue, 1);
+
+ // Close the 010 connection
+ connection010.close();
+
+ // Create a 0-8 Connection and receive message
+ setSystemProperty(ClientProperties.AMQP_VERSION, "0-8");
+
+ Connection connection08 = getConnection();
+
+ Session session08 = connection08.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer = session08.createConsumer(queue);
+
+ connection08.start();
+
+ Message message = consumer.receive(1000);
+
+ assertNotNull("Didn't receive 0-10 message.", message);
+
+ // Validate that JMS_QPID_DESTTYPE is not set
+ try
+ {
+ message.getIntProperty(CustomJMSXProperty.JMS_QPID_DESTTYPE.toString());
+ fail("JMS_QPID_DESTTYPE should not be set, so should throw NumberFormatException");
+ }
+ catch (NumberFormatException nfe)
+ {
+
+ }
+
+ // Resend message back to queue and validate that
+ // a) getJMSDestination works without the JMS_QPID_DESTTYPE
+ // b) we can actually send without a BufferOverFlow.
+
+ MessageProducer producer = session08.createProducer(queue);
+
+ try
+ {
+ producer.send(message);
+ }
+ catch (BufferOverflowException bofe)
+ {
+ // Print the stack trace so we can validate where the execption occured.
+ bofe.printStackTrace();
+ fail("BufferOverflowException thrown during send");
+ }
+
+ message = consumer.receive(1000);
+
+ assertNotNull("Didn't receive recent 0-8 message.", message);
+
+ // Validate that JMS_QPID_DESTTYPE is not set
+ assertEquals("JMS_QPID_DESTTYPE should be set to a Queue", AMQDestination.QUEUE_TYPE,
+ message.getIntProperty(CustomJMSXProperty.JMS_QPID_DESTTYPE.toString()));
+
+ }
+
+ /**
+ * Send a message to a custom exchange and then verify
+ * the message received has the proper destination set
+ *
+ * @throws Exception
+ */
+ public void testGetDestinationWithCustomExchange() throws Exception
+ {
+
+ AMQDestination dest = new AMQAnyDestination(new AMQShortString("my-exchange"),
+ new AMQShortString("direct"),
+ new AMQShortString("test"),
+ false,
+ false,
+ new AMQShortString("test"),
+ false,
+ new AMQShortString[]{new AMQShortString("test")});
+
+ // to force the creation of my-exchange.
+ sendMessage(_session, dest, 1);
+
+ MessageProducer prod = _session.createProducer(dest);
+
+ MessageConsumer consumer = _session.createConsumer(dest);
+
+ _connection.start();
+
+ sendMessage(_session, dest, 1);
+
+ Message message = consumer.receive(10000);
+
+ assertNotNull("Message should not be null", message);
+
+ Destination destination = message.getJMSDestination();
+
+ assertNotNull("JMSDestination should not be null", destination);
+
+ assertEquals("Incorrect Destination name", "my-exchange", dest.getExchangeName().asString());
+ assertEquals("Incorrect Destination type", "direct", dest.getExchangeClass().asString());
+ assertEquals("Incorrect Routing Key", "test", dest.getRoutingKey().asString());
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
new file mode 100644
index 0000000000..1071861d47
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
@@ -0,0 +1,257 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.message;
+
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.UUID;
+
+public class MessageToStringTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+ private Session _session;
+ private Queue _queue;
+ MessageConsumer _consumer;
+ private static final String BYTE_TEST = "MapByteTest";
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ //Create Producer put some messages on the queue
+ _connection = getConnection();
+
+ //Create Consumer
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String queueName = getTestQueueName();
+
+ //Create Queue
+ ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, false, false);
+ _queue = _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+
+
+ _consumer = _session.createConsumer(_queue);
+
+ _connection.start();
+ }
+
+ public void tearDown() throws Exception
+ {
+ //clean up
+ _connection.close();
+
+ super.tearDown();
+ }
+
+ public void testBytesMessage() throws JMSException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ BytesMessage testMessage = _session.createBytesMessage();
+
+ //Convert UUID into bytes for transit
+ byte[] testBytes = test.toString().getBytes();
+
+ testMessage.writeBytes(testBytes);
+
+ sendAndTest(testMessage, testBytes);
+ }
+
+ public void testMapMessage() throws JMSException, IOException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ MapMessage testMessage = _session.createMapMessage();
+
+ byte[] testBytes = convertToBytes(test);
+
+ testMessage.setBytes(BYTE_TEST, testBytes);
+
+ sendAndTest(testMessage, testBytes);
+ }
+
+ public void testObjectMessage() throws JMSException
+ {
+ MessageProducer producer = _session.createProducer(_queue);
+
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ Message testMessage = _session.createObjectMessage(test);
+
+ sendAndTest(testMessage, test);
+ }
+
+ public void testStreamMessage() throws JMSException, IOException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ StreamMessage testMessage = _session.createStreamMessage();
+
+ byte[] testBytes = convertToBytes(test);
+
+ testMessage.writeBytes(testBytes);
+
+ sendAndTest(testMessage, testBytes);
+ }
+
+ public void testTextMessage() throws JMSException, IOException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ TextMessage testMessage = _session.createTextMessage();
+
+ String stringValue = String.valueOf(test);
+ byte[] testBytes = stringValue.getBytes();
+
+ testMessage.setText(stringValue);
+
+ sendAndTest(testMessage, testBytes);
+ }
+
+ //***************** Helpers
+
+ private void sendAndTest(Message message, Object testBytes) throws JMSException
+ {
+ MessageProducer producer = _session.createProducer(_queue);
+
+ producer.send(message);
+
+ Message receivedMessage = _consumer.receive(1000);
+
+ assertNotNull("Message was not received.", receivedMessage);
+
+ //Ensure that to calling toString doesn't error and that doing this doesn't break next tests.
+ assertNotNull("Message returned null from toString", receivedMessage.toString());
+
+ byte[] byteResults;
+ UUID result;
+
+ try
+ {
+ if (receivedMessage instanceof ObjectMessage)
+ {
+ result = (UUID) ((ObjectMessage) receivedMessage).getObject();
+ assertEquals("UUIDs were not equal", testBytes, result);
+ }
+ else
+ {
+ byteResults = getBytes(receivedMessage, ((byte[]) testBytes).length);
+ assertBytesEquals("UUIDs were not equal", (byte[]) testBytes, byteResults);
+ }
+ }
+ catch (Exception e)
+ {
+ fail(e.getMessage());
+ }
+
+ }
+
+ private void assertBytesEquals(String message, byte[] expected, byte[] actual)
+ {
+ if (expected.length == actual.length)
+ {
+ int index = 0;
+ boolean failed = false;
+ for (byte b : expected)
+ {
+ if (actual[index++] != b)
+ {
+ failed = true;
+ break;
+ }
+ }
+
+ if (!failed)
+ {
+ return;
+ }
+
+ }
+
+ fail(message);
+ }
+
+ private byte[] getBytes(Message receivedMessage, int testBytesLength) throws JMSException
+ {
+ byte[] byteResults = new byte[testBytesLength];
+
+ if (receivedMessage instanceof BytesMessage)
+ {
+ assertEquals(testBytesLength, ((BytesMessage) receivedMessage).readBytes(byteResults));
+ }
+ else if (receivedMessage instanceof StreamMessage)
+ {
+ assertEquals(testBytesLength, ((StreamMessage) receivedMessage).readBytes(byteResults));
+ }
+ else if (receivedMessage instanceof MapMessage)
+ {
+ byteResults = ((MapMessage) receivedMessage).getBytes(BYTE_TEST);
+ assertEquals(testBytesLength, byteResults.length);
+ }
+ else if (receivedMessage instanceof TextMessage)
+ {
+ byteResults = ((TextMessage) receivedMessage).getText().getBytes();
+ assertEquals(testBytesLength, byteResults.length);
+ }
+
+
+ return byteResults;
+ }
+
+ private byte[] convertToBytes(UUID test) throws IOException
+ {
+ //Convert UUID into bytes for transit
+ ObjectOutput out;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ out = new ObjectOutputStream(bos);
+ out.writeObject(test);
+ out.close();
+
+ return bos.toByteArray();
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
new file mode 100644
index 0000000000..147a03be0c
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.message;
+
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.UUID;
+
+public class ObjectMessageTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+ private Session _session;
+ MessageConsumer _consumer;
+ MessageProducer _producer;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ //Create Connection
+ _connection = getConnection();
+
+
+ //Create Session
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Create Queue
+ String queueName = getTestQueueName();
+ ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, false, false);
+ Queue queue = _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+
+ //Create Consumer
+ _consumer = _session.createConsumer(queue);
+
+ //Create Producer
+ _producer = _session.createProducer(queue);
+
+ _connection.start();
+ }
+
+ public void tearDown() throws Exception
+ {
+ //clean up
+ _connection.close();
+
+ super.tearDown();
+ }
+
+ public void testGetAndSend() throws JMSException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ ObjectMessage testMessage = _session.createObjectMessage(test);
+
+ Object o = testMessage.getObject();
+
+ assertNotNull("Object was null", o);
+
+ sendAndTest(testMessage, test);
+ }
+
+ public void testSend() throws JMSException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ ObjectMessage testMessage = _session.createObjectMessage(test);
+
+ sendAndTest(testMessage, test);
+ }
+
+ public void testTostringAndSend() throws JMSException
+ {
+ //Create Sample Message using UUIDs
+ UUID test = UUID.randomUUID();
+
+ ObjectMessage testMessage = _session.createObjectMessage(test);
+
+ assertNotNull("Object was null", testMessage.toString());
+
+ sendAndTest(testMessage, test);
+ }
+
+ public void testSendNull() throws JMSException
+ {
+
+ ObjectMessage testMessage = _session.createObjectMessage(null);
+
+ assertNotNull("Object was null", testMessage.toString());
+
+ sendAndTest(testMessage, null);
+ }
+
+ //***************** Helpers
+
+ private void sendAndTest(ObjectMessage message, Object sent) throws JMSException
+ {
+ _producer.send(message);
+
+ ObjectMessage receivedMessage = (ObjectMessage) _consumer.receive(1000);
+
+ assertNotNull("Message was not received.", receivedMessage);
+
+ UUID result = (UUID) receivedMessage.getObject();
+
+ assertEquals("First read: UUIDs were not equal", sent, result);
+
+ result = (UUID) receivedMessage.getObject();
+
+ assertEquals("Second read: UUIDs were not equal", sent, result);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
new file mode 100644
index 0000000000..49a608190d
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
@@ -0,0 +1,310 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.message;
+
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.DeliveryMode;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SelectorTest extends QpidBrokerTestCase implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(SelectorTest.class);
+
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private int count;
+ public String _connectionString = "vm://:1";
+ private static final String INVALID_SELECTOR = "Cost LIKE 5";
+ CountDownLatch _responseLatch = new CountDownLatch(1);
+
+ private static final String BAD_MATHS_SELECTOR = " 1 % 5";
+
+ private static final long RECIEVE_TIMEOUT = 1000;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ init((AMQConnection) getConnection("guest", "guest"));
+ }
+
+ private void init(AMQConnection connection) throws JMSException
+ {
+ init(connection, new AMQQueue(connection, getTestQueueName(), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws JMSException
+ {
+ _connection = connection;
+ _destination = destination;
+ connection.start();
+ }
+
+ public void onMessage(Message message)
+ {
+ count++;
+ _logger.info("Got Message:" + message);
+ _responseLatch.countDown();
+ }
+
+ public void testUsingOnMessage() throws Exception
+ {
+ String selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
+ // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+
+ Session session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ // _session.createConsumer(destination).setMessageListener(this);
+ session.createConsumer(_destination, selector).setMessageListener(this);
+
+ try
+ {
+ Message msg = session.createTextMessage("Message");
+ msg.setJMSPriority(1);
+ msg.setIntProperty("Cost", 2);
+ msg.setStringProperty("property-with-hyphen", "wibble");
+ msg.setJMSType("Special");
+
+ _logger.info("Sending Message:" + msg);
+
+ ((BasicMessageProducer) session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT);
+ _logger.info("Message sent, waiting for response...");
+
+ _responseLatch.await();
+
+ if (count > 0)
+ {
+ _logger.info("Got message");
+ }
+
+ if (count == 0)
+ {
+ fail("Did not get message!");
+ // throw new RuntimeException("Did not get message!");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ System.out.println("SUCCESS!!");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ }
+
+ }
+
+ public void testUnparsableSelectors() throws Exception
+ {
+ AMQSession session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ boolean caught = false;
+
+ //Try Creating a Browser
+ try
+ {
+ session.createBrowser(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ //Try Creating a Consumer
+ try
+ {
+ session.createConsumer(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ //Try Creating a Receiever
+ try
+ {
+ session.createReceiver(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ try
+ {
+ session.createReceiver(session.createQueue("Ping"), BAD_MATHS_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ }
+
+ public void testRuntimeSelectorError() throws JMSException
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(_destination , "testproperty % 5 = 1");
+ MessageProducer producer = session.createProducer(_destination);
+ Message sentMsg = session.createTextMessage();
+
+ sentMsg.setIntProperty("testproperty", 1); // 1 % 5
+ producer.send(sentMsg);
+ Message recvd = consumer.receive(RECIEVE_TIMEOUT);
+ assertNotNull(recvd);
+
+ sentMsg.setStringProperty("testproperty", "hello"); // "hello" % 5 makes no sense
+ producer.send(sentMsg);
+ try
+ {
+ recvd = consumer.receive(RECIEVE_TIMEOUT);
+ assertNull(recvd);
+ }
+ catch (Exception e)
+ {
+
+ }
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+ public void testSelectorWithJMSMessageID() throws Exception
+ {
+ Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer prod = session.createProducer(_destination);
+ MessageConsumer consumer = session.createConsumer(_destination,"JMSMessageID IS NOT NULL");
+
+ for (int i=0; i<2; i++)
+ {
+ Message msg = session.createTextMessage("Msg" + String.valueOf(i));
+ prod.send(msg);
+ }
+ session.commit();
+
+ Message msg1 = consumer.receive(1000);
+ Message msg2 = consumer.receive(1000);
+
+ Assert.assertNotNull("Msg1 should not be null", msg1);
+ Assert.assertNotNull("Msg2 should not be null", msg2);
+
+ session.commit();
+
+ prod.setDisableMessageID(true);
+
+ for (int i=0; i<2; i++)
+ {
+ Message msg = session.createTextMessage("Msg" + String.valueOf(i));
+ prod.send(msg);
+ }
+
+ session.commit();
+ Message msg3 = consumer.receive(1000);
+ Assert.assertNull("Msg3 should be null", msg3);
+ session.commit();
+ consumer = session.createConsumer(_destination,"JMSMessageID IS NULL");
+
+ Message msg4 = consumer.receive(1000);
+ Message msg5 = consumer.receive(1000);
+ session.commit();
+ Assert.assertNotNull("Msg4 should not be null", msg4);
+ Assert.assertNotNull("Msg5 should not be null", msg5);
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ SelectorTest test = new SelectorTest();
+ test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0];
+
+ try
+ {
+ while (true)
+ {
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.setUp();
+ }
+ test.testUsingOnMessage();
+
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.tearDown();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java
new file mode 100644
index 0000000000..14fbd1deb6
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.queue;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.test.client.destination.AddressBasedDestinationTest;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LVQTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(LVQTest.class);
+ private Connection _connection;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _connection = getConnection() ;
+ _connection.start();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ _connection.close();
+ super.tearDown();
+ }
+
+ public void testLVQQueue() throws Exception
+ {
+ String addr = "ADDR:my-lvq-queue; {create: always, " +
+ "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " +
+ "x-declare:{arguments : {'qpid.last_value_queue':1}}}}";
+
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ Destination dest = ssn.createQueue(addr);
+ MessageConsumer consumer = ssn.createConsumer(dest);
+ MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test"));
+
+ for (int i=0; i<40; i++)
+ {
+ Message msg = ssn.createTextMessage(String.valueOf(i));
+ msg.setStringProperty("qpid.LVQ_key", String.valueOf(i%10));
+ prod.send(msg);
+ }
+
+ for (int i=0; i<10; i++)
+ {
+ TextMessage msg = (TextMessage)consumer.receive(500);
+ assertEquals("The last value is not reflected","3" + i,msg.getText());
+ }
+
+ assertNull("There should not be anymore messages",consumer.receive(500));
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
new file mode 100644
index 0000000000..e3557efd97
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
@@ -0,0 +1,110 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.test.client.queue;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueuePolicyTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(QueuePolicyTest.class);
+ private Connection _connection;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _connection = getConnection() ;
+ _connection.start();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ _connection.close();
+ super.tearDown();
+ }
+
+ public void testRejectPolicy() throws Exception
+ {
+ String addr = "ADDR:queue; {create: always, " +
+ "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " +
+ "x-declare:{ arguments : {'qpid.max_count':5} }}}";
+
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ Destination dest = ssn.createQueue(addr);
+ MessageConsumer consumer = ssn.createConsumer(dest);
+ MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test"));
+
+ for (int i=0; i<6; i++)
+ {
+ prod.send(ssn.createMessage());
+ }
+
+ try
+ {
+ prod.send(ssn.createMessage());
+ ((AMQSession)ssn).sync();
+ fail("The client did not receive an exception after exceeding the queue limit");
+ }
+ catch (AMQException e)
+ {
+ assertTrue("The correct error code is not set",e.getErrorCode().toString().contains("506"));
+ }
+ }
+
+ public void testRingPolicy() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ String addr = "ADDR:my-ring-queue; {create: always, " +
+ "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " +
+ "x-declare:{arguments : {'qpid.policy_type':ring, 'qpid.max_count':2} }}}";
+
+ Destination dest = ssn.createQueue(addr);
+ MessageConsumer consumer = ssn.createConsumer(dest);
+ MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test"));
+
+ prod.send(ssn.createTextMessage("Test1"));
+ prod.send(ssn.createTextMessage("Test2"));
+ prod.send(ssn.createTextMessage("Test3"));
+
+ TextMessage msg = (TextMessage)consumer.receive(1000);
+ assertEquals("The consumer should receive the msg with body='Test2'",msg.getText(),"Test2");
+
+ msg = (TextMessage)consumer.receive(1000);
+ assertEquals("The consumer should receive the msg with body='Test3'",msg.getText(),"Test3");
+
+ prod.send(ssn.createTextMessage("Test4"));
+ assertEquals("The consumer should receive the msg with body='Test4'",msg.getText(),"Test3");
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
new file mode 100644
index 0000000000..85565a33b0
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.timeouts;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This tests that when the commit takes a long time(due to POST_COMMIT_DELAY) that the commit does not timeout
+ * This test must be run in conjunction with SyncWaiteTimeoutDelay or be run with POST_COMMIT_DELAY > 30s to ensure
+ * that the default value is being replaced.
+ */
+public class SyncWaitDelayTest extends QpidBrokerTestCase
+{
+ protected static final Logger _logger = LoggerFactory.getLogger(SyncWaitDelayTest.class);
+
+ private String VIRTUALHOST = "test";
+ protected long POST_COMMIT_DELAY = 1000L;
+ protected long SYNC_WRITE_TIMEOUT = POST_COMMIT_DELAY + 1000;
+
+ protected Connection _connection;
+ protected Session _session;
+ protected Queue _queue;
+ protected MessageConsumer _consumer;
+
+ public void setUp() throws Exception
+ {
+
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.class", "org.apache.qpid.server.store.SlowMessageStore");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY));
+ setConfigurationProperty("management.enabled", "false");
+
+
+ super.setUp();
+
+ //Set the syncWrite timeout to be just larger than the delay on the commitTran.
+ setSystemProperty("amqj.default_syncwrite_timeout", String.valueOf(SYNC_WRITE_TIMEOUT));
+
+ _connection = getConnection();
+
+ //Create Queue
+ _queue = (Queue) getInitialContext().lookup("queue");
+
+ //Create Consumer
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ //Ensure Queue exists
+ _session.createConsumer(_queue).close();
+ }
+
+
+ public void test() throws JMSException
+ {
+ MessageProducer producer = _session.createProducer(_queue);
+
+ Message message = _session.createTextMessage("Message");
+
+ producer.send(message);
+
+ long start = System.nanoTime();
+
+ _logger.info("Calling Commit");
+
+ try
+ {
+ _session.commit();
+ long end = System.nanoTime();
+ long time = (end - start);
+ // As we are using Nano time ensure to multiply up the millis.
+ assertTrue("Commit was quickier than the built in delay:" + time, time > 1000000L * POST_COMMIT_DELAY);
+ assertFalse("Commit was slower than the built in default", time > 1000000L * 1000 * 30);
+ }
+ catch (JMSException e)
+ {
+ fail(e.getMessage());
+ }
+
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
new file mode 100644
index 0000000000..1a23eee8ab
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.timeouts;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQTimeoutException;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+/** This tests that when the syncWrite timeout is set that it will timeout on that time rather than the default 30s. */
+public class SyncWaitTimeoutDelayTest extends SyncWaitDelayTest
+{
+ protected static final Logger _logger = Logger.getLogger(SyncWaitTimeoutDelayTest.class);
+
+ public void setUp() throws Exception
+ {
+ POST_COMMIT_DELAY = 1000L;
+
+ //Set the syncWrite timeout to be less than the COMMIT Delay so we can validate that it is being applied
+ SYNC_WRITE_TIMEOUT = 500L;
+
+ super.setUp();
+ }
+
+ @Override
+ public void test() throws JMSException
+ {
+ MessageProducer producer = _session.createProducer(_queue);
+
+ Message message = _session.createTextMessage("Message");
+
+ producer.send(message);
+
+ _logger.info("Calling Commit");
+
+ long start = System.nanoTime();
+ try
+ {
+ _session.commit();
+ fail("Commit occured even though syncWait timeout is shorter than delay in commit");
+ }
+ catch (JMSException e)
+ {
+ assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException);
+ assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Failed to commit"));
+ // As we are using Nano time ensure to multiply up the millis.
+ assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30));
+ }
+
+ }
+}