summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-12 13:22:45 +0000
committerRobert Greig <rgreig@apache.org>2006-12-12 13:22:45 +0000
commit1ed818ed1d90626d232f340dc48d55d54ee51d6c (patch)
tree69106b6a888d148e0aecd7643802a440df9f661a /java
parent2970078e8a383be527c6a80c7fdd9b548e3895ca (diff)
downloadqpid-python-1ed818ed1d90626d232f340dc48d55d54ee51d6c.tar.gz
QPID-95 : Patch supplied by Rob Godfrey - throws correct exception when encountering a non-routable mandatory message
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486132 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java13
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java42
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java28
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java151
4 files changed, 222 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index ccb2211a55..3fa73aa2e2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -206,7 +206,18 @@ public class HeadersExchange extends AbstractExchange
}
if (!delivered)
{
- _logger.warn("Exchange " + getName() + ": message not routable.");
+
+ String msg = "Exchange " + getName() + ": message not routable.";
+
+ if (payload.getPublishBody().mandatory)
+ {
+ throw new NoRouteException(msg, payload);
+ }
+ else
+ {
+ _logger.warn(msg);
+ }
+
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 93fbab682a..35aab78f73 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -81,25 +81,45 @@ public class AbstractHeadersExchangeTestBase extends TestCase
protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
{
- routeAndTest(m, Arrays.asList(expected));
+ routeAndTest(m, false, Arrays.asList(expected));
+ }
+
+ protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException
+ {
+ routeAndTest(m, expectReturn, Arrays.asList(expected));
}
protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
{
- route(m);
- for (TestQueue q : queues)
+ routeAndTest(m, false, expected);
+ }
+
+ protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
+ {
+ try
{
- if (expected.contains(q))
+ route(m);
+ assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+ for (TestQueue q : queues)
{
- assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
- //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
- }
- else
- {
- assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
- //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+ if (expected.contains(q))
+ {
+ assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
+ }
+ else
+ {
+ assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+ }
}
}
+
+ catch (NoRouteException ex)
+ {
+ assertTrue("Expected "+m+" not to be returned",expectReturn);
+ }
+
}
static FieldTable getHeaders(String... entries)
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 91520df3bf..c220442a78 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.framing.BasicPublishBody;
public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
{
@@ -52,6 +53,19 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
q1, q2, q3, q4, q5, q6, q7, q8);
routeAndTest(new Message("Message6", "F0002"));
+
+ Message m7 = new Message("Message7", "XXXXX");
+
+ BasicPublishBody pb7 = m7.getPublishBody();
+ pb7.mandatory = true;
+ routeAndTest(m7,true);
+
+ Message m8 = new Message("Message8", "F0000");
+ BasicPublishBody pb8 = m8.getPublishBody();
+ pb8.mandatory = true;
+ routeAndTest(m8,false,q1);
+
+
}
public void testAny() throws AMQException
@@ -71,6 +85,20 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
routeAndTest(new Message("Message6", "F0002"));
}
+ public void testMandatory() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000");
+ Message m1 = new Message("Message1", "XXXXX");
+ Message m2 = new Message("Message2", "F0000");
+ BasicPublishBody pb1 = m1.getPublishBody();
+ pb1.mandatory = true;
+ BasicPublishBody pb2 = m1.getPublishBody();
+ pb2.mandatory = true;
+ routeAndTest(m1,true);
+
+
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(HeadersExchangeTest.class);
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
new file mode 100644
index 0000000000..4dffe3e75f
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
@@ -0,0 +1,151 @@
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.VMBrokerSetup;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.client.*;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.PropertyFieldTable;
+
+import javax.jms.*;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+
+public class ReturnUnroutableMandatoryMessageTest extends TestCase implements ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
+
+ private final List<Message> _bouncedMessageList = Collections.synchronizedList(new ArrayList<Message>());
+
+ static
+ {
+ String workdir = System.getProperty("QPID_WORK");
+ if (workdir == null || workdir.equals(""))
+ {
+ String tempdir = System.getProperty("java.io.tmpdir");
+ System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
+ System.setProperty("QPID_WORK", tempdir);
+ }
+// DOMConfigurator.configure("../broker/etc/log4j.xml");
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ /**
+ * Tests that mandatory message which are not routable are returned to the producer
+ *
+ * @throws Exception
+ */
+ public void testReturnUnroutableMandatoryMessage() throws Exception
+ {
+ _bouncedMessageList.clear();
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+
+ AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
+ FieldTable ft = new PropertyFieldTable();
+ ft.setString("F1000","1");
+ MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft);
+
+
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+
+ con2.setExceptionListener(this);
+ AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced messages
+ _logger.info("Starting producer connection");
+ con2.start();
+
+
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(queue,false,false);
+ MessageProducer mandatoryProducer = producerSession.createProducer(queue);
+
+
+ // First test - should neither be bounced nor routed
+ _logger.info("Sending non-routable non-mandatory message");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
+ nonMandatoryProducer.send(msg1);
+
+ // Second test - should be bounced
+ _logger.info("Sending non-routable mandatory message");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
+ mandatoryProducer.send(msg2);
+
+ // Third test - should be routed
+ _logger.info("Sending routable message");
+ TextMessage msg3 = producerSession.createTextMessage("msg3");
+ msg3.setStringProperty("F1000","1");
+ mandatoryProducer.send(msg3);
+
+
+
+ _logger.info("Starting consumer connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive(1000L);
+
+ assertTrue("No message routed to receiver",tm != null);
+ assertTrue("Wrong message routed to receiver: "+tm.getText(),"msg3".equals(tm.getText()));
+
+ try
+ {
+ Thread.sleep(1000L);
+ }
+ catch(InterruptedException e)
+ {
+ ;
+ }
+
+ assertTrue("Wrong number of messages bounced (expect 1): "+_bouncedMessageList.size(),_bouncedMessageList.size()==1);
+ Message m = _bouncedMessageList.get(0);
+ assertTrue("Wrong message bounced: "+m.toString(),m.toString().contains("msg2"));
+
+
+
+
+ con.close();
+ con2.close();
+
+
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new VMBrokerSetup(new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class));
+ }
+
+ public void onException(JMSException jmsException)
+ {
+ _logger.warn("Caught exception on producer: ",jmsException);
+ Exception linkedException = jmsException.getLinkedException();
+ if(linkedException instanceof AMQNoRouteException)
+ {
+ AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
+ Message bounced = (Message) noRoute.getUndeliveredMessage();
+ _bouncedMessageList.add(bounced);
+ }
+ }
+}