summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java118
1 files changed, 111 insertions, 7 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 8c3c247e2b..b70b2f90e4 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.test.client.destination;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,11 +18,13 @@ package org.apache.qpid.test.client.destination;
* under the License.
*
*/
-
+package org.apache.qpid.test.client.destination;
import java.util.Collections;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -34,6 +35,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
+import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
@@ -475,13 +477,13 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
prod.send(jmsSession.createTextMessage("msg" + i) );
}
-
- for (int i=0; i< 9; i++)
+ Message msg = null;
+ for (int i=0; i< 10; i++)
{
- cons.receive();
+ msg = cons.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Should have received " + i + " message", msg);
+ assertEquals("Unexpected message received", "msg" + i, ((TextMessage)msg).getText());
}
- Message msg = cons.receive(RECEIVE_TIMEOUT);
- assertNotNull("Should have received the 10th message",msg);
assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT));
msg.acknowledge();
for (int i=11; i<16; i++)
@@ -1182,4 +1184,106 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000));
cons.close();
}
+
+ public void testQueueBrowserWithSelectorAutoAcknowledgement() throws Exception
+ {
+ assertQueueBrowserWithSelector(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testQueueBrowserWithSelectorClientAcknowldgement() throws Exception
+ {
+ assertQueueBrowserWithSelector(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testQueueBrowserWithSelectorTransactedSession() throws Exception
+ {
+ assertQueueBrowserWithSelector(Session.SESSION_TRANSACTED);
+ }
+
+ public void testConsumerWithSelectorAutoAcknowledgement() throws Exception
+ {
+ assertConsumerWithSelector(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testConsumerWithSelectorClientAcknowldgement() throws Exception
+ {
+ assertConsumerWithSelector(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testConsumerWithSelectorTransactedSession() throws Exception
+ {
+ assertConsumerWithSelector(Session.SESSION_TRANSACTED);
+ }
+
+ private void assertQueueBrowserWithSelector(int acknowledgement) throws Exception
+ {
+ String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}";
+
+ boolean transacted = acknowledgement == Session.SESSION_TRANSACTED;
+ Session session = _connection.createSession(transacted, acknowledgement);
+
+ Queue queue = session.createQueue(queueAddress);
+
+ final int numberOfMessages = 10;
+ List<Message> sentMessages = sendMessage(session, queue, numberOfMessages);
+ assertNotNull("Messages were not sent", sentMessages);
+ assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size());
+
+ QueueBrowser browser = session.createBrowser(queue, INDEX + "%2=0");
+ _connection.start();
+
+ Enumeration<Message> enumaration = browser.getEnumeration();
+
+ int counter = 0;
+ int expectedIndex = 0;
+ while (enumaration.hasMoreElements())
+ {
+ Message m = enumaration.nextElement();
+ assertNotNull("Expected not null message at step " + counter, m);
+ int messageIndex = m.getIntProperty(INDEX);
+ assertEquals("Unexpected index", expectedIndex, messageIndex);
+ expectedIndex += 2;
+ counter++;
+ }
+ assertEquals("Unexpected number of messsages received", 5, counter);
+ }
+
+ private void assertConsumerWithSelector(int acknowledgement) throws Exception
+ {
+ String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}";
+
+ boolean transacted = acknowledgement == Session.SESSION_TRANSACTED;
+ Session session = _connection.createSession(transacted, acknowledgement);
+
+ Queue queue = session.createQueue(queueAddress);
+
+ final int numberOfMessages = 10;
+ List<Message> sentMessages = sendMessage(session, queue, numberOfMessages);
+ assertNotNull("Messages were not sent", sentMessages);
+ assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size());
+
+ MessageConsumer consumer = session.createConsumer(queue, INDEX + "%2=0");
+
+ int expectedIndex = 0;
+ for (int i = 0; i < 5; i++)
+ {
+ Message m = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Expected not null message at step " + i, m);
+ int messageIndex = m.getIntProperty(INDEX);
+ assertEquals("Unexpected index", expectedIndex, messageIndex);
+ expectedIndex += 2;
+
+ if (transacted)
+ {
+ session.commit();
+ }
+ else if (acknowledgement == Session.CLIENT_ACKNOWLEDGE)
+ {
+ m.acknowledge();
+ }
+ }
+
+ Message m = consumer.receive(RECEIVE_TIMEOUT);
+ assertNull("Unexpected message received", m);
+ }
}