diff options
author | Robert Gemmell <robbie@apache.org> | 2011-03-01 09:34:44 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-03-01 09:34:44 +0000 |
commit | 4b586d3c7098bc8daaf08192132429de576b5ad4 (patch) | |
tree | aba83db183533916e41d8c2a16748a7dc5d7487f | |
parent | 518323260617ed5ba52c950763a7a3d222ddb0a3 (diff) | |
download | qpid-python-4b586d3c7098bc8daaf08192132429de576b5ad4.tar.gz |
QPID-2973: enable management during tests, and add a test using MaxDeliveryCount with a DurableSubscription
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1075743 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java | 148 |
1 files changed, 110 insertions, 38 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java index 576bb3c90c..47bb5173a2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java @@ -21,33 +21,29 @@ package org.apache.qpid.test.unit.client; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; -import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.CustomJMSXProperty; -import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.BindingURL; /** * Test that the MaxRedelivery feature works as expected, allowing the client to reject @@ -65,7 +61,6 @@ import org.apache.qpid.url.BindingURL; public class MaxDeliveryCountTest extends QpidTestCase { private static final Logger _logger = Logger.getLogger(MaxDeliveryCountTest.class); - private Queue _queue; private boolean _failed; private String _failMsg; private static final int MSG_COUNT = 15; @@ -74,28 +69,41 @@ public class MaxDeliveryCountTest extends QpidTestCase public void setUp() throws Exception { + //enable DLQ support for all queues at the vhost level + setConfigurationProperty("virtualhosts.virtualhost.test.queues.deadLetterQueues", + String.valueOf(true)); + + //Ensure management is on + setConfigurationProperty("management.enabled", "true"); + setConfigurationProperty("management.ssl.enabled", "false"); + + //enable max delivery count on all client connections + setTestClientSystemProperty(ClientProperties.MAX_DELIVERY_COUNT_PROP_NAME, "2"); + super.setUp(); - String queueName = getTestQueueName(); - - //create an AMQQueue object using a BindingURL to set the Max Delivery Count for the consumer - BindingURL burl = new AMQBindingURL("direct://amq.direct//" + queueName + "?maxdeliverycount='" + MAX_DELIVERY_COUNT + "'"); - _queue = new AMQQueue(burl); - //declare the test queue, using some AMQSession hackery to enable DLQing + boolean durableSub = isDurSubTest(); + + //declare the test queue Connection consumerConnection = getConnection(); Session consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put(AMQQueueFactory.X_QPID_DLQ_ENABLED.asString(), true); - ((AMQSession<?,?>) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments); - ((AMQSession<?,?>) consumerSession).declareAndBind((AMQDestination) new AMQQueue("amq.direct",queueName)); + Destination destination = getDestination(consumerSession, durableSub); + if(durableSub) + { + consumerSession.createDurableSubscriber((Topic)destination, getName()).close(); + } + else + { + consumerSession.createConsumer(destination).close(); + } + consumerConnection.close(); //Create Producer put some messages on the queue Connection producerConnection = getConnection(); producerConnection.start(); - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(_queue); + MessageProducer producer = producerSession.createProducer(getDestination(producerSession, durableSub)); for (int count = 1; count <= MSG_COUNT; count++) { @@ -110,6 +118,18 @@ public class MaxDeliveryCountTest extends QpidTestCase _awaitCompletion = new CountDownLatch(1); } + private Destination getDestination(Session consumerSession, boolean durableSub) throws JMSException + { + if(durableSub) + { + return consumerSession.createTopic(getTestQueueName()); + } + else + { + return consumerSession.createQueue(getTestQueueName()); + } + } + private String generateContent(int count) { return "Message " + count + " content."; @@ -127,7 +147,7 @@ public class MaxDeliveryCountTest extends QpidTestCase redeliverMsgs.add(5); redeliverMsgs.add(14); - doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, false); + doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, false, false); } /** @@ -142,7 +162,7 @@ public class MaxDeliveryCountTest extends QpidTestCase redeliverMsgs.add(5); redeliverMsgs.add(14); - doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false); + doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, false); } /** @@ -157,7 +177,7 @@ public class MaxDeliveryCountTest extends QpidTestCase redeliverMsgs.add(5); redeliverMsgs.add(14); - doTest(Session.AUTO_ACKNOWLEDGE, redeliverMsgs, false); + doTest(Session.AUTO_ACKNOWLEDGE, redeliverMsgs, false, false); } /** @@ -172,7 +192,7 @@ public class MaxDeliveryCountTest extends QpidTestCase redeliverMsgs.add(5); redeliverMsgs.add(14); - doTest(Session.DUPS_OK_ACKNOWLEDGE, redeliverMsgs, false); + doTest(Session.DUPS_OK_ACKNOWLEDGE, redeliverMsgs, false, false); } /** @@ -187,7 +207,7 @@ public class MaxDeliveryCountTest extends QpidTestCase redeliverMsgs.add(5); redeliverMsgs.add(14); - doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, true); + doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, true, false); } /** @@ -202,20 +222,43 @@ public class MaxDeliveryCountTest extends QpidTestCase redeliverMsgs.add(5); redeliverMsgs.add(14); - doTest(Session.SESSION_TRANSACTED, redeliverMsgs, true); + doTest(Session.SESSION_TRANSACTED, redeliverMsgs, true, false); } - - public void doTest(int deliveryMode, ArrayList<Integer> redeliverMsgs, boolean synchronous) throws Exception + + public void testDurableSubscription() throws Exception + { + final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); + redeliverMsgs.add(1); + redeliverMsgs.add(2); + redeliverMsgs.add(5); + redeliverMsgs.add(14); + + doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, true); + } + + public void doTest(int deliveryMode, ArrayList<Integer> redeliverMsgs, boolean synchronous, boolean durableSub) throws Exception { Connection clientConnection = getConnection(); boolean transacted = deliveryMode == Session.SESSION_TRANSACTED ? true : false; Session clientSession = clientConnection.createSession(transacted, deliveryMode); - MessageConsumer consumer = clientSession.createConsumer(_queue); + MessageConsumer consumer; + Destination dest = getDestination(clientSession, durableSub); + AMQQueue checkQueue; + if(durableSub) + { + consumer = clientSession.createDurableSubscriber((Topic)dest, getName()); + checkQueue = new AMQQueue("amq.topic", "clientid" + ":" + getName()); + } + else + { + consumer = clientSession.createConsumer(dest); + checkQueue = (AMQQueue) dest; + } assertEquals("The queue should have " + MSG_COUNT + " msgs at start", - MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth((AMQDestination) _queue)); + MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue)); clientConnection.start(); @@ -252,13 +295,10 @@ public class MaxDeliveryCountTest extends QpidTestCase consumer.close(); //check the source queue is now empty - assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth((AMQDestination) _queue)); + assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueue)); //check the DLQ has the required number of rejected-without-requeue messages - String dlQueueName = getTestQueueName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - AMQDestination queue = new AMQQueue("amq.direct", dlQueueName); - assertEquals("The DLQ should have " + redeliverMsgs.size() + " msgs on it", redeliverMsgs.size(), - ((AMQSession<?,?>) clientSession).getQueueDepth(queue)); + verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub); if(isBrokerStorePersistent()) { @@ -273,15 +313,42 @@ public class MaxDeliveryCountTest extends QpidTestCase } //verify the messages on the DLQ - verifyDLQcontent(clientConnection, redeliverMsgs, queue); + verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub); clientConnection.close(); } - private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, AMQDestination queue) throws JMSException + private void verifyDLQdepth(int expected, Session clientSession, boolean durableSub) throws AMQException + { + AMQDestination checkQueueDLQ; + if(durableSub) + { + checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + else + { + checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + + assertEquals("The DLQ should have " + expected + " msgs on it", expected, + ((AMQSession<?,?>) clientSession).getQueueDepth(checkQueueDLQ)); + } + + private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, String destName, boolean durableSub) throws JMSException { Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = clientSession.createConsumer(queue); + + MessageConsumer consumer; + if(durableSub) + { + consumer = clientSession.createDurableSubscriber( + clientSession.createTopic(destName), getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + else + { + consumer = clientSession.createConsumer( + clientSession.createQueue(destName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX)); + } //keep track of the message we expect to still be on the DLQ List<Integer> outstandingMessages = new ArrayList<Integer>(redeliverMsgs); @@ -611,4 +678,9 @@ public class MaxDeliveryCountTest extends QpidTestCase } } } + + private boolean isDurSubTest() + { + return getTestQueueName().contains("DurableSubscription"); + } } |