summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-03-01 09:34:44 +0000
committerRobert Gemmell <robbie@apache.org>2011-03-01 09:34:44 +0000
commit4b586d3c7098bc8daaf08192132429de576b5ad4 (patch)
treeaba83db183533916e41d8c2a16748a7dc5d7487f
parent518323260617ed5ba52c950763a7a3d222ddb0a3 (diff)
downloadqpid-python-4b586d3c7098bc8daaf08192132429de576b5ad4.tar.gz
QPID-2973: enable management during tests, and add a test using MaxDeliveryCount with a DurableSubscription
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1075743 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java148
1 files changed, 110 insertions, 38 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
index 576bb3c90c..47bb5173a2 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -21,33 +21,29 @@
package org.apache.qpid.test.unit.client;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
-import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.CustomJMSXProperty;
-import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.BindingURL;
/**
* Test that the MaxRedelivery feature works as expected, allowing the client to reject
@@ -65,7 +61,6 @@ import org.apache.qpid.url.BindingURL;
public class MaxDeliveryCountTest extends QpidTestCase
{
private static final Logger _logger = Logger.getLogger(MaxDeliveryCountTest.class);
- private Queue _queue;
private boolean _failed;
private String _failMsg;
private static final int MSG_COUNT = 15;
@@ -74,28 +69,41 @@ public class MaxDeliveryCountTest extends QpidTestCase
public void setUp() throws Exception
{
+ //enable DLQ support for all queues at the vhost level
+ setConfigurationProperty("virtualhosts.virtualhost.test.queues.deadLetterQueues",
+ String.valueOf(true));
+
+ //Ensure management is on
+ setConfigurationProperty("management.enabled", "true");
+ setConfigurationProperty("management.ssl.enabled", "false");
+
+ //enable max delivery count on all client connections
+ setTestClientSystemProperty(ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME, "2");
+
super.setUp();
- String queueName = getTestQueueName();
-
- //create an AMQQueue object using a BindingURL to set the Max Delivery Count for the consumer
- BindingURL burl = new AMQBindingURL("direct://amq.direct//" + queueName + "?maxdeliverycount='" + MAX_DELIVERY_COUNT + "'");
- _queue = new AMQQueue(burl);
- //declare the test queue, using some AMQSession hackery to enable DLQing
+ boolean durableSub = isDurSubTest();
+
+ //declare the test queue
Connection consumerConnection = getConnection();
Session consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put(AMQQueueFactory.X_QPID_DLQ_ENABLED.asString(), true);
- ((AMQSession<?,?>) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments);
- ((AMQSession<?,?>) consumerSession).declareAndBind((AMQDestination) new AMQQueue("amq.direct",queueName));
+ Destination destination = getDestination(consumerSession, durableSub);
+ if(durableSub)
+ {
+ consumerSession.createDurableSubscriber((Topic)destination, getName()).close();
+ }
+ else
+ {
+ consumerSession.createConsumer(destination).close();
+ }
+
consumerConnection.close();
//Create Producer put some messages on the queue
Connection producerConnection = getConnection();
producerConnection.start();
-
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(_queue);
+ MessageProducer producer = producerSession.createProducer(getDestination(producerSession, durableSub));
for (int count = 1; count <= MSG_COUNT; count++)
{
@@ -110,6 +118,18 @@ public class MaxDeliveryCountTest extends QpidTestCase
_awaitCompletion = new CountDownLatch(1);
}
+ private Destination getDestination(Session consumerSession, boolean durableSub) throws JMSException
+ {
+ if(durableSub)
+ {
+ return consumerSession.createTopic(getTestQueueName());
+ }
+ else
+ {
+ return consumerSession.createQueue(getTestQueueName());
+ }
+ }
+
private String generateContent(int count)
{
return "Message " + count + " content.";
@@ -127,7 +147,7 @@ public class MaxDeliveryCountTest extends QpidTestCase
redeliverMsgs.add(5);
redeliverMsgs.add(14);
- doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, false);
+ doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, false, false);
}
/**
@@ -142,7 +162,7 @@ public class MaxDeliveryCountTest extends QpidTestCase
redeliverMsgs.add(5);
redeliverMsgs.add(14);
- doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false);
+ doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, false);
}
/**
@@ -157,7 +177,7 @@ public class MaxDeliveryCountTest extends QpidTestCase
redeliverMsgs.add(5);
redeliverMsgs.add(14);
- doTest(Session.AUTO_ACKNOWLEDGE, redeliverMsgs, false);
+ doTest(Session.AUTO_ACKNOWLEDGE, redeliverMsgs, false, false);
}
/**
@@ -172,7 +192,7 @@ public class MaxDeliveryCountTest extends QpidTestCase
redeliverMsgs.add(5);
redeliverMsgs.add(14);
- doTest(Session.DUPS_OK_ACKNOWLEDGE, redeliverMsgs, false);
+ doTest(Session.DUPS_OK_ACKNOWLEDGE, redeliverMsgs, false, false);
}
/**
@@ -187,7 +207,7 @@ public class MaxDeliveryCountTest extends QpidTestCase
redeliverMsgs.add(5);
redeliverMsgs.add(14);
- doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, true);
+ doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, true, false);
}
/**
@@ -202,20 +222,43 @@ public class MaxDeliveryCountTest extends QpidTestCase
redeliverMsgs.add(5);
redeliverMsgs.add(14);
- doTest(Session.SESSION_TRANSACTED, redeliverMsgs, true);
+ doTest(Session.SESSION_TRANSACTED, redeliverMsgs, true, false);
}
-
- public void doTest(int deliveryMode, ArrayList<Integer> redeliverMsgs, boolean synchronous) throws Exception
+
+ public void testDurableSubscription() throws Exception
+ {
+ final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
+ redeliverMsgs.add(1);
+ redeliverMsgs.add(2);
+ redeliverMsgs.add(5);
+ redeliverMsgs.add(14);
+
+ doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, true);
+ }
+
+ public void doTest(int deliveryMode, ArrayList<Integer> redeliverMsgs, boolean synchronous, boolean durableSub) throws Exception
{
Connection clientConnection = getConnection();
boolean transacted = deliveryMode == Session.SESSION_TRANSACTED ? true : false;
Session clientSession = clientConnection.createSession(transacted, deliveryMode);
- MessageConsumer consumer = clientSession.createConsumer(_queue);
+ MessageConsumer consumer;
+ Destination dest = getDestination(clientSession, durableSub);
+ AMQQueue checkQueue;
+ if(durableSub)
+ {
+ consumer = clientSession.createDurableSubscriber((Topic)dest, getName());
+ checkQueue = new AMQQueue("amq.topic", "clientid" + ":" + getName());
+ }
+ else
+ {
+ consumer = clientSession.createConsumer(dest);
+ checkQueue = (AMQQueue) dest;
+ }
assertEquals("The queue should have " + MSG_COUNT + " msgs at start",
- MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth((AMQDestination) _queue));
+ MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue));
clientConnection.start();
@@ -252,13 +295,10 @@ public class MaxDeliveryCountTest extends QpidTestCase
consumer.close();
//check the source queue is now empty
- assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth((AMQDestination) _queue));
+ assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue));
//check the DLQ has the required number of rejected-without-requeue messages
- String dlQueueName = getTestQueueName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- AMQDestination queue = new AMQQueue("amq.direct", dlQueueName);
- assertEquals("The DLQ should have " + redeliverMsgs.size() + " msgs on it", redeliverMsgs.size(),
- ((AMQSession<?,?>) clientSession).getQueueDepth(queue));
+ verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub);
if(isBrokerStorePersistent())
{
@@ -273,15 +313,42 @@ public class MaxDeliveryCountTest extends QpidTestCase
}
//verify the messages on the DLQ
- verifyDLQcontent(clientConnection, redeliverMsgs, queue);
+ verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub);
clientConnection.close();
}
- private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, AMQDestination queue) throws JMSException
+ private void verifyDLQdepth(int expected, Session clientSession, boolean durableSub) throws AMQException
+ {
+ AMQDestination checkQueueDLQ;
+ if(durableSub)
+ {
+ checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+ else
+ {
+ checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+
+ assertEquals("The DLQ should have " + expected + " msgs on it", expected,
+ ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueueDLQ));
+ }
+
+ private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, String destName, boolean durableSub) throws JMSException
{
Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = clientSession.createConsumer(queue);
+
+ MessageConsumer consumer;
+ if(durableSub)
+ {
+ consumer = clientSession.createDurableSubscriber(
+ clientSession.createTopic(destName), getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+ else
+ {
+ consumer = clientSession.createConsumer(
+ clientSession.createQueue(destName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX));
+ }
//keep track of the message we expect to still be on the DLQ
List<Integer> outstandingMessages = new ArrayList<Integer>(redeliverMsgs);
@@ -611,4 +678,9 @@ public class MaxDeliveryCountTest extends QpidTestCase
}
}
}
+
+ private boolean isDurSubTest()
+ {
+ return getTestQueueName().contains("DurableSubscription");
+ }
}