summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-04-12 10:31:51 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-04-12 10:31:51 +0000
commitaebd8f2b8e14d2a05363d1a3ec160d06d0151e10 (patch)
tree661a0427c5b0aceb07169011888d3ae878fad25f
parent8db39dc54b357436697fe17da5676c14a2d21486 (diff)
downloadqpid-python-aebd8f2b8e14d2a05363d1a3ec160d06d0151e10.tar.gz
QPID-451 Throw InvalidDestinationException on attempt to publish to a Queue which does not exist
Changed QueueSenderAdapter to check if the routing key is bound to a queue on the given exchange. The checking can be turned off by setting the system property org.apache.qpid.client.verifyQueueBindingBeforePublish to anything but true git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@527876 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java35
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java5
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java109
6 files changed, 170 insertions, 29 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 661372845a..585991d905 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -53,6 +53,8 @@ public abstract class AMQDestination implements Destination, Referenceable
private String _url;
private AMQShortString _urlAsShortString;
+ private boolean _validated;
+
private byte[] _byteEncoding;
private static final int IS_DURABLE_MASK = 0x1;
private static final int IS_EXCLUSIVE_MASK = 0x2;
@@ -198,12 +200,16 @@ public abstract class AMQDestination implements Destination, Referenceable
{
return toURL();
- /*
- return "Destination: " + _destinationName + ", " +
- "Queue Name: " + _queueName + ", Exchange: " + _exchangeName +
- ", Exchange class: " + _exchangeClass + ", Exclusive: " + _isExclusive +
- ", AutoDelete: " + _isAutoDelete + ", Routing Key: " + getRoutingKey();
- */
+ }
+
+ public boolean isValidated()
+ {
+ return _validated;
+ }
+
+ public void setValidated(boolean validated)
+ {
+ _validated = validated;
}
public String toURL()
@@ -348,15 +354,7 @@ public abstract class AMQDestination implements Destination, Referenceable
{
return false;
}
- /* if (_isExclusive != that._isExclusive)
- {
- return false;
- }
- if (_isAutoDelete != that._isAutoDelete)
- {
- return false;
- }
- */
+
return true;
}
@@ -370,8 +368,7 @@ public abstract class AMQDestination implements Destination, Referenceable
{
result = 29 * result + _queueName.hashCode();
}
-// result = result * (_isExclusive ? 13 : 7);
-// result = result * (_isAutoDelete ? 13 : 7);
+
return result;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 39fc7e9c0d..d8d15d22c5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -998,42 +998,42 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
throw new java.lang.UnsupportedOperationException();
}
- public MessageProducer createProducer(Destination destination, boolean mandatory,
+ public BasicMessageProducer createProducer(Destination destination, boolean mandatory,
boolean immediate, boolean waitUntilSent)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
}
- public MessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
+ public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate);
}
- public MessageProducer createProducer(Destination destination, boolean immediate)
+ public BasicMessageProducer createProducer(Destination destination, boolean immediate)
throws JMSException
{
return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
}
- public MessageProducer createProducer(Destination destination) throws JMSException
+ public BasicMessageProducer createProducer(Destination destination) throws JMSException
{
return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
}
- private org.apache.qpid.jms.MessageProducer createProducerImpl(Destination destination, boolean mandatory,
+ private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory,
boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, false);
}
- private org.apache.qpid.jms.MessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
+ private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent)
throws JMSException
{
- return (org.apache.qpid.jms.MessageProducer) new FailoverSupport()
+ return (BasicMessageProducer) new FailoverSupport()
{
public Object operation() throws JMSException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 5b6945e259..bd7cc94582 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -682,4 +682,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
return _session;
}
+
+ public boolean isBound(AMQDestination destination) throws JMSException
+ {
+ return _session.isQueueBound(destination.getExchangeName(),null,destination.getRoutingKey());
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
index c9d29d8077..e0c4b61333 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
@@ -7,14 +7,15 @@ import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueSender;
+import javax.jms.InvalidDestinationException;
public class QueueSenderAdapter implements QueueSender {
- private MessageProducer _delegate;
+ private BasicMessageProducer _delegate;
private Queue _queue;
private boolean closed = false;
- public QueueSenderAdapter(MessageProducer msgProducer, Queue queue){
+ public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue){
_delegate = msgProducer;
_queue = queue;
}
@@ -122,12 +123,13 @@ public class QueueSenderAdapter implements QueueSender {
_delegate.setTimeToLive(timeToLive);
}
- private void checkPreConditions() throws IllegalStateException, IllegalStateException
+ private void checkPreConditions() throws JMSException
{
checkPreConditions(_queue);
}
- private void checkPreConditions(Queue queue) throws IllegalStateException, IllegalStateException {
+ private void checkPreConditions(Queue queue) throws JMSException
+ {
if (closed){
throw new javax.jms.IllegalStateException("Publisher is closed");
}
@@ -137,5 +139,28 @@ public class QueueSenderAdapter implements QueueSender {
if(session == null || session.isClosed()){
throw new javax.jms.IllegalStateException("Invalid Session");
}
- }
+
+ if(!(queue instanceof AMQDestination))
+ {
+ throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue");
+ }
+ AMQDestination destination = (AMQDestination) queue;
+ if(!destination.isValidated() && checkQueueBeforePublish())
+ {
+
+ if (_delegate.isBound(destination))
+ {
+ destination.setValidated(true);
+ }
+ else
+ {
+ throw new InvalidDestinationException("Queue: " + queue + " is not a valid destination (no bindings on server");
+ }
+ }
+ }
+
+ private boolean checkQueueBeforePublish()
+ {
+ return "true".equalsIgnoreCase(System.getProperty("org.apache.qpid.client.verifyQueueBindingBeforePublish", "true"));
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
index f67b984658..02a408465b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
@@ -175,5 +175,10 @@ public class TopicPublisherAdapter implements TopicPublisher
{
throw new InvalidDestinationException("Destination " + topic + " is not a topic");
}
+ if(!(topic instanceof AMQDestination))
+ {
+ throw new InvalidDestinationException("Destination " + topic + " is not a Qpid topic");
+ }
+
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
new file mode 100644
index 0000000000..1b5da2631d
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
@@ -0,0 +1,109 @@
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import junit.framework.TestCase;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.QueueSession;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+import javax.jms.TextMessage;
+import javax.jms.InvalidDestinationException;
+
+public class InvalidDestinationTest extends TestCase
+{
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private MessageConsumer _consumer;
+
+ private static final String VM_BROKER = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ createVMBroker();
+ _connection = new AMQConnection(VM_BROKER, "guest", "guest", "ReceiveTestClient", "test");
+ }
+
+ public void createVMBroker()
+ {
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ fail("Unable to create broker: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ TransportConnection.killVMBroker(1);
+ super.tearDown();
+ }
+
+
+
+ public void testInvalidDestination() throws Exception
+ {
+ Queue invalidDestination = new AMQQueue("amq.direct","unknownQ");
+ AMQQueue validDestination = new AMQQueue("amq.direct","knownQ");
+ QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // This is the only easy way to create and bind a queue from the API :-(
+ MessageConsumer consumer = queueSession.createConsumer(validDestination);
+
+ QueueSender sender = queueSession.createSender(invalidDestination);
+ TextMessage msg = queueSession.createTextMessage("Hello");
+ try
+ {
+ sender.send(msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.close();
+
+ sender = queueSession.createSender(null);
+ invalidDestination = new AMQQueue("amq.direct","unknownQ");
+
+ try
+ {
+ sender.send(invalidDestination,msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.send(validDestination,msg);
+ sender.close();
+ validDestination = new AMQQueue("amq.direct","knownQ");
+ sender = queueSession.createSender(validDestination);
+ sender.send(msg);
+
+
+
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+
+ return new junit.framework.TestSuite(InvalidDestinationTest.class);
+ }
+}