diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 61 |
1 files changed, 44 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 3973b5dd71..7ab26f3b47 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1312,7 +1312,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotClosed(); if (queueName.indexOf('/') == -1) { - return new AMQQueue(queueName); + return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName)); } else { @@ -1330,6 +1330,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public AMQShortString getDefaultQueueExchangeName() + { + return _connection.getDefaultQueueExchangeName(); + } + /** * Creates a QueueReceiver wrapping a MessageConsumer * @@ -1379,7 +1384,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (topicName.indexOf('/') == -1) { - return new AMQTopic(new AMQShortString(topicName)); + return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName)); } else { @@ -1397,6 +1402,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public AMQShortString getDefaultTopicExchangeName() + { + return _connection.getDefaultTopicExchangeName(); + } + /** * Creates a non-durable subscriber * @@ -1409,8 +1419,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkNotClosed(); - checkValidTopic(topic); - AMQTopic dest = new AMQTopic(topic.getTopicName()); + AMQTopic dest = checkValidTopic(topic); + //AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1428,16 +1438,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkNotClosed(); - checkValidTopic(topic); - AMQTopic dest = new AMQTopic(topic.getTopicName()); + AMQTopic dest = checkValidTopic(topic); + //AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { checkNotClosed(); - checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); + AMQTopic origTopic = checkValidTopic(topic); + AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); TopicSubscriberAdaptor subscriber = _subscriptions.get(name); if (subscriber != null) { @@ -1464,8 +1474,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // if the queue is bound to the exchange but NOT for this topic, then the JMS spec // says we must trash the subscription. - if (isQueueBound(dest.getAMQQueueName()) && - !isQueueBound(dest.getAMQQueueName(), topicName)) + if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) && + !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName)) { deleteQueue(dest.getAMQQueueName()); } @@ -1556,7 +1566,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection))) + if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) { deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } @@ -1567,17 +1577,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - boolean isQueueBound(AMQShortString queueName) throws JMSException + boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException { - return isQueueBound(queueName, null); + return isQueueBound(exchangeName, queueName, null); } - boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException + boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException { // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange + exchangeName, // exchange queueName, // queue routingKey); // routingKey AMQMethodEvent response = null; @@ -1858,7 +1868,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /* * I could have combined the last 3 methods, but this way it improves readability */ - private void checkValidTopic(Topic topic) throws JMSException + private AMQTopic checkValidTopic(Topic topic) throws JMSException { if (topic == null) { @@ -1866,8 +1876,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this) { - throw new JMSException("Cannot create a subscription on a temporary topic created in another session"); + throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session"); } + if(!(topic instanceof AMQTopic)) + { + throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName()); + } + return (AMQTopic) topic; } private void checkValidQueue(Queue queue) throws InvalidDestinationException @@ -1887,6 +1902,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } + public AMQShortString getTemporaryTopicExchangeName() + { + return _connection.getTemporaryTopicExchangeName(); + } + + public AMQShortString getTemporaryQueueExchangeName() + { + return _connection.getTemporaryQueueExchangeName(); + } + + + public int getTicket() { return _ticket; |