diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-04-12 10:31:51 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-04-12 10:31:51 +0000 |
commit | aebd8f2b8e14d2a05363d1a3ec160d06d0151e10 (patch) | |
tree | 661a0427c5b0aceb07169011888d3ae878fad25f | |
parent | 8db39dc54b357436697fe17da5676c14a2d21486 (diff) | |
download | qpid-python-aebd8f2b8e14d2a05363d1a3ec160d06d0151e10.tar.gz |
QPID-451 Throw InvalidDestinationException on attempt to publish to a Queue which does not exist
Changed QueueSenderAdapter to check if the routing key is bound to a queue on the given exchange.
The checking can be turned off by setting the system property org.apache.qpid.client.verifyQueueBindingBeforePublish to anything but true
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@527876 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 170 insertions, 29 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 661372845a..585991d905 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -53,6 +53,8 @@ public abstract class AMQDestination implements Destination, Referenceable private String _url; private AMQShortString _urlAsShortString; + private boolean _validated; + private byte[] _byteEncoding; private static final int IS_DURABLE_MASK = 0x1; private static final int IS_EXCLUSIVE_MASK = 0x2; @@ -198,12 +200,16 @@ public abstract class AMQDestination implements Destination, Referenceable { return toURL(); - /* - return "Destination: " + _destinationName + ", " + - "Queue Name: " + _queueName + ", Exchange: " + _exchangeName + - ", Exchange class: " + _exchangeClass + ", Exclusive: " + _isExclusive + - ", AutoDelete: " + _isAutoDelete + ", Routing Key: " + getRoutingKey(); - */ + } + + public boolean isValidated() + { + return _validated; + } + + public void setValidated(boolean validated) + { + _validated = validated; } public String toURL() @@ -348,15 +354,7 @@ public abstract class AMQDestination implements Destination, Referenceable { return false; } - /* if (_isExclusive != that._isExclusive) - { - return false; - } - if (_isAutoDelete != that._isAutoDelete) - { - return false; - } - */ + return true; } @@ -370,8 +368,7 @@ public abstract class AMQDestination implements Destination, Referenceable { result = 29 * result + _queueName.hashCode(); } -// result = result * (_isExclusive ? 13 : 7); -// result = result * (_isAutoDelete ? 13 : 7); + return result; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 39fc7e9c0d..d8d15d22c5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -998,42 +998,42 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi throw new java.lang.UnsupportedOperationException(); } - public MessageProducer createProducer(Destination destination, boolean mandatory, + public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException { return createProducerImpl(destination, mandatory, immediate, waitUntilSent); } - public MessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) + public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate); } - public MessageProducer createProducer(Destination destination, boolean immediate) + public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException { return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); } - public MessageProducer createProducer(Destination destination) throws JMSException + public BasicMessageProducer createProducer(Destination destination) throws JMSException { return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); } - private org.apache.qpid.jms.MessageProducer createProducerImpl(Destination destination, boolean mandatory, + private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate, false); } - private org.apache.qpid.jms.MessageProducer createProducerImpl(final Destination destination, final boolean mandatory, + private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent) throws JMSException { - return (org.apache.qpid.jms.MessageProducer) new FailoverSupport() + return (BasicMessageProducer) new FailoverSupport() { public Object operation() throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 5b6945e259..bd7cc94582 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -682,4 +682,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { return _session; } + + public boolean isBound(AMQDestination destination) throws JMSException + { + return _session.isQueueBound(destination.getExchangeName(),null,destination.getRoutingKey()); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index c9d29d8077..e0c4b61333 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -7,14 +7,15 @@ import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueSender; +import javax.jms.InvalidDestinationException; public class QueueSenderAdapter implements QueueSender { - private MessageProducer _delegate; + private BasicMessageProducer _delegate; private Queue _queue; private boolean closed = false; - public QueueSenderAdapter(MessageProducer msgProducer, Queue queue){ + public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue){ _delegate = msgProducer; _queue = queue; } @@ -122,12 +123,13 @@ public class QueueSenderAdapter implements QueueSender { _delegate.setTimeToLive(timeToLive); } - private void checkPreConditions() throws IllegalStateException, IllegalStateException + private void checkPreConditions() throws JMSException { checkPreConditions(_queue); } - private void checkPreConditions(Queue queue) throws IllegalStateException, IllegalStateException { + private void checkPreConditions(Queue queue) throws JMSException + { if (closed){ throw new javax.jms.IllegalStateException("Publisher is closed"); } @@ -137,5 +139,28 @@ public class QueueSenderAdapter implements QueueSender { if(session == null || session.isClosed()){ throw new javax.jms.IllegalStateException("Invalid Session"); } - } + + if(!(queue instanceof AMQDestination)) + { + throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue"); + } + AMQDestination destination = (AMQDestination) queue; + if(!destination.isValidated() && checkQueueBeforePublish()) + { + + if (_delegate.isBound(destination)) + { + destination.setValidated(true); + } + else + { + throw new InvalidDestinationException("Queue: " + queue + " is not a valid destination (no bindings on server"); + } + } + } + + private boolean checkQueueBeforePublish() + { + return "true".equalsIgnoreCase(System.getProperty("org.apache.qpid.client.verifyQueueBindingBeforePublish", "true")); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java index f67b984658..02a408465b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java @@ -175,5 +175,10 @@ public class TopicPublisherAdapter implements TopicPublisher { throw new InvalidDestinationException("Destination " + topic + " is not a topic"); } + if(!(topic instanceof AMQDestination)) + { + throw new InvalidDestinationException("Destination " + topic + " is not a Qpid topic"); + } + } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java new file mode 100644 index 0000000000..1b5da2631d --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java @@ -0,0 +1,109 @@ +package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import junit.framework.TestCase;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.QueueSession;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+import javax.jms.TextMessage;
+import javax.jms.InvalidDestinationException;
+
+public class InvalidDestinationTest extends TestCase
+{
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private MessageConsumer _consumer;
+
+ private static final String VM_BROKER = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ createVMBroker();
+ _connection = new AMQConnection(VM_BROKER, "guest", "guest", "ReceiveTestClient", "test");
+ }
+
+ public void createVMBroker()
+ {
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ fail("Unable to create broker: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ TransportConnection.killVMBroker(1);
+ super.tearDown();
+ }
+
+
+
+ public void testInvalidDestination() throws Exception
+ {
+ Queue invalidDestination = new AMQQueue("amq.direct","unknownQ");
+ AMQQueue validDestination = new AMQQueue("amq.direct","knownQ");
+ QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // This is the only easy way to create and bind a queue from the API :-(
+ MessageConsumer consumer = queueSession.createConsumer(validDestination);
+
+ QueueSender sender = queueSession.createSender(invalidDestination);
+ TextMessage msg = queueSession.createTextMessage("Hello");
+ try
+ {
+ sender.send(msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.close();
+
+ sender = queueSession.createSender(null);
+ invalidDestination = new AMQQueue("amq.direct","unknownQ");
+
+ try
+ {
+ sender.send(invalidDestination,msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.send(validDestination,msg);
+ sender.close();
+ validDestination = new AMQQueue("amq.direct","knownQ");
+ sender = queueSession.createSender(validDestination);
+ sender.send(msg);
+
+
+
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+
+ return new junit.framework.TestSuite(InvalidDestinationTest.class);
+ }
+}
|