diff options
Diffstat (limited to 'qpid/java/systests/src/test/java/org/apache/qpid/server/queue')
12 files changed, 4182 insertions, 0 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java new file mode 100644 index 0000000000..21e3bfa055 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java @@ -0,0 +1,159 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Test DeapQueueConsumerWithSelector + * Summary: + * Prior to M4 the broker had a different queue model which pre-processed the + * messages on the queue for any connecting subscription that had a selector. + * + * If the queue had a lot of data then this may take a long time to process + * to such an extent that the subscription creation may time out. During this + * pre-process phase the virtualhost would be come unresposive. + * + * Our solution was to allow the timeout to be adjusted QPID-1119, which allowed + * the subscription to connect but did not address the unresponsiveness. + * + * The new queue model introduced in M4 resolved this. + * + * This test is to validate that the new queueing model does indeed remove the + * long pre-processing phase and allow immediate subscription so that there is + * no unresponsive period. + * + * Test Strategy: + * + * Add 100k messages to the queue with a numberic header property that will + * allow later subscribers to use as in a selector. + * + * Connect the subscriber and time how long it takes to connect. + * + * Finally consume all the messages from the queue to clean up. + */ +public class DeepQueueConsumeWithSelector extends QpidBrokerTestCase implements MessageListener +{ + + private static final int MESSAGE_COUNT = 10000; + private static final int BATCH_SIZE = MESSAGE_COUNT / 10; + + private CountDownLatch _receviedLatch = new CountDownLatch(MESSAGE_COUNT); + + protected long SYNC_WRITE_TIMEOUT = 120000L; + + + public void setUp() throws Exception + { + //Set the syncWrite timeout to be just larger than the delay on the commitTran. + setSystemProperty("amqj.default_syncwrite_timeout", String.valueOf(SYNC_WRITE_TIMEOUT)); + + super.setUp(); + } + + public void test() throws Exception + { + // Create Connection + Connection connection = getConnection(); + Session session = ((AMQConnection)connection).createSession(true, Session.SESSION_TRANSACTED, 100000); + + Queue queue = (Queue) getInitialContext().lookup("queue"); + + // Validate that the destination exists + session.createConsumer(queue).close(); + + // Send Messages + sendMessage(session, queue, MESSAGE_COUNT, BATCH_SIZE); + + session.close(); + + session = ((AMQConnection) connection).createSession(false, Session.AUTO_ACKNOWLEDGE);//, 100000); + + + // Setup Selector to perform a few calculations which will slow it down + String selector = "((\"" + INDEX + "\" % 1) = 0) AND ('" + INDEX + "' IS NOT NULL) AND ('" + INDEX + "' <> -1)"; + + // Setup timing + long start = System.nanoTime(); + + System.err.println("Create Consumer"); + // Connect Consumer + MessageConsumer consumer = session.createConsumer(queue, selector); + consumer.setMessageListener(this); + + // Validate timing details + long end = System.nanoTime(); + + System.err.println("Subscription time took:" + (end - start)); + + // Consume Messages + connection.start(); + + + + assertTrue("Messages took to long to be received :"+_receviedLatch.getCount(), + _receviedLatch.await(SYNC_WRITE_TIMEOUT, TimeUnit.MILLISECONDS )); + + } + + @Override + public Message createNextMessage(Session session, int msgCount) throws JMSException + { + Message message = super.createNextMessage(session,msgCount); + + if ((msgCount % BATCH_SIZE) == 0 ) + { + System.err.println("Sent:"+msgCount); + } + + return message; + } + + public void onMessage(Message message) + { + _receviedLatch.countDown(); + int msgCount = 0; + try + { + msgCount = message.getIntProperty(INDEX); + } + catch (JMSException e) + { + //ignore + } + if ((msgCount % BATCH_SIZE) == 0 ) + { + System.err.println("Received:"+msgCount); + } + + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java new file mode 100644 index 0000000000..dc30c02951 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java @@ -0,0 +1,574 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.AMQBindingURL; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class LastValueQueueTest extends QpidBrokerTestCase +{ + private static final Logger LOGGER = Logger.getLogger(LastValueQueueTest.class); + + private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg"; + private static final String KEY_PROPERTY = "key"; + + private static final int MSG_COUNT = 400; + + private String _queueName; + private Queue _queue; + private Connection _producerConnection; + private MessageProducer _producer; + private Session _producerSession; + private Connection _consumerConnection; + private Session _consumerSession; + private MessageConsumer _consumer; + + protected void setUp() throws Exception + { + super.setUp(); + + _queueName = getTestQueueName(); + _producerConnection = getConnection(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public void testConflation() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + } + + _producer.close(); + _producerSession.close(); + _producerConnection.close(); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + Message received; + + List<Message> messages = new ArrayList<Message>(); + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received",10,messages.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + } + + public void testConflationWithRelease() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT/2; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + + } + + // HACK to do something synchronous + ((AMQSession<?,?>)_producerSession).sync(); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + Message received; + List<Message> messages = new ArrayList<Message>(); + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received",10,messages.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + _consumerSession.close(); + _consumerConnection.close(); + + + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + } + + + // HACK to do something synchronous + ((AMQSession<?,?>)_producerSession).sync(); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + messages = new ArrayList<Message>(); + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received",10,messages.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + } + + + public void testConflationWithReleaseAfterNewPublish() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT/2; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + } + + // HACK to do something synchronous + ((AMQSession<?,?>)_producerSession).sync(); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + Message received; + List<Message> messages = new ArrayList<Message>(); + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received",10,messages.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + _consumer.close(); + + for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + } + + // HACK to do something synchronous + ((AMQSession<?,?>)_producerSession).sync(); + + + // this causes the "old" messages to be released + _consumerSession.close(); + _consumerConnection.close(); + + + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + messages = new ArrayList<Message>(); + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received",10,messages.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + } + + public void testConflatedQueueDepth() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + } + + final long queueDepth = ((AMQSession<?, ?>)_producerSession).getQueueDepth((AMQDestination)_queue, true); + + assertEquals(10, queueDepth); + } + + public void testConflationBrowser() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + + } + + ((AMQSession<?,?>)_producerSession).sync(); + + AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'"); + AMQQueue browseQueue = new AMQQueue(url); + + _consumer = _consumerSession.createConsumer(browseQueue); + _consumerConnection.start(); + Message received; + List<Message> messages = new ArrayList<Message>(); + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received",10,messages.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + messages.clear(); + + _producer.send(nextMessage(MSG_COUNT, _producerSession)); + + ((AMQSession<?,?>)_producerSession).sync(); + + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + assertEquals("Unexpected number of messages received",1,messages.size()); + assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + + + _producer.close(); + _producerSession.close(); + _producerConnection.close(); + } + + public void testConflation2Browsers() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + } + + ((AMQSession<?,?>)_producerSession).sync(); + + AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'"); + AMQQueue browseQueue = new AMQQueue(url); + + _consumer = _consumerSession.createConsumer(browseQueue); + MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue); + _consumerConnection.start(); + List<Message> messages = new ArrayList<Message>(); + List<Message> messages2 = new ArrayList<Message>(); + Message received = _consumer.receive(1000); + Message received2 = consumer2.receive(1000); + + while(received!=null || received2!=null) + { + if(received != null) + { + messages.add(received); + } + if(received2 != null) + { + messages2.add(received2); + } + + + received = _consumer.receive(1000); + received2 = consumer2.receive(1000); + + } + + assertEquals("Unexpected number of messages received on first browser",10,messages.size()); + assertEquals("Unexpected number of messages received on second browser",10,messages2.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + msg = messages2.get(i); + assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + + _producer.close(); + _producerSession.close(); + _producerConnection.close(); + } + + public void testParallelProductionAndConsumption() throws Exception + { + createConflationQueue(_producerSession); + + // Start producing threads that send messages + BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1"); + messageProducer1.startSendingMessages(); + BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2"); + messageProducer2.startSendingMessages(); + + Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1); + + messageProducer1.join(); + messageProducer2.join(); + + final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey(); + assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size()); + final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey(); + assertEquals(lastSentMessages1, lastSentMessages2); + + assertEquals("The last message sent for each key should match the last message received for that key", + lastSentMessages1, lastReceivedMessages); + + assertNull("Unexpected exception from background producer thread", messageProducer1.getException()); + } + + private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer) throws Exception + { + producer.waitUntilQuarterOfMessagesSentToEncourageConflation(); + + _consumerConnection = getConnection(); + int smallPrefetchToEncourageConflation = 1; + _consumerSession = ((AMQConnection)_consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE, smallPrefetchToEncourageConflation); + + LOGGER.info("Starting to receive"); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + Map<String, Integer> messageSequenceNumbersByKey = new HashMap<String, Integer>(); + + Message message; + int numberOfShutdownsReceived = 0; + int numberOfMessagesReceived = 0; + while(numberOfShutdownsReceived < 2) + { + message = _consumer.receive(10000); + assertNotNull(message); + + if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN)) + { + numberOfShutdownsReceived++; + } + else + { + numberOfMessagesReceived++; + putMessageInMap(message, messageSequenceNumbersByKey); + } + } + + LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total"); + + return messageSequenceNumbersByKey; + } + + private void putMessageInMap(Message message, Map<String, Integer> messageSequenceNumbersByKey) throws JMSException + { + String keyValue = message.getStringProperty(KEY_PROPERTY); + Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY); + messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber); + } + + private class BackgroundMessageProducer + { + static final String SHUTDOWN = "SHUTDOWN"; + + private final String _threadName; + + private volatile Exception _exception; + + private Thread _thread; + private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<String, Integer>(); + private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4); + + public BackgroundMessageProducer(String threadName) + { + _threadName = threadName; + } + + public void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException + { + final long latchTimeout = 60000; + boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS); + assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", success); + LOGGER.info("Quarter of messages sent"); + } + + public Exception getException() + { + return _exception; + } + + public Map<String, Integer> getMessageSequenceNumbersByKey() + { + return Collections.unmodifiableMap(_messageSequenceNumbersByKey); + } + + public void startSendingMessages() + { + Runnable messageSender = new Runnable() + { + @Override + public void run() + { + try + { + LOGGER.info("Starting to send in background thread"); + Connection producerConnection = getConnection(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer backgroundProducer = producerSession.createProducer(_queue); + for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++) + { + Message message = nextMessage(messageNumber, producerSession, 2); + backgroundProducer.send(message); + + putMessageInMap(message, _messageSequenceNumbersByKey); + _quarterOfMessagesSentLatch.countDown(); + } + + Message shutdownMessage = producerSession.createMessage(); + shutdownMessage.setBooleanProperty(SHUTDOWN, true); + backgroundProducer.send(shutdownMessage); + + LOGGER.info("Finished sending in background thread"); + } + catch (Exception e) + { + _exception = e; + throw new RuntimeException(e); + } + } + }; + + _thread = new Thread(messageSender); + _thread.setName(_threadName); + _thread.start(); + } + + public void join() throws InterruptedException + { + final int timeoutInMillis = 120000; + _thread.join(timeoutInMillis); + assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive()); + } + } + + private void createConflationQueue(Session session) throws AMQException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("qpid.last_value_queue_key",KEY_PROPERTY); + ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, arguments); + _queue = new AMQQueue("amq.direct", _queueName); + ((AMQSession<?,?>) session).declareAndBind((AMQDestination)_queue); + } + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + return nextMessage(msg, producerSession, 10); + } + + private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msg); + + send.setStringProperty(KEY_PROPERTY, String.valueOf(msg % numberOfUniqueKeyValues)); + send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg); + + return send; + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java new file mode 100644 index 0000000000..cb8ced4ddb --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java @@ -0,0 +1,604 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class MessageGroupQueueTest extends QpidBrokerTestCase +{ + protected final String QUEUE = "MessageGroupQueue"; + + private Connection producerConnection; + private MessageProducer producer; + private Session producerSession; + private Queue queue; + private Connection consumerConnection; + + + protected void setUp() throws Exception + { + super.setUp(); + + producerConnection = getConnection(); + producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + producerConnection.start(); + + consumerConnection = getConnection(); + + } + + protected void tearDown() throws Exception + { + producerConnection.close(); + consumerConnection.close(); + super.tearDown(); + } + + + public void testSimpleGroupAssignment() throws Exception + { + simpleGroupAssignment(false); + } + + public void testSharedGroupSimpleGroupAssignment() throws Exception + { + simpleGroupAssignment(true); + } + + + /** + * Pre populate the queue with messages with groups as follows + * + * ONE + * TWO + * ONE + * TWO + * + * Create two consumers with prefetch of 1, the first consumer should then be assigned group ONE, the second + * consumer assigned group TWO if they are started in sequence. + * + * Thus doing + * + * c1 <--- (ONE) + * c2 <--- (TWO) + * c2 ack ---> + * + * c2 should now be able to receive a second message from group TWO (skipping over the message from group ONE) + * + * i.e. + * + * c2 <--- (TWO) + * c2 ack ---> + * c1 <--- (ONE) + * c1 ack ---> + * + */ + private void simpleGroupAssignment(boolean sharedGroups) throws AMQException, JMSException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group"); + if(sharedGroups) + { + arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1"); + } + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + String[] groups = { "ONE", "TWO"}; + + for (int msg = 0; msg < 4; msg++) + { + producer.send(createMessage(msg, groups[msg % groups.length])); + } + producerSession.commit(); + producer.close(); + producerSession.close(); + producerConnection.close(); + + Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); + + + MessageConsumer consumer1 = cs1.createConsumer(queue); + MessageConsumer consumer2 = cs2.createConsumer(queue); + + consumerConnection.start(); + Message cs1Received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received first message", cs1Received); + + Message cs2Received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received first message", cs2Received); + + cs2Received.acknowledge(); + + Message cs2Received2 = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received second message", cs2Received2); + assertEquals("Differing groups", cs2Received2.getStringProperty("group"), + cs2Received.getStringProperty("group")); + + cs1Received.acknowledge(); + Message cs1Received2 = consumer1.receive(1000); + + assertNotNull("Consumer 1 should have received second message", cs1Received2); + assertEquals("Differing groups", cs1Received2.getStringProperty("group"), + cs1Received.getStringProperty("group")); + + cs1Received2.acknowledge(); + cs2Received2.acknowledge(); + + assertNull(consumer1.receive(1000)); + assertNull(consumer2.receive(1000)); + } + + + public void testConsumerCloseGroupAssignment() throws Exception + { + consumerCloseGroupAssignment(false); + } + + public void testSharedGroupConsumerCloseGroupAssignment() throws Exception + { + consumerCloseGroupAssignment(true); + } + + /** + * + * Tests that upon closing a consumer, groups previously assigned to that consumer are reassigned to a different + * consumer. + * + * Pre-populate the queue as ONE, ONE, TWO, ONE + * + * create in sequence two consumers + * + * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2) + * + * Then close c1 before acking. + * + * If we now attempt to receive from c2, then the remaining messages in group ONE should be available (which + * requires c2 to go "backwards" in the queue). + * + **/ + private void consumerCloseGroupAssignment(boolean sharedGroups) throws AMQException, JMSException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group"); + if(sharedGroups) + { + arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1"); + } + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + producer.send(createMessage(1, "ONE")); + producer.send(createMessage(2, "ONE")); + producer.send(createMessage(3, "TWO")); + producer.send(createMessage(4, "ONE")); + + producerSession.commit(); + producer.close(); + producerSession.close(); + producerConnection.close(); + + Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + + MessageConsumer consumer1 = cs1.createConsumer(queue); + + consumerConnection.start(); + MessageConsumer consumer2 = cs2.createConsumer(queue); + + Message cs1Received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received first message", cs1Received); + assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg")); + + Message cs2Received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received first message", cs2Received); + assertEquals("incorrect message received", 3, cs2Received.getIntProperty("msg")); + cs2.commit(); + + Message cs2Received2 = consumer2.receive(1000); + + assertNull("Consumer 2 should not yet have received a second message", cs2Received2); + + consumer1.close(); + + cs1.commit(); + Message cs2Received3 = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received second message", cs2Received3); + assertEquals("Unexpected group", "ONE", cs2Received3.getStringProperty("group")); + assertEquals("incorrect message received", 2, cs2Received3.getIntProperty("msg")); + + cs2.commit(); + + + Message cs2Received4 = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received third message", cs2Received4); + assertEquals("Unexpected group", "ONE", cs2Received4.getStringProperty("group")); + assertEquals("incorrect message received", 4, cs2Received4.getIntProperty("msg")); + cs2.commit(); + + assertNull(consumer2.receive(1000)); + } + + + + + public void testConsumerCloseWithRelease() throws Exception + { + consumerCloseWithRelease(false); + } + + public void testSharedGroupConsumerCloseWithRelease() throws Exception + { + consumerCloseWithRelease(true); + } + + + /** + * + * Tests that upon closing a consumer and its session, groups previously assigned to that consumer are reassigned + * toa different consumer, including messages which were previously delivered but have now been released. + * + * Pre-populate the queue as ONE, ONE, TWO, ONE + * + * create in sequence two consumers + * + * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2) + * + * Then close c1 and its session without acking. + * + * If we now attempt to receive from c2, then the all messages in group ONE should be available (which + * requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered + * + */ + private void consumerCloseWithRelease(boolean sharedGroups) throws AMQException, JMSException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group"); + if(sharedGroups) + { + arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1"); + } + + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + producer.send(createMessage(1, "ONE")); + producer.send(createMessage(2, "ONE")); + producer.send(createMessage(3, "TWO")); + producer.send(createMessage(4, "ONE")); + + producerSession.commit(); + producer.close(); + producerSession.close(); + producerConnection.close(); + + Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + + + MessageConsumer consumer1 = cs1.createConsumer(queue); + + consumerConnection.start(); + + MessageConsumer consumer2 = cs2.createConsumer(queue); + + Message cs1Received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received its first message", cs1Received); + assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg")); + + Message received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received its first message", received); + assertEquals("incorrect message received", 3, received.getIntProperty("msg")); + + received = consumer2.receive(1000); + + assertNull("Consumer 2 should not yet have received second message", received); + + consumer1.close(); + cs1.close(); + cs2.commit(); + received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should now have received second message", received); + assertEquals("Unexpected group", "ONE", received.getStringProperty("group")); + assertEquals("incorrect message received", 1, received.getIntProperty("msg")); + assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"), + received.getJMSRedelivered()); + + cs2.commit(); + + + received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received a third message", received); + assertEquals("Unexpected group", "ONE", received.getStringProperty("group")); + assertEquals("incorrect message received", 2, received.getIntProperty("msg")); + + cs2.commit(); + + received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received a fourth message", received); + assertEquals("Unexpected group", "ONE", received.getStringProperty("group")); + assertEquals("incorrect message received", 4, received.getIntProperty("msg")); + + cs2.commit(); + + + assertNull(consumer2.receive(1000)); + } + + public void testGroupAssignmentSurvivesEmpty() throws JMSException, AMQException + { + groupAssignmentOnEmpty(false); + } + + public void testSharedGroupAssignmentDoesNotSurviveEmpty() throws JMSException, AMQException + { + groupAssignmentOnEmpty(true); + } + + private void groupAssignmentOnEmpty(boolean sharedGroups) throws AMQException, JMSException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group"); + if(sharedGroups) + { + arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1"); + } + + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + producer.send(createMessage(1, "ONE")); + producer.send(createMessage(2, "TWO")); + producer.send(createMessage(3, "THREE")); + producer.send(createMessage(4, "ONE")); + + producerSession.commit(); + producer.close(); + producerSession.close(); + producerConnection.close(); + + Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + + + MessageConsumer consumer1 = cs1.createConsumer(queue); + + consumerConnection.start(); + + MessageConsumer consumer2 = cs2.createConsumer(queue); + + Message received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received its first message", received); + assertEquals("incorrect message received", 1, received.getIntProperty("msg")); + + received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received its first message", received); + assertEquals("incorrect message received", 2, received.getIntProperty("msg")); + + cs1.commit(); + + received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received its second message", received); + assertEquals("incorrect message received", 3, received.getIntProperty("msg")); + + // We expect different behaviours from "shared groups": here the assignment of a subscription to a group + // is terminated when there are no outstanding delivered but unacknowledged messages. In contrast, with a + // standard message grouping queue the assignment will be retained until the subscription is no longer + // registered + if(sharedGroups) + { + cs2.commit(); + received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received its second message", received); + assertEquals("incorrect message received", 4, received.getIntProperty("msg")); + + cs2.commit(); + } + else + { + cs2.commit(); + received = consumer2.receive(1000); + + assertNull("Consumer 2 should not have received a second message", received); + + cs1.commit(); + + received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received its third message", received); + assertEquals("incorrect message received", 4, received.getIntProperty("msg")); + + } + + } + + private Message createMessage(int msg, String group) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msg); + send.setIntProperty("msg", msg); + send.setStringProperty("group", group); + + return send; + } + + /** + * Tests that when a number of new messages for a given groupid are arriving while the delivery group + * state is also in the process of being emptied (due to acking a message while using prefetch=1), that only + * 1 of a number of existing consumers is ever receiving messages for the shared group at a time. + */ + public void testSingleSharedGroupWithMultipleConsumers() throws Exception + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group"); + arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1"); + + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + + consumerConnection.close(); + Map<String, String> options = new HashMap<String, String>(); + options.put(ConnectionURL.OPTIONS_MAXPREFETCH, "1"); + consumerConnection = getConnectionWithOptions(options); + + int numMessages = 100; + SharedGroupTestMessageListener groupingTestMessageListener = new SharedGroupTestMessageListener(numMessages); + + Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE); + Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE); + Session cs3 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE); + Session cs4 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer1 = cs1.createConsumer(queue); + consumer1.setMessageListener(groupingTestMessageListener); + MessageConsumer consumer2 = cs2.createConsumer(queue); + consumer2.setMessageListener(groupingTestMessageListener); + MessageConsumer consumer3 = cs3.createConsumer(queue); + consumer3.setMessageListener(groupingTestMessageListener); + MessageConsumer consumer4 = cs4.createConsumer(queue); + consumer4.setMessageListener(groupingTestMessageListener); + consumerConnection.start(); + + for(int i = 1; i <= numMessages; i++) + { + producer.send(createMessage(i, "GROUP")); + } + producerSession.commit(); + producer.close(); + producerSession.close(); + producerConnection.close(); + + assertTrue("Mesages not all recieved in the allowed timeframe", groupingTestMessageListener.waitForLatch(30)); + assertEquals("Unexpected concurrent processing of messages for the group", 0, groupingTestMessageListener.getConcurrentProcessingCases()); + assertNull("Unexpecte throwable in message listeners", groupingTestMessageListener.getThrowable()); + } + + public static class SharedGroupTestMessageListener implements MessageListener + { + private final CountDownLatch _count; + private final AtomicInteger _activeListeners = new AtomicInteger(); + private final AtomicInteger _concurrentProcessingCases = new AtomicInteger(); + private Throwable _throwable; + + public SharedGroupTestMessageListener(int numMessages) + { + _count = new CountDownLatch(numMessages); + } + + public void onMessage(Message message) + { + try + { + int currentActiveListeners = _activeListeners.incrementAndGet(); + + if (currentActiveListeners > 1) + { + _concurrentProcessingCases.incrementAndGet(); + + System.err.println("Concurrent processing when handling message: " + message.getIntProperty("msg")); + } + + try + { + Thread.sleep(25); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + + _activeListeners.decrementAndGet(); + } + catch (Throwable t) + { + _throwable = t; + t.printStackTrace(); + } + finally + { + _count.countDown(); + } + } + + public boolean waitForLatch(int seconds) throws Exception + { + return _count.await(seconds, TimeUnit.SECONDS); + } + + public int getConcurrentProcessingCases() + { + return _concurrentProcessingCases.get(); + } + + public Throwable getThrowable() + { + return _throwable; + } + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ModelTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ModelTest.java new file mode 100644 index 0000000000..c6b2c9e95c --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ModelTest.java @@ -0,0 +1,342 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.management.JMException; +import javax.management.MBeanException; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.UndeclaredThrowableException; + +/** + * This Test validates the Queue Model on the broker. + * Currently it has some basic queue creation / deletion tests. + * However, it should be expanded to include other tests that relate to the + * model. i.e. + * + * The Create and Delete tests should ensure that the requisite logging is + * performed. + * + * Additions to this suite would be to complete testing of creations, validating + * fields such as owner/exclusive, autodelete and priority are correctly set. + * + * Currently this test uses the JMX interface to validate that the queue has + * been declared as expected so these tests cannot run against a CPP broker. + * + * + * Tests should ensure that they clean up after themselves. + * e,g. Durable queue creation test should perform a queue delete. + */ +public class ModelTest extends QpidBrokerTestCase +{ + + private JMXTestUtils _jmxUtils; + private static final String VIRTUALHOST_NAME = "test"; + + @Override + public void setUp() throws Exception + { + getBrokerConfiguration().addJmxManagementConfiguration(); + + // Create a JMX Helper + _jmxUtils = new JMXTestUtils(this); + super.setUp(); + + // Open the JMX Connection + _jmxUtils.open(); + } + + @Override + public void tearDown() throws Exception + { + // Close the JMX Connection + _jmxUtils.close(); + super.tearDown(); + } + + /** + * Test that an exclusive transient queue can be created via AMQP. + * + * @throws Exception On unexpected error + */ + public void testExclusiveQueueCreationTransientViaAMQP() throws Exception + { + Connection connection = getConnection(); + + String queueName = getTestQueueName(); + boolean durable = false; + boolean autoDelete = false; + boolean exclusive = true; + + createViaAMQPandValidateViaJMX(connection, queueName, durable, + autoDelete, exclusive); + } + + + + /** + * Test that a transient queue can be created via AMQP. + * + * @throws Exception On unexpected error + */ + public void testQueueCreationTransientViaAMQP() throws Exception + { + Connection connection = getConnection(); + + String queueName = getTestQueueName(); + boolean durable = false; + boolean autoDelete = false; + boolean exclusive = true; + + createViaAMQPandValidateViaJMX(connection, queueName, durable, + autoDelete, exclusive); + } + + /** + * Test that a durable exclusive queue can be created via AMQP. + * + * @throws Exception On unexpected error + */ + + public void testExclusiveQueueCreationDurableViaAMQP() throws Exception + { + Connection connection = getConnection(); + + String queueName = getTestQueueName(); + boolean durable = true; + boolean autoDelete = false; + boolean exclusive = true; + + createViaAMQPandValidateViaJMX(connection, queueName, durable, + autoDelete, exclusive); + + // Clean up + ManagedBroker managedBroker = + _jmxUtils.getManagedBroker(VIRTUALHOST_NAME); + managedBroker.deleteQueue(queueName); + } + + /** + * Test that a durable queue can be created via AMQP. + * + * @throws Exception On unexpected error + */ + + public void testQueueCreationDurableViaAMQP() throws Exception + { + Connection connection = getConnection(); + + String queueName = getTestQueueName(); + boolean durable = true; + boolean autoDelete = false; + boolean exclusive = false; + + createViaAMQPandValidateViaJMX(connection, queueName, durable, + autoDelete, exclusive); + + // Clean up + ManagedBroker managedBroker = + _jmxUtils.getManagedBroker(VIRTUALHOST_NAME); + managedBroker.deleteQueue(queueName); + } + + + /** + * Test that a transient queue can be created via JMX. + * + * @throws IOException if there is a problem via the JMX connection + * @throws javax.management.JMException if there is a problem with the JMX command + */ + public void testCreationTransientViaJMX() throws IOException, JMException + { + String name = getName(); + String owner = null; + boolean durable = false; + + createViaJMXandValidateViaJMX(name, owner, durable); + } + + /** + * Test that a durable queue can be created via JMX. + * + * @throws IOException if there is a problem via the JMX connection + * @throws javax.management.JMException if there is a problem with the JMX command + */ + public void testCreationDurableViaJMX() throws IOException, JMException + { + String name = getName(); + String owner = null; + boolean durable = true; + + createViaJMXandValidateViaJMX(name, owner, durable); + + // Clean up + ManagedBroker managedBroker = + _jmxUtils.getManagedBroker(VIRTUALHOST_NAME); + managedBroker.deleteQueue(name); + } + + /** + * Test that a transient queue can be deleted via JMX. + * + * @throws IOException if there is a problem via the JMX connection + * @throws javax.management.JMException if there is a problem with the JMX command + */ + public void testDeletionTransientViaJMX() throws IOException, JMException + { + String name = getName(); + + _jmxUtils.createQueue(VIRTUALHOST_NAME, name, null, false); + + ManagedBroker managedBroker = _jmxUtils. + getManagedBroker(VIRTUALHOST_NAME); + + try + { + managedBroker.deleteQueue(name); + } + catch (UndeclaredThrowableException e) + { + fail(((MBeanException) ((InvocationTargetException) + e.getUndeclaredThrowable()).getTargetException()).getTargetException().getMessage()); + } + } + + /** + * Test that a durable queue can be created via JMX. + * + * @throws IOException if there is a problem via the JMX connection + * @throws javax.management.JMException if there is a problem with the JMX command + */ + public void testDeletionDurableViaJMX() throws IOException, JMException + { + String name = getName(); + + _jmxUtils.createQueue(VIRTUALHOST_NAME, name, null, true); + + ManagedBroker managedBroker = _jmxUtils. + getManagedBroker(VIRTUALHOST_NAME); + + try + { + managedBroker.deleteQueue(name); + } + catch (UndeclaredThrowableException e) + { + fail(((MBeanException) ((InvocationTargetException) + e.getUndeclaredThrowable()).getTargetException()).getTargetException().getMessage()); + } + } + + /* + * Helper Methods + */ + + /** + * Using the provided JMS Connection create a queue using the AMQP extension + * with the given properties and then validate it was created correctly via + * the JMX Connection + * + * @param connection Qpid JMS Connection + * @param queueName String the desired QueueName + * @param durable boolean if the queue should be durable + * @param autoDelete boolean if the queue is an autoDelete queue + * @param exclusive boolean if the queue is exclusive + * + * @throws AMQException if there is a problem with the createQueue call + * @throws JMException if there is a problem with the JMX validatation + * @throws IOException if there is a problem with the JMX connection + * @throws JMSException if there is a problem creating the JMS Session + */ + private void createViaAMQPandValidateViaJMX(Connection connection, + String queueName, + boolean durable, + boolean autoDelete, + boolean exclusive) + throws AMQException, JMException, IOException, JMSException + { + AMQSession session = (AMQSession) connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + + session.createQueue(new AMQShortString(queueName), + autoDelete, durable, exclusive); + + validateQueueViaJMX(queueName, (exclusive && durable &&!isBroker010()) ? connection.getClientID() : null, durable, autoDelete || (exclusive && !isBroker010() && !durable)); + } + + /** + * Use the JMX Helper to create a queue with the given properties and then + * validate it was created correctly via the JMX Connection + * + * @param queueName String the desired QueueName + * @param owner String the owner value that should be set + * @param durable boolean if the queue should be durable + * @param autoDelete boolean if the queue is an autoDelete queue + * + * @throws JMException if there is a problem with the JMX validatation + * @throws IOException if there is a problem with the JMX connection + */ + private void createViaJMXandValidateViaJMX(String queueName, String owner, + boolean durable) + throws JMException, IOException + { + _jmxUtils.createQueue(VIRTUALHOST_NAME, queueName, owner, durable); + + validateQueueViaJMX(queueName, owner, durable, false); + } + + /** + * Validate that a queue with the given properties exists on the broker + * + * @param queueName String the desired QueueName + * @param owner String the owner value that should be set + * @param durable boolean if the queue should be durable + * @param autoDelete boolean if the queue is an autoDelete queue + * + * @throws JMException if there is a problem with the JMX validatation + * @throws IOException if there is a problem with the JMX connection + */ + private void validateQueueViaJMX(String queueName, String owner, boolean durable, boolean autoDelete) + throws JMException, IOException + { + ManagedQueue managedQueue = _jmxUtils. + getManagedObject(ManagedQueue.class, + _jmxUtils.getQueueObjectName(VIRTUALHOST_NAME, + queueName)); + + assertEquals(queueName, managedQueue.getName()); + assertEquals(owner, managedQueue.getOwner()); + assertEquals(durable, managedQueue.isDurable()); + assertEquals(autoDelete, managedQueue.isAutoDelete()); + } + +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java new file mode 100644 index 0000000000..cbf4e032db --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java @@ -0,0 +1,250 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class MultipleTransactedBatchProducerTest extends QpidBrokerTestCase +{ + private static final Logger _logger = Logger.getLogger(MultipleTransactedBatchProducerTest.class); + + private static final int MESSAGE_COUNT = 1000; + private static final int BATCH_SIZE = 50; + private static final int NUM_PRODUCERS = 2; + private static final int NUM_CONSUMERS = 3; + private static final Random RANDOM = new Random(); + + private CountDownLatch _receivedLatch; + private String _queueName; + + private volatile String _failMsg; + + public void setUp() throws Exception + { + //debug level logging often makes this test pass artificially, turn the level down to info. + setSystemProperty("amqj.server.logging.level", "INFO"); + _receivedLatch = new CountDownLatch(MESSAGE_COUNT * NUM_PRODUCERS); + + getBrokerConfiguration().addJmxManagementConfiguration(); + + super.setUp(); + _queueName = getTestQueueName(); + _failMsg = null; + } + + /** + * When there are multiple producers submitting batches of messages to a given + * queue using transacted sessions, it is highly probable that concurrent + * enqueue() activity will occur and attempt delivery of their message to the + * same subscription. In this scenario it is likely that one of the attempts + * will succeed and the other will result in use of the deliverAsync() method + * to start a queue Runner and ensure delivery of the message. + * + * A defect within the processQueue() method used by the Runner would mean that + * delivery of these messages may not occur, should the Runner stop before all + * messages have been processed. Such a defect was discovered and found to be + * most visible when Selectors are used such that one and only one subscription + * can/will accept any given message, but multiple subscriptions are present, + * and one of the earlier subscriptions receives more messages than the others. + * + * This test is to validate that the processQueue() method is able to correctly + * deliver all of the messages present for asynchronous delivery to subscriptions, + * by utilising multiple batch transacted producers to create the scenario and + * ensure all messages are received by a consumer. + */ + public void testMultipleBatchedProducersWithMultipleConsumersUsingSelectors() throws Exception + { + String selector1 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 0"); + String selector2 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 1"); + String selector3 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 2"); + + //create consumers + Connection conn1 = getConnection(); + conn1.setExceptionListener(new ExceptionHandler("conn1")); + Session sess1 = conn1.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer cons1 = sess1.createConsumer(sess1.createQueue(_queueName), selector1); + cons1.setMessageListener(new Cons(sess1,"consumer1")); + + Connection conn2 = getConnection(); + conn2.setExceptionListener(new ExceptionHandler("conn2")); + Session sess2 = conn2.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer cons2 = sess2.createConsumer(sess2.createQueue(_queueName), selector2); + cons2.setMessageListener(new Cons(sess2,"consumer2")); + + Connection conn3 = getConnection(); + conn3.setExceptionListener(new ExceptionHandler("conn3")); + Session sess3 = conn3.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer cons3 = sess3.createConsumer(sess3.createQueue(_queueName), selector3); + cons3.setMessageListener(new Cons(sess3,"consumer3")); + + conn1.start(); + conn2.start(); + conn3.start(); + + //create producers + Connection connA = getConnection(); + connA.setExceptionListener(new ExceptionHandler("connA")); + Connection connB = getConnection(); + connB.setExceptionListener(new ExceptionHandler("connB")); + Thread producer1 = new Thread(new ProducerThread(connA, _queueName, "producer1")); + Thread producer2 = new Thread(new ProducerThread(connB, _queueName, "producer2")); + + producer1.start(); + Thread.sleep(10); + producer2.start(); + + //await delivery of the messages + int timeout = isBrokerStorePersistent() ? 300 : 75; + boolean result = _receivedLatch.await(timeout, TimeUnit.SECONDS); + + assertNull("Test failed because: " + String.valueOf(_failMsg), _failMsg); + assertTrue("Some of the messages were not all recieved in the given timeframe, remaining count was: "+_receivedLatch.getCount(), + result); + + } + + @Override + public Message createNextMessage(Session session, int msgCount) throws JMSException + { + Message message = super.createNextMessage(session,msgCount); + + //bias at least 50% of the messages to the first consumers selector because + //the issue presents itself primarily when an earlier subscription completes + //delivery after the later subscriptions + int val; + if (msgCount % 2 == 0) + { + val = 0; + } + else + { + val = RANDOM.nextInt(Integer.MAX_VALUE); + } + + message.setIntProperty(_queueName, val); + + return message; + } + + private class Cons implements MessageListener + { + private Session _sess; + private String _desc; + + public Cons(Session sess, String desc) + { + _sess = sess; + _desc = desc; + } + + public void onMessage(Message message) + { + _receivedLatch.countDown(); + int msgCount = 0; + int msgID = 0; + try + { + msgCount = message.getIntProperty(INDEX); + msgID = message.getIntProperty(_queueName); + } + catch (JMSException e) + { + _logger.error(_desc + " received exception: " + e.getMessage(), e); + failAsyncTest(e.getMessage()); + } + + _logger.info("Consumer received message:"+ msgCount + " with ID: " + msgID); + + try + { + _sess.commit(); + } + catch (JMSException e) + { + _logger.error(_desc + " received exception: " + e.getMessage(), e); + failAsyncTest(e.getMessage()); + } + } + } + + private class ProducerThread implements Runnable + { + private Connection _conn; + private String _dest; + private String _desc; + + public ProducerThread(Connection conn, String dest, String desc) + { + _conn = conn; + _dest = dest; + _desc = desc; + } + + public void run() + { + try + { + Session session = _conn.createSession(true, Session.SESSION_TRANSACTED); + sendMessage(session, session.createQueue(_dest), MESSAGE_COUNT, BATCH_SIZE); + } + catch (Exception e) + { + _logger.error(_desc + " received exception: " + e.getMessage(), e); + failAsyncTest(e.getMessage()); + } + } + } + + private class ExceptionHandler implements javax.jms.ExceptionListener + { + private String _desc; + + public ExceptionHandler(String description) + { + _desc = description; + } + + public void onException(JMSException e) + { + _logger.error(_desc + " received exception: " + e.getMessage(), e); + failAsyncTest(e.getMessage()); + } + } + + private void failAsyncTest(String msg) + { + _logger.error("Failing test because: " + msg); + _failMsg = msg; + } +}
\ No newline at end of file diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java new file mode 100644 index 0000000000..7b2dd3239d --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java @@ -0,0 +1,303 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.naming.NamingException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class PriorityQueueTest extends QpidBrokerTestCase +{ + private static final int TIMEOUT = 1500; + + protected final String QUEUE = "PriorityQueue"; + + private static final int MSG_COUNT = 50; + + private Connection producerConnection; + private MessageProducer producer; + private Session producerSession; + private Queue queue; + private Connection consumerConnection; + private Session consumerSession; + + private MessageConsumer consumer; + + protected void setUp() throws Exception + { + super.setUp(); + + producerConnection = getConnection(); + producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + producerConnection.start(); + + consumerConnection = getConnection(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + } + + protected void tearDown() throws Exception + { + producerConnection.close(); + consumerConnection.close(); + super.tearDown(); + } + + public void testPriority() throws JMSException, NamingException, AMQException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-priorities",10); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.setPriority(msg % 10); + producer.send(nextMessage(msg, false, producerSession, producer)); + } + producerSession.commit(); + producer.close(); + producerSession.close(); + producerConnection.close(); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + Message received; + int receivedCount = 0; + Message previous = null; + int messageCount = 0; + while((received = consumer.receive(1000))!=null) + { + messageCount++; + if(previous != null) + { + assertTrue("Messages arrived in unexpected order " + messageCount + " " + previous.getIntProperty("msg") + " " + received.getIntProperty("msg") + " " + previous.getJMSPriority() + " " + received.getJMSPriority(), (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) ); + } + + previous = received; + receivedCount++; + } + + assertEquals("Incorrect number of message received", 50, receivedCount); + } + + public void testOddOrdering() throws AMQException, JMSException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-priorities",3); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + // In order ABC + producer.setPriority(9); + producer.send(nextMessage(1, false, producerSession, producer)); + producer.setPriority(4); + producer.send(nextMessage(2, false, producerSession, producer)); + producer.setPriority(1); + producer.send(nextMessage(3, false, producerSession, producer)); + + // Out of order BAC + producer.setPriority(4); + producer.send(nextMessage(4, false, producerSession, producer)); + producer.setPriority(9); + producer.send(nextMessage(5, false, producerSession, producer)); + producer.setPriority(1); + producer.send(nextMessage(6, false, producerSession, producer)); + + // Out of order BCA + producer.setPriority(4); + producer.send(nextMessage(7, false, producerSession, producer)); + producer.setPriority(1); + producer.send(nextMessage(8, false, producerSession, producer)); + producer.setPriority(9); + producer.send(nextMessage(9, false, producerSession, producer)); + + // Reverse order CBA + producer.setPriority(1); + producer.send(nextMessage(10, false, producerSession, producer)); + producer.setPriority(4); + producer.send(nextMessage(11, false, producerSession, producer)); + producer.setPriority(9); + producer.send(nextMessage(12, false, producerSession, producer)); + producerSession.commit(); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message msg = consumer.receive(TIMEOUT); + assertEquals(1, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(5, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(9, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(12, msg.getIntProperty("msg")); + + msg = consumer.receive(TIMEOUT); + assertEquals(2, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(4, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(7, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(11, msg.getIntProperty("msg")); + + msg = consumer.receive(TIMEOUT); + assertEquals(3, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(6, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(8, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(10, msg.getIntProperty("msg")); + } + + private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msg); + send.setIntProperty("msg", msg); + + return send; + } + + /** + * Test that after sending an initial message with priority 0, it is able to be repeatedly reflected back to the queue using + * default priority and then consumed again, with separate transacted sessions with prefetch 1 for producer and consumer. + * + * Highlighted defect with PriorityQueues resolved in QPID-3927. + */ + public void testMessageReflectionWithPriorityIncreaseOnTransactedSessionsWithPrefetch1() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1"); + Connection conn = getConnection(); + conn.start(); + assertEquals("Prefetch not reset", 1, ((AMQConnection) conn).getMaxPrefetch()); + + final Session producerSess = conn.createSession(true, Session.SESSION_TRANSACTED); + final Session consumerSess = conn.createSession(true, Session.SESSION_TRANSACTED); + + //declare a priority queue with 10 priorities + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-priorities",10); + ((AMQSession<?,?>) producerSess).createQueue(new AMQShortString(getTestQueueName()), false, true, false, arguments); + + Queue queue = producerSess.createQueue(getTestQueueName()); + + //create the consumer, producer, add message listener + CountDownLatch latch = new CountDownLatch(5); + MessageConsumer cons = producerSess.createConsumer(queue); + MessageProducer producer = producerSess.createProducer(queue); + + ReflectingMessageListener listener = new ReflectingMessageListener(producerSess,producer,consumerSess,latch); + cons.setMessageListener(listener); + + //Send low priority 0 message to kick start the asynchronous reflection process + producer.setPriority(0); + producer.send(nextMessage(1, true, producerSess, producer)); + producerSess.commit(); + + //wait for the reflection process to complete + assertTrue("Test process failed to complete in allowed time", latch.await(10, TimeUnit.SECONDS)); + assertNull("Unexpected throwable encountered", listener.getThrown()); + } + + private static class ReflectingMessageListener implements MessageListener + { + private static final Logger _logger = Logger.getLogger(PriorityQueueTest.ReflectingMessageListener.class); + + private Session _prodSess; + private Session _consSess; + private CountDownLatch _latch; + private MessageProducer _prod; + private long _origCount; + private Throwable _lastThrown; + + public ReflectingMessageListener(final Session prodSess, final MessageProducer prod, + final Session consSess, final CountDownLatch latch) + { + _latch = latch; + _origCount = _latch.getCount(); + _prodSess = prodSess; + _consSess = consSess; + _prod = prod; + } + + @Override + public void onMessage(final Message message) + { + try + { + _latch.countDown(); + long msgNum = _origCount - _latch.getCount(); + _logger.info("Received message " + msgNum + " with ID: " + message.getIntProperty("msg")); + + if(_latch.getCount() > 0) + { + //reflect the message, updating its ID and using default priority + message.clearProperties(); + message.setIntProperty("msg", (int) msgNum + 1); + _prod.setPriority(Message.DEFAULT_PRIORITY); + _prod.send(message); + _prodSess.commit(); + } + + //commit the consumer session to consume the message + _consSess.commit(); + } + catch(Throwable t) + { + _logger.error(t.getMessage(), t); + _lastThrown = t; + } + } + + public Throwable getThrown() + { + return _lastThrown; + } + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java new file mode 100644 index 0000000000..427508954d --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -0,0 +1,494 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.server.logging.AbstractTestLogging; +import org.apache.qpid.test.utils.JMXTestUtils; + +public class ProducerFlowControlTest extends AbstractTestLogging +{ + private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class); + + private static final int TIMEOUT = 10000; + + private Connection producerConnection; + private Connection consumerConnection; + private Session producerSession; + private Session consumerSession; + private MessageProducer producer; + private MessageConsumer consumer; + private Queue queue; + + private final AtomicInteger _sentMessages = new AtomicInteger(0); + + private JMXTestUtils _jmxUtils; + private boolean _jmxUtilConnected; + + public void setUp() throws Exception + { + getBrokerConfiguration().addJmxManagementConfiguration(); + + _jmxUtils = new JMXTestUtils(this); + _jmxUtilConnected=false; + super.setUp(); + + _monitor.markDiscardPoint(); + + producerConnection = getConnection(); + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + producerConnection.start(); + + consumerConnection = getConnection(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + } + + public void tearDown() throws Exception + { + try + { + if(_jmxUtilConnected) + { + try + { + _jmxUtils.close(); + } + catch (IOException e) + { + _logger.error("Error closing jmxUtils", e); + } + } + producerConnection.close(); + consumerConnection.close(); + } + finally + { + super.tearDown(); + } + } + + public void testCapacityExceededCausesBlock() throws Exception + { + String queueName = getTestQueueName(); + + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800); + producer = producerSession.createProducer(queue); + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get()); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message not sent after two messages received", 5, _sentMessages.get()); + + } + + + public void testBrokerLogMessages() throws Exception + { + String queueName = getTestQueueName(); + + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800); + producer = producerSession.createProducer(queue); + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + List<String> results = waitAndFindMatches("QUE-1003", 7000); + + assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + while(consumer.receive(1000) != null) {}; + + results = waitAndFindMatches("QUE-1004"); + + assertEquals("Did not find correct number of UNDERFULL queue underfull messages", 1, results.size()); + } + + + public void testClientLogMessages() throws Exception + { + String queueName = getTestQueueName(); + + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); + setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000"); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + createAndBindQueueWithFlowControlEnabled(session, queueName, 1000, 800); + producer = session.createProducer(queue); + + // try to send 5 messages (should block after 4) + MessageSender sender = sendMessagesAsync(producer, session, 5, 50L); + + List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT); + assertTrue("No delay messages logged by client",results.size()!=0); + + List<String> failedMessages = waitAndFindMatches("Message send failed due to timeout waiting on broker enforced" + + " flow control", TIMEOUT); + assertEquals("Incorrect number of send failure messages logged by client (got " + results.size() + " delay " + + "messages)",1,failedMessages.size()); + } + + + public void testFlowControlOnCapacityResumeEqual() throws Exception + { + String queueName = getTestQueueName(); + + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 1000); + producer = producerSession.createProducer(queue); + + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get()); + + + } + + + public void testFlowControlSoak() throws Exception + { + String queueName = getTestQueueName(); + + + final int numProducers = 10; + final int numMessages = 100; + + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 6000, 3000); + + consumerConnection.start(); + + Connection[] producers = new Connection[numProducers]; + for(int i = 0 ; i < numProducers; i ++) + { + + producers[i] = getConnection(); + producers[i].start(); + Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer myproducer = session.createProducer(queue); + MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L); + } + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + for(int j = 0; j < numProducers * numMessages; j++) + { + + Message msg = consumer.receive(5000); + Thread.sleep(50L); + assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg); + + } + + + + Message msg = consumer.receive(500); + assertNull("extra message received", msg); + + + for(int i = 0; i < numProducers; i++) + { + producers[i].close(); + } + + } + + public void testSendTimeout() throws Exception + { + String queueName = getTestQueueName(); + final String expectedMsg = isBroker010() ? "Exception when sending message:timed out waiting for message credit" + : "Unable to send message for 3 seconds due to broker enforced flow control"; + + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800); + producer = session.createProducer(queue); + + // try to send 5 messages (should block after 4) + MessageSender sender = sendMessagesAsync(producer, session, 5, 100L); + + Exception e = sender.awaitSenderException(10000); + + assertNotNull("No timeout exception on sending", e); + + + assertEquals("Unexpected exception reason", expectedMsg, e.getMessage()); + + } + + public void testFlowControlAttributeModificationViaJMX() throws Exception + { + _jmxUtils.open(); + _jmxUtilConnected = true; + + String queueName = getTestQueueName(); + + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 0, 0); + producer = producerSession.createProducer(queue); + + Thread.sleep(1000); + + //Create a JMX MBean proxy for the queue + ManagedQueue queueMBean = _jmxUtils.getManagedObject(ManagedQueue.class, _jmxUtils.getQueueObjectName("test", queueName)); + assertNotNull(queueMBean); + + //check current attribute values are 0 as expected + assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 0L); + assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 0L); + + //set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent + queueMBean.setCapacity(250L); + queueMBean.setFlowResumeCapacity(250L); + assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 250L); + assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 250L); + assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); + + // try to send 2 messages (should block after 1) + + sendMessagesAsync(producer, producerSession, 2, 50L); + + Thread.sleep(2000); + + //check only 1 message was sent, and queue is overfull + assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get()); + assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); + + //raise the attribute values, causing the queue to become underfull and allow the second message to be sent. + queueMBean.setCapacity(300L); + queueMBean.setFlowResumeCapacity(300L); + + Thread.sleep(2000); + + //check second message was sent, and caused the queue to become overfull again + assertEquals("Second message was not sent after lifting FlowResumeCapacity", 2, _sentMessages.get()); + assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); + + //raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity still exceeded + queueMBean.setCapacity(700L); + assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); + + //receive a message, check queue becomes underfull + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + consumer.receive(); + + //perform a synchronous op on the connection + ((AMQSession<?,?>) consumerSession).sync(); + + assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); + + consumer.receive(); + } + + public void testQueueDeleteWithBlockedFlow() throws Exception + { + String queueName = getTestQueueName(); + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800, true, false); + + producer = producerSession.createProducer(queue); + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + // close blocked producer session and connection + producerConnection.close(); + + // delete queue with a consumer session + ((AMQSession<?,?>) consumerSession).sendQueueDelete(new AMQShortString(queueName)); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message message = consumer.receive(1000l); + assertNull("Unexpected message", message); + } + + private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception + { + createAndBindQueueWithFlowControlEnabled(session, queueName, capacity, resumeCapacity, false, true); + } + + private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity, boolean durable, boolean autoDelete) throws Exception + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",capacity); + arguments.put("x-qpid-flow-resume-capacity",resumeCapacity); + ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), autoDelete, durable, false, arguments); + queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='" + durable + "'&autodelete='" + autoDelete + "'"); + ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue); + } + + private MessageSender sendMessagesAsync(final MessageProducer producer, + final Session producerSession, + final int numMessages, + long sleepPeriod) + { + MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod); + new Thread(sender).start(); + return sender; + } + + private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) + throws JMSException + { + + for (int msg = 0; msg < numMessages; msg++) + { + producer.send(nextMessage(msg, producerSession)); + _sentMessages.incrementAndGet(); + + + try + { + ((AMQSession<?,?>)producerSession).sync(); + } + catch (AMQException e) + { + _logger.error("Error performing sync", e); + throw new RuntimeException(e); + } + + try + { + Thread.sleep(sleepPeriod); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + private static final byte[] BYTE_300 = new byte[300]; + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + BytesMessage send = producerSession.createBytesMessage(); + send.writeBytes(BYTE_300); + send.setIntProperty("msg", msg); + + return send; + } + + private class MessageSender implements Runnable + { + private final MessageProducer _senderProducer; + private final Session _senderSession; + private final int _numMessages; + private volatile JMSException _exception; + private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1); + private long _sleepPeriod; + + public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) + { + _senderProducer = producer; + _senderSession = producerSession; + _numMessages = numMessages; + _sleepPeriod = sleepPeriod; + } + + public void run() + { + try + { + sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod); + } + catch (JMSException e) + { + _exception = e; + _exceptionThrownLatch.countDown(); + } + } + + public Exception awaitSenderException(long timeout) throws InterruptedException + { + _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS); + return _exception; + } + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueBindTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueBindTest.java new file mode 100644 index 0000000000..64ba0156e6 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueBindTest.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.AMQBindingURL; + +public class QueueBindTest extends QpidBrokerTestCase +{ + private Connection _connection; + private AMQSession<?, ?> _session; + + protected void setUp() throws Exception + { + super.setUp(); + + _connection = getConnection(); + _session = (AMQSession<?, ?>) _connection.createSession(true, Session.SESSION_TRANSACTED); + } + + public void testQueueCannotBeReboundOnNonTopicExchange() throws Exception + { + runTestForNonTopicExhange(new AMQQueue(new AMQBindingURL("direct://amq.direct//" + getTestQueueName()))); + runTestForNonTopicExhange(new AMQQueue(new AMQBindingURL("fanout://amq.fanout//" + getTestQueueName()) + "?routingkey='" + + getTestQueueName() + "'")); + } + + public void testQueueCanBeReboundOnTopicExchange() throws Exception + { + AMQQueue destination = new AMQQueue(new AMQBindingURL("topic://amq.topic//" + getTestQueueName() + "?routingkey='" + + getTestQueueName() + "'")); + setTestClientSystemProperty("qpid.default_mandatory", "false"); + runTestForTopicExchange(destination); + + } + + private void runTestForTopicExchange(AMQDestination destination) throws AMQException, JMSException, Exception + { + // binding queue with empty arguments + _session.declareAndBind(destination, FieldTable.convertToFieldTable(Collections.<String, Object> emptyMap())); + + // try to re-bind queue with a selector + Map<String, Object> bindArguments = new HashMap<String, Object>(); + bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), INDEX + "=0"); + _session.bindQueue(destination.getAMQQueueName(), destination.getRoutingKey(), + FieldTable.convertToFieldTable(bindArguments), destination.getExchangeName(), destination); + + _connection.start(); + + // repeat send/receive twice to make sure that selector is working + for (int i = 0; i < 2; i++) + { + int numberOfMesssages = 2; + sendMessage(_session, destination, numberOfMesssages); + + MessageConsumer consumer = _session.createConsumer(destination); + Message m = consumer.receive(1000); + assertNotNull("Message not received", m); + assertEquals("Unexpected index", 0, m.getIntProperty(INDEX)); + _session.commit(); + + m = consumer.receive(1000); + assertNull("Message received", m); + + consumer.close(); + } + } + + private void runTestForNonTopicExhange(AMQQueue destination) throws AMQException, Exception, JMSException + { + // binding queue with empty arguments + _session.declareAndBind(destination, FieldTable.convertToFieldTable(Collections.<String, Object> emptyMap())); + + // try to re-bind queue with a selector + Map<String, Object> bindArguments = new HashMap<String, Object>(); + bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), INDEX + "=0"); + _session.bindQueue(destination.getAMQQueueName(), destination.getRoutingKey(), + FieldTable.convertToFieldTable(bindArguments), destination.getExchangeName(), destination); + + // send and receive to prove that selector is not used + int numberOfMesssages = 2; + sendMessage(_session, destination, numberOfMesssages); + + MessageConsumer consumer = _session.createConsumer(destination); + _connection.start(); + + for (int i = 0; i < numberOfMesssages; i++) + { + Message m = consumer.receive(1000l); + assertNotNull("Message [" + i + "] not received with exchange " + destination.getExchangeName(), m); + assertEquals("Unexpected index", i, m.getIntProperty(INDEX)); + _session.commit(); + } + consumer.close(); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java new file mode 100644 index 0000000000..dd57c1e3f7 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java @@ -0,0 +1,175 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * Test Case to ensure that messages are correctly returned. + * This includes checking: + * - The message is returned. + * - The broker doesn't leak memory. + * - The broker's state is correct after test. + */ +public class QueueDepthWithSelectorTest extends QpidBrokerTestCase +{ + protected final String VHOST = "test"; + protected final String QUEUE = this.getClass().getName(); + + protected Connection _clientConnection; + protected Connection _producerConnection; + private Session _clientSession; + protected Session _producerSession; + protected MessageProducer _producer; + private MessageConsumer _consumer; + + protected static int MSG_COUNT = 50; + + protected Message[] _messages = new Message[MSG_COUNT]; + + protected Queue _queue; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + _messages = new Message[MSG_COUNT]; + _queue = getTestQueue(); + + //Create Producer + _producerConnection = getConnection(); + _producerConnection.start(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _producer = _producerSession.createProducer(_queue); + + // Create consumer + _clientConnection = getConnection(); + _clientConnection.start(); + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumer = _clientSession.createConsumer(_queue, "key = 23"); + } + + public void test() throws Exception + { + //Send messages + _logger.info("Starting to send messages"); + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg)); + } + _logger.info("Closing connection"); + //Close the connection.. .giving the broker time to clean up its state. + _producerConnection.close(); + + //Verify we get all the messages. + _logger.info("Verifying messages"); + verifyAllMessagesRecevied(50); + verifyBrokerState(0); + + //Close the connection.. .giving the broker time to clean up its state. + _clientConnection.close(); + + //Verify Broker state + _logger.info("Verifying broker state"); + verifyBrokerState(0); + } + + protected void verifyBrokerState(int expectedDepth) + { + try + { + Connection connection = getConnection(); + AMQSession session = (AMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + long queueDepth = session.getQueueDepth((AMQDestination) _queue); + assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth); + + connection.close(); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + catch (Exception e) + { + fail(e.getMessage()); + } + } + + protected void verifyAllMessagesRecevied(int expectedDepth) throws Exception + { + boolean[] msgIdRecevied = new boolean[MSG_COUNT]; + + for (int i = 0; i < expectedDepth; i++) + { + _messages[i] = _consumer.receive(1000); + assertNotNull("should have received a message but didn't", _messages[i]); + } + + //Check received messages + int msgId = 0; + for (Message msg : _messages) + { + assertNotNull("Message should not be null", msg); + assertEquals("msgId was wrong", msgId, msg.getIntProperty("ID")); + assertFalse("Already received msg id " + msgId, msgIdRecevied[msgId]); + msgIdRecevied[msgId] = true; + msgId++; + } + + //Check all received + for (msgId = 0; msgId < expectedDepth; msgId++) + { + assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]); + } + + //do a synchronous op to ensure the acks are processed + //on the broker before proceeding + ((AMQSession)_clientSession).sync(); + } + + /** + * Get the next message putting the given count into the intProperties as ID. + * + * @param msgNo the message count to store as ID. + * @throws JMSException + */ + protected Message nextMessage(int msgNo) throws JMSException + { + Message send = _producerSession.createTextMessage("MessageReturnTest"); + send.setIntProperty("ID", msgNo); + send.setIntProperty("key", 23); + return send; + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java new file mode 100644 index 0000000000..fe86e9d41f --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java @@ -0,0 +1,216 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class QueueMessageDurabilityTest extends QpidBrokerTestCase +{ + + private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability"; + private static final String DURABLE_ALWAYS_PERSIST_NAME = "DURABLE_QUEUE_ALWAYS_PERSIST"; + private static final String DURABLE_NEVER_PERSIST_NAME = "DURABLE_QUEUE_NEVER_PERSIST"; + private static final String DURABLE_DEFAULT_PERSIST_NAME = "DURABLE_QUEUE_DEFAULT_PERSIST"; + private static final String NONDURABLE_ALWAYS_PERSIST_NAME = "NONDURABLE_QUEUE_ALWAYS_PERSIST"; + + @Override + public void setUp() throws Exception + { + super.setUp(); + Connection conn = getConnection(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQSession amqSession = (AMQSession) session; + + Map<String,Object> arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.ALWAYS.name()); + amqSession.createQueue(new AMQShortString(DURABLE_ALWAYS_PERSIST_NAME), false, true, false, arguments); + + arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name()); + amqSession.createQueue(new AMQShortString(DURABLE_NEVER_PERSIST_NAME), false, true, false, arguments); + + arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.DEFAULT.name()); + amqSession.createQueue(new AMQShortString(DURABLE_DEFAULT_PERSIST_NAME), false, true, false, arguments); + + arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name()); + amqSession.createQueue(new AMQShortString(NONDURABLE_ALWAYS_PERSIST_NAME), false, false, false, arguments); + + amqSession.bindQueue(AMQShortString.valueOf(DURABLE_ALWAYS_PERSIST_NAME), + AMQShortString.valueOf("Y.*.*.*"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + + amqSession.bindQueue(AMQShortString.valueOf(DURABLE_NEVER_PERSIST_NAME), + AMQShortString.valueOf("*.Y.*.*"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + + amqSession.bindQueue(AMQShortString.valueOf(DURABLE_DEFAULT_PERSIST_NAME), + AMQShortString.valueOf("*.*.Y.*"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + + amqSession.bindQueue(AMQShortString.valueOf(NONDURABLE_ALWAYS_PERSIST_NAME), + AMQShortString.valueOf("*.*.*.Y"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + } + + public void testSendPersistentMessageToAll() throws Exception + { + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + conn.start(); + producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test")); + session.commit(); + + AMQSession amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + restartBroker(); + + conn = getConnection(); + session = conn.createSession(true, Session.SESSION_TRANSACTED); + amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + + assertFalse(amqSession.isQueueBound((AMQDestination) session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + } + + + public void testSendNonPersistentMessageToAll() throws Exception + { + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + conn.start(); + producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test")); + session.commit(); + + AMQSession amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + restartBroker(); + + conn = getConnection(); + session = conn.createSession(true, Session.SESSION_TRANSACTED); + amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + + assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + } + + public void testNonPersistentContentRetained() throws Exception + { + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + conn.start(); + producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1")); + producer.send(session.createTopic("Y.N.Y.Y"), session.createTextMessage("test2")); + session.commit(); + MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)); + Message msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test2", ((TextMessage) msg).getText()); + session.rollback(); + restartBroker(); + conn = getConnection(); + conn.start(); + session = conn.createSession(true, Session.SESSION_TRANSACTED); + AMQSession amqSession = (AMQSession) session; + assertEquals(1, amqSession.getQueueDepth((AMQDestination) session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)); + msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test2", ((TextMessage)msg).getText()); + session.commit(); + } + + public void testPersistentContentRetainedOnTransientQueue() throws Exception + { + setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false"); + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + conn.start(); + producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1")); + session.commit(); + MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)); + Message msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test1", ((TextMessage)msg).getText()); + session.commit(); + System.gc(); + consumer = session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)); + msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test1", ((TextMessage)msg).getText()); + session.commit(); + } + + +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java new file mode 100644 index 0000000000..340ae4a1ae --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.NamingException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; + +public class SortedQueueTest extends QpidBrokerTestCase +{ + private static final Logger LOGGER = Logger.getLogger(SortedQueueTest.class); + public static final String TEST_SORT_KEY = "testSortKey"; + private static final String[] VALUES = SortedQueueEntryListTest.keys.clone(); + private static final String[] VALUES_SORTED = SortedQueueEntryListTest.keys.clone(); + private final String[] SUBSET_KEYS = { "000", "100", "200", "300", "400", "500", "600", "700", "800", "900" }; + + private Connection _producerConnection; + private Session _producerSession; + private Connection _consumerConnection; + private long _receiveInterval; + + protected void setUp() throws Exception + { + super.setUp(); + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1"); + // Sort value array to generated "expected" order of messages. + Arrays.sort(VALUES_SORTED); + _producerConnection = getConnection(); + _consumerConnection = getConnection(); + _producerSession = _producerConnection.createSession(true, Session.SESSION_TRANSACTED); + _receiveInterval = isBrokerStorePersistent() ? 3000l : 1500l; + } + + protected void tearDown() throws Exception + { + _producerSession.close(); + _producerConnection.close(); + _consumerConnection.close(); + super.tearDown(); + } + + public void testSortOrder() throws JMSException, NamingException, AMQException + { + final Queue queue = createQueue(); + final MessageProducer producer = _producerSession.createProducer(queue); + + for(String value : VALUES) + { + final Message msg = _producerSession.createTextMessage("Message Text:" + value); + msg.setStringProperty(TEST_SORT_KEY, value); + producer.send(msg); + } + + _producerSession.commit(); + producer.close(); + + final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(queue); + _consumerConnection.start(); + TextMessage received; + int messageCount = 0; + while((received = (TextMessage) consumer.receive(_receiveInterval)) != null) + { + assertEquals("Received message with unexpected sorted key value", VALUES_SORTED[messageCount], + received.getStringProperty(TEST_SORT_KEY)); + assertEquals("Received message with unexpected message value", + "Message Text:" + VALUES_SORTED[messageCount], received.getText()); + messageCount++; + } + + assertEquals("Incorrect number of messages received", VALUES.length, messageCount); + } + + public void testAutoAckSortedQueue() throws JMSException, NamingException, AMQException + { + runThroughSortedQueueForSessionMode(Session.AUTO_ACKNOWLEDGE); + } + + public void testTransactedSortedQueue() throws JMSException, NamingException, AMQException + { + runThroughSortedQueueForSessionMode(Session.SESSION_TRANSACTED); + } + + public void testClientAckSortedQueue() throws JMSException, NamingException, AMQException + { + runThroughSortedQueueForSessionMode(Session.CLIENT_ACKNOWLEDGE); + } + + private void runThroughSortedQueueForSessionMode(final int sessionMode) throws JMSException, NamingException, + AMQException + { + final Queue queue = createQueue(); + final MessageProducer producer = _producerSession.createProducer(queue); + + final TestConsumerThread consumerThread = new TestConsumerThread(sessionMode, queue); + consumerThread.start(); + + for(String value : VALUES) + { + final Message msg = _producerSession.createMessage(); + msg.setStringProperty(TEST_SORT_KEY, value); + producer.send(msg); + _producerSession.commit(); + } + + try + { + consumerThread.join(getConsumerThreadJoinInterval()); + } + catch(InterruptedException e) + { + fail("Test failed waiting for consumer to complete"); + } + + assertTrue("Consumer timed out", consumerThread.isStopped()); + assertEquals("Incorrect number of messages received", VALUES.length, consumerThread.getConsumed()); + + producer.close(); + } + + public void testSortedQueueWithAscendingSortedKeys() throws JMSException, NamingException, AMQException + { + final Queue queue = createQueue(); + final MessageProducer producer = _producerSession.createProducer(queue); + + final TestConsumerThread consumerThread = new TestConsumerThread(Session.AUTO_ACKNOWLEDGE, queue); + consumerThread.start(); + + for(int i = 0; i < 200; i++) + { + final String ascendingKey = AscendingSortedKeys.getNextKey(); + final Message msg = _producerSession.createTextMessage("Message Text:" + ascendingKey); + msg.setStringProperty(TEST_SORT_KEY, ascendingKey); + producer.send(msg); + _producerSession.commit(); + } + + try + { + consumerThread.join(getConsumerThreadJoinInterval()); + } + catch(InterruptedException e) + { + fail("Test failed waiting for consumer to complete"); + } + + assertTrue("Consumer timed out", consumerThread.isStopped()); + assertEquals("Incorrect number of messages received", 200, consumerThread.getConsumed()); + + producer.close(); + } + + private long getConsumerThreadJoinInterval() + { + return isBrokerStorePersistent() ? 50000L: 5000L; + } + + public void testSortOrderWithNonUniqueKeys() throws JMSException, NamingException, AMQException + { + final Queue queue = createQueue(); + final MessageProducer producer = _producerSession.createProducer(queue); + + int count = 0; + while(count < 200) + { + final Message msg = _producerSession.createTextMessage("Message Text:" + count++); + msg.setStringProperty(TEST_SORT_KEY, "samesortkeyvalue"); + producer.send(msg); + } + + _producerSession.commit(); + producer.close(); + + final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(queue); + _consumerConnection.start(); + TextMessage received = null; + int messageCount = 0; + + while((received = (TextMessage) consumer.receive(_receiveInterval)) != null) + { + assertEquals("Received message with unexpected sorted key value", "samesortkeyvalue", + received.getStringProperty(TEST_SORT_KEY)); + assertEquals("Received message with unexpected message value", "Message Text:" + messageCount, + received.getText()); + messageCount++; + } + + assertEquals("Incorrect number of messages received", 200, messageCount); + } + + public void testSortOrderWithUniqueKeySubset() throws JMSException, NamingException, AMQException + { + final Queue queue = createQueue(); + final MessageProducer producer = _producerSession.createProducer(queue); + + int count = 0; + while(count < 100) + { + int keyValueIndex = count % 10; + final Message msg = _producerSession.createTextMessage("Message Text:" + count); + msg.setStringProperty(TEST_SORT_KEY, SUBSET_KEYS[keyValueIndex]); + producer.send(msg); + count++; + } + + _producerSession.commit(); + producer.close(); + + final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(queue); + _consumerConnection.start(); + TextMessage received; + int messageCount = 0; + + while((received = (TextMessage) consumer.receive(_receiveInterval)) != null) + { + assertEquals("Received message with unexpected sorted key value", SUBSET_KEYS[messageCount / 10], + received.getStringProperty(TEST_SORT_KEY)); + messageCount++; + } + + assertEquals("Incorrect number of messages received", 100, messageCount); + } + + public void testGetNextWithAck() throws JMSException, NamingException, AMQException + { + Queue _queue = createQueue(); + MessageProducer producer = _producerSession.createProducer(_queue); + Message received = null; + + //Send 3 out of order + sendAndCommitMessage(producer,"2"); + sendAndCommitMessage(producer,"3"); + sendAndCommitMessage(producer,"1"); + + final Session consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + //Receive 3 in sorted order + received = assertReceiveAndValidateMessage(consumer, "1"); + received.acknowledge(); + received = assertReceiveAndValidateMessage(consumer, "2"); + received.acknowledge(); + received = assertReceiveAndValidateMessage(consumer, "3"); + received.acknowledge(); + + //Send 1 + sendAndCommitMessage(producer,"4"); + + //Receive 1 and recover + received = assertReceiveAndValidateMessage(consumer, "4"); + consumerSession.recover(); + + //Receive same 1 + received = assertReceiveAndValidateMessage(consumer, "4"); + received.acknowledge(); + + //Send 3 out of order + sendAndCommitMessage(producer,"7"); + sendAndCommitMessage(producer,"6"); + sendAndCommitMessage(producer,"5"); + + //Receive 1 of 3 (possibly out of order due to pre-fetch) + final Message messageBeforeRollback = assertReceiveMessage(consumer); + consumerSession.recover(); + + if (isBroker010()) + { + //Receive 3 in sorted order (not as per JMS recover) + received = assertReceiveAndValidateMessage(consumer, "5"); + received.acknowledge(); + received = assertReceiveAndValidateMessage(consumer, "6"); + received.acknowledge(); + received = assertReceiveAndValidateMessage(consumer, "7"); + received.acknowledge(); + } + else + { + //First message will be the one rolled-back (as per JMS spec). + final String messageKeyDeliveredBeforeRollback = messageBeforeRollback.getStringProperty(TEST_SORT_KEY); + received = assertReceiveAndValidateMessage(consumer, messageKeyDeliveredBeforeRollback); + received.acknowledge(); + + //Remaining two messages will be sorted + final SortedSet<String> keys = new TreeSet<String>(Arrays.asList("5", "6", "7")); + keys.remove(messageKeyDeliveredBeforeRollback); + + received = assertReceiveAndValidateMessage(consumer, keys.first()); + received.acknowledge(); + received = assertReceiveAndValidateMessage(consumer, keys.last()); + received.acknowledge(); + } + } + + private Queue createQueue() throws AMQException, JMSException + { + final Map<String, Object> arguments = new HashMap<String, Object>(); + arguments.put(QueueArgumentsConverter.QPID_QUEUE_SORT_KEY, TEST_SORT_KEY); + ((AMQSession<?,?>) _producerSession).createQueue(new AMQShortString(getTestQueueName()), false, true, false, arguments); + final Queue queue = new AMQQueue("amq.direct", getTestQueueName()); + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination) queue); + return queue; + } + + private Message getSortableTestMesssage(final String key) throws JMSException + { + final Message msg = _producerSession.createTextMessage("Message Text: Key Value" + key); + msg.setStringProperty(TEST_SORT_KEY, key); + return msg; + } + + private void sendAndCommitMessage(final MessageProducer producer, final String keyValue) throws JMSException + { + producer.send(getSortableTestMesssage(keyValue)); + _producerSession.commit(); + } + + private Message assertReceiveAndValidateMessage(final MessageConsumer consumer, final String expectedKey) throws JMSException + { + final Message received = assertReceiveMessage(consumer); + assertEquals("Received message with unexpected sorted key value", expectedKey, + received.getStringProperty(TEST_SORT_KEY)); + return received; + } + + private Message assertReceiveMessage(final MessageConsumer consumer) + throws JMSException + { + final Message received = (TextMessage) consumer.receive(_receiveInterval); + assertNotNull("Received message is unexpectedly null", received); + return received; + } + + private class TestConsumerThread extends Thread + { + private final AtomicInteger _consumed = new AtomicInteger(0); + private volatile boolean _stopped = false; + private int _count = 0; + private int _sessionType = Session.AUTO_ACKNOWLEDGE; + private Queue _queue; + + public TestConsumerThread(final int sessionType, final Queue queue) + { + _sessionType = sessionType; + _queue = queue; + } + + public void run() + { + try + { + Connection conn = null; + try + { + conn = getConnection(); + } + catch(Exception e) + { + throw new RuntimeException("Could not get connection"); + } + + final Session session = conn.createSession((_sessionType == Session.SESSION_TRANSACTED ? true : false), + _sessionType); + final MessageConsumer consumer = session.createConsumer(_queue); + + conn.start(); + + Message msg; + while((msg = consumer.receive(_receiveInterval)) != null) + { + if(_sessionType == Session.SESSION_TRANSACTED) + { + if (_count%10 == 0) + { + LOGGER.debug("transacted session rollback"); + session.rollback(); + } + else + { + LOGGER.debug("transacted session commit"); + session.commit(); + _consumed.incrementAndGet(); + } + } + else if(_sessionType == Session.CLIENT_ACKNOWLEDGE) + { + if (_count%10 == 0) + { + LOGGER.debug("client ack session recover"); + session.recover(); + } + else + { + LOGGER.debug("client ack session acknowledge"); + msg.acknowledge(); + _consumed.incrementAndGet(); + } + } + else + { + LOGGER.debug("auto ack session"); + _consumed.incrementAndGet(); + } + + _count++; + LOGGER.debug("Message consumed with key: " + msg.getStringProperty(TEST_SORT_KEY)); + LOGGER.debug("Message consumed with consumed index: " + _consumed.get()); + } + + _stopped = true; + session.close(); + conn.close(); + } + catch(JMSException e) + { + LOGGER.error("Exception in listener", e); + } + } + + public boolean isStopped() + { + return _stopped; + } + + public int getConsumed() + { + return _consumed.get(); + } + } + + private static class AscendingSortedKeys + { + public static final String[] KEYS = { "Ul4a1", "WaWsv", "2Yz7E", "ix74r", "okgRi", "HlUbF", "LewvM", "lweGy", + "TXQ0Z", "0Kyfs", "s7Mxk", "dmoS7", "8RCUA", "W3VFH", "aez9y", "uQIcz", "0h1b1", "cmXIX", + "4dEz6", "zHF1q", "D6rBy", "5drc6", "0BmCy", "BCxeC", "t59lR", "aL6AJ", "OHaBz", "WmadA", + "B3qem", "CxVEf", "AIYUu", "uJScX", "uoStw", "ogLgc", "AgJHQ", "hUTw7", "Rxrsm", "9GXkX", + "7hyVv", "y94nw", "Twano", "TCgPp", "pFrrl", "POUYS", "L7cGc", "0ao3l", "CNHmv", "MaJQs", + "OUqFM", "jeskS", "FPfSE", "v1Hln", "14FLR", "KZamH", "G1RhS", "FVMxo", "rKDLJ", "nnP8o", + "nFqik", "zmLYD", "1j5L8", "e6e4z", "WDVWJ", "aDGtS", "fcwDa", "nlaBy", "JJs5m", "vLsmS", + "No0Qb", "JXljW", "Waim6", "MezSW", "l83Ud", "SjskQ", "uPX7G", "5nmWv", "ZhwG1", "uTacx", + "t98iW", "JkzUn", "fmIK1", "i7WMQ", "bgJAz", "n1pmO", "jS1aj", "4W0Tl", "Yf2Ec", "sqVrf", + "MojnP", "qQxHP", "pWiOs", "yToGW", "kB5nP", "BpYhV", "Cfgr3", "zbIYY", "VLTy6", "he9IA", + "lm0pD", "WreyP", "8hJdt", "QnJ1S", "n8pJ9", "iqv4k", "OUYuF", "8cVD3", "sx5Gl", "cQOnv", + "wiHrZ", "oGu6x", "7fsYM", "gf8rI", "7fKYU", "pT8wu", "lCMxy", "prNT6", "5Drn0", "guMb8", + "OxWIH", "uZPqg", "SbRYy", "In3NS", "uvf7A", "FLsph", "pmeCd", "BbwgA", "ru4UG", "YOfrY", + "W7cTs", "K4GS8", "AOgEe", "618Di", "dpe1v", "3otm6", "oVQp6", "5Mg9r", "Y1mC0", "VIlwP", + "aFFss", "Mkgy8", "pv0i7", "S77LH", "XyPZN", "QYxC0", "vkCHH", "MGlTF", "24ARF", "v2eC3", + "ZUnqt", "HfyNQ", "FjHXR", "45cIH", "1LB1L", "zqH0W", "fLNg8", "oQ87r", "Cp3mZ", "Zv7z0", + "O3iyQ", "EOE1o", "5ZaEz", "tlILt", "MmsIo", "lXFOB", "gtCA5", "yEfy9", "7X3uy", "d7vjM", + "XflUq", "Fhtgl", "NOHsz", "GWqqX", "xciqp", "BFkb8", "P6bcg", "lViBv", "2TRI7", "2hEEU", + "9XyT9", "29QAz", "U3yw5", "FxX9q", "C2Irc", "8U2nU", "m4bxU", "5iGN5", "mX2GE", "cShY2", + "JRJQB", "yvOMI", "4QMc9", "NAFuw", "RmDcr", "faHir", "2ZHdk", "zY1GY", "a00b5", "ZuDtD", + "JIqXi", "K20wK", "gdQsS", "5Namm", "lkMUA", "IBe8k", "FcWrW", "FFDui", "tuDyS", "ZJTXH", + "AkKTk", "zQt6Q", "FNYIM", "RpBQm", "RsQUq", "Mm8si", "gjUTu", "zz4ZU", "jiVBP", "ReKEW", + "5VZjS", "YjB9t", "zFgtB", "8TxD7", "euZA5", "MK07Y", "CK5W7", "16lHc", "6q6L9", "Z4I1v", + "UlU3M", "SWfou", "0PktI", "55rfB", "jfREu", "580YD", "Uvlv4", "KASQ8", "AmdQd", "piJSk", + "hE1Ql", "LDk6f", "NcICA", "IKxdL", "iwzGk", "uN6r3", "lsQGo", "QClRL", "iKqhr", "FGzgp", + "RkQke", "b29RJ", "CIShG", "9eoRc", "F6PT2", "LbRTH", "M3zXL", "GXdoH", "IjTwP", "RBhp0", + "yluBx", "mz8gx", "MmKGJ", "Q6Lix", "uupzk", "RACuj", "d85a9", "qaofN", "kZANm", "jtn0X", + "lpF6W", "suY4x", "rz7Ut", "wDajX", "1v5hH", "Yw2oU", "ksJby", "WMiS3", "lj07Q", "EdBKc", + "6AFT0", "0YAGH", "ThjNn", "JKWYR", "9iGoT", "UmaEv", "3weIF", "CdyBV", "pAhR1", "djsrv", + "xReec", "8FmFH", "Dz1R3", "Ta8f6", "DG4sT", "VjCZq", "jSjS3", "Pb1pa", "VNCPd", "Kr8ys", + "AXpwE", "ZzJHW", "Nxx9V", "jzUqR", "dhSuH", "DQimp", "07n1c", "HP433", "RzaZA", "cL0aE", + "Ss0Zu", "FnPFB", "7lUXZ", "9rlg9", "lH1kt", "ni2v1", "48cHL", "soy9t", "WPmlx", "2Nslm", + "hSSvQ", "9y4lw", "ulk41", "ECMvU", "DLhzM", "GrDg7", "x3LDe", "QChxs", "xXTI4", "Gv3Fq", + "rhl0J", "QssNC", "brhlQ", "s93Ml", "tl72W", "pvgjS", "Qworu", "DcpWB", "X6Pin", "J2mQi", + "BGaQY", "CqqaD", "NhXdu", "dQ586", "Yh1hF", "HRxd8", "PYBf4", "64s8N", "tvdkD", "azIWp", + "tAOsr", "v8yFN", "h1zcH", "SmGzv", "bZLvS", "fFDrJ", "Oz8yZ", "0Wr5y", "fcJOy", "7ku1p", + "QbxXc", "VerEA", "QWxoT", "hYBCK", "o8Uyd", "FwEJz", "hi5X7", "uAWyp", "I7p2a", "M6qcG", + "gIYvE", "HzZT8", "iB08l", "StlDJ", "tjQxs", "k85Ae", "taOXK", "s4786", "2DREs", "atef2", + "Vprf2", "VBjhz", "EoToP", "blLA9", "qUJMd", "ydG8U", "8xEKz", "uLtKs", "GSQwj", "S2Dfu", + "ciuWz", "i3pyd", "7Ow5C", "IRh48", "vOqCE", "Q6hMC", "yofH3", "KsjRK", "5IhmG", "fqypy", + "0MR5X", "Chuy3" }; + + private static int _i = 0; + private static int _j = 0; + + static + { + Arrays.sort(KEYS); + } + + public static String getNextKey() + { + if(_j == KEYS.length) + { + _j = 0; + _i++; + if(_i == KEYS.length) + { + _i = 0; + } + } + return new StringBuffer().append(KEYS[_i]).append("-").append(KEYS[_j++]).toString(); + } + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java new file mode 100644 index 0000000000..e606df3f7d --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -0,0 +1,397 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TopicSubscriber; +import javax.naming.NamingException; + +import org.apache.log4j.Logger; +import org.junit.Assert; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class TimeToLiveTest extends QpidBrokerTestCase +{ + private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class); + + protected final String QUEUE = "TimeToLiveQueue"; + + private final long TIME_TO_LIVE = 100L; + + private static final int MSG_COUNT = 50; + private static final long SERVER_TTL_TIMEOUT = 60000L; + + public void testPassiveTTLWithPrefetch() throws Exception + { + doTestPassiveTTL(true); + } + + public void testPassiveTTL() throws Exception + { + doTestPassiveTTL(false); + + } + + private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException + { + //Create Client 1 + Connection clientConnection = getConnection(); + + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = clientSession.createQueue(QUEUE); + + // Create then close the consumer so the queue is actually created + // Closing it then reopening it ensures that the consumer shouldn't get messages + // which should have expired and allows a shorter sleep period. See QPID-1418 + + MessageConsumer consumer = clientSession.createConsumer(queue); + consumer.close(); + + //Create Producer + Connection producerConnection = getConnection(); + + producerConnection.start(); + + // Move to a Transacted session to ensure that all messages have been delivered to broker before + // we start waiting for TTL + Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer producer = producerSession.createProducer(queue); + + consumer = clientSession.createConsumer(queue); + if(prefetchMessages) + { + clientConnection.start(); + } + + //Set TTL + int msg = 0; + producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); + + producer.setTimeToLive(TIME_TO_LIVE); + + for (; msg < MSG_COUNT - 2; msg++) + { + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); + } + + //Reset TTL + producer.setTimeToLive(0L); + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); + + producerSession.commit(); + + + // Ensure we sleep the required amount of time. + ReentrantLock waitLock = new ReentrantLock(); + Condition wait = waitLock.newCondition(); + final long MILLIS = 1000000L; + + long waitTime = TIME_TO_LIVE * MILLIS; + while (waitTime > 0) + { + try + { + waitLock.lock(); + + waitTime = wait.awaitNanos(waitTime); + } + catch (InterruptedException e) + { + //Stop if we are interrupted + fail(e.getMessage()); + } + finally + { + waitLock.unlock(); + } + + } + + if(prefetchMessages) + { + clientConnection.close(); + clientConnection = getConnection(); + + clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = clientSession.createQueue(QUEUE); + consumer = clientSession.createConsumer(queue); + } + + clientConnection.start(); + + //Receive Message 0 + // Set 5s receive time for messages we expect to receive. + Message receivedFirst = consumer.receive(5000); + Message receivedSecond = consumer.receive(5000); + Message receivedThird = consumer.receive(1000); + + // Log the messages to help diagnosis incase of failure + _logger.info("First:"+receivedFirst); + _logger.info("Second:"+receivedSecond); + _logger.info("Third:"+receivedThird); + + // Only first and last messages sent should survive expiry + Assert.assertNull("More messages received", receivedThird); + + Assert.assertNotNull("First message not received", receivedFirst); + Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first")); + Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL")); + + Assert.assertNotNull("Final message not received", receivedSecond); + Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first")); + Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL")); + + clientConnection.close(); + + producerConnection.close(); + } + + private Message nextMessage(String msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message " + msg); + send.setBooleanProperty("first", first); + send.setStringProperty("testprop", "TimeToLiveTest"); + send.setLongProperty("TTL", producer.getTimeToLive()); + return send; + } + + + /** + * Tests the expired messages get actively deleted even on queues which have no consumers + * @throws Exception + */ + public void testActiveTTL() throws Exception + { + Connection producerConnection = getConnection(); + AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = producerSession.createTemporaryQueue(); + producerSession.declareAndBind((AMQDestination) queue); + MessageProducer producer = producerSession.createProducer(queue); + producer.setTimeToLive(1000L); + + // send Messages + for(int i = 0; i < MSG_COUNT; i++) + { + producer.send(producerSession.createTextMessage("Message: "+i)); + } + long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; + + // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms. + long messageCount = MSG_COUNT; + long lastPass; + + do + { + lastPass = messageCount; + Thread.sleep(100); + messageCount = producerSession.getQueueDepth((AMQDestination) queue); + + // If we have received messages in the last loop then extend the timeout time. + // if we get messages stuck that are not expiring then the failureTime will occur + // failing the test. This will help with the scenario when the broker does not + // have enough CPU cycles to process the TTLs. + if (lastPass != messageCount) + { + failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; + } + } + while(messageCount > 0L && System.currentTimeMillis() < failureTime); + + assertEquals("Messages not automatically expired: ", 0L, messageCount); + + producer.close(); + producerSession.close(); + producerConnection.close(); + } + + public void testPassiveTTLwithDurableSubscription() throws Exception + { + //Create Client 1 + Connection clientConnection = getConnection(); + + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create and close the durable subscriber + AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName()); + TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false); + durableSubscriber.close(); + + //Create Producer + Connection producerConnection = getConnection(); + + producerConnection.start(); + + // Move to a Transacted session to ensure that all messages have been delivered to broker before + // we start waiting for TTL + Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer producer = producerSession.createProducer(topic); + + //Set TTL + int msg = 0; + producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); + + producer.setTimeToLive(TIME_TO_LIVE); + + for (; msg < MSG_COUNT - 2; msg++) + { + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); + } + + //Reset TTL + producer.setTimeToLive(0L); + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); + + producerSession.commit(); + + //resubscribe + durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false); + + // Ensure we sleep the required amount of time. + ReentrantLock waitLock = new ReentrantLock(); + Condition wait = waitLock.newCondition(); + final long MILLIS = 1000000L; + + long waitTime = TIME_TO_LIVE * MILLIS; + while (waitTime > 0) + { + try + { + waitLock.lock(); + + waitTime = wait.awaitNanos(waitTime); + } + catch (InterruptedException e) + { + //Stop if we are interrupted + fail(e.getMessage()); + } + finally + { + waitLock.unlock(); + } + + } + + clientConnection.start(); + + //Receive Message 0 + // Set 5s receive time for messages we expect to receive. + Message receivedFirst = durableSubscriber.receive(5000); + Message receivedSecond = durableSubscriber.receive(5000); + Message receivedThird = durableSubscriber.receive(1000); + + // Log the messages to help diagnosis incase of failure + _logger.info("First:"+receivedFirst); + _logger.info("Second:"+receivedSecond); + _logger.info("Third:"+receivedThird); + + // Only first and last messages sent should survive expiry + Assert.assertNull("More messages received", receivedThird); + + Assert.assertNotNull("First message not received", receivedFirst); + Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first")); + Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL")); + + Assert.assertNotNull("Final message not received", receivedSecond); + Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first")); + Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL")); + + clientSession.unsubscribe(getTestQueueName()); + clientConnection.close(); + + producerConnection.close(); + } + + public void testActiveTTLwithDurableSubscription() throws Exception + { + //Create Client 1 + Connection clientConnection = getConnection(); + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create and close the durable subscriber + AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName()); + TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, "MyDurableTTLSubscription","testprop='TimeToLiveTest'", false); + durableSubscriber.close(); + + //Create Producer + Connection producerConnection = getConnection(); + AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(topic); + producer.setTimeToLive(1000L); + + // send Messages + for(int i = 0; i < MSG_COUNT; i++) + { + producer.send(producerSession.createTextMessage("Message: "+i)); + } + long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; + + // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms. + long messageCount = MSG_COUNT; + long lastPass; + AMQQueue subcriptionQueue = new AMQQueue("amq.topic","clientid" + ":" + "MyDurableTTLSubscription"); + do + { + lastPass = messageCount; + Thread.sleep(100); + messageCount = producerSession.getQueueDepth((AMQDestination) subcriptionQueue); + + // If we have received messages in the last loop then extend the timeout time. + // if we get messages stuck that are not expiring then the failureTime will occur + // failing the test. This will help with the scenario when the broker does not + // have enough CPU cycles to process the TTLs. + if (lastPass != messageCount) + { + failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; + } + } + while(messageCount > 0L && System.currentTimeMillis() < failureTime); + + assertEquals("Messages not automatically expired: ", 0L, messageCount); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + clientSession.unsubscribe("MyDurableTTLSubscription"); + clientSession.close(); + clientConnection.close(); + } + +} |