From 1ed818ed1d90626d232f340dc48d55d54ee51d6c Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Tue, 12 Dec 2006 13:22:45 +0000 Subject: QPID-95 : Patch supplied by Rob Godfrey - throws correct exception when encountering a non-routable mandatory message git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486132 13f79535-47bb-0310-9956-ffa450edef68 --- .../exchange/AbstractHeadersExchangeTestBase.java | 42 ++++-- .../qpid/server/exchange/HeadersExchangeTest.java | 28 ++++ .../ReturnUnroutableMandatoryMessageTest.java | 151 +++++++++++++++++++++ 3 files changed, 210 insertions(+), 11 deletions(-) create mode 100644 java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (limited to 'java/systests/src') diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 93fbab682a..35aab78f73 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -81,25 +81,45 @@ public class AbstractHeadersExchangeTestBase extends TestCase protected void routeAndTest(Message m, TestQueue... expected) throws AMQException { - routeAndTest(m, Arrays.asList(expected)); + routeAndTest(m, false, Arrays.asList(expected)); + } + + protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException + { + routeAndTest(m, expectReturn, Arrays.asList(expected)); } protected void routeAndTest(Message m, List expected) throws AMQException { - route(m); - for (TestQueue q : queues) + routeAndTest(m, false, expected); + } + + protected void routeAndTest(Message m, boolean expectReturn, List expected) throws AMQException + { + try { - if (expected.contains(q)) + route(m); + assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn); + for (TestQueue q : queues) { - assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; - } - else - { - assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + if (expected.contains(q)) + { + assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; + } + else + { + assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + } } } + + catch (NoRouteException ex) + { + assertTrue("Expected "+m+" not to be returned",expectReturn); + } + } static FieldTable getHeaders(String... entries) diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 91520df3bf..c220442a78 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.framing.BasicPublishBody; public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase { @@ -52,6 +53,19 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q5, q6, q7, q8); routeAndTest(new Message("Message6", "F0002")); + + Message m7 = new Message("Message7", "XXXXX"); + + BasicPublishBody pb7 = m7.getPublishBody(); + pb7.mandatory = true; + routeAndTest(m7,true); + + Message m8 = new Message("Message8", "F0000"); + BasicPublishBody pb8 = m8.getPublishBody(); + pb8.mandatory = true; + routeAndTest(m8,false,q1); + + } public void testAny() throws AMQException @@ -71,6 +85,20 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase routeAndTest(new Message("Message6", "F0002")); } + public void testMandatory() throws AMQException + { + TestQueue q1 = bindDefault("F0000"); + Message m1 = new Message("Message1", "XXXXX"); + Message m2 = new Message("Message2", "F0000"); + BasicPublishBody pb1 = m1.getPublishBody(); + pb1.mandatory = true; + BasicPublishBody pb2 = m1.getPublishBody(); + pb2.mandatory = true; + routeAndTest(m1,true); + + + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(HeadersExchangeTest.class); diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java new file mode 100644 index 0000000000..4dffe3e75f --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -0,0 +1,151 @@ +package org.apache.qpid.server.exchange; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.test.VMBrokerSetup; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.client.*; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.PropertyFieldTable; + +import javax.jms.*; +import java.util.List; +import java.util.Collections; +import java.util.ArrayList; + +public class ReturnUnroutableMandatoryMessageTest extends TestCase implements ExceptionListener +{ + private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class); + + private final List _bouncedMessageList = Collections.synchronizedList(new ArrayList()); + + static + { + String workdir = System.getProperty("QPID_WORK"); + if (workdir == null || workdir.equals("")) + { + String tempdir = System.getProperty("java.io.tmpdir"); + System.out.println("QPID_WORK not set using tmp directory: " + tempdir); + System.setProperty("QPID_WORK", tempdir); + } +// DOMConfigurator.configure("../broker/etc/log4j.xml"); + } + + protected void setUp() throws Exception + { + super.setUp(); + ApplicationRegistry.initialise(new TestApplicationRegistry(), 1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + /** + * Tests that mandatory message which are not routable are returned to the producer + * + * @throws Exception + */ + public void testReturnUnroutableMandatoryMessage() throws Exception + { + _bouncedMessageList.clear(); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); + + AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + + AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'")); + FieldTable ft = new PropertyFieldTable(); + ft.setString("F1000","1"); + MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft); + + + //force synch to ensure the consumer has resulted in a bound queue + ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + + con2.setExceptionListener(this); + AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Need to start the "producer" connection in order to receive bounced messages + _logger.info("Starting producer connection"); + con2.start(); + + + MessageProducer nonMandatoryProducer = producerSession.createProducer(queue,false,false); + MessageProducer mandatoryProducer = producerSession.createProducer(queue); + + + // First test - should neither be bounced nor routed + _logger.info("Sending non-routable non-mandatory message"); + TextMessage msg1 = producerSession.createTextMessage("msg1"); + nonMandatoryProducer.send(msg1); + + // Second test - should be bounced + _logger.info("Sending non-routable mandatory message"); + TextMessage msg2 = producerSession.createTextMessage("msg2"); + mandatoryProducer.send(msg2); + + // Third test - should be routed + _logger.info("Sending routable message"); + TextMessage msg3 = producerSession.createTextMessage("msg3"); + msg3.setStringProperty("F1000","1"); + mandatoryProducer.send(msg3); + + + + _logger.info("Starting consumer connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(1000L); + + assertTrue("No message routed to receiver",tm != null); + assertTrue("Wrong message routed to receiver: "+tm.getText(),"msg3".equals(tm.getText())); + + try + { + Thread.sleep(1000L); + } + catch(InterruptedException e) + { + ; + } + + assertTrue("Wrong number of messages bounced (expect 1): "+_bouncedMessageList.size(),_bouncedMessageList.size()==1); + Message m = _bouncedMessageList.get(0); + assertTrue("Wrong message bounced: "+m.toString(),m.toString().contains("msg2")); + + + + + con.close(); + con2.close(); + + + } + + public static junit.framework.Test suite() + { + return new VMBrokerSetup(new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class)); + } + + public void onException(JMSException jmsException) + { + _logger.warn("Caught exception on producer: ",jmsException); + Exception linkedException = jmsException.getLinkedException(); + if(linkedException instanceof AMQNoRouteException) + { + AMQNoRouteException noRoute = (AMQNoRouteException) linkedException; + Message bounced = (Message) noRoute.getUndeliveredMessage(); + _bouncedMessageList.add(bounced); + } + } +} -- cgit v1.2.1