diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-12 13:22:45 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-12 13:22:45 +0000 |
commit | 1ed818ed1d90626d232f340dc48d55d54ee51d6c (patch) | |
tree | 69106b6a888d148e0aecd7643802a440df9f661a /java | |
parent | 2970078e8a383be527c6a80c7fdd9b548e3895ca (diff) | |
download | qpid-python-1ed818ed1d90626d232f340dc48d55d54ee51d6c.tar.gz |
QPID-95 : Patch supplied by Rob Godfrey - throws correct exception when encountering a non-routable mandatory message
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486132 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
4 files changed, 222 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index ccb2211a55..3fa73aa2e2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -206,7 +206,18 @@ public class HeadersExchange extends AbstractExchange } if (!delivered) { - _logger.warn("Exchange " + getName() + ": message not routable."); + + String msg = "Exchange " + getName() + ": message not routable."; + + if (payload.getPublishBody().mandatory) + { + throw new NoRouteException(msg, payload); + } + else + { + _logger.warn(msg); + } + } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 93fbab682a..35aab78f73 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -81,25 +81,45 @@ public class AbstractHeadersExchangeTestBase extends TestCase protected void routeAndTest(Message m, TestQueue... expected) throws AMQException { - routeAndTest(m, Arrays.asList(expected)); + routeAndTest(m, false, Arrays.asList(expected)); + } + + protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException + { + routeAndTest(m, expectReturn, Arrays.asList(expected)); } protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException { - route(m); - for (TestQueue q : queues) + routeAndTest(m, false, expected); + } + + protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException + { + try { - if (expected.contains(q)) + route(m); + assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn); + for (TestQueue q : queues) { - assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; - } - else - { - assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + if (expected.contains(q)) + { + assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; + } + else + { + assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + } } } + + catch (NoRouteException ex) + { + assertTrue("Expected "+m+" not to be returned",expectReturn); + } + } static FieldTable getHeaders(String... entries) diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 91520df3bf..c220442a78 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.framing.BasicPublishBody; public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase { @@ -52,6 +53,19 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q5, q6, q7, q8); routeAndTest(new Message("Message6", "F0002")); + + Message m7 = new Message("Message7", "XXXXX"); + + BasicPublishBody pb7 = m7.getPublishBody(); + pb7.mandatory = true; + routeAndTest(m7,true); + + Message m8 = new Message("Message8", "F0000"); + BasicPublishBody pb8 = m8.getPublishBody(); + pb8.mandatory = true; + routeAndTest(m8,false,q1); + + } public void testAny() throws AMQException @@ -71,6 +85,20 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase routeAndTest(new Message("Message6", "F0002")); } + public void testMandatory() throws AMQException + { + TestQueue q1 = bindDefault("F0000"); + Message m1 = new Message("Message1", "XXXXX"); + Message m2 = new Message("Message2", "F0000"); + BasicPublishBody pb1 = m1.getPublishBody(); + pb1.mandatory = true; + BasicPublishBody pb2 = m1.getPublishBody(); + pb2.mandatory = true; + routeAndTest(m1,true); + + + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(HeadersExchangeTest.class); diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java new file mode 100644 index 0000000000..4dffe3e75f --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -0,0 +1,151 @@ +package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.VMBrokerSetup;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.client.*;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.PropertyFieldTable;
+
+import javax.jms.*;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+
+public class ReturnUnroutableMandatoryMessageTest extends TestCase implements ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
+
+ private final List<Message> _bouncedMessageList = Collections.synchronizedList(new ArrayList<Message>());
+
+ static
+ {
+ String workdir = System.getProperty("QPID_WORK");
+ if (workdir == null || workdir.equals(""))
+ {
+ String tempdir = System.getProperty("java.io.tmpdir");
+ System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
+ System.setProperty("QPID_WORK", tempdir);
+ }
+// DOMConfigurator.configure("../broker/etc/log4j.xml");
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ /**
+ * Tests that mandatory message which are not routable are returned to the producer
+ *
+ * @throws Exception
+ */
+ public void testReturnUnroutableMandatoryMessage() throws Exception
+ {
+ _bouncedMessageList.clear();
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+
+ AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
+ FieldTable ft = new PropertyFieldTable();
+ ft.setString("F1000","1");
+ MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft);
+
+
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+
+ con2.setExceptionListener(this);
+ AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced messages
+ _logger.info("Starting producer connection");
+ con2.start();
+
+
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(queue,false,false);
+ MessageProducer mandatoryProducer = producerSession.createProducer(queue);
+
+
+ // First test - should neither be bounced nor routed
+ _logger.info("Sending non-routable non-mandatory message");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
+ nonMandatoryProducer.send(msg1);
+
+ // Second test - should be bounced
+ _logger.info("Sending non-routable mandatory message");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
+ mandatoryProducer.send(msg2);
+
+ // Third test - should be routed
+ _logger.info("Sending routable message");
+ TextMessage msg3 = producerSession.createTextMessage("msg3");
+ msg3.setStringProperty("F1000","1");
+ mandatoryProducer.send(msg3);
+
+
+
+ _logger.info("Starting consumer connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive(1000L);
+
+ assertTrue("No message routed to receiver",tm != null);
+ assertTrue("Wrong message routed to receiver: "+tm.getText(),"msg3".equals(tm.getText()));
+
+ try
+ {
+ Thread.sleep(1000L);
+ }
+ catch(InterruptedException e)
+ {
+ ;
+ }
+
+ assertTrue("Wrong number of messages bounced (expect 1): "+_bouncedMessageList.size(),_bouncedMessageList.size()==1);
+ Message m = _bouncedMessageList.get(0);
+ assertTrue("Wrong message bounced: "+m.toString(),m.toString().contains("msg2"));
+
+
+
+
+ con.close();
+ con2.close();
+
+
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new VMBrokerSetup(new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class));
+ }
+
+ public void onException(JMSException jmsException)
+ {
+ _logger.warn("Caught exception on producer: ",jmsException);
+ Exception linkedException = jmsException.getLinkedException();
+ if(linkedException instanceof AMQNoRouteException)
+ {
+ AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
+ Message bounced = (Message) noRoute.getUndeliveredMessage();
+ _bouncedMessageList.add(bounced);
+ }
+ }
+}
|