diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 64 |
1 files changed, 26 insertions, 38 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a41f2f9b17..a477832892 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -952,7 +952,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); } - public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) + protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { checkValidDestination(destination); @@ -966,15 +966,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkValidDestination(destination); return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null, - ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); - } - - public C createExclusiveConsumer(Destination destination) throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null, - ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); + isBrowseOnlyDestination(destination), false); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException @@ -982,7 +974,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkValidDestination(destination); return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), - messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); + messageSelector, null, isBrowseOnlyDestination(destination), false); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) @@ -991,16 +983,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkValidDestination(destination); return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic), - messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); - } - - public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true, - messageSelector, null, false, false); + messageSelector, null, isBrowseOnlyDestination(destination), false); } public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, @@ -1008,23 +991,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); + return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false); } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, - boolean exclusive, String selector) throws JMSException + boolean exclusive, String selector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); - } - - public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, - String selector, FieldTable rawSelector) throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false); } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, @@ -1032,7 +1007,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, isBrowseOnlyDestination(destination), false); } @@ -1438,9 +1413,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkNotClosed(); - Topic dest = checkValidTopic(topic); + checkValidTopic(topic); - return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); + return new TopicSubscriberAdaptor<C>(topic, + createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false)); } /** @@ -1457,10 +1433,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkNotClosed(); - Topic dest = checkValidTopic(topic); + checkValidTopic(topic); - // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal)); + return new TopicSubscriberAdaptor<C>(topic, + createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal, + true, messageSelector, null, false, false)); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1985,6 +1962,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkTemporaryDestination(destination); + if(!noConsume && isBrowseOnlyDestination(destination)) + { + throw new InvalidDestinationException("The consumer being created is not 'noConsume'," + + "but a 'browseOnly' Destination has been supplied."); + } + final String messageSelector; if (_strictAMQP && !((selector == null) || selector.equals(""))) @@ -3526,4 +3509,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } return code; } + + private boolean isBrowseOnlyDestination(Destination destination) + { + return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()); + } } |