diff options
3 files changed, 105 insertions, 36 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 1f940b62f0..5c2949960c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1043,7 +1043,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throws JMSException { checkNotClosed(); - AMQTopic origTopic = checkValidTopic(topic, true); + Topic origTopic = checkValidTopic(topic, true); AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; @@ -1307,8 +1307,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createQueueReceiver(Destination destination) throws JMSException { checkValidDestination(destination); - AMQQueue dest = (AMQQueue) destination; - C consumer = (C) createConsumer(destination); + Queue dest = validateQueue(destination); + C consumer = (C) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); } @@ -1326,8 +1326,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); - AMQQueue dest = (AMQQueue) destination; - C consumer = (C) createConsumer(destination, messageSelector); + Queue dest = validateQueue(destination); + C consumer = (C) createConsumer(dest, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } @@ -1344,7 +1344,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createReceiver(Queue queue) throws JMSException { checkNotClosed(); - AMQQueue dest = (AMQQueue) queue; + Queue dest = validateQueue(queue); C consumer = (C) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); @@ -1363,11 +1363,23 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { checkNotClosed(); - AMQQueue dest = (AMQQueue) queue; + Queue dest = validateQueue(queue); C consumer = (C) createConsumer(dest, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } + + private Queue validateQueue(Destination dest) throws InvalidDestinationException + { + if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue) + { + return (Queue)dest; + } + else + { + throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue"); + } + } public QueueSender createSender(Queue queue) throws JMSException { @@ -1408,7 +1420,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkNotClosed(); - AMQTopic dest = checkValidTopic(topic); + Topic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); @@ -1428,7 +1440,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkNotClosed(); - AMQTopic dest = checkValidTopic(topic); + Topic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal)); @@ -2395,7 +2407,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /* * I could have combined the last 3 methods, but this way it improves readability */ - protected AMQTopic checkValidTopic(Topic topic, boolean durable) throws JMSException + protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException { if (topic == null) { @@ -2414,17 +2426,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ("Cannot create a durable subscription with a temporary topic: " + topic); } - if (!(topic instanceof AMQTopic)) + if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic)) { throw new javax.jms.InvalidDestinationException( "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName()); } - return (AMQTopic) topic; + return topic; } - protected AMQTopic checkValidTopic(Topic topic) throws JMSException + protected Topic checkValidTopic(Topic topic) throws JMSException { return checkValidTopic(topic, false); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 6217cb534a..c573da9def 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import java.net.URISyntaxException; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Topic; @@ -95,39 +96,47 @@ public class AMQTopic extends AMQDestination implements Topic super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys); } - public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) + public static AMQTopic createDurableTopic(Topic topic, String subscriptionName, AMQConnection connection) throws JMSException { - if (topic.getDestSyntax() == DestSyntax.ADDR) + if (topic instanceof AMQDestination && topic instanceof javax.jms.Topic) { - try + AMQDestination qpidTopic = (AMQDestination)topic; + if (qpidTopic.getDestSyntax() == DestSyntax.ADDR) { - AMQTopic t = new AMQTopic(topic.getAddress()); - AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); - // link is never null if dest was created using an address string. - t.getLink().setName(queueName.asString()); - t.getSourceNode().setAutoDelete(false); - t.getSourceNode().setDurable(true); - - // The legacy fields are also populated just in case. - t.setQueueName(queueName); - t.setAutoDelete(false); - t.setDurable(true); - return t; + try + { + AMQTopic t = new AMQTopic(qpidTopic.getAddress()); + AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); + // link is never null if dest was created using an address string. + t.getLink().setName(queueName.asString()); + t.getSourceNode().setAutoDelete(false); + t.getSourceNode().setDurable(true); + + // The legacy fields are also populated just in case. + t.setQueueName(queueName); + t.setAutoDelete(false); + t.setDurable(true); + return t; + } + catch(Exception e) + { + JMSException ex = new JMSException("Error creating durable topic"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } } - catch(Exception e) + else { - JMSException ex = new JMSException("Error creating durable topic"); - ex.initCause(e); - ex.setLinkedException(e); - throw ex; + return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false, + getDurableTopicQueueName(subscriptionName, connection), + true); } } else { - return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false, - getDurableTopicQueueName(subscriptionName, connection), - true); + throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Topic"); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 51589c705f..dd86ffc4da 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -31,12 +31,18 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueReceiver; +import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; import javax.naming.Context; import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; @@ -796,4 +802,46 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { } } + + public void testQueueReceiversAndTopicSubscriber() throws Exception + { + Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}"); + Topic topic = new AMQAnyDestination("ADDR:amq.topic/test"); + + QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + QueueReceiver receiver = qSession.createReceiver(queue); + + TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber sub = tSession.createSubscriber(topic); + + Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue")); + prod1.send(ssn.createTextMessage("test1")); + + MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test")); + prod2.send(ssn.createTextMessage("test2")); + + Message msg1 = receiver.receive(); + assertNotNull(msg1); + assertEquals("test1",((TextMessage)msg1).getText()); + + Message msg2 = sub.receive(); + assertNotNull(msg2); + assertEquals("test2",((TextMessage)msg2).getText()); + } + + public void testDurableSubscriber() throws Exception + { + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Topic topic = ssn.createTopic("news.us"); + + MessageConsumer cons = ssn.createDurableSubscriber(topic, "my-sub"); + MessageProducer prod = ssn.createProducer(topic); + + Message m = ssn.createTextMessage("A"); + prod.send(m); + Message msg = cons.receive(1000); + assertNotNull(msg); + assertEquals("A",((TextMessage)msg).getText()); + } } |