diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 188 |
1 files changed, 104 insertions, 84 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d34290e007..c058ceb624 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -98,6 +98,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -976,7 +977,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, isTopic(destination), null, null, isBrowseOnlyDestination(destination), false); } @@ -984,7 +985,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, isTopic(destination), messageSelector, null, isBrowseOnlyDestination(destination), false); } @@ -993,7 +994,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic), + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, isTopic(destination), messageSelector, null, isBrowseOnlyDestination(destination), false); } @@ -1034,30 +1035,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkNotClosed(); Topic origTopic = checkValidTopic(topic, true); - AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - if (dest.getDestSyntax() == DestSyntax.ADDR && - !dest.isAddressResolved()) + // The check valid Topic will throw an exception if topic is not an instanceof + // AMQTopic or AddressBasedTopc. + Topic dest; + if (topic instanceof AMQTopic) { - try - { - handleAddressBasedDestination(dest,false,true); - if (dest.getAddressType() != AMQDestination.TOPIC_TYPE) - { - throw new JMSException("Durable subscribers can only be created for Topics"); - } - dest.getSourceNode().setDurable(true); - } - catch(AMQException e) - { - JMSException ex = new JMSException("Error when verifying destination"); - ex.initCause(e); - ex.setLinkedException(e); - throw ex; - } - catch(TransportException e) - { - throw toJMSException("Error when verifying destination", e); - } + dest = AMQTopic.createDurableTopic(origTopic, name, _connection); + } + else + { + dest = AddressBasedTopic.createDurableTopic(origTopic, name, _connection, this); } String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; @@ -1071,7 +1058,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (subscriber == null) { // After the address is resolved routing key will not be null. - AMQShortString topicName = dest.getRoutingKey(); + AMQShortString topicName = new AMQShortString(dest.getTopicName()); if (_strictAMQP) { @@ -1085,7 +1072,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic + "' for creation durableSubscriber. Requesting queue deletion regardless."); } - deleteQueue(dest.getAMQQueueName()); + if (topic instanceof AMQTopic) + { + deleteQueue(((AMQTopic)dest).getAMQQueueName()); + } + else + { + ((AddressBasedTopic)topic).deleteSubscription(this); + } } else { @@ -1099,13 +1093,34 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec // says we must trash the subscription. - boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()); + AMQShortString exchangeName; + AMQShortString queueName; + + if (topic instanceof AMQTopic) + { + exchangeName = ((AMQTopic)dest).getExchangeName(); + queueName = ((AMQTopic)dest).getAMQQueueName(); + + } + else + { + exchangeName = new AMQShortString(((AddressBasedTopic)topic).getAddress().getName()); + queueName = new AMQShortString(((AddressBasedTopic)topic).getAddress().getSubject()); + } + boolean isQueueBound = isQueueBound(exchangeName, queueName); boolean isQueueBoundForTopicAndSelector = - isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args); + isQueueBound(exchangeName.asString(), queueName.asString(), topicName.asString(), args); if (isQueueBound && !isQueueBoundForTopicAndSelector) { - deleteQueue(dest.getAMQQueueName()); + if (topic instanceof AMQTopic) + { + deleteQueue(((AMQTopic)dest).getAMQQueueName()); + } + else + { + ((AddressBasedTopic)topic).deleteSubscription(this); + } } } } @@ -1217,36 +1232,34 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public Queue createQueue(String queueName) throws JMSException { checkNotClosed(); - try + DestSyntax syntax = AMQDestination.getDestType(queueName); + queueName = AMQDestination.stripSyntaxPrefix(queueName); + if (syntax == AMQDestination.DestSyntax.BURL) { - if (queueName.indexOf('/') == -1 && queueName.indexOf(';') == -1) + if (queueName.indexOf('/') == -1) { - DestSyntax syntax = AMQDestination.getDestType(queueName); - if (syntax == AMQDestination.DestSyntax.BURL) - { - // For testing we may want to use the prefix - return new AMQQueue(getDefaultQueueExchangeName(), - new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName))); - } - else - { - AMQQueue queue = new AMQQueue(queueName); - return queue; - - } + return new AMQQueue(getDefaultQueueExchangeName(), + new AMQShortString(queueName)); } else { - return new AMQQueue(queueName); + try + { + return new AMQQueue(new AMQBindingURL(queueName)); + } + catch (URISyntaxException urlse) + { + _logger.error("", urlse); + JMSException jmse = new JMSException(urlse.getReason()); + jmse.setLinkedException(urlse); + jmse.initCause(urlse); + throw jmse; + } } } - catch (URISyntaxException urlse) + else { - _logger.error("", urlse); - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); - jmse.initCause(urlse); - throw jmse; + return new AddressBasedQueue(queueName); } } @@ -1511,36 +1524,39 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public Topic createTopic(String topicName) throws JMSException { checkNotClosed(); - try + DestSyntax syntax = AMQDestination.getDestType(topicName); + topicName = AMQDestination.stripSyntaxPrefix(topicName); + if (syntax == AMQDestination.DestSyntax.BURL) { - if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1) + if (topicName.indexOf('/') == -1) + { + return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); + } + else { - DestSyntax syntax = AMQDestination.getDestType(topicName); - // for testing we may want to use the prefix to indicate our choice. - topicName = AMQDestination.stripSyntaxPrefix(topicName); - if (syntax == AMQDestination.DestSyntax.BURL) + try { - return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); + return new AMQTopic(new AMQBindingURL(topicName)); } - else + catch (URISyntaxException urlse) { - return new AMQTopic("ADDR:" + getDefaultTopicExchangeName() + "/" + topicName); + _logger.error("", urlse); + JMSException jmse = new JMSException(urlse.getReason()); + jmse.setLinkedException(urlse); + jmse.initCause(urlse); + throw jmse; } } - else - { - return new AMQTopic(topicName); - } - } - catch (URISyntaxException urlse) + else { - _logger.error("", urlse); - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); - jmse.initCause(urlse); - throw jmse; - } + if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1) + { + topicName = getDefaultTopicExchangeName() + "/" + topicName; + } + + return new AddressBasedTopic(topicName); + } } public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException @@ -2463,7 +2479,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ("Cannot create a durable subscription with a temporary topic: " + topic); } - if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic)) + if (!(topic instanceof AMQTopic || topic instanceof AddressBasedTopic)) { throw new javax.jms.InvalidDestinationException( "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " @@ -2872,11 +2888,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic AMQProtocolHandler protocolHandler = getProtocolHandler(); - if (amqd.getDestSyntax() == DestSyntax.ADDR) - { - handleAddressBasedDestination(amqd,true,nowait); - } - else + if (amqd.getDestSyntax() == DestSyntax.BURL) { if (DECLARE_EXCHANGES) { @@ -2932,10 +2944,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throw new AMQException(null, "Fail-over exception interrupted basic consume.", e); } } - - public abstract void handleAddressBasedDestination(AMQDestination dest, - boolean isConsumer, - boolean noWait) throws AMQException; private void registerProducer(long producerId, MessageProducer producer) { @@ -3565,4 +3573,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.debug("Rollback mark is set to " + _rollbackMark.get()); } } + + private boolean isTopic(Destination dest) + { + if (dest instanceof AMQDestination) + { + return ((AMQDestination)dest).isTopic(); + } + else + { + return dest instanceof Topic; + } + } } |