diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java new file mode 100644 index 0000000000..74f50e8659 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java @@ -0,0 +1,176 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Test Case to ensure that messages are correctly returned. + * This includes checking: + * - The message is returned. + * - The broker doesn't leak memory. + * - The broker's state is correct after test. + */ +public class QueueDepthWithSelectorTest extends QpidBrokerTestCase +{ + protected final String VHOST = "test"; + protected final String QUEUE = this.getClass().getName(); + + protected Connection _clientConnection; + protected Connection _producerConnection; + private Session _clientSession; + protected Session _producerSession; + protected MessageProducer _producer; + private MessageConsumer _consumer; + + protected static int MSG_COUNT = 50; + + protected Message[] _messages = new Message[MSG_COUNT]; + + protected Queue _queue; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + _messages = new Message[MSG_COUNT]; + _queue = getTestQueue(); + + //Create Producer + _producerConnection = getConnection(); + _producerConnection.start(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _producer = _producerSession.createProducer(_queue); + + // Create consumer + _clientConnection = getConnection(); + _clientConnection.start(); + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumer = _clientSession.createConsumer(_queue, "key = 23"); + } + + public void test() throws Exception + { + //Send messages + _logger.info("Starting to send messages"); + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg)); + } + _logger.info("Closing connection"); + //Close the connection.. .giving the broker time to clean up its state. + _producerConnection.close(); + + //Verify we get all the messages. + _logger.info("Verifying messages"); + verifyAllMessagesRecevied(50); + verifyBrokerState(0); + + //Close the connection.. .giving the broker time to clean up its state. + _clientConnection.close(); + + //Verify Broker state + _logger.info("Verifying broker state"); + verifyBrokerState(0); + } + + protected void verifyBrokerState(int expectedDepth) + { + try + { + Connection connection = getConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Thread.sleep(2000); + long queueDepth = ((AMQSession) session).getQueueDepth((AMQDestination) _queue); + assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth); + + connection.close(); + } + catch (InterruptedException e) + { + fail(e.getMessage()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + catch (Exception e) + { + fail(e.getMessage()); + } + } + + protected void verifyAllMessagesRecevied(int expectedDepth) throws Exception + { + boolean[] msgIdRecevied = new boolean[MSG_COUNT]; + + for (int i = 0; i < expectedDepth; i++) + { + _messages[i] = _consumer.receive(1000); + assertNotNull("should have received a message but didn't", _messages[i]); + } + + //Check received messages + int msgId = 0; + for (Message msg : _messages) + { + assertNotNull("Message should not be null", msg); + assertEquals("msgId was wrong", msgId, msg.getIntProperty("ID")); + assertFalse("Already received msg id " + msgId, msgIdRecevied[msgId]); + msgIdRecevied[msgId] = true; + msgId++; + } + + //Check all received + for (msgId = 0; msgId < expectedDepth; msgId++) + { + assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]); + } + } + + /** + * Get the next message putting the given count into the intProperties as ID. + * + * @param msgNo the message count to store as ID. + * @throws JMSException + */ + protected Message nextMessage(int msgNo) throws JMSException + { + Message send = _producerSession.createTextMessage("MessageReturnTest"); + send.setIntProperty("ID", msgNo); + send.setIntProperty("key", 23); + return send; + } +} |