diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java | 118 |
1 files changed, 111 insertions, 7 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 8c3c247e2b..b70b2f90e4 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -1,4 +1,3 @@ -package org.apache.qpid.test.client.destination; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,11 +18,13 @@ package org.apache.qpid.test.client.destination; * under the License. * */ - +package org.apache.qpid.test.client.destination; import java.util.Collections; +import java.util.Enumeration; import java.util.HashMap; import java.util.Hashtable; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -34,6 +35,7 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; +import javax.jms.QueueBrowser; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; @@ -475,13 +477,13 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { prod.send(jmsSession.createTextMessage("msg" + i) ); } - - for (int i=0; i< 9; i++) + Message msg = null; + for (int i=0; i< 10; i++) { - cons.receive(); + msg = cons.receive(RECEIVE_TIMEOUT); + assertNotNull("Should have received " + i + " message", msg); + assertEquals("Unexpected message received", "msg" + i, ((TextMessage)msg).getText()); } - Message msg = cons.receive(RECEIVE_TIMEOUT); - assertNotNull("Should have received the 10th message",msg); assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT)); msg.acknowledge(); for (int i=11; i<16; i++) @@ -1182,4 +1184,106 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000)); cons.close(); } + + public void testQueueBrowserWithSelectorAutoAcknowledgement() throws Exception + { + assertQueueBrowserWithSelector(Session.AUTO_ACKNOWLEDGE); + } + + public void testQueueBrowserWithSelectorClientAcknowldgement() throws Exception + { + assertQueueBrowserWithSelector(Session.CLIENT_ACKNOWLEDGE); + } + + public void testQueueBrowserWithSelectorTransactedSession() throws Exception + { + assertQueueBrowserWithSelector(Session.SESSION_TRANSACTED); + } + + public void testConsumerWithSelectorAutoAcknowledgement() throws Exception + { + assertConsumerWithSelector(Session.AUTO_ACKNOWLEDGE); + } + + public void testConsumerWithSelectorClientAcknowldgement() throws Exception + { + assertConsumerWithSelector(Session.CLIENT_ACKNOWLEDGE); + } + + public void testConsumerWithSelectorTransactedSession() throws Exception + { + assertConsumerWithSelector(Session.SESSION_TRANSACTED); + } + + private void assertQueueBrowserWithSelector(int acknowledgement) throws Exception + { + String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}"; + + boolean transacted = acknowledgement == Session.SESSION_TRANSACTED; + Session session = _connection.createSession(transacted, acknowledgement); + + Queue queue = session.createQueue(queueAddress); + + final int numberOfMessages = 10; + List<Message> sentMessages = sendMessage(session, queue, numberOfMessages); + assertNotNull("Messages were not sent", sentMessages); + assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size()); + + QueueBrowser browser = session.createBrowser(queue, INDEX + "%2=0"); + _connection.start(); + + Enumeration<Message> enumaration = browser.getEnumeration(); + + int counter = 0; + int expectedIndex = 0; + while (enumaration.hasMoreElements()) + { + Message m = enumaration.nextElement(); + assertNotNull("Expected not null message at step " + counter, m); + int messageIndex = m.getIntProperty(INDEX); + assertEquals("Unexpected index", expectedIndex, messageIndex); + expectedIndex += 2; + counter++; + } + assertEquals("Unexpected number of messsages received", 5, counter); + } + + private void assertConsumerWithSelector(int acknowledgement) throws Exception + { + String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}"; + + boolean transacted = acknowledgement == Session.SESSION_TRANSACTED; + Session session = _connection.createSession(transacted, acknowledgement); + + Queue queue = session.createQueue(queueAddress); + + final int numberOfMessages = 10; + List<Message> sentMessages = sendMessage(session, queue, numberOfMessages); + assertNotNull("Messages were not sent", sentMessages); + assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size()); + + MessageConsumer consumer = session.createConsumer(queue, INDEX + "%2=0"); + + int expectedIndex = 0; + for (int i = 0; i < 5; i++) + { + Message m = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Expected not null message at step " + i, m); + int messageIndex = m.getIntProperty(INDEX); + assertEquals("Unexpected index", expectedIndex, messageIndex); + expectedIndex += 2; + + if (transacted) + { + session.commit(); + } + else if (acknowledgement == Session.CLIENT_ACKNOWLEDGE) + { + m.acknowledge(); + } + } + + Message m = consumer.receive(RECEIVE_TIMEOUT); + assertNull("Unexpected message received", m); + } } |