diff options
author | Keith Wall <kwall@apache.org> | 2014-01-07 17:58:41 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-01-07 17:58:41 +0000 |
commit | fc2867bccfeab4126352e2ac5bec0a6e63b46b53 (patch) | |
tree | d0774c95aad6ee1a8049d17e80b1839a9b519807 | |
parent | f7386e281f61e6921254f6da2c9aa6850a5bf722 (diff) | |
download | qpid-python-fc2867bccfeab4126352e2ac5bec0a6e63b46b53.tar.gz |
QPID-5420: Restore ability to consume using BURLs specifying default exchange.
* Java Broker: Changed AbstractVirtualHost so that createExchange throws ExchangeExistException before checking for a reserved exchnage name. The effect will be that the Java Broker will again accept active declaration of the built in exchanges (amq.*, qpid.* and default).
* Java Broker: Changed the 0-8..0-9-1 ExchangeBoundHandler so that a null exchnage name is treated to mean the default exchange. This matches the behaviour of ServerSessionDelegate#exchangeBound() on the 0-10 path. This allows the Java client to query bindings on the default exchange.
* Client: Changed AbstractAMQMessageDelegate.java so that 0-10 knows the type of the default exchange when populating the JMSDestination on received messages.
* Client: Introduced system property qpid.bind_queues system property so that the exchange/queue bind side effect can be suppressed on consumer creation. Like qid.declare_exchanges and declare_queues, this system propery has effect when using BURLs. Might be useful if using a new client with older broker.
* Added new system tests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556292 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 107 insertions, 86 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 0cd4f0b6b2..5859ce3c68 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -563,11 +563,6 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg String alternateExchangeName) throws AMQException { - - if(_exchangeRegistry.isReservedExchangeName(name)) - { - throw new ReservedExchangeNameException(name); - } synchronized (_exchangeRegistry) { Exchange existing; @@ -575,6 +570,11 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg { throw new ExchangeExistsException(name,existing); } + if(_exchangeRegistry.isReservedExchangeName(name)) + { + throw new ReservedExchangeNameException(name); + } + Exchange alternateExchange; if(alternateExchangeName != null) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java index 0535236f94..4ebddb0f68 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java @@ -29,7 +29,6 @@ import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -80,13 +79,9 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo channel.sync(); - AMQShortString exchangeName = body.getExchange(); + AMQShortString exchangeName = body.getExchange() == null ? AMQShortString.EMPTY_STRING : body.getExchange(); AMQShortString queueName = body.getQueue(); AMQShortString routingKey = body.getRoutingKey(); - if (exchangeName == null) - { - throw new AMQException("Exchange exchange must not be null"); - } Exchange exchange = virtualHost.getExchange(exchangeName.toString()); ExchangeBoundOkBody response; if (exchange == null) @@ -94,7 +89,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND, - new AMQShortString("Exchange " + exchangeName + " not found")); + new AMQShortString("Exchange '" + exchangeName + "' not found")); } else if (routingKey == null) { @@ -119,7 +114,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - new AMQShortString("Queue " + queueName + " not found")); // replyText + new AMQShortString("Queue '" + queueName + "' not found")); // replyText } else { @@ -133,7 +128,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode - new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName)); // replyText + new AMQShortString("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText } } } @@ -145,7 +140,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - new AMQShortString("Queue " + queueName + " not found")); // replyText + new AMQShortString("Queue '" + queueName + "' not found")); // replyText } else { @@ -159,8 +154,8 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo else { - String message = "Queue " + queueName + " not bound with routing key " + - body.getRoutingKey() + " to exchange " + exchangeName; + String message = "Queue '" + queueName + "' not bound with routing key '" + + body.getRoutingKey() + "' to exchange '" + exchangeName + "'"; if(message.length()>255) { @@ -183,8 +178,8 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode - new AMQShortString("No queue bound with routing key " + body.getRoutingKey() + - " to exchange " + exchangeName)); // replyText + new AMQShortString("No queue bound with routing key '" + body.getRoutingKey() + + "' to exchange '" + exchangeName + "'")); // replyText } } session.writeFrame(response.generateFrame(channelId)); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 018a1ec851..8224c77ba9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -109,7 +109,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** System property to enable failure if strict AMQP compliance is violated. */ public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; - /** Strickt AMQP failure default. */ + /** Strict AMQP failure default. */ public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; /** System property to enable immediate message prefetching. */ @@ -124,6 +124,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final boolean _declareExchanges = Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "true")); + private final boolean _bindQueues = + Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_BIND_QUEUES_PROP_NAME, "true")); + private final boolean _useAMQPEncodedMapMessage; private final boolean _useAMQPEncodedStreamMessage; @@ -2870,10 +2873,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { declareQueue(amqd, consumer.isNoLocal(), nowait); } - if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey())) + if (_bindQueues) { - bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), - amqd instanceof AMQTopic ? consumer.getArguments() : null, amqd.getExchangeName(), amqd, nowait); + if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey())) + { + bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), + amqd instanceof AMQTopic ? consumer.getArguments() : null, amqd.getExchangeName(), amqd, nowait); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index f733e6bbca..f735895c81 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -116,7 +116,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe { return getMessageFactory().createMessage(messageFrame.getDeliveryTag(), - messageFrame.isRedelivered(), messageFrame.getExchange(), + messageFrame.isRedelivered(), messageFrame.getExchange() == null ? AMQShortString.EMPTY_STRING : messageFrame.getExchange(), messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(), _queueDestinationCache, _topicDestinationCache); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java index ad19b0e620..784c33cf02 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java @@ -60,7 +60,10 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, AMQDestination.TOPIC_TYPE); _exchangeTypeToDestinationType.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, AMQDestination.QUEUE_TYPE); - _exchangeMap.put("", new ExchangeInfo("","",AMQDestination.QUEUE_TYPE)); + _exchangeMap.put(ExchangeDefaults.DEFAULT_EXCHANGE_NAME, + new ExchangeInfo(ExchangeDefaults.DEFAULT_EXCHANGE_NAME, + ExchangeDefaults.DIRECT_EXCHANGE_CLASS, + AMQDestination.QUEUE_TYPE)); _exchangeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new ExchangeInfo(ExchangeDefaults.DIRECT_EXCHANGE_NAME, diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index b43b9d450b..0e7d061ba7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -230,6 +230,12 @@ public class ClientProperties * producer/consumer creation when using BindingURLs. */ public static final String QPID_DECLARE_EXCHANGES_PROP_NAME = "qpid.declare_exchanges"; + /** + * System property to control whether the client will bind queues during + * consumer creation when using BindingURLs. + */ + public static final String QPID_BIND_QUEUES_PROP_NAME = "qpid.bind_queues"; + public static final String VERIFY_QUEUE_ON_SEND = "qpid.verify_queue_on_send"; public static final String QPID_MAX_CACHED_ADDR_OPTION_STRINGS = "qpid.max_cached_address_option_strings"; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java index 1ee5b997f2..760884e654 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java @@ -20,18 +20,17 @@ */ package org.apache.qpid.test.client.message; -import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -41,7 +40,7 @@ import javax.jms.Session; import javax.jms.Topic; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.TabularData; -import java.nio.BufferOverflowException; + import java.util.Iterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -88,15 +87,15 @@ public class JMSDestinationTest extends QpidBrokerTestCase _connection.start(); - Message message = consumer.receive(10000); + Message receivedMessage = consumer.receive(10000); - assertNotNull("Message should not be null", message); + assertNotNull("Message should not be null", receivedMessage); - Destination destination = message.getJMSDestination(); + Destination receivedDestination = receivedMessage.getJMSDestination(); - assertNotNull("JMSDestination should not be null", destination); + assertNotNull("JMSDestination should not be null", receivedDestination); - assertEquals("Incorrect Destination type", queue.getClass(), destination.getClass()); + assertEquals("Incorrect Destination type", queue.getClass(), receivedDestination.getClass()); } /** @@ -115,15 +114,14 @@ public class JMSDestinationTest extends QpidBrokerTestCase _connection.start(); - Message message = consumer.receive(10000); - - assertNotNull("Message should not be null", message); + Message receivedMessage = consumer.receive(10000); - Destination destination = message.getJMSDestination(); + assertNotNull("Message should not be null", receivedMessage); - assertNotNull("JMSDestination should not be null", destination); + Destination receivedDestination = receivedMessage.getJMSDestination(); - assertEquals("Incorrect Destination type", topic.getClass(), destination.getClass()); + assertNotNull("JMSDestination should not be null", receivedDestination); + assertEquals("Incorrect Destination type", topic.getClass(), receivedDestination.getClass()); } /** @@ -191,11 +189,11 @@ public class JMSDestinationTest extends QpidBrokerTestCase assertNotNull("Message should not be null", message); - Destination destination = message.getJMSDestination(); + Destination receivedDestination = message.getJMSDestination(); - assertNotNull("JMSDestination should not be null", destination); + assertNotNull("JMSDestination should not be null", receivedDestination); - assertEquals("Incorrect Destination type", queue.getClass(), destination.getClass()); + assertEquals("Incorrect Destination type", queue.getClass(), receivedDestination.getClass()); } finally @@ -238,11 +236,11 @@ public class JMSDestinationTest extends QpidBrokerTestCase assertNotNull("Message should not be null", _message); - Destination destination = _message.getJMSDestination(); + Destination receivedDestination = _message.getJMSDestination(); - assertNotNull("JMSDestination should not be null", destination); + assertNotNull("JMSDestination should not be null", receivedDestination); - assertEquals("Incorrect Destination type", queue.getClass(), destination.getClass()); + assertEquals("Incorrect Destination type", queue.getClass(), receivedDestination.getClass()); } /** @@ -305,17 +303,7 @@ public class JMSDestinationTest extends QpidBrokerTestCase // b) we can actually send without a BufferOverFlow. MessageProducer producer = session08.createProducer(queue); - - try - { - producer.send(message); - } - catch (BufferOverflowException bofe) - { - // Print the stack trace so we can validate where the execption occured. - bofe.printStackTrace(); - fail("BufferOverflowException thrown during send"); - } + producer.send(message); message = consumer.receive(1000); @@ -327,45 +315,45 @@ public class JMSDestinationTest extends QpidBrokerTestCase } - /** - * Send a message to a custom exchange and then verify - * the message received has the proper destination set - * - * @throws Exception - */ - public void testGetDestinationWithCustomExchange() throws Exception + public void testQueueWithBindingUrlUsingCustomExchange() throws Exception { - - AMQDestination dest = new AMQAnyDestination(new AMQShortString("my-exchange"), - new AMQShortString("direct"), - new AMQShortString("test"), - false, - false, - new AMQShortString("test"), - false, - new AMQShortString[]{new AMQShortString("test")}); - - // to force the creation of my-exchange. - sendMessage(_session, dest, 1); - - MessageProducer prod = _session.createProducer(dest); + String exchangeName = "exch_" + getTestQueueName(); + String queueName = "queue_" + getTestQueueName(); + String address = String.format("direct://%s/%s/%s?routingkey='%s'", exchangeName, queueName, queueName, queueName); + sendReceive(address); + } + + public void testQueueWithBindingUrlUsingAmqDirectExchange() throws Exception + { + String queueName = getTestQueueName(); + String address = String.format("direct://amq.direct/%s/%s?routingkey='%s'", queueName, queueName, queueName); + sendReceive(address); + } + + public void testQueueWithBindingUrlUsingDefaultExchange() throws Exception + { + String queueName = getTestQueueName(); + String address = String.format("direct:///%s/%s?routingkey='%s'", queueName, queueName, queueName); + sendReceive(address); + } + + private void sendReceive(String address) throws JMSException, Exception + { + Destination dest = _session.createQueue(address); MessageConsumer consumer = _session.createConsumer(dest); - + _connection.start(); sendMessage(_session, dest, 1); - - Message message = consumer.receive(10000); - assertNotNull("Message should not be null", message); + Message receivedMessage = consumer.receive(10000); - Destination destination = message.getJMSDestination(); + assertNotNull("Message should not be null", receivedMessage); - assertNotNull("JMSDestination should not be null", destination); + Destination receivedDestination = receivedMessage.getJMSDestination(); - assertEquals("Incorrect Destination name", "my-exchange", dest.getExchangeName().asString()); - assertEquals("Incorrect Destination type", "direct", dest.getExchangeClass().asString()); - assertEquals("Incorrect Routing Key", "test", dest.getRoutingKey().asString()); + assertNotNull("JMSDestination should not be null", receivedDestination); + assertEquals("JMSDestination should match that sent", address, receivedDestination.toString()); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java index 734e3f2268..77df6c58d9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java @@ -31,6 +31,7 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.url.BindingURL; import javax.jms.Connection; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.Session; @@ -158,6 +159,28 @@ public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase assertFalse("exchange should not exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName2)); } + public void testQueueNotBoundDuringConsumerCreation() throws Exception + { + setSystemProperty(ClientProperties.QPID_BIND_QUEUES_PROP_NAME, "false"); + setSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "true"); + + Connection connection = getConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(getTestQueueName()); + session.createConsumer(queue); + + try + { + session.createProducer(queue).send(session.createMessage()); + fail("JMSException should be thrown as the queue does not exist"); + } + catch (InvalidDestinationException ide) + { + //PASS + } + } private void checkExceptionErrorCode(JMSException original, AMQConstant code) { Exception linked = original.getLinkedException(); |