diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-02-27 16:05:01 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-02-27 16:05:01 +0000 |
commit | fd4963007c35cd1c8e3b3cc88366a685920001e1 (patch) | |
tree | 88128615ff5846dcce9ce684230499e407ab431e | |
parent | 1a7e18ae07ed88605daf3c0277632ce9269eef71 (diff) | |
download | qpid-python-fd4963007c35cd1c8e3b3cc88366a685920001e1.tar.gz |
QPID-792 : [Java Client] Validate queue browser selector on client side, not broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1294194 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 110 insertions, 41 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 2313bce474..bd83e8b37b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.client; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.client.filter.JMSSelectorFilter; +import org.apache.qpid.protocol.AMQConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,12 +54,50 @@ public class AMQQueueBrowser implements QueueBrowser _session = session; _queue = queue; _messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector; - // Create Consumer to verify message selector. - BasicMessageConsumer consumer = - (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); - // Close this consumer as we are not looking to consume only to establish that, at least for now, - // the QB can be created - consumer.close(); + + + validateQueue((AMQDestination) queue); + + if(_messageSelector != null) + { + validateSelector(_messageSelector); + } + } + + private void validateSelector(String messageSelector) throws InvalidSelectorException + { + try + { + new JMSSelectorFilter(messageSelector); + } + catch (AMQInternalException e) + { + throw new InvalidSelectorException(e.getMessage()); + } + } + + private void validateQueue(AMQDestination queue) throws JMSException + { + try + { + // Essentially just test the connection/session is still active + _session.sync(); + // TODO - should really validate queue exists, but we often rely on creating the consumer to create the queue :( + // _session.declareQueuePassive( queue ); + } + catch (AMQException e) + { + if(e.getErrorCode() == AMQConstant.NOT_FOUND) + { + throw new InvalidDestinationException(e.getMessage()); + } + else + { + final JMSException jmsException = new JMSException(e.getMessage(), String.valueOf(e.getErrorCode().getCode())); + jmsException.setLinkedException(e); + throw jmsException; + } + } } public Queue getQueue() throws JMSException @@ -118,12 +162,12 @@ public class AMQQueueBrowser implements QueueBrowser _consumer = consumer; prefetchMessage(); } - _logger.info("QB:created with first element:" + _nextMessage); + _logger.debug("QB:created with first element:" + _nextMessage); } public boolean hasMoreElements() { - _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); + _logger.debug("QB:hasMoreElements:" + (_nextMessage != null)); return (_nextMessage != null); } @@ -136,9 +180,9 @@ public class AMQQueueBrowser implements QueueBrowser } try { - _logger.info("QB:nextElement about to receive"); + _logger.debug("QB:nextElement about to receive"); prefetchMessage(); - _logger.info("QB:nextElement received:" + _nextMessage); + _logger.debug("QB:nextElement received:" + _nextMessage); } catch (JMSException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 30f1dcf8b7..766237006a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1680,7 +1680,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { AMQProtocolHandler protocolHandler = getProtocolHandler(); declareExchange(amqd, protocolHandler, false); - AMQShortString queueName = declareQueue(amqd, protocolHandler, false); + AMQShortString queueName = declareQueue(amqd, false); bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd); } @@ -2714,6 +2714,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException, FailoverException; + + void declareQueuePassive(AMQDestination queue) throws AMQException + { + declareQueue(queue,false,false,true); + } + /** * Declares a queue for a JMS destination. * @@ -2723,27 +2729,35 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * * <p/>Note that this operation automatically retries in the event of fail-over. * - * @param amqd The destination to declare as a queue. - * @param protocolHandler The protocol handler to communicate through. * + * @param amqd The destination to declare as a queue. * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of * the client. * + * + * * @throws AMQException If the queue cannot be declared for any reason. * @todo Verify the destiation is valid or throw an exception. * @todo Be aware of possible changes to parameter order as versions change. */ - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal) throws AMQException { - return declareQueue(amqd, protocolHandler, noLocal, false); + return declareQueue(amqd, noLocal, false); } - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal, final boolean nowait) + throws AMQException + { + return declareQueue(amqd, noLocal, nowait, false); + } + + protected AMQShortString declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException { - /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ + final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new FailoverNoopSupport<AMQShortString, AMQException>( new FailoverProtectedOperation<AMQShortString, AMQException>() { @@ -2755,7 +2769,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic amqd.setQueueName(protocolHandler.generateQueueName()); } - sendQueueDeclare(amqd, protocolHandler, nowait); + sendQueueDeclare(amqd, protocolHandler, nowait, passive); return amqd.getAMQQueueName(); } @@ -2763,7 +2777,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException; + final boolean nowait, boolean passive) throws AMQException, FailoverException; /** * Undeclares the specified queue. @@ -2904,7 +2918,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_delareQueues || amqd.isNameRequired()) { - declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait); + declareQueue(amqd, consumer.isNoLocal(), nowait); } bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 36dbad1928..3902c726f3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -721,7 +721,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Declare a queue with the given queueName */ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait) + final boolean nowait, boolean passive) throws AMQException, FailoverException { // do nothing this is only used by 0_8 @@ -731,7 +731,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Declare a queue with the given queueName */ public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal, final boolean nowait) + final boolean noLocal, final boolean nowait, boolean passive) throws AMQException { AMQShortString queueName; @@ -757,7 +757,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().queueDeclare(queueName.toString(), "" , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, amqd.isDurable() ? Option.DURABLE : Option.NONE, - amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE, + passive ? Option.PASSIVE : Option.NONE); } else { @@ -927,11 +928,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return getCurrentException(); } - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal, final boolean nowait) + protected AMQShortString declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException { - /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + return new FailoverNoopSupport<AMQShortString, AMQException>( new FailoverProtectedOperation<AMQShortString, AMQException>() { @@ -948,7 +950,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.setQueueName(new AMQShortString( binddingKey + "@" + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); } - return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait); + return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive); } }, getAMQConnection()).execute(); } @@ -1205,7 +1207,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic else if(createNode) { setLegacyFiledsForQueueType(dest); - send0_10QueueDeclare(dest,null,noLocal,noWait); + send0_10QueueDeclare(dest,null,noLocal,noWait, false); sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), null,dest.getExchangeName(),dest, false); break; @@ -1311,7 +1313,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } node.setExclusive(true); node.setAutoDelete(!node.isDurable()); - send0_10QueueDeclare(dest,null,noLocal,true); + send0_10QueueDeclare(dest,null,noLocal,true, false); getQpidSession().exchangeBind(dest.getQueueName(), dest.getAddressName(), dest.getSubject(), diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index b56da5c2ec..8ab23a240e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -38,7 +38,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.client.filter.MessageFilter; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; @@ -401,9 +400,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException - { - QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null); + final boolean nowait, boolean passive) throws AMQException, FailoverException + { + QueueDeclareBody body = + getMethodRegistry().createQueueDeclareBody(getTicket(), + amqd.getAMQQueueName(), + passive, + amqd.isDurable(), + amqd.isExclusive(), + amqd.isAutoDelete(), + false, + null); AMQFrame queueDeclare = body.generateFrame(getChannelId()); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index 84d91ee57e..f199961b6f 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -145,7 +145,7 @@ public class TestAMQSession extends AMQSession_0_8 } public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler, - boolean nowait) throws AMQException, FailoverException + boolean nowait, boolean passive) throws AMQException, FailoverException { } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java index ddcb96b4db..236202f323 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.logging; +import javax.jms.QueueBrowser; import junit.framework.AssertionFailedError; import org.apache.qpid.client.AMQConnection; @@ -166,8 +167,10 @@ public class SubscriptionLoggingTest extends AbstractTestLogging */ public void testSubscriptionCreateQueueBrowser() throws JMSException, IOException { - _session.createBrowser(_queue); + _connection.start(); + QueueBrowser browser = _session.createBrowser(_queue); + browser.getEnumeration(); //Validate //Ensure that we wait for the SUB log message waitAndFindMatches("SUB-1001"); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index 7a4a45a2c8..f3433adb3f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -20,11 +20,7 @@ */ package org.apache.qpid.test.client; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - +import java.util.Enumeration; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -35,7 +31,10 @@ import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.NamingException; -import java.util.Enumeration; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; public class QueueBrowserAutoAckTest extends QpidBrokerTestCase { |