summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/test/java/org/apache/qpid/server/queue
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/test/java/org/apache/qpid/server/queue')
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java159
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java574
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java604
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ModelTest.java342
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java250
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java303
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java494
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueBindTest.java130
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java175
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java216
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java538
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java397
12 files changed, 4182 insertions, 0 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
new file mode 100644
index 0000000000..21e3bfa055
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test DeapQueueConsumerWithSelector
+ * Summary:
+ * Prior to M4 the broker had a different queue model which pre-processed the
+ * messages on the queue for any connecting subscription that had a selector.
+ *
+ * If the queue had a lot of data then this may take a long time to process
+ * to such an extent that the subscription creation may time out. During this
+ * pre-process phase the virtualhost would be come unresposive.
+ *
+ * Our solution was to allow the timeout to be adjusted QPID-1119, which allowed
+ * the subscription to connect but did not address the unresponsiveness.
+ *
+ * The new queue model introduced in M4 resolved this.
+ *
+ * This test is to validate that the new queueing model does indeed remove the
+ * long pre-processing phase and allow immediate subscription so that there is
+ * no unresponsive period.
+ *
+ * Test Strategy:
+ *
+ * Add 100k messages to the queue with a numberic header property that will
+ * allow later subscribers to use as in a selector.
+ *
+ * Connect the subscriber and time how long it takes to connect.
+ *
+ * Finally consume all the messages from the queue to clean up.
+ */
+public class DeepQueueConsumeWithSelector extends QpidBrokerTestCase implements MessageListener
+{
+
+ private static final int MESSAGE_COUNT = 10000;
+ private static final int BATCH_SIZE = MESSAGE_COUNT / 10;
+
+ private CountDownLatch _receviedLatch = new CountDownLatch(MESSAGE_COUNT);
+
+ protected long SYNC_WRITE_TIMEOUT = 120000L;
+
+
+ public void setUp() throws Exception
+ {
+ //Set the syncWrite timeout to be just larger than the delay on the commitTran.
+ setSystemProperty("amqj.default_syncwrite_timeout", String.valueOf(SYNC_WRITE_TIMEOUT));
+
+ super.setUp();
+ }
+
+ public void test() throws Exception
+ {
+ // Create Connection
+ Connection connection = getConnection();
+ Session session = ((AMQConnection)connection).createSession(true, Session.SESSION_TRANSACTED, 100000);
+
+ Queue queue = (Queue) getInitialContext().lookup("queue");
+
+ // Validate that the destination exists
+ session.createConsumer(queue).close();
+
+ // Send Messages
+ sendMessage(session, queue, MESSAGE_COUNT, BATCH_SIZE);
+
+ session.close();
+
+ session = ((AMQConnection) connection).createSession(false, Session.AUTO_ACKNOWLEDGE);//, 100000);
+
+
+ // Setup Selector to perform a few calculations which will slow it down
+ String selector = "((\"" + INDEX + "\" % 1) = 0) AND ('" + INDEX + "' IS NOT NULL) AND ('" + INDEX + "' <> -1)";
+
+ // Setup timing
+ long start = System.nanoTime();
+
+ System.err.println("Create Consumer");
+ // Connect Consumer
+ MessageConsumer consumer = session.createConsumer(queue, selector);
+ consumer.setMessageListener(this);
+
+ // Validate timing details
+ long end = System.nanoTime();
+
+ System.err.println("Subscription time took:" + (end - start));
+
+ // Consume Messages
+ connection.start();
+
+
+
+ assertTrue("Messages took to long to be received :"+_receviedLatch.getCount(),
+ _receviedLatch.await(SYNC_WRITE_TIMEOUT, TimeUnit.MILLISECONDS ));
+
+ }
+
+ @Override
+ public Message createNextMessage(Session session, int msgCount) throws JMSException
+ {
+ Message message = super.createNextMessage(session,msgCount);
+
+ if ((msgCount % BATCH_SIZE) == 0 )
+ {
+ System.err.println("Sent:"+msgCount);
+ }
+
+ return message;
+ }
+
+ public void onMessage(Message message)
+ {
+ _receviedLatch.countDown();
+ int msgCount = 0;
+ try
+ {
+ msgCount = message.getIntProperty(INDEX);
+ }
+ catch (JMSException e)
+ {
+ //ignore
+ }
+ if ((msgCount % BATCH_SIZE) == 0 )
+ {
+ System.err.println("Received:"+msgCount);
+ }
+
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
new file mode 100644
index 0000000000..dc30c02951
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
@@ -0,0 +1,574 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.AMQBindingURL;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class LastValueQueueTest extends QpidBrokerTestCase
+{
+ private static final Logger LOGGER = Logger.getLogger(LastValueQueueTest.class);
+
+ private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg";
+ private static final String KEY_PROPERTY = "key";
+
+ private static final int MSG_COUNT = 400;
+
+ private String _queueName;
+ private Queue _queue;
+ private Connection _producerConnection;
+ private MessageProducer _producer;
+ private Session _producerSession;
+ private Connection _consumerConnection;
+ private Session _consumerSession;
+ private MessageConsumer _consumer;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _queueName = getTestQueueName();
+ _producerConnection = getConnection();
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testConflation() throws Exception
+ {
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(nextMessage(msg, _producerSession));
+ }
+
+ _producer.close();
+ _producerSession.close();
+ _producerConnection.close();
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+ Message received;
+
+ List<Message> messages = new ArrayList<Message>();
+ while((received = _consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+ }
+
+ public void testConflationWithRelease() throws Exception
+ {
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT/2; msg++)
+ {
+ _producer.send(nextMessage(msg, _producerSession));
+
+ }
+
+ // HACK to do something synchronous
+ ((AMQSession<?,?>)_producerSession).sync();
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+ Message received;
+ List<Message> messages = new ArrayList<Message>();
+ while((received = _consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+
+ _consumerSession.close();
+ _consumerConnection.close();
+
+
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(nextMessage(msg, _producerSession));
+ }
+
+
+ // HACK to do something synchronous
+ ((AMQSession<?,?>)_producerSession).sync();
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+ messages = new ArrayList<Message>();
+ while((received = _consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+
+ }
+
+
+ public void testConflationWithReleaseAfterNewPublish() throws Exception
+ {
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT/2; msg++)
+ {
+ _producer.send(nextMessage(msg, _producerSession));
+ }
+
+ // HACK to do something synchronous
+ ((AMQSession<?,?>)_producerSession).sync();
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+ Message received;
+ List<Message> messages = new ArrayList<Message>();
+ while((received = _consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+
+ _consumer.close();
+
+ for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(nextMessage(msg, _producerSession));
+ }
+
+ // HACK to do something synchronous
+ ((AMQSession<?,?>)_producerSession).sync();
+
+
+ // this causes the "old" messages to be released
+ _consumerSession.close();
+ _consumerConnection.close();
+
+
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+ messages = new ArrayList<Message>();
+ while((received = _consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+
+ }
+
+ public void testConflatedQueueDepth() throws Exception
+ {
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(nextMessage(msg, _producerSession));
+ }
+
+ final long queueDepth = ((AMQSession<?, ?>)_producerSession).getQueueDepth((AMQDestination)_queue, true);
+
+ assertEquals(10, queueDepth);
+ }
+
+ public void testConflationBrowser() throws Exception
+ {
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(nextMessage(msg, _producerSession));
+
+ }
+
+ ((AMQSession<?,?>)_producerSession).sync();
+
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'");
+ AMQQueue browseQueue = new AMQQueue(url);
+
+ _consumer = _consumerSession.createConsumer(browseQueue);
+ _consumerConnection.start();
+ Message received;
+ List<Message> messages = new ArrayList<Message>();
+ while((received = _consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+
+ messages.clear();
+
+ _producer.send(nextMessage(MSG_COUNT, _producerSession));
+
+ ((AMQSession<?,?>)_producerSession).sync();
+
+ while((received = _consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+ assertEquals("Unexpected number of messages received",1,messages.size());
+ assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+
+
+ _producer.close();
+ _producerSession.close();
+ _producerConnection.close();
+ }
+
+ public void testConflation2Browsers() throws Exception
+ {
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(nextMessage(msg, _producerSession));
+ }
+
+ ((AMQSession<?,?>)_producerSession).sync();
+
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'");
+ AMQQueue browseQueue = new AMQQueue(url);
+
+ _consumer = _consumerSession.createConsumer(browseQueue);
+ MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue);
+ _consumerConnection.start();
+ List<Message> messages = new ArrayList<Message>();
+ List<Message> messages2 = new ArrayList<Message>();
+ Message received = _consumer.receive(1000);
+ Message received2 = consumer2.receive(1000);
+
+ while(received!=null || received2!=null)
+ {
+ if(received != null)
+ {
+ messages.add(received);
+ }
+ if(received2 != null)
+ {
+ messages2.add(received2);
+ }
+
+
+ received = _consumer.receive(1000);
+ received2 = consumer2.receive(1000);
+
+ }
+
+ assertEquals("Unexpected number of messages received on first browser",10,messages.size());
+ assertEquals("Unexpected number of messages received on second browser",10,messages2.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ msg = messages2.get(i);
+ assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+
+
+ _producer.close();
+ _producerSession.close();
+ _producerConnection.close();
+ }
+
+ public void testParallelProductionAndConsumption() throws Exception
+ {
+ createConflationQueue(_producerSession);
+
+ // Start producing threads that send messages
+ BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1");
+ messageProducer1.startSendingMessages();
+ BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2");
+ messageProducer2.startSendingMessages();
+
+ Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1);
+
+ messageProducer1.join();
+ messageProducer2.join();
+
+ final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey();
+ assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size());
+ final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey();
+ assertEquals(lastSentMessages1, lastSentMessages2);
+
+ assertEquals("The last message sent for each key should match the last message received for that key",
+ lastSentMessages1, lastReceivedMessages);
+
+ assertNull("Unexpected exception from background producer thread", messageProducer1.getException());
+ }
+
+ private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer) throws Exception
+ {
+ producer.waitUntilQuarterOfMessagesSentToEncourageConflation();
+
+ _consumerConnection = getConnection();
+ int smallPrefetchToEncourageConflation = 1;
+ _consumerSession = ((AMQConnection)_consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE, smallPrefetchToEncourageConflation);
+
+ LOGGER.info("Starting to receive");
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+ Map<String, Integer> messageSequenceNumbersByKey = new HashMap<String, Integer>();
+
+ Message message;
+ int numberOfShutdownsReceived = 0;
+ int numberOfMessagesReceived = 0;
+ while(numberOfShutdownsReceived < 2)
+ {
+ message = _consumer.receive(10000);
+ assertNotNull(message);
+
+ if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN))
+ {
+ numberOfShutdownsReceived++;
+ }
+ else
+ {
+ numberOfMessagesReceived++;
+ putMessageInMap(message, messageSequenceNumbersByKey);
+ }
+ }
+
+ LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total");
+
+ return messageSequenceNumbersByKey;
+ }
+
+ private void putMessageInMap(Message message, Map<String, Integer> messageSequenceNumbersByKey) throws JMSException
+ {
+ String keyValue = message.getStringProperty(KEY_PROPERTY);
+ Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY);
+ messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber);
+ }
+
+ private class BackgroundMessageProducer
+ {
+ static final String SHUTDOWN = "SHUTDOWN";
+
+ private final String _threadName;
+
+ private volatile Exception _exception;
+
+ private Thread _thread;
+ private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<String, Integer>();
+ private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4);
+
+ public BackgroundMessageProducer(String threadName)
+ {
+ _threadName = threadName;
+ }
+
+ public void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException
+ {
+ final long latchTimeout = 60000;
+ boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS);
+ assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", success);
+ LOGGER.info("Quarter of messages sent");
+ }
+
+ public Exception getException()
+ {
+ return _exception;
+ }
+
+ public Map<String, Integer> getMessageSequenceNumbersByKey()
+ {
+ return Collections.unmodifiableMap(_messageSequenceNumbersByKey);
+ }
+
+ public void startSendingMessages()
+ {
+ Runnable messageSender = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ LOGGER.info("Starting to send in background thread");
+ Connection producerConnection = getConnection();
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer backgroundProducer = producerSession.createProducer(_queue);
+ for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++)
+ {
+ Message message = nextMessage(messageNumber, producerSession, 2);
+ backgroundProducer.send(message);
+
+ putMessageInMap(message, _messageSequenceNumbersByKey);
+ _quarterOfMessagesSentLatch.countDown();
+ }
+
+ Message shutdownMessage = producerSession.createMessage();
+ shutdownMessage.setBooleanProperty(SHUTDOWN, true);
+ backgroundProducer.send(shutdownMessage);
+
+ LOGGER.info("Finished sending in background thread");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ _thread = new Thread(messageSender);
+ _thread.setName(_threadName);
+ _thread.start();
+ }
+
+ public void join() throws InterruptedException
+ {
+ final int timeoutInMillis = 120000;
+ _thread.join(timeoutInMillis);
+ assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive());
+ }
+ }
+
+ private void createConflationQueue(Session session) throws AMQException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.last_value_queue_key",KEY_PROPERTY);
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, arguments);
+ _queue = new AMQQueue("amq.direct", _queueName);
+ ((AMQSession<?,?>) session).declareAndBind((AMQDestination)_queue);
+ }
+
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
+ {
+ return nextMessage(msg, producerSession, 10);
+ }
+
+ private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msg);
+
+ send.setStringProperty(KEY_PROPERTY, String.valueOf(msg % numberOfUniqueKeyValues));
+ send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg);
+
+ return send;
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
new file mode 100644
index 0000000000..cb8ced4ddb
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
@@ -0,0 +1,604 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class MessageGroupQueueTest extends QpidBrokerTestCase
+{
+ protected final String QUEUE = "MessageGroupQueue";
+
+ private Connection producerConnection;
+ private MessageProducer producer;
+ private Session producerSession;
+ private Queue queue;
+ private Connection consumerConnection;
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ producerConnection = getConnection();
+ producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ producerConnection.start();
+
+ consumerConnection = getConnection();
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ producerConnection.close();
+ consumerConnection.close();
+ super.tearDown();
+ }
+
+
+ public void testSimpleGroupAssignment() throws Exception
+ {
+ simpleGroupAssignment(false);
+ }
+
+ public void testSharedGroupSimpleGroupAssignment() throws Exception
+ {
+ simpleGroupAssignment(true);
+ }
+
+
+ /**
+ * Pre populate the queue with messages with groups as follows
+ *
+ * ONE
+ * TWO
+ * ONE
+ * TWO
+ *
+ * Create two consumers with prefetch of 1, the first consumer should then be assigned group ONE, the second
+ * consumer assigned group TWO if they are started in sequence.
+ *
+ * Thus doing
+ *
+ * c1 <--- (ONE)
+ * c2 <--- (TWO)
+ * c2 ack --->
+ *
+ * c2 should now be able to receive a second message from group TWO (skipping over the message from group ONE)
+ *
+ * i.e.
+ *
+ * c2 <--- (TWO)
+ * c2 ack --->
+ * c1 <--- (ONE)
+ * c1 ack --->
+ *
+ */
+ private void simpleGroupAssignment(boolean sharedGroups) throws AMQException, JMSException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
+ if(sharedGroups)
+ {
+ arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
+ }
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ String[] groups = { "ONE", "TWO"};
+
+ for (int msg = 0; msg < 4; msg++)
+ {
+ producer.send(createMessage(msg, groups[msg % groups.length]));
+ }
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+ consumerConnection.start();
+ Message cs1Received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received first message", cs1Received);
+
+ Message cs2Received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received first message", cs2Received);
+
+ cs2Received.acknowledge();
+
+ Message cs2Received2 = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received second message", cs2Received2);
+ assertEquals("Differing groups", cs2Received2.getStringProperty("group"),
+ cs2Received.getStringProperty("group"));
+
+ cs1Received.acknowledge();
+ Message cs1Received2 = consumer1.receive(1000);
+
+ assertNotNull("Consumer 1 should have received second message", cs1Received2);
+ assertEquals("Differing groups", cs1Received2.getStringProperty("group"),
+ cs1Received.getStringProperty("group"));
+
+ cs1Received2.acknowledge();
+ cs2Received2.acknowledge();
+
+ assertNull(consumer1.receive(1000));
+ assertNull(consumer2.receive(1000));
+ }
+
+
+ public void testConsumerCloseGroupAssignment() throws Exception
+ {
+ consumerCloseGroupAssignment(false);
+ }
+
+ public void testSharedGroupConsumerCloseGroupAssignment() throws Exception
+ {
+ consumerCloseGroupAssignment(true);
+ }
+
+ /**
+ *
+ * Tests that upon closing a consumer, groups previously assigned to that consumer are reassigned to a different
+ * consumer.
+ *
+ * Pre-populate the queue as ONE, ONE, TWO, ONE
+ *
+ * create in sequence two consumers
+ *
+ * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
+ *
+ * Then close c1 before acking.
+ *
+ * If we now attempt to receive from c2, then the remaining messages in group ONE should be available (which
+ * requires c2 to go "backwards" in the queue).
+ *
+ **/
+ private void consumerCloseGroupAssignment(boolean sharedGroups) throws AMQException, JMSException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
+ if(sharedGroups)
+ {
+ arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
+ }
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ producer.send(createMessage(1, "ONE"));
+ producer.send(createMessage(2, "ONE"));
+ producer.send(createMessage(3, "TWO"));
+ producer.send(createMessage(4, "ONE"));
+
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
+
+ consumerConnection.start();
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+ Message cs1Received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received first message", cs1Received);
+ assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg"));
+
+ Message cs2Received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received first message", cs2Received);
+ assertEquals("incorrect message received", 3, cs2Received.getIntProperty("msg"));
+ cs2.commit();
+
+ Message cs2Received2 = consumer2.receive(1000);
+
+ assertNull("Consumer 2 should not yet have received a second message", cs2Received2);
+
+ consumer1.close();
+
+ cs1.commit();
+ Message cs2Received3 = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received second message", cs2Received3);
+ assertEquals("Unexpected group", "ONE", cs2Received3.getStringProperty("group"));
+ assertEquals("incorrect message received", 2, cs2Received3.getIntProperty("msg"));
+
+ cs2.commit();
+
+
+ Message cs2Received4 = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received third message", cs2Received4);
+ assertEquals("Unexpected group", "ONE", cs2Received4.getStringProperty("group"));
+ assertEquals("incorrect message received", 4, cs2Received4.getIntProperty("msg"));
+ cs2.commit();
+
+ assertNull(consumer2.receive(1000));
+ }
+
+
+
+
+ public void testConsumerCloseWithRelease() throws Exception
+ {
+ consumerCloseWithRelease(false);
+ }
+
+ public void testSharedGroupConsumerCloseWithRelease() throws Exception
+ {
+ consumerCloseWithRelease(true);
+ }
+
+
+ /**
+ *
+ * Tests that upon closing a consumer and its session, groups previously assigned to that consumer are reassigned
+ * toa different consumer, including messages which were previously delivered but have now been released.
+ *
+ * Pre-populate the queue as ONE, ONE, TWO, ONE
+ *
+ * create in sequence two consumers
+ *
+ * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
+ *
+ * Then close c1 and its session without acking.
+ *
+ * If we now attempt to receive from c2, then the all messages in group ONE should be available (which
+ * requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered
+ *
+ */
+ private void consumerCloseWithRelease(boolean sharedGroups) throws AMQException, JMSException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
+ if(sharedGroups)
+ {
+ arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
+ }
+
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ producer.send(createMessage(1, "ONE"));
+ producer.send(createMessage(2, "ONE"));
+ producer.send(createMessage(3, "TWO"));
+ producer.send(createMessage(4, "ONE"));
+
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
+
+ consumerConnection.start();
+
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+ Message cs1Received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its first message", cs1Received);
+ assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg"));
+
+ Message received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received its first message", received);
+ assertEquals("incorrect message received", 3, received.getIntProperty("msg"));
+
+ received = consumer2.receive(1000);
+
+ assertNull("Consumer 2 should not yet have received second message", received);
+
+ consumer1.close();
+ cs1.close();
+ cs2.commit();
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should now have received second message", received);
+ assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+ assertEquals("incorrect message received", 1, received.getIntProperty("msg"));
+ assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"),
+ received.getJMSRedelivered());
+
+ cs2.commit();
+
+
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received a third message", received);
+ assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+ assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
+
+ cs2.commit();
+
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received a fourth message", received);
+ assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+ assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
+
+ cs2.commit();
+
+
+ assertNull(consumer2.receive(1000));
+ }
+
+ public void testGroupAssignmentSurvivesEmpty() throws JMSException, AMQException
+ {
+ groupAssignmentOnEmpty(false);
+ }
+
+ public void testSharedGroupAssignmentDoesNotSurviveEmpty() throws JMSException, AMQException
+ {
+ groupAssignmentOnEmpty(true);
+ }
+
+ private void groupAssignmentOnEmpty(boolean sharedGroups) throws AMQException, JMSException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
+ if(sharedGroups)
+ {
+ arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
+ }
+
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ producer.send(createMessage(1, "ONE"));
+ producer.send(createMessage(2, "TWO"));
+ producer.send(createMessage(3, "THREE"));
+ producer.send(createMessage(4, "ONE"));
+
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
+
+ consumerConnection.start();
+
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+ Message received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its first message", received);
+ assertEquals("incorrect message received", 1, received.getIntProperty("msg"));
+
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received its first message", received);
+ assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
+
+ cs1.commit();
+
+ received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its second message", received);
+ assertEquals("incorrect message received", 3, received.getIntProperty("msg"));
+
+ // We expect different behaviours from "shared groups": here the assignment of a subscription to a group
+ // is terminated when there are no outstanding delivered but unacknowledged messages. In contrast, with a
+ // standard message grouping queue the assignment will be retained until the subscription is no longer
+ // registered
+ if(sharedGroups)
+ {
+ cs2.commit();
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received its second message", received);
+ assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
+
+ cs2.commit();
+ }
+ else
+ {
+ cs2.commit();
+ received = consumer2.receive(1000);
+
+ assertNull("Consumer 2 should not have received a second message", received);
+
+ cs1.commit();
+
+ received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its third message", received);
+ assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
+
+ }
+
+ }
+
+ private Message createMessage(int msg, String group) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msg);
+ send.setIntProperty("msg", msg);
+ send.setStringProperty("group", group);
+
+ return send;
+ }
+
+ /**
+ * Tests that when a number of new messages for a given groupid are arriving while the delivery group
+ * state is also in the process of being emptied (due to acking a message while using prefetch=1), that only
+ * 1 of a number of existing consumers is ever receiving messages for the shared group at a time.
+ */
+ public void testSingleSharedGroupWithMultipleConsumers() throws Exception
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
+ arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
+
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+
+ consumerConnection.close();
+ Map<String, String> options = new HashMap<String, String>();
+ options.put(ConnectionURL.OPTIONS_MAXPREFETCH, "1");
+ consumerConnection = getConnectionWithOptions(options);
+
+ int numMessages = 100;
+ SharedGroupTestMessageListener groupingTestMessageListener = new SharedGroupTestMessageListener(numMessages);
+
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session cs3 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session cs4 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
+ consumer1.setMessageListener(groupingTestMessageListener);
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+ consumer2.setMessageListener(groupingTestMessageListener);
+ MessageConsumer consumer3 = cs3.createConsumer(queue);
+ consumer3.setMessageListener(groupingTestMessageListener);
+ MessageConsumer consumer4 = cs4.createConsumer(queue);
+ consumer4.setMessageListener(groupingTestMessageListener);
+ consumerConnection.start();
+
+ for(int i = 1; i <= numMessages; i++)
+ {
+ producer.send(createMessage(i, "GROUP"));
+ }
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ assertTrue("Mesages not all recieved in the allowed timeframe", groupingTestMessageListener.waitForLatch(30));
+ assertEquals("Unexpected concurrent processing of messages for the group", 0, groupingTestMessageListener.getConcurrentProcessingCases());
+ assertNull("Unexpecte throwable in message listeners", groupingTestMessageListener.getThrowable());
+ }
+
+ public static class SharedGroupTestMessageListener implements MessageListener
+ {
+ private final CountDownLatch _count;
+ private final AtomicInteger _activeListeners = new AtomicInteger();
+ private final AtomicInteger _concurrentProcessingCases = new AtomicInteger();
+ private Throwable _throwable;
+
+ public SharedGroupTestMessageListener(int numMessages)
+ {
+ _count = new CountDownLatch(numMessages);
+ }
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ int currentActiveListeners = _activeListeners.incrementAndGet();
+
+ if (currentActiveListeners > 1)
+ {
+ _concurrentProcessingCases.incrementAndGet();
+
+ System.err.println("Concurrent processing when handling message: " + message.getIntProperty("msg"));
+ }
+
+ try
+ {
+ Thread.sleep(25);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+
+ _activeListeners.decrementAndGet();
+ }
+ catch (Throwable t)
+ {
+ _throwable = t;
+ t.printStackTrace();
+ }
+ finally
+ {
+ _count.countDown();
+ }
+ }
+
+ public boolean waitForLatch(int seconds) throws Exception
+ {
+ return _count.await(seconds, TimeUnit.SECONDS);
+ }
+
+ public int getConcurrentProcessingCases()
+ {
+ return _concurrentProcessingCases.get();
+ }
+
+ public Throwable getThrowable()
+ {
+ return _throwable;
+ }
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ModelTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ModelTest.java
new file mode 100644
index 0000000000..c6b2c9e95c
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ModelTest.java
@@ -0,0 +1,342 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
+
+/**
+ * This Test validates the Queue Model on the broker.
+ * Currently it has some basic queue creation / deletion tests.
+ * However, it should be expanded to include other tests that relate to the
+ * model. i.e.
+ *
+ * The Create and Delete tests should ensure that the requisite logging is
+ * performed.
+ *
+ * Additions to this suite would be to complete testing of creations, validating
+ * fields such as owner/exclusive, autodelete and priority are correctly set.
+ *
+ * Currently this test uses the JMX interface to validate that the queue has
+ * been declared as expected so these tests cannot run against a CPP broker.
+ *
+ *
+ * Tests should ensure that they clean up after themselves.
+ * e,g. Durable queue creation test should perform a queue delete.
+ */
+public class ModelTest extends QpidBrokerTestCase
+{
+
+ private JMXTestUtils _jmxUtils;
+ private static final String VIRTUALHOST_NAME = "test";
+
+ @Override
+ public void setUp() throws Exception
+ {
+ getBrokerConfiguration().addJmxManagementConfiguration();
+
+ // Create a JMX Helper
+ _jmxUtils = new JMXTestUtils(this);
+ super.setUp();
+
+ // Open the JMX Connection
+ _jmxUtils.open();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ // Close the JMX Connection
+ _jmxUtils.close();
+ super.tearDown();
+ }
+
+ /**
+ * Test that an exclusive transient queue can be created via AMQP.
+ *
+ * @throws Exception On unexpected error
+ */
+ public void testExclusiveQueueCreationTransientViaAMQP() throws Exception
+ {
+ Connection connection = getConnection();
+
+ String queueName = getTestQueueName();
+ boolean durable = false;
+ boolean autoDelete = false;
+ boolean exclusive = true;
+
+ createViaAMQPandValidateViaJMX(connection, queueName, durable,
+ autoDelete, exclusive);
+ }
+
+
+
+ /**
+ * Test that a transient queue can be created via AMQP.
+ *
+ * @throws Exception On unexpected error
+ */
+ public void testQueueCreationTransientViaAMQP() throws Exception
+ {
+ Connection connection = getConnection();
+
+ String queueName = getTestQueueName();
+ boolean durable = false;
+ boolean autoDelete = false;
+ boolean exclusive = true;
+
+ createViaAMQPandValidateViaJMX(connection, queueName, durable,
+ autoDelete, exclusive);
+ }
+
+ /**
+ * Test that a durable exclusive queue can be created via AMQP.
+ *
+ * @throws Exception On unexpected error
+ */
+
+ public void testExclusiveQueueCreationDurableViaAMQP() throws Exception
+ {
+ Connection connection = getConnection();
+
+ String queueName = getTestQueueName();
+ boolean durable = true;
+ boolean autoDelete = false;
+ boolean exclusive = true;
+
+ createViaAMQPandValidateViaJMX(connection, queueName, durable,
+ autoDelete, exclusive);
+
+ // Clean up
+ ManagedBroker managedBroker =
+ _jmxUtils.getManagedBroker(VIRTUALHOST_NAME);
+ managedBroker.deleteQueue(queueName);
+ }
+
+ /**
+ * Test that a durable queue can be created via AMQP.
+ *
+ * @throws Exception On unexpected error
+ */
+
+ public void testQueueCreationDurableViaAMQP() throws Exception
+ {
+ Connection connection = getConnection();
+
+ String queueName = getTestQueueName();
+ boolean durable = true;
+ boolean autoDelete = false;
+ boolean exclusive = false;
+
+ createViaAMQPandValidateViaJMX(connection, queueName, durable,
+ autoDelete, exclusive);
+
+ // Clean up
+ ManagedBroker managedBroker =
+ _jmxUtils.getManagedBroker(VIRTUALHOST_NAME);
+ managedBroker.deleteQueue(queueName);
+ }
+
+
+ /**
+ * Test that a transient queue can be created via JMX.
+ *
+ * @throws IOException if there is a problem via the JMX connection
+ * @throws javax.management.JMException if there is a problem with the JMX command
+ */
+ public void testCreationTransientViaJMX() throws IOException, JMException
+ {
+ String name = getName();
+ String owner = null;
+ boolean durable = false;
+
+ createViaJMXandValidateViaJMX(name, owner, durable);
+ }
+
+ /**
+ * Test that a durable queue can be created via JMX.
+ *
+ * @throws IOException if there is a problem via the JMX connection
+ * @throws javax.management.JMException if there is a problem with the JMX command
+ */
+ public void testCreationDurableViaJMX() throws IOException, JMException
+ {
+ String name = getName();
+ String owner = null;
+ boolean durable = true;
+
+ createViaJMXandValidateViaJMX(name, owner, durable);
+
+ // Clean up
+ ManagedBroker managedBroker =
+ _jmxUtils.getManagedBroker(VIRTUALHOST_NAME);
+ managedBroker.deleteQueue(name);
+ }
+
+ /**
+ * Test that a transient queue can be deleted via JMX.
+ *
+ * @throws IOException if there is a problem via the JMX connection
+ * @throws javax.management.JMException if there is a problem with the JMX command
+ */
+ public void testDeletionTransientViaJMX() throws IOException, JMException
+ {
+ String name = getName();
+
+ _jmxUtils.createQueue(VIRTUALHOST_NAME, name, null, false);
+
+ ManagedBroker managedBroker = _jmxUtils.
+ getManagedBroker(VIRTUALHOST_NAME);
+
+ try
+ {
+ managedBroker.deleteQueue(name);
+ }
+ catch (UndeclaredThrowableException e)
+ {
+ fail(((MBeanException) ((InvocationTargetException)
+ e.getUndeclaredThrowable()).getTargetException()).getTargetException().getMessage());
+ }
+ }
+
+ /**
+ * Test that a durable queue can be created via JMX.
+ *
+ * @throws IOException if there is a problem via the JMX connection
+ * @throws javax.management.JMException if there is a problem with the JMX command
+ */
+ public void testDeletionDurableViaJMX() throws IOException, JMException
+ {
+ String name = getName();
+
+ _jmxUtils.createQueue(VIRTUALHOST_NAME, name, null, true);
+
+ ManagedBroker managedBroker = _jmxUtils.
+ getManagedBroker(VIRTUALHOST_NAME);
+
+ try
+ {
+ managedBroker.deleteQueue(name);
+ }
+ catch (UndeclaredThrowableException e)
+ {
+ fail(((MBeanException) ((InvocationTargetException)
+ e.getUndeclaredThrowable()).getTargetException()).getTargetException().getMessage());
+ }
+ }
+
+ /*
+ * Helper Methods
+ */
+
+ /**
+ * Using the provided JMS Connection create a queue using the AMQP extension
+ * with the given properties and then validate it was created correctly via
+ * the JMX Connection
+ *
+ * @param connection Qpid JMS Connection
+ * @param queueName String the desired QueueName
+ * @param durable boolean if the queue should be durable
+ * @param autoDelete boolean if the queue is an autoDelete queue
+ * @param exclusive boolean if the queue is exclusive
+ *
+ * @throws AMQException if there is a problem with the createQueue call
+ * @throws JMException if there is a problem with the JMX validatation
+ * @throws IOException if there is a problem with the JMX connection
+ * @throws JMSException if there is a problem creating the JMS Session
+ */
+ private void createViaAMQPandValidateViaJMX(Connection connection,
+ String queueName,
+ boolean durable,
+ boolean autoDelete,
+ boolean exclusive)
+ throws AMQException, JMException, IOException, JMSException
+ {
+ AMQSession session = (AMQSession) connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+
+ session.createQueue(new AMQShortString(queueName),
+ autoDelete, durable, exclusive);
+
+ validateQueueViaJMX(queueName, (exclusive && durable &&!isBroker010()) ? connection.getClientID() : null, durable, autoDelete || (exclusive && !isBroker010() && !durable));
+ }
+
+ /**
+ * Use the JMX Helper to create a queue with the given properties and then
+ * validate it was created correctly via the JMX Connection
+ *
+ * @param queueName String the desired QueueName
+ * @param owner String the owner value that should be set
+ * @param durable boolean if the queue should be durable
+ * @param autoDelete boolean if the queue is an autoDelete queue
+ *
+ * @throws JMException if there is a problem with the JMX validatation
+ * @throws IOException if there is a problem with the JMX connection
+ */
+ private void createViaJMXandValidateViaJMX(String queueName, String owner,
+ boolean durable)
+ throws JMException, IOException
+ {
+ _jmxUtils.createQueue(VIRTUALHOST_NAME, queueName, owner, durable);
+
+ validateQueueViaJMX(queueName, owner, durable, false);
+ }
+
+ /**
+ * Validate that a queue with the given properties exists on the broker
+ *
+ * @param queueName String the desired QueueName
+ * @param owner String the owner value that should be set
+ * @param durable boolean if the queue should be durable
+ * @param autoDelete boolean if the queue is an autoDelete queue
+ *
+ * @throws JMException if there is a problem with the JMX validatation
+ * @throws IOException if there is a problem with the JMX connection
+ */
+ private void validateQueueViaJMX(String queueName, String owner, boolean durable, boolean autoDelete)
+ throws JMException, IOException
+ {
+ ManagedQueue managedQueue = _jmxUtils.
+ getManagedObject(ManagedQueue.class,
+ _jmxUtils.getQueueObjectName(VIRTUALHOST_NAME,
+ queueName));
+
+ assertEquals(queueName, managedQueue.getName());
+ assertEquals(owner, managedQueue.getOwner());
+ assertEquals(durable, managedQueue.isDurable());
+ assertEquals(autoDelete, managedQueue.isAutoDelete());
+ }
+
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
new file mode 100644
index 0000000000..cbf4e032db
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
@@ -0,0 +1,250 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class MultipleTransactedBatchProducerTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = Logger.getLogger(MultipleTransactedBatchProducerTest.class);
+
+ private static final int MESSAGE_COUNT = 1000;
+ private static final int BATCH_SIZE = 50;
+ private static final int NUM_PRODUCERS = 2;
+ private static final int NUM_CONSUMERS = 3;
+ private static final Random RANDOM = new Random();
+
+ private CountDownLatch _receivedLatch;
+ private String _queueName;
+
+ private volatile String _failMsg;
+
+ public void setUp() throws Exception
+ {
+ //debug level logging often makes this test pass artificially, turn the level down to info.
+ setSystemProperty("amqj.server.logging.level", "INFO");
+ _receivedLatch = new CountDownLatch(MESSAGE_COUNT * NUM_PRODUCERS);
+
+ getBrokerConfiguration().addJmxManagementConfiguration();
+
+ super.setUp();
+ _queueName = getTestQueueName();
+ _failMsg = null;
+ }
+
+ /**
+ * When there are multiple producers submitting batches of messages to a given
+ * queue using transacted sessions, it is highly probable that concurrent
+ * enqueue() activity will occur and attempt delivery of their message to the
+ * same subscription. In this scenario it is likely that one of the attempts
+ * will succeed and the other will result in use of the deliverAsync() method
+ * to start a queue Runner and ensure delivery of the message.
+ *
+ * A defect within the processQueue() method used by the Runner would mean that
+ * delivery of these messages may not occur, should the Runner stop before all
+ * messages have been processed. Such a defect was discovered and found to be
+ * most visible when Selectors are used such that one and only one subscription
+ * can/will accept any given message, but multiple subscriptions are present,
+ * and one of the earlier subscriptions receives more messages than the others.
+ *
+ * This test is to validate that the processQueue() method is able to correctly
+ * deliver all of the messages present for asynchronous delivery to subscriptions,
+ * by utilising multiple batch transacted producers to create the scenario and
+ * ensure all messages are received by a consumer.
+ */
+ public void testMultipleBatchedProducersWithMultipleConsumersUsingSelectors() throws Exception
+ {
+ String selector1 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 0");
+ String selector2 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 1");
+ String selector3 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 2");
+
+ //create consumers
+ Connection conn1 = getConnection();
+ conn1.setExceptionListener(new ExceptionHandler("conn1"));
+ Session sess1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer cons1 = sess1.createConsumer(sess1.createQueue(_queueName), selector1);
+ cons1.setMessageListener(new Cons(sess1,"consumer1"));
+
+ Connection conn2 = getConnection();
+ conn2.setExceptionListener(new ExceptionHandler("conn2"));
+ Session sess2 = conn2.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer cons2 = sess2.createConsumer(sess2.createQueue(_queueName), selector2);
+ cons2.setMessageListener(new Cons(sess2,"consumer2"));
+
+ Connection conn3 = getConnection();
+ conn3.setExceptionListener(new ExceptionHandler("conn3"));
+ Session sess3 = conn3.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer cons3 = sess3.createConsumer(sess3.createQueue(_queueName), selector3);
+ cons3.setMessageListener(new Cons(sess3,"consumer3"));
+
+ conn1.start();
+ conn2.start();
+ conn3.start();
+
+ //create producers
+ Connection connA = getConnection();
+ connA.setExceptionListener(new ExceptionHandler("connA"));
+ Connection connB = getConnection();
+ connB.setExceptionListener(new ExceptionHandler("connB"));
+ Thread producer1 = new Thread(new ProducerThread(connA, _queueName, "producer1"));
+ Thread producer2 = new Thread(new ProducerThread(connB, _queueName, "producer2"));
+
+ producer1.start();
+ Thread.sleep(10);
+ producer2.start();
+
+ //await delivery of the messages
+ int timeout = isBrokerStorePersistent() ? 300 : 75;
+ boolean result = _receivedLatch.await(timeout, TimeUnit.SECONDS);
+
+ assertNull("Test failed because: " + String.valueOf(_failMsg), _failMsg);
+ assertTrue("Some of the messages were not all recieved in the given timeframe, remaining count was: "+_receivedLatch.getCount(),
+ result);
+
+ }
+
+ @Override
+ public Message createNextMessage(Session session, int msgCount) throws JMSException
+ {
+ Message message = super.createNextMessage(session,msgCount);
+
+ //bias at least 50% of the messages to the first consumers selector because
+ //the issue presents itself primarily when an earlier subscription completes
+ //delivery after the later subscriptions
+ int val;
+ if (msgCount % 2 == 0)
+ {
+ val = 0;
+ }
+ else
+ {
+ val = RANDOM.nextInt(Integer.MAX_VALUE);
+ }
+
+ message.setIntProperty(_queueName, val);
+
+ return message;
+ }
+
+ private class Cons implements MessageListener
+ {
+ private Session _sess;
+ private String _desc;
+
+ public Cons(Session sess, String desc)
+ {
+ _sess = sess;
+ _desc = desc;
+ }
+
+ public void onMessage(Message message)
+ {
+ _receivedLatch.countDown();
+ int msgCount = 0;
+ int msgID = 0;
+ try
+ {
+ msgCount = message.getIntProperty(INDEX);
+ msgID = message.getIntProperty(_queueName);
+ }
+ catch (JMSException e)
+ {
+ _logger.error(_desc + " received exception: " + e.getMessage(), e);
+ failAsyncTest(e.getMessage());
+ }
+
+ _logger.info("Consumer received message:"+ msgCount + " with ID: " + msgID);
+
+ try
+ {
+ _sess.commit();
+ }
+ catch (JMSException e)
+ {
+ _logger.error(_desc + " received exception: " + e.getMessage(), e);
+ failAsyncTest(e.getMessage());
+ }
+ }
+ }
+
+ private class ProducerThread implements Runnable
+ {
+ private Connection _conn;
+ private String _dest;
+ private String _desc;
+
+ public ProducerThread(Connection conn, String dest, String desc)
+ {
+ _conn = conn;
+ _dest = dest;
+ _desc = desc;
+ }
+
+ public void run()
+ {
+ try
+ {
+ Session session = _conn.createSession(true, Session.SESSION_TRANSACTED);
+ sendMessage(session, session.createQueue(_dest), MESSAGE_COUNT, BATCH_SIZE);
+ }
+ catch (Exception e)
+ {
+ _logger.error(_desc + " received exception: " + e.getMessage(), e);
+ failAsyncTest(e.getMessage());
+ }
+ }
+ }
+
+ private class ExceptionHandler implements javax.jms.ExceptionListener
+ {
+ private String _desc;
+
+ public ExceptionHandler(String description)
+ {
+ _desc = description;
+ }
+
+ public void onException(JMSException e)
+ {
+ _logger.error(_desc + " received exception: " + e.getMessage(), e);
+ failAsyncTest(e.getMessage());
+ }
+ }
+
+ private void failAsyncTest(String msg)
+ {
+ _logger.error("Failing test because: " + msg);
+ _failMsg = msg;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
new file mode 100644
index 0000000000..7b2dd3239d
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
@@ -0,0 +1,303 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.NamingException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class PriorityQueueTest extends QpidBrokerTestCase
+{
+ private static final int TIMEOUT = 1500;
+
+ protected final String QUEUE = "PriorityQueue";
+
+ private static final int MSG_COUNT = 50;
+
+ private Connection producerConnection;
+ private MessageProducer producer;
+ private Session producerSession;
+ private Queue queue;
+ private Connection consumerConnection;
+ private Session consumerSession;
+
+ private MessageConsumer consumer;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ producerConnection = getConnection();
+ producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ producerConnection.start();
+
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ producerConnection.close();
+ consumerConnection.close();
+ super.tearDown();
+ }
+
+ public void testPriority() throws JMSException, NamingException, AMQException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-priorities",10);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.setPriority(msg % 10);
+ producer.send(nextMessage(msg, false, producerSession, producer));
+ }
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+ Message received;
+ int receivedCount = 0;
+ Message previous = null;
+ int messageCount = 0;
+ while((received = consumer.receive(1000))!=null)
+ {
+ messageCount++;
+ if(previous != null)
+ {
+ assertTrue("Messages arrived in unexpected order " + messageCount + " " + previous.getIntProperty("msg") + " " + received.getIntProperty("msg") + " " + previous.getJMSPriority() + " " + received.getJMSPriority(), (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) );
+ }
+
+ previous = received;
+ receivedCount++;
+ }
+
+ assertEquals("Incorrect number of message received", 50, receivedCount);
+ }
+
+ public void testOddOrdering() throws AMQException, JMSException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-priorities",3);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ // In order ABC
+ producer.setPriority(9);
+ producer.send(nextMessage(1, false, producerSession, producer));
+ producer.setPriority(4);
+ producer.send(nextMessage(2, false, producerSession, producer));
+ producer.setPriority(1);
+ producer.send(nextMessage(3, false, producerSession, producer));
+
+ // Out of order BAC
+ producer.setPriority(4);
+ producer.send(nextMessage(4, false, producerSession, producer));
+ producer.setPriority(9);
+ producer.send(nextMessage(5, false, producerSession, producer));
+ producer.setPriority(1);
+ producer.send(nextMessage(6, false, producerSession, producer));
+
+ // Out of order BCA
+ producer.setPriority(4);
+ producer.send(nextMessage(7, false, producerSession, producer));
+ producer.setPriority(1);
+ producer.send(nextMessage(8, false, producerSession, producer));
+ producer.setPriority(9);
+ producer.send(nextMessage(9, false, producerSession, producer));
+
+ // Reverse order CBA
+ producer.setPriority(1);
+ producer.send(nextMessage(10, false, producerSession, producer));
+ producer.setPriority(4);
+ producer.send(nextMessage(11, false, producerSession, producer));
+ producer.setPriority(9);
+ producer.send(nextMessage(12, false, producerSession, producer));
+ producerSession.commit();
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message msg = consumer.receive(TIMEOUT);
+ assertEquals(1, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(5, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(9, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(12, msg.getIntProperty("msg"));
+
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(2, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(4, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(7, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(11, msg.getIntProperty("msg"));
+
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(3, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(6, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(8, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(10, msg.getIntProperty("msg"));
+ }
+
+ private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msg);
+ send.setIntProperty("msg", msg);
+
+ return send;
+ }
+
+ /**
+ * Test that after sending an initial message with priority 0, it is able to be repeatedly reflected back to the queue using
+ * default priority and then consumed again, with separate transacted sessions with prefetch 1 for producer and consumer.
+ *
+ * Highlighted defect with PriorityQueues resolved in QPID-3927.
+ */
+ public void testMessageReflectionWithPriorityIncreaseOnTransactedSessionsWithPrefetch1() throws Exception
+ {
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1");
+ Connection conn = getConnection();
+ conn.start();
+ assertEquals("Prefetch not reset", 1, ((AMQConnection) conn).getMaxPrefetch());
+
+ final Session producerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ final Session consumerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ //declare a priority queue with 10 priorities
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-priorities",10);
+ ((AMQSession<?,?>) producerSess).createQueue(new AMQShortString(getTestQueueName()), false, true, false, arguments);
+
+ Queue queue = producerSess.createQueue(getTestQueueName());
+
+ //create the consumer, producer, add message listener
+ CountDownLatch latch = new CountDownLatch(5);
+ MessageConsumer cons = producerSess.createConsumer(queue);
+ MessageProducer producer = producerSess.createProducer(queue);
+
+ ReflectingMessageListener listener = new ReflectingMessageListener(producerSess,producer,consumerSess,latch);
+ cons.setMessageListener(listener);
+
+ //Send low priority 0 message to kick start the asynchronous reflection process
+ producer.setPriority(0);
+ producer.send(nextMessage(1, true, producerSess, producer));
+ producerSess.commit();
+
+ //wait for the reflection process to complete
+ assertTrue("Test process failed to complete in allowed time", latch.await(10, TimeUnit.SECONDS));
+ assertNull("Unexpected throwable encountered", listener.getThrown());
+ }
+
+ private static class ReflectingMessageListener implements MessageListener
+ {
+ private static final Logger _logger = Logger.getLogger(PriorityQueueTest.ReflectingMessageListener.class);
+
+ private Session _prodSess;
+ private Session _consSess;
+ private CountDownLatch _latch;
+ private MessageProducer _prod;
+ private long _origCount;
+ private Throwable _lastThrown;
+
+ public ReflectingMessageListener(final Session prodSess, final MessageProducer prod,
+ final Session consSess, final CountDownLatch latch)
+ {
+ _latch = latch;
+ _origCount = _latch.getCount();
+ _prodSess = prodSess;
+ _consSess = consSess;
+ _prod = prod;
+ }
+
+ @Override
+ public void onMessage(final Message message)
+ {
+ try
+ {
+ _latch.countDown();
+ long msgNum = _origCount - _latch.getCount();
+ _logger.info("Received message " + msgNum + " with ID: " + message.getIntProperty("msg"));
+
+ if(_latch.getCount() > 0)
+ {
+ //reflect the message, updating its ID and using default priority
+ message.clearProperties();
+ message.setIntProperty("msg", (int) msgNum + 1);
+ _prod.setPriority(Message.DEFAULT_PRIORITY);
+ _prod.send(message);
+ _prodSess.commit();
+ }
+
+ //commit the consumer session to consume the message
+ _consSess.commit();
+ }
+ catch(Throwable t)
+ {
+ _logger.error(t.getMessage(), t);
+ _lastThrown = t;
+ }
+ }
+
+ public Throwable getThrown()
+ {
+ return _lastThrown;
+ }
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
new file mode 100644
index 0000000000..427508954d
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -0,0 +1,494 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.test.utils.JMXTestUtils;
+
+public class ProducerFlowControlTest extends AbstractTestLogging
+{
+ private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class);
+
+ private static final int TIMEOUT = 10000;
+
+ private Connection producerConnection;
+ private Connection consumerConnection;
+ private Session producerSession;
+ private Session consumerSession;
+ private MessageProducer producer;
+ private MessageConsumer consumer;
+ private Queue queue;
+
+ private final AtomicInteger _sentMessages = new AtomicInteger(0);
+
+ private JMXTestUtils _jmxUtils;
+ private boolean _jmxUtilConnected;
+
+ public void setUp() throws Exception
+ {
+ getBrokerConfiguration().addJmxManagementConfiguration();
+
+ _jmxUtils = new JMXTestUtils(this);
+ _jmxUtilConnected=false;
+ super.setUp();
+
+ _monitor.markDiscardPoint();
+
+ producerConnection = getConnection();
+ producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ producerConnection.start();
+
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ }
+
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ if(_jmxUtilConnected)
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ catch (IOException e)
+ {
+ _logger.error("Error closing jmxUtils", e);
+ }
+ }
+ producerConnection.close();
+ consumerConnection.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ public void testCapacityExceededCausesBlock() throws Exception
+ {
+ String queueName = getTestQueueName();
+
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
+ producer = producerSession.createProducer(queue);
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+
+ assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+
+ consumer.receive();
+
+ Thread.sleep(1000);
+
+ assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get());
+
+
+ consumer.receive();
+
+ Thread.sleep(1000);
+
+ assertEquals("Message not sent after two messages received", 5, _sentMessages.get());
+
+ }
+
+
+ public void testBrokerLogMessages() throws Exception
+ {
+ String queueName = getTestQueueName();
+
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
+ producer = producerSession.createProducer(queue);
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ List<String> results = waitAndFindMatches("QUE-1003", 7000);
+
+ assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size());
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+
+ while(consumer.receive(1000) != null) {};
+
+ results = waitAndFindMatches("QUE-1004");
+
+ assertEquals("Did not find correct number of UNDERFULL queue underfull messages", 1, results.size());
+ }
+
+
+ public void testClientLogMessages() throws Exception
+ {
+ String queueName = getTestQueueName();
+
+ setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
+ setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
+
+ Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ createAndBindQueueWithFlowControlEnabled(session, queueName, 1000, 800);
+ producer = session.createProducer(queue);
+
+ // try to send 5 messages (should block after 4)
+ MessageSender sender = sendMessagesAsync(producer, session, 5, 50L);
+
+ List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT);
+ assertTrue("No delay messages logged by client",results.size()!=0);
+
+ List<String> failedMessages = waitAndFindMatches("Message send failed due to timeout waiting on broker enforced"
+ + " flow control", TIMEOUT);
+ assertEquals("Incorrect number of send failure messages logged by client (got " + results.size() + " delay "
+ + "messages)",1,failedMessages.size());
+ }
+
+
+ public void testFlowControlOnCapacityResumeEqual() throws Exception
+ {
+ String queueName = getTestQueueName();
+
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 1000);
+ producer = producerSession.createProducer(queue);
+
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+
+ assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ consumer.receive();
+
+ Thread.sleep(1000);
+
+ assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get());
+
+
+ }
+
+
+ public void testFlowControlSoak() throws Exception
+ {
+ String queueName = getTestQueueName();
+
+
+ final int numProducers = 10;
+ final int numMessages = 100;
+
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 6000, 3000);
+
+ consumerConnection.start();
+
+ Connection[] producers = new Connection[numProducers];
+ for(int i = 0 ; i < numProducers; i ++)
+ {
+
+ producers[i] = getConnection();
+ producers[i].start();
+ Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer myproducer = session.createProducer(queue);
+ MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L);
+ }
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ for(int j = 0; j < numProducers * numMessages; j++)
+ {
+
+ Message msg = consumer.receive(5000);
+ Thread.sleep(50L);
+ assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg);
+
+ }
+
+
+
+ Message msg = consumer.receive(500);
+ assertNull("extra message received", msg);
+
+
+ for(int i = 0; i < numProducers; i++)
+ {
+ producers[i].close();
+ }
+
+ }
+
+ public void testSendTimeout() throws Exception
+ {
+ String queueName = getTestQueueName();
+ final String expectedMsg = isBroker010() ? "Exception when sending message:timed out waiting for message credit"
+ : "Unable to send message for 3 seconds due to broker enforced flow control";
+
+ setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
+ Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
+ producer = session.createProducer(queue);
+
+ // try to send 5 messages (should block after 4)
+ MessageSender sender = sendMessagesAsync(producer, session, 5, 100L);
+
+ Exception e = sender.awaitSenderException(10000);
+
+ assertNotNull("No timeout exception on sending", e);
+
+
+ assertEquals("Unexpected exception reason", expectedMsg, e.getMessage());
+
+ }
+
+ public void testFlowControlAttributeModificationViaJMX() throws Exception
+ {
+ _jmxUtils.open();
+ _jmxUtilConnected = true;
+
+ String queueName = getTestQueueName();
+
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 0, 0);
+ producer = producerSession.createProducer(queue);
+
+ Thread.sleep(1000);
+
+ //Create a JMX MBean proxy for the queue
+ ManagedQueue queueMBean = _jmxUtils.getManagedObject(ManagedQueue.class, _jmxUtils.getQueueObjectName("test", queueName));
+ assertNotNull(queueMBean);
+
+ //check current attribute values are 0 as expected
+ assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 0L);
+ assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 0L);
+
+ //set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent
+ queueMBean.setCapacity(250L);
+ queueMBean.setFlowResumeCapacity(250L);
+ assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 250L);
+ assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 250L);
+ assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
+
+ // try to send 2 messages (should block after 1)
+
+ sendMessagesAsync(producer, producerSession, 2, 50L);
+
+ Thread.sleep(2000);
+
+ //check only 1 message was sent, and queue is overfull
+ assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get());
+ assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+
+ //raise the attribute values, causing the queue to become underfull and allow the second message to be sent.
+ queueMBean.setCapacity(300L);
+ queueMBean.setFlowResumeCapacity(300L);
+
+ Thread.sleep(2000);
+
+ //check second message was sent, and caused the queue to become overfull again
+ assertEquals("Second message was not sent after lifting FlowResumeCapacity", 2, _sentMessages.get());
+ assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+
+ //raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity still exceeded
+ queueMBean.setCapacity(700L);
+ assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+
+ //receive a message, check queue becomes underfull
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ consumer.receive();
+
+ //perform a synchronous op on the connection
+ ((AMQSession<?,?>) consumerSession).sync();
+
+ assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
+
+ consumer.receive();
+ }
+
+ public void testQueueDeleteWithBlockedFlow() throws Exception
+ {
+ String queueName = getTestQueueName();
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800, true, false);
+
+ producer = producerSession.createProducer(queue);
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+
+ assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
+
+ // close blocked producer session and connection
+ producerConnection.close();
+
+ // delete queue with a consumer session
+ ((AMQSession<?,?>) consumerSession).sendQueueDelete(new AMQShortString(queueName));
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message message = consumer.receive(1000l);
+ assertNull("Unexpected message", message);
+ }
+
+ private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
+ {
+ createAndBindQueueWithFlowControlEnabled(session, queueName, capacity, resumeCapacity, false, true);
+ }
+
+ private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity, boolean durable, boolean autoDelete) throws Exception
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",capacity);
+ arguments.put("x-qpid-flow-resume-capacity",resumeCapacity);
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), autoDelete, durable, false, arguments);
+ queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='" + durable + "'&autodelete='" + autoDelete + "'");
+ ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
+ }
+
+ private MessageSender sendMessagesAsync(final MessageProducer producer,
+ final Session producerSession,
+ final int numMessages,
+ long sleepPeriod)
+ {
+ MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod);
+ new Thread(sender).start();
+ return sender;
+ }
+
+ private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+ throws JMSException
+ {
+
+ for (int msg = 0; msg < numMessages; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ _sentMessages.incrementAndGet();
+
+
+ try
+ {
+ ((AMQSession<?,?>)producerSession).sync();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error performing sync", e);
+ throw new RuntimeException(e);
+ }
+
+ try
+ {
+ Thread.sleep(sleepPeriod);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static final byte[] BYTE_300 = new byte[300];
+
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
+ {
+ BytesMessage send = producerSession.createBytesMessage();
+ send.writeBytes(BYTE_300);
+ send.setIntProperty("msg", msg);
+
+ return send;
+ }
+
+ private class MessageSender implements Runnable
+ {
+ private final MessageProducer _senderProducer;
+ private final Session _senderSession;
+ private final int _numMessages;
+ private volatile JMSException _exception;
+ private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1);
+ private long _sleepPeriod;
+
+ public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+ {
+ _senderProducer = producer;
+ _senderSession = producerSession;
+ _numMessages = numMessages;
+ _sleepPeriod = sleepPeriod;
+ }
+
+ public void run()
+ {
+ try
+ {
+ sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod);
+ }
+ catch (JMSException e)
+ {
+ _exception = e;
+ _exceptionThrownLatch.countDown();
+ }
+ }
+
+ public Exception awaitSenderException(long timeout) throws InterruptedException
+ {
+ _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS);
+ return _exception;
+ }
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueBindTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueBindTest.java
new file mode 100644
index 0000000000..64ba0156e6
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueBindTest.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.AMQBindingURL;
+
+public class QueueBindTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+ private AMQSession<?, ?> _session;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _connection = getConnection();
+ _session = (AMQSession<?, ?>) _connection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ public void testQueueCannotBeReboundOnNonTopicExchange() throws Exception
+ {
+ runTestForNonTopicExhange(new AMQQueue(new AMQBindingURL("direct://amq.direct//" + getTestQueueName())));
+ runTestForNonTopicExhange(new AMQQueue(new AMQBindingURL("fanout://amq.fanout//" + getTestQueueName()) + "?routingkey='"
+ + getTestQueueName() + "'"));
+ }
+
+ public void testQueueCanBeReboundOnTopicExchange() throws Exception
+ {
+ AMQQueue destination = new AMQQueue(new AMQBindingURL("topic://amq.topic//" + getTestQueueName() + "?routingkey='"
+ + getTestQueueName() + "'"));
+ setTestClientSystemProperty("qpid.default_mandatory", "false");
+ runTestForTopicExchange(destination);
+
+ }
+
+ private void runTestForTopicExchange(AMQDestination destination) throws AMQException, JMSException, Exception
+ {
+ // binding queue with empty arguments
+ _session.declareAndBind(destination, FieldTable.convertToFieldTable(Collections.<String, Object> emptyMap()));
+
+ // try to re-bind queue with a selector
+ Map<String, Object> bindArguments = new HashMap<String, Object>();
+ bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), INDEX + "=0");
+ _session.bindQueue(destination.getAMQQueueName(), destination.getRoutingKey(),
+ FieldTable.convertToFieldTable(bindArguments), destination.getExchangeName(), destination);
+
+ _connection.start();
+
+ // repeat send/receive twice to make sure that selector is working
+ for (int i = 0; i < 2; i++)
+ {
+ int numberOfMesssages = 2;
+ sendMessage(_session, destination, numberOfMesssages);
+
+ MessageConsumer consumer = _session.createConsumer(destination);
+ Message m = consumer.receive(1000);
+ assertNotNull("Message not received", m);
+ assertEquals("Unexpected index", 0, m.getIntProperty(INDEX));
+ _session.commit();
+
+ m = consumer.receive(1000);
+ assertNull("Message received", m);
+
+ consumer.close();
+ }
+ }
+
+ private void runTestForNonTopicExhange(AMQQueue destination) throws AMQException, Exception, JMSException
+ {
+ // binding queue with empty arguments
+ _session.declareAndBind(destination, FieldTable.convertToFieldTable(Collections.<String, Object> emptyMap()));
+
+ // try to re-bind queue with a selector
+ Map<String, Object> bindArguments = new HashMap<String, Object>();
+ bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), INDEX + "=0");
+ _session.bindQueue(destination.getAMQQueueName(), destination.getRoutingKey(),
+ FieldTable.convertToFieldTable(bindArguments), destination.getExchangeName(), destination);
+
+ // send and receive to prove that selector is not used
+ int numberOfMesssages = 2;
+ sendMessage(_session, destination, numberOfMesssages);
+
+ MessageConsumer consumer = _session.createConsumer(destination);
+ _connection.start();
+
+ for (int i = 0; i < numberOfMesssages; i++)
+ {
+ Message m = consumer.receive(1000l);
+ assertNotNull("Message [" + i + "] not received with exchange " + destination.getExchangeName(), m);
+ assertEquals("Unexpected index", i, m.getIntProperty(INDEX));
+ _session.commit();
+ }
+ consumer.close();
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
new file mode 100644
index 0000000000..dd57c1e3f7
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * Test Case to ensure that messages are correctly returned.
+ * This includes checking:
+ * - The message is returned.
+ * - The broker doesn't leak memory.
+ * - The broker's state is correct after test.
+ */
+public class QueueDepthWithSelectorTest extends QpidBrokerTestCase
+{
+ protected final String VHOST = "test";
+ protected final String QUEUE = this.getClass().getName();
+
+ protected Connection _clientConnection;
+ protected Connection _producerConnection;
+ private Session _clientSession;
+ protected Session _producerSession;
+ protected MessageProducer _producer;
+ private MessageConsumer _consumer;
+
+ protected static int MSG_COUNT = 50;
+
+ protected Message[] _messages = new Message[MSG_COUNT];
+
+ protected Queue _queue;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _messages = new Message[MSG_COUNT];
+ _queue = getTestQueue();
+
+ //Create Producer
+ _producerConnection = getConnection();
+ _producerConnection.start();
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _producer = _producerSession.createProducer(_queue);
+
+ // Create consumer
+ _clientConnection = getConnection();
+ _clientConnection.start();
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumer = _clientSession.createConsumer(_queue, "key = 23");
+ }
+
+ public void test() throws Exception
+ {
+ //Send messages
+ _logger.info("Starting to send messages");
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(nextMessage(msg));
+ }
+ _logger.info("Closing connection");
+ //Close the connection.. .giving the broker time to clean up its state.
+ _producerConnection.close();
+
+ //Verify we get all the messages.
+ _logger.info("Verifying messages");
+ verifyAllMessagesRecevied(50);
+ verifyBrokerState(0);
+
+ //Close the connection.. .giving the broker time to clean up its state.
+ _clientConnection.close();
+
+ //Verify Broker state
+ _logger.info("Verifying broker state");
+ verifyBrokerState(0);
+ }
+
+ protected void verifyBrokerState(int expectedDepth)
+ {
+ try
+ {
+ Connection connection = getConnection();
+ AMQSession session = (AMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ long queueDepth = session.getQueueDepth((AMQDestination) _queue);
+ assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth);
+
+ connection.close();
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ catch (Exception e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ protected void verifyAllMessagesRecevied(int expectedDepth) throws Exception
+ {
+ boolean[] msgIdRecevied = new boolean[MSG_COUNT];
+
+ for (int i = 0; i < expectedDepth; i++)
+ {
+ _messages[i] = _consumer.receive(1000);
+ assertNotNull("should have received a message but didn't", _messages[i]);
+ }
+
+ //Check received messages
+ int msgId = 0;
+ for (Message msg : _messages)
+ {
+ assertNotNull("Message should not be null", msg);
+ assertEquals("msgId was wrong", msgId, msg.getIntProperty("ID"));
+ assertFalse("Already received msg id " + msgId, msgIdRecevied[msgId]);
+ msgIdRecevied[msgId] = true;
+ msgId++;
+ }
+
+ //Check all received
+ for (msgId = 0; msgId < expectedDepth; msgId++)
+ {
+ assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]);
+ }
+
+ //do a synchronous op to ensure the acks are processed
+ //on the broker before proceeding
+ ((AMQSession)_clientSession).sync();
+ }
+
+ /**
+ * Get the next message putting the given count into the intProperties as ID.
+ *
+ * @param msgNo the message count to store as ID.
+ * @throws JMSException
+ */
+ protected Message nextMessage(int msgNo) throws JMSException
+ {
+ Message send = _producerSession.createTextMessage("MessageReturnTest");
+ send.setIntProperty("ID", msgNo);
+ send.setIntProperty("key", 23);
+ return send;
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
new file mode 100644
index 0000000000..fe86e9d41f
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class QueueMessageDurabilityTest extends QpidBrokerTestCase
+{
+
+ private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
+ private static final String DURABLE_ALWAYS_PERSIST_NAME = "DURABLE_QUEUE_ALWAYS_PERSIST";
+ private static final String DURABLE_NEVER_PERSIST_NAME = "DURABLE_QUEUE_NEVER_PERSIST";
+ private static final String DURABLE_DEFAULT_PERSIST_NAME = "DURABLE_QUEUE_DEFAULT_PERSIST";
+ private static final String NONDURABLE_ALWAYS_PERSIST_NAME = "NONDURABLE_QUEUE_ALWAYS_PERSIST";
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ Connection conn = getConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQSession amqSession = (AMQSession) session;
+
+ Map<String,Object> arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.ALWAYS.name());
+ amqSession.createQueue(new AMQShortString(DURABLE_ALWAYS_PERSIST_NAME), false, true, false, arguments);
+
+ arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name());
+ amqSession.createQueue(new AMQShortString(DURABLE_NEVER_PERSIST_NAME), false, true, false, arguments);
+
+ arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.DEFAULT.name());
+ amqSession.createQueue(new AMQShortString(DURABLE_DEFAULT_PERSIST_NAME), false, true, false, arguments);
+
+ arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
+ amqSession.createQueue(new AMQShortString(NONDURABLE_ALWAYS_PERSIST_NAME), false, false, false, arguments);
+
+ amqSession.bindQueue(AMQShortString.valueOf(DURABLE_ALWAYS_PERSIST_NAME),
+ AMQShortString.valueOf("Y.*.*.*"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+
+ amqSession.bindQueue(AMQShortString.valueOf(DURABLE_NEVER_PERSIST_NAME),
+ AMQShortString.valueOf("*.Y.*.*"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+
+ amqSession.bindQueue(AMQShortString.valueOf(DURABLE_DEFAULT_PERSIST_NAME),
+ AMQShortString.valueOf("*.*.Y.*"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+
+ amqSession.bindQueue(AMQShortString.valueOf(NONDURABLE_ALWAYS_PERSIST_NAME),
+ AMQShortString.valueOf("*.*.*.Y"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+ }
+
+ public void testSendPersistentMessageToAll() throws Exception
+ {
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ conn.start();
+ producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+ session.commit();
+
+ AMQSession amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ restartBroker();
+
+ conn = getConnection();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+
+ assertFalse(amqSession.isQueueBound((AMQDestination) session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ }
+
+
+ public void testSendNonPersistentMessageToAll() throws Exception
+ {
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ conn.start();
+ producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+ session.commit();
+
+ AMQSession amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ restartBroker();
+
+ conn = getConnection();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+
+ assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ }
+
+ public void testNonPersistentContentRetained() throws Exception
+ {
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ conn.start();
+ producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
+ producer.send(session.createTopic("Y.N.Y.Y"), session.createTextMessage("test2"));
+ session.commit();
+ MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
+ Message msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test2", ((TextMessage) msg).getText());
+ session.rollback();
+ restartBroker();
+ conn = getConnection();
+ conn.start();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ AMQSession amqSession = (AMQSession) session;
+ assertEquals(1, amqSession.getQueueDepth((AMQDestination) session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
+ msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test2", ((TextMessage)msg).getText());
+ session.commit();
+ }
+
+ public void testPersistentContentRetainedOnTransientQueue() throws Exception
+ {
+ setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ conn.start();
+ producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
+ session.commit();
+ MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME));
+ Message msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test1", ((TextMessage)msg).getText());
+ session.commit();
+ System.gc();
+ consumer = session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME));
+ msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test1", ((TextMessage)msg).getText());
+ session.commit();
+ }
+
+
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java
new file mode 100644
index 0000000000..340ae4a1ae
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.NamingException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SortedQueueTest extends QpidBrokerTestCase
+{
+ private static final Logger LOGGER = Logger.getLogger(SortedQueueTest.class);
+ public static final String TEST_SORT_KEY = "testSortKey";
+ private static final String[] VALUES = SortedQueueEntryListTest.keys.clone();
+ private static final String[] VALUES_SORTED = SortedQueueEntryListTest.keys.clone();
+ private final String[] SUBSET_KEYS = { "000", "100", "200", "300", "400", "500", "600", "700", "800", "900" };
+
+ private Connection _producerConnection;
+ private Session _producerSession;
+ private Connection _consumerConnection;
+ private long _receiveInterval;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1");
+ // Sort value array to generated "expected" order of messages.
+ Arrays.sort(VALUES_SORTED);
+ _producerConnection = getConnection();
+ _consumerConnection = getConnection();
+ _producerSession = _producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ _receiveInterval = isBrokerStorePersistent() ? 3000l : 1500l;
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _producerSession.close();
+ _producerConnection.close();
+ _consumerConnection.close();
+ super.tearDown();
+ }
+
+ public void testSortOrder() throws JMSException, NamingException, AMQException
+ {
+ final Queue queue = createQueue();
+ final MessageProducer producer = _producerSession.createProducer(queue);
+
+ for(String value : VALUES)
+ {
+ final Message msg = _producerSession.createTextMessage("Message Text:" + value);
+ msg.setStringProperty(TEST_SORT_KEY, value);
+ producer.send(msg);
+ }
+
+ _producerSession.commit();
+ producer.close();
+
+ final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = consumerSession.createConsumer(queue);
+ _consumerConnection.start();
+ TextMessage received;
+ int messageCount = 0;
+ while((received = (TextMessage) consumer.receive(_receiveInterval)) != null)
+ {
+ assertEquals("Received message with unexpected sorted key value", VALUES_SORTED[messageCount],
+ received.getStringProperty(TEST_SORT_KEY));
+ assertEquals("Received message with unexpected message value",
+ "Message Text:" + VALUES_SORTED[messageCount], received.getText());
+ messageCount++;
+ }
+
+ assertEquals("Incorrect number of messages received", VALUES.length, messageCount);
+ }
+
+ public void testAutoAckSortedQueue() throws JMSException, NamingException, AMQException
+ {
+ runThroughSortedQueueForSessionMode(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testTransactedSortedQueue() throws JMSException, NamingException, AMQException
+ {
+ runThroughSortedQueueForSessionMode(Session.SESSION_TRANSACTED);
+ }
+
+ public void testClientAckSortedQueue() throws JMSException, NamingException, AMQException
+ {
+ runThroughSortedQueueForSessionMode(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ private void runThroughSortedQueueForSessionMode(final int sessionMode) throws JMSException, NamingException,
+ AMQException
+ {
+ final Queue queue = createQueue();
+ final MessageProducer producer = _producerSession.createProducer(queue);
+
+ final TestConsumerThread consumerThread = new TestConsumerThread(sessionMode, queue);
+ consumerThread.start();
+
+ for(String value : VALUES)
+ {
+ final Message msg = _producerSession.createMessage();
+ msg.setStringProperty(TEST_SORT_KEY, value);
+ producer.send(msg);
+ _producerSession.commit();
+ }
+
+ try
+ {
+ consumerThread.join(getConsumerThreadJoinInterval());
+ }
+ catch(InterruptedException e)
+ {
+ fail("Test failed waiting for consumer to complete");
+ }
+
+ assertTrue("Consumer timed out", consumerThread.isStopped());
+ assertEquals("Incorrect number of messages received", VALUES.length, consumerThread.getConsumed());
+
+ producer.close();
+ }
+
+ public void testSortedQueueWithAscendingSortedKeys() throws JMSException, NamingException, AMQException
+ {
+ final Queue queue = createQueue();
+ final MessageProducer producer = _producerSession.createProducer(queue);
+
+ final TestConsumerThread consumerThread = new TestConsumerThread(Session.AUTO_ACKNOWLEDGE, queue);
+ consumerThread.start();
+
+ for(int i = 0; i < 200; i++)
+ {
+ final String ascendingKey = AscendingSortedKeys.getNextKey();
+ final Message msg = _producerSession.createTextMessage("Message Text:" + ascendingKey);
+ msg.setStringProperty(TEST_SORT_KEY, ascendingKey);
+ producer.send(msg);
+ _producerSession.commit();
+ }
+
+ try
+ {
+ consumerThread.join(getConsumerThreadJoinInterval());
+ }
+ catch(InterruptedException e)
+ {
+ fail("Test failed waiting for consumer to complete");
+ }
+
+ assertTrue("Consumer timed out", consumerThread.isStopped());
+ assertEquals("Incorrect number of messages received", 200, consumerThread.getConsumed());
+
+ producer.close();
+ }
+
+ private long getConsumerThreadJoinInterval()
+ {
+ return isBrokerStorePersistent() ? 50000L: 5000L;
+ }
+
+ public void testSortOrderWithNonUniqueKeys() throws JMSException, NamingException, AMQException
+ {
+ final Queue queue = createQueue();
+ final MessageProducer producer = _producerSession.createProducer(queue);
+
+ int count = 0;
+ while(count < 200)
+ {
+ final Message msg = _producerSession.createTextMessage("Message Text:" + count++);
+ msg.setStringProperty(TEST_SORT_KEY, "samesortkeyvalue");
+ producer.send(msg);
+ }
+
+ _producerSession.commit();
+ producer.close();
+
+ final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = consumerSession.createConsumer(queue);
+ _consumerConnection.start();
+ TextMessage received = null;
+ int messageCount = 0;
+
+ while((received = (TextMessage) consumer.receive(_receiveInterval)) != null)
+ {
+ assertEquals("Received message with unexpected sorted key value", "samesortkeyvalue",
+ received.getStringProperty(TEST_SORT_KEY));
+ assertEquals("Received message with unexpected message value", "Message Text:" + messageCount,
+ received.getText());
+ messageCount++;
+ }
+
+ assertEquals("Incorrect number of messages received", 200, messageCount);
+ }
+
+ public void testSortOrderWithUniqueKeySubset() throws JMSException, NamingException, AMQException
+ {
+ final Queue queue = createQueue();
+ final MessageProducer producer = _producerSession.createProducer(queue);
+
+ int count = 0;
+ while(count < 100)
+ {
+ int keyValueIndex = count % 10;
+ final Message msg = _producerSession.createTextMessage("Message Text:" + count);
+ msg.setStringProperty(TEST_SORT_KEY, SUBSET_KEYS[keyValueIndex]);
+ producer.send(msg);
+ count++;
+ }
+
+ _producerSession.commit();
+ producer.close();
+
+ final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = consumerSession.createConsumer(queue);
+ _consumerConnection.start();
+ TextMessage received;
+ int messageCount = 0;
+
+ while((received = (TextMessage) consumer.receive(_receiveInterval)) != null)
+ {
+ assertEquals("Received message with unexpected sorted key value", SUBSET_KEYS[messageCount / 10],
+ received.getStringProperty(TEST_SORT_KEY));
+ messageCount++;
+ }
+
+ assertEquals("Incorrect number of messages received", 100, messageCount);
+ }
+
+ public void testGetNextWithAck() throws JMSException, NamingException, AMQException
+ {
+ Queue _queue = createQueue();
+ MessageProducer producer = _producerSession.createProducer(_queue);
+ Message received = null;
+
+ //Send 3 out of order
+ sendAndCommitMessage(producer,"2");
+ sendAndCommitMessage(producer,"3");
+ sendAndCommitMessage(producer,"1");
+
+ final Session consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageConsumer consumer = consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+ //Receive 3 in sorted order
+ received = assertReceiveAndValidateMessage(consumer, "1");
+ received.acknowledge();
+ received = assertReceiveAndValidateMessage(consumer, "2");
+ received.acknowledge();
+ received = assertReceiveAndValidateMessage(consumer, "3");
+ received.acknowledge();
+
+ //Send 1
+ sendAndCommitMessage(producer,"4");
+
+ //Receive 1 and recover
+ received = assertReceiveAndValidateMessage(consumer, "4");
+ consumerSession.recover();
+
+ //Receive same 1
+ received = assertReceiveAndValidateMessage(consumer, "4");
+ received.acknowledge();
+
+ //Send 3 out of order
+ sendAndCommitMessage(producer,"7");
+ sendAndCommitMessage(producer,"6");
+ sendAndCommitMessage(producer,"5");
+
+ //Receive 1 of 3 (possibly out of order due to pre-fetch)
+ final Message messageBeforeRollback = assertReceiveMessage(consumer);
+ consumerSession.recover();
+
+ if (isBroker010())
+ {
+ //Receive 3 in sorted order (not as per JMS recover)
+ received = assertReceiveAndValidateMessage(consumer, "5");
+ received.acknowledge();
+ received = assertReceiveAndValidateMessage(consumer, "6");
+ received.acknowledge();
+ received = assertReceiveAndValidateMessage(consumer, "7");
+ received.acknowledge();
+ }
+ else
+ {
+ //First message will be the one rolled-back (as per JMS spec).
+ final String messageKeyDeliveredBeforeRollback = messageBeforeRollback.getStringProperty(TEST_SORT_KEY);
+ received = assertReceiveAndValidateMessage(consumer, messageKeyDeliveredBeforeRollback);
+ received.acknowledge();
+
+ //Remaining two messages will be sorted
+ final SortedSet<String> keys = new TreeSet<String>(Arrays.asList("5", "6", "7"));
+ keys.remove(messageKeyDeliveredBeforeRollback);
+
+ received = assertReceiveAndValidateMessage(consumer, keys.first());
+ received.acknowledge();
+ received = assertReceiveAndValidateMessage(consumer, keys.last());
+ received.acknowledge();
+ }
+ }
+
+ private Queue createQueue() throws AMQException, JMSException
+ {
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put(QueueArgumentsConverter.QPID_QUEUE_SORT_KEY, TEST_SORT_KEY);
+ ((AMQSession<?,?>) _producerSession).createQueue(new AMQShortString(getTestQueueName()), false, true, false, arguments);
+ final Queue queue = new AMQQueue("amq.direct", getTestQueueName());
+ ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination) queue);
+ return queue;
+ }
+
+ private Message getSortableTestMesssage(final String key) throws JMSException
+ {
+ final Message msg = _producerSession.createTextMessage("Message Text: Key Value" + key);
+ msg.setStringProperty(TEST_SORT_KEY, key);
+ return msg;
+ }
+
+ private void sendAndCommitMessage(final MessageProducer producer, final String keyValue) throws JMSException
+ {
+ producer.send(getSortableTestMesssage(keyValue));
+ _producerSession.commit();
+ }
+
+ private Message assertReceiveAndValidateMessage(final MessageConsumer consumer, final String expectedKey) throws JMSException
+ {
+ final Message received = assertReceiveMessage(consumer);
+ assertEquals("Received message with unexpected sorted key value", expectedKey,
+ received.getStringProperty(TEST_SORT_KEY));
+ return received;
+ }
+
+ private Message assertReceiveMessage(final MessageConsumer consumer)
+ throws JMSException
+ {
+ final Message received = (TextMessage) consumer.receive(_receiveInterval);
+ assertNotNull("Received message is unexpectedly null", received);
+ return received;
+ }
+
+ private class TestConsumerThread extends Thread
+ {
+ private final AtomicInteger _consumed = new AtomicInteger(0);
+ private volatile boolean _stopped = false;
+ private int _count = 0;
+ private int _sessionType = Session.AUTO_ACKNOWLEDGE;
+ private Queue _queue;
+
+ public TestConsumerThread(final int sessionType, final Queue queue)
+ {
+ _sessionType = sessionType;
+ _queue = queue;
+ }
+
+ public void run()
+ {
+ try
+ {
+ Connection conn = null;
+ try
+ {
+ conn = getConnection();
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException("Could not get connection");
+ }
+
+ final Session session = conn.createSession((_sessionType == Session.SESSION_TRANSACTED ? true : false),
+ _sessionType);
+ final MessageConsumer consumer = session.createConsumer(_queue);
+
+ conn.start();
+
+ Message msg;
+ while((msg = consumer.receive(_receiveInterval)) != null)
+ {
+ if(_sessionType == Session.SESSION_TRANSACTED)
+ {
+ if (_count%10 == 0)
+ {
+ LOGGER.debug("transacted session rollback");
+ session.rollback();
+ }
+ else
+ {
+ LOGGER.debug("transacted session commit");
+ session.commit();
+ _consumed.incrementAndGet();
+ }
+ }
+ else if(_sessionType == Session.CLIENT_ACKNOWLEDGE)
+ {
+ if (_count%10 == 0)
+ {
+ LOGGER.debug("client ack session recover");
+ session.recover();
+ }
+ else
+ {
+ LOGGER.debug("client ack session acknowledge");
+ msg.acknowledge();
+ _consumed.incrementAndGet();
+ }
+ }
+ else
+ {
+ LOGGER.debug("auto ack session");
+ _consumed.incrementAndGet();
+ }
+
+ _count++;
+ LOGGER.debug("Message consumed with key: " + msg.getStringProperty(TEST_SORT_KEY));
+ LOGGER.debug("Message consumed with consumed index: " + _consumed.get());
+ }
+
+ _stopped = true;
+ session.close();
+ conn.close();
+ }
+ catch(JMSException e)
+ {
+ LOGGER.error("Exception in listener", e);
+ }
+ }
+
+ public boolean isStopped()
+ {
+ return _stopped;
+ }
+
+ public int getConsumed()
+ {
+ return _consumed.get();
+ }
+ }
+
+ private static class AscendingSortedKeys
+ {
+ public static final String[] KEYS = { "Ul4a1", "WaWsv", "2Yz7E", "ix74r", "okgRi", "HlUbF", "LewvM", "lweGy",
+ "TXQ0Z", "0Kyfs", "s7Mxk", "dmoS7", "8RCUA", "W3VFH", "aez9y", "uQIcz", "0h1b1", "cmXIX",
+ "4dEz6", "zHF1q", "D6rBy", "5drc6", "0BmCy", "BCxeC", "t59lR", "aL6AJ", "OHaBz", "WmadA",
+ "B3qem", "CxVEf", "AIYUu", "uJScX", "uoStw", "ogLgc", "AgJHQ", "hUTw7", "Rxrsm", "9GXkX",
+ "7hyVv", "y94nw", "Twano", "TCgPp", "pFrrl", "POUYS", "L7cGc", "0ao3l", "CNHmv", "MaJQs",
+ "OUqFM", "jeskS", "FPfSE", "v1Hln", "14FLR", "KZamH", "G1RhS", "FVMxo", "rKDLJ", "nnP8o",
+ "nFqik", "zmLYD", "1j5L8", "e6e4z", "WDVWJ", "aDGtS", "fcwDa", "nlaBy", "JJs5m", "vLsmS",
+ "No0Qb", "JXljW", "Waim6", "MezSW", "l83Ud", "SjskQ", "uPX7G", "5nmWv", "ZhwG1", "uTacx",
+ "t98iW", "JkzUn", "fmIK1", "i7WMQ", "bgJAz", "n1pmO", "jS1aj", "4W0Tl", "Yf2Ec", "sqVrf",
+ "MojnP", "qQxHP", "pWiOs", "yToGW", "kB5nP", "BpYhV", "Cfgr3", "zbIYY", "VLTy6", "he9IA",
+ "lm0pD", "WreyP", "8hJdt", "QnJ1S", "n8pJ9", "iqv4k", "OUYuF", "8cVD3", "sx5Gl", "cQOnv",
+ "wiHrZ", "oGu6x", "7fsYM", "gf8rI", "7fKYU", "pT8wu", "lCMxy", "prNT6", "5Drn0", "guMb8",
+ "OxWIH", "uZPqg", "SbRYy", "In3NS", "uvf7A", "FLsph", "pmeCd", "BbwgA", "ru4UG", "YOfrY",
+ "W7cTs", "K4GS8", "AOgEe", "618Di", "dpe1v", "3otm6", "oVQp6", "5Mg9r", "Y1mC0", "VIlwP",
+ "aFFss", "Mkgy8", "pv0i7", "S77LH", "XyPZN", "QYxC0", "vkCHH", "MGlTF", "24ARF", "v2eC3",
+ "ZUnqt", "HfyNQ", "FjHXR", "45cIH", "1LB1L", "zqH0W", "fLNg8", "oQ87r", "Cp3mZ", "Zv7z0",
+ "O3iyQ", "EOE1o", "5ZaEz", "tlILt", "MmsIo", "lXFOB", "gtCA5", "yEfy9", "7X3uy", "d7vjM",
+ "XflUq", "Fhtgl", "NOHsz", "GWqqX", "xciqp", "BFkb8", "P6bcg", "lViBv", "2TRI7", "2hEEU",
+ "9XyT9", "29QAz", "U3yw5", "FxX9q", "C2Irc", "8U2nU", "m4bxU", "5iGN5", "mX2GE", "cShY2",
+ "JRJQB", "yvOMI", "4QMc9", "NAFuw", "RmDcr", "faHir", "2ZHdk", "zY1GY", "a00b5", "ZuDtD",
+ "JIqXi", "K20wK", "gdQsS", "5Namm", "lkMUA", "IBe8k", "FcWrW", "FFDui", "tuDyS", "ZJTXH",
+ "AkKTk", "zQt6Q", "FNYIM", "RpBQm", "RsQUq", "Mm8si", "gjUTu", "zz4ZU", "jiVBP", "ReKEW",
+ "5VZjS", "YjB9t", "zFgtB", "8TxD7", "euZA5", "MK07Y", "CK5W7", "16lHc", "6q6L9", "Z4I1v",
+ "UlU3M", "SWfou", "0PktI", "55rfB", "jfREu", "580YD", "Uvlv4", "KASQ8", "AmdQd", "piJSk",
+ "hE1Ql", "LDk6f", "NcICA", "IKxdL", "iwzGk", "uN6r3", "lsQGo", "QClRL", "iKqhr", "FGzgp",
+ "RkQke", "b29RJ", "CIShG", "9eoRc", "F6PT2", "LbRTH", "M3zXL", "GXdoH", "IjTwP", "RBhp0",
+ "yluBx", "mz8gx", "MmKGJ", "Q6Lix", "uupzk", "RACuj", "d85a9", "qaofN", "kZANm", "jtn0X",
+ "lpF6W", "suY4x", "rz7Ut", "wDajX", "1v5hH", "Yw2oU", "ksJby", "WMiS3", "lj07Q", "EdBKc",
+ "6AFT0", "0YAGH", "ThjNn", "JKWYR", "9iGoT", "UmaEv", "3weIF", "CdyBV", "pAhR1", "djsrv",
+ "xReec", "8FmFH", "Dz1R3", "Ta8f6", "DG4sT", "VjCZq", "jSjS3", "Pb1pa", "VNCPd", "Kr8ys",
+ "AXpwE", "ZzJHW", "Nxx9V", "jzUqR", "dhSuH", "DQimp", "07n1c", "HP433", "RzaZA", "cL0aE",
+ "Ss0Zu", "FnPFB", "7lUXZ", "9rlg9", "lH1kt", "ni2v1", "48cHL", "soy9t", "WPmlx", "2Nslm",
+ "hSSvQ", "9y4lw", "ulk41", "ECMvU", "DLhzM", "GrDg7", "x3LDe", "QChxs", "xXTI4", "Gv3Fq",
+ "rhl0J", "QssNC", "brhlQ", "s93Ml", "tl72W", "pvgjS", "Qworu", "DcpWB", "X6Pin", "J2mQi",
+ "BGaQY", "CqqaD", "NhXdu", "dQ586", "Yh1hF", "HRxd8", "PYBf4", "64s8N", "tvdkD", "azIWp",
+ "tAOsr", "v8yFN", "h1zcH", "SmGzv", "bZLvS", "fFDrJ", "Oz8yZ", "0Wr5y", "fcJOy", "7ku1p",
+ "QbxXc", "VerEA", "QWxoT", "hYBCK", "o8Uyd", "FwEJz", "hi5X7", "uAWyp", "I7p2a", "M6qcG",
+ "gIYvE", "HzZT8", "iB08l", "StlDJ", "tjQxs", "k85Ae", "taOXK", "s4786", "2DREs", "atef2",
+ "Vprf2", "VBjhz", "EoToP", "blLA9", "qUJMd", "ydG8U", "8xEKz", "uLtKs", "GSQwj", "S2Dfu",
+ "ciuWz", "i3pyd", "7Ow5C", "IRh48", "vOqCE", "Q6hMC", "yofH3", "KsjRK", "5IhmG", "fqypy",
+ "0MR5X", "Chuy3" };
+
+ private static int _i = 0;
+ private static int _j = 0;
+
+ static
+ {
+ Arrays.sort(KEYS);
+ }
+
+ public static String getNextKey()
+ {
+ if(_j == KEYS.length)
+ {
+ _j = 0;
+ _i++;
+ if(_i == KEYS.length)
+ {
+ _i = 0;
+ }
+ }
+ return new StringBuffer().append(KEYS[_i]).append("-").append(KEYS[_j++]).toString();
+ }
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java
new file mode 100644
index 0000000000..e606df3f7d
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java
@@ -0,0 +1,397 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import javax.naming.NamingException;
+
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class TimeToLiveTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class);
+
+ protected final String QUEUE = "TimeToLiveQueue";
+
+ private final long TIME_TO_LIVE = 100L;
+
+ private static final int MSG_COUNT = 50;
+ private static final long SERVER_TTL_TIMEOUT = 60000L;
+
+ public void testPassiveTTLWithPrefetch() throws Exception
+ {
+ doTestPassiveTTL(true);
+ }
+
+ public void testPassiveTTL() throws Exception
+ {
+ doTestPassiveTTL(false);
+
+ }
+
+ private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException
+ {
+ //Create Client 1
+ Connection clientConnection = getConnection();
+
+ Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = clientSession.createQueue(QUEUE);
+
+ // Create then close the consumer so the queue is actually created
+ // Closing it then reopening it ensures that the consumer shouldn't get messages
+ // which should have expired and allows a shorter sleep period. See QPID-1418
+
+ MessageConsumer consumer = clientSession.createConsumer(queue);
+ consumer.close();
+
+ //Create Producer
+ Connection producerConnection = getConnection();
+
+ producerConnection.start();
+
+ // Move to a Transacted session to ensure that all messages have been delivered to broker before
+ // we start waiting for TTL
+ Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ consumer = clientSession.createConsumer(queue);
+ if(prefetchMessages)
+ {
+ clientConnection.start();
+ }
+
+ //Set TTL
+ int msg = 0;
+ producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
+
+ producer.setTimeToLive(TIME_TO_LIVE);
+
+ for (; msg < MSG_COUNT - 2; msg++)
+ {
+ producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
+ }
+
+ //Reset TTL
+ producer.setTimeToLive(0L);
+ producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
+
+ producerSession.commit();
+
+
+ // Ensure we sleep the required amount of time.
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ final long MILLIS = 1000000L;
+
+ long waitTime = TIME_TO_LIVE * MILLIS;
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ if(prefetchMessages)
+ {
+ clientConnection.close();
+ clientConnection = getConnection();
+
+ clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = clientSession.createQueue(QUEUE);
+ consumer = clientSession.createConsumer(queue);
+ }
+
+ clientConnection.start();
+
+ //Receive Message 0
+ // Set 5s receive time for messages we expect to receive.
+ Message receivedFirst = consumer.receive(5000);
+ Message receivedSecond = consumer.receive(5000);
+ Message receivedThird = consumer.receive(1000);
+
+ // Log the messages to help diagnosis incase of failure
+ _logger.info("First:"+receivedFirst);
+ _logger.info("Second:"+receivedSecond);
+ _logger.info("Third:"+receivedThird);
+
+ // Only first and last messages sent should survive expiry
+ Assert.assertNull("More messages received", receivedThird);
+
+ Assert.assertNotNull("First message not received", receivedFirst);
+ Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));
+ Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL"));
+
+ Assert.assertNotNull("Final message not received", receivedSecond);
+ Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first"));
+ Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL"));
+
+ clientConnection.close();
+
+ producerConnection.close();
+ }
+
+ private Message nextMessage(String msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message " + msg);
+ send.setBooleanProperty("first", first);
+ send.setStringProperty("testprop", "TimeToLiveTest");
+ send.setLongProperty("TTL", producer.getTimeToLive());
+ return send;
+ }
+
+
+ /**
+ * Tests the expired messages get actively deleted even on queues which have no consumers
+ * @throws Exception
+ */
+ public void testActiveTTL() throws Exception
+ {
+ Connection producerConnection = getConnection();
+ AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = producerSession.createTemporaryQueue();
+ producerSession.declareAndBind((AMQDestination) queue);
+ MessageProducer producer = producerSession.createProducer(queue);
+ producer.setTimeToLive(1000L);
+
+ // send Messages
+ for(int i = 0; i < MSG_COUNT; i++)
+ {
+ producer.send(producerSession.createTextMessage("Message: "+i));
+ }
+ long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
+
+ // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms.
+ long messageCount = MSG_COUNT;
+ long lastPass;
+
+ do
+ {
+ lastPass = messageCount;
+ Thread.sleep(100);
+ messageCount = producerSession.getQueueDepth((AMQDestination) queue);
+
+ // If we have received messages in the last loop then extend the timeout time.
+ // if we get messages stuck that are not expiring then the failureTime will occur
+ // failing the test. This will help with the scenario when the broker does not
+ // have enough CPU cycles to process the TTLs.
+ if (lastPass != messageCount)
+ {
+ failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
+ }
+ }
+ while(messageCount > 0L && System.currentTimeMillis() < failureTime);
+
+ assertEquals("Messages not automatically expired: ", 0L, messageCount);
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+ }
+
+ public void testPassiveTTLwithDurableSubscription() throws Exception
+ {
+ //Create Client 1
+ Connection clientConnection = getConnection();
+
+ Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create and close the durable subscriber
+ AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName());
+ TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false);
+ durableSubscriber.close();
+
+ //Create Producer
+ Connection producerConnection = getConnection();
+
+ producerConnection.start();
+
+ // Move to a Transacted session to ensure that all messages have been delivered to broker before
+ // we start waiting for TTL
+ Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer producer = producerSession.createProducer(topic);
+
+ //Set TTL
+ int msg = 0;
+ producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
+
+ producer.setTimeToLive(TIME_TO_LIVE);
+
+ for (; msg < MSG_COUNT - 2; msg++)
+ {
+ producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
+ }
+
+ //Reset TTL
+ producer.setTimeToLive(0L);
+ producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
+
+ producerSession.commit();
+
+ //resubscribe
+ durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false);
+
+ // Ensure we sleep the required amount of time.
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ final long MILLIS = 1000000L;
+
+ long waitTime = TIME_TO_LIVE * MILLIS;
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ clientConnection.start();
+
+ //Receive Message 0
+ // Set 5s receive time for messages we expect to receive.
+ Message receivedFirst = durableSubscriber.receive(5000);
+ Message receivedSecond = durableSubscriber.receive(5000);
+ Message receivedThird = durableSubscriber.receive(1000);
+
+ // Log the messages to help diagnosis incase of failure
+ _logger.info("First:"+receivedFirst);
+ _logger.info("Second:"+receivedSecond);
+ _logger.info("Third:"+receivedThird);
+
+ // Only first and last messages sent should survive expiry
+ Assert.assertNull("More messages received", receivedThird);
+
+ Assert.assertNotNull("First message not received", receivedFirst);
+ Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));
+ Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL"));
+
+ Assert.assertNotNull("Final message not received", receivedSecond);
+ Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first"));
+ Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL"));
+
+ clientSession.unsubscribe(getTestQueueName());
+ clientConnection.close();
+
+ producerConnection.close();
+ }
+
+ public void testActiveTTLwithDurableSubscription() throws Exception
+ {
+ //Create Client 1
+ Connection clientConnection = getConnection();
+ Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create and close the durable subscriber
+ AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName());
+ TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, "MyDurableTTLSubscription","testprop='TimeToLiveTest'", false);
+ durableSubscriber.close();
+
+ //Create Producer
+ Connection producerConnection = getConnection();
+ AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(topic);
+ producer.setTimeToLive(1000L);
+
+ // send Messages
+ for(int i = 0; i < MSG_COUNT; i++)
+ {
+ producer.send(producerSession.createTextMessage("Message: "+i));
+ }
+ long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
+
+ // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms.
+ long messageCount = MSG_COUNT;
+ long lastPass;
+ AMQQueue subcriptionQueue = new AMQQueue("amq.topic","clientid" + ":" + "MyDurableTTLSubscription");
+ do
+ {
+ lastPass = messageCount;
+ Thread.sleep(100);
+ messageCount = producerSession.getQueueDepth((AMQDestination) subcriptionQueue);
+
+ // If we have received messages in the last loop then extend the timeout time.
+ // if we get messages stuck that are not expiring then the failureTime will occur
+ // failing the test. This will help with the scenario when the broker does not
+ // have enough CPU cycles to process the TTLs.
+ if (lastPass != messageCount)
+ {
+ failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
+ }
+ }
+ while(messageCount > 0L && System.currentTimeMillis() < failureTime);
+
+ assertEquals("Messages not automatically expired: ", 0L, messageCount);
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ clientSession.unsubscribe("MyDurableTTLSubscription");
+ clientSession.close();
+ clientConnection.close();
+ }
+
+}