summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-03-03 04:49:50 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-03-03 04:49:50 +0000
commitce920ef0dbc6fe2322a14a6849878d8ebe475077 (patch)
treefa84abd7317e3f901b6b12b7a2244d1b856a3fb3
parent66bd94f3609aba8452ea215017915968c5aedba0 (diff)
downloadqpid-python-ce920ef0dbc6fe2322a14a6849878d8ebe475077.tar.gz
QPID-3106
Instead of checking if it's an instance of AMQQueue, the code the now checks if it's an instance of AMQDestination and javax.jms.Queue to cover the AMQAnyDestination case. The same check is done for topics. Added test cases for QueueReceivers, TopicSubscribers and DurableTopicSubscribers using the new addressing scheme. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1076516 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java38
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java55
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java48
3 files changed, 105 insertions, 36 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 1f940b62f0..5c2949960c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1043,7 +1043,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
throws JMSException
{
checkNotClosed();
- AMQTopic origTopic = checkValidTopic(topic, true);
+ Topic origTopic = checkValidTopic(topic, true);
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
@@ -1307,8 +1307,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
{
checkValidDestination(destination);
- AMQQueue dest = (AMQQueue) destination;
- C consumer = (C) createConsumer(destination);
+ Queue dest = validateQueue(destination);
+ C consumer = (C) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1326,8 +1326,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
- AMQQueue dest = (AMQQueue) destination;
- C consumer = (C) createConsumer(destination, messageSelector);
+ Queue dest = validateQueue(destination);
+ C consumer = (C) createConsumer(dest, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1344,7 +1344,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
checkNotClosed();
- AMQQueue dest = (AMQQueue) queue;
+ Queue dest = validateQueue(queue);
C consumer = (C) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
@@ -1363,11 +1363,23 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
checkNotClosed();
- AMQQueue dest = (AMQQueue) queue;
+ Queue dest = validateQueue(queue);
C consumer = (C) createConsumer(dest, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
+
+ private Queue validateQueue(Destination dest) throws InvalidDestinationException
+ {
+ if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue)
+ {
+ return (Queue)dest;
+ }
+ else
+ {
+ throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue");
+ }
+ }
public QueueSender createSender(Queue queue) throws JMSException
{
@@ -1408,7 +1420,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
- AMQTopic dest = checkValidTopic(topic);
+ Topic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
@@ -1428,7 +1440,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
- AMQTopic dest = checkValidTopic(topic);
+ Topic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
@@ -2395,7 +2407,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- protected AMQTopic checkValidTopic(Topic topic, boolean durable) throws JMSException
+ protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException
{
if (topic == null)
{
@@ -2414,17 +2426,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
("Cannot create a durable subscription with a temporary topic: " + topic);
}
- if (!(topic instanceof AMQTopic))
+ if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
{
throw new javax.jms.InvalidDestinationException(
"Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
+ topic.getClass().getName());
}
- return (AMQTopic) topic;
+ return topic;
}
- protected AMQTopic checkValidTopic(Topic topic) throws JMSException
+ protected Topic checkValidTopic(Topic topic) throws JMSException
{
return checkValidTopic(topic, false);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 6217cb534a..c573da9def 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import java.net.URISyntaxException;
+import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Topic;
@@ -95,39 +96,47 @@ public class AMQTopic extends AMQDestination implements Topic
super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys);
}
- public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+ public static AMQTopic createDurableTopic(Topic topic, String subscriptionName, AMQConnection connection)
throws JMSException
{
- if (topic.getDestSyntax() == DestSyntax.ADDR)
+ if (topic instanceof AMQDestination && topic instanceof javax.jms.Topic)
{
- try
+ AMQDestination qpidTopic = (AMQDestination)topic;
+ if (qpidTopic.getDestSyntax() == DestSyntax.ADDR)
{
- AMQTopic t = new AMQTopic(topic.getAddress());
- AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
- // link is never null if dest was created using an address string.
- t.getLink().setName(queueName.asString());
- t.getSourceNode().setAutoDelete(false);
- t.getSourceNode().setDurable(true);
-
- // The legacy fields are also populated just in case.
- t.setQueueName(queueName);
- t.setAutoDelete(false);
- t.setDurable(true);
- return t;
+ try
+ {
+ AMQTopic t = new AMQTopic(qpidTopic.getAddress());
+ AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
+ // link is never null if dest was created using an address string.
+ t.getLink().setName(queueName.asString());
+ t.getSourceNode().setAutoDelete(false);
+ t.getSourceNode().setDurable(true);
+
+ // The legacy fields are also populated just in case.
+ t.setQueueName(queueName);
+ t.setAutoDelete(false);
+ t.setDurable(true);
+ return t;
+ }
+ catch(Exception e)
+ {
+ JMSException ex = new JMSException("Error creating durable topic");
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
}
- catch(Exception e)
+ else
{
- JMSException ex = new JMSException("Error creating durable topic");
- ex.initCause(e);
- ex.setLinkedException(e);
- throw ex;
+ return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false,
+ getDurableTopicQueueName(subscriptionName, connection),
+ true);
}
}
else
{
- return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false,
- getDurableTopicQueueName(subscriptionName, connection),
- true);
+ throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Topic");
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 51589c705f..dd86ffc4da 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -31,12 +31,18 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
import javax.naming.Context;
import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
@@ -796,4 +802,46 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
}
}
+
+ public void testQueueReceiversAndTopicSubscriber() throws Exception
+ {
+ Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}");
+ Topic topic = new AMQAnyDestination("ADDR:amq.topic/test");
+
+ QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueReceiver receiver = qSession.createReceiver(queue);
+
+ TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = tSession.createSubscriber(topic);
+
+ Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue"));
+ prod1.send(ssn.createTextMessage("test1"));
+
+ MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test"));
+ prod2.send(ssn.createTextMessage("test2"));
+
+ Message msg1 = receiver.receive();
+ assertNotNull(msg1);
+ assertEquals("test1",((TextMessage)msg1).getText());
+
+ Message msg2 = sub.receive();
+ assertNotNull(msg2);
+ assertEquals("test2",((TextMessage)msg2).getText());
+ }
+
+ public void testDurableSubscriber() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Topic topic = ssn.createTopic("news.us");
+
+ MessageConsumer cons = ssn.createDurableSubscriber(topic, "my-sub");
+ MessageProducer prod = ssn.createProducer(topic);
+
+ Message m = ssn.createTextMessage("A");
+ prod.send(m);
+ Message msg = cons.receive(1000);
+ assertNotNull(msg);
+ assertEquals("A",((TextMessage)msg).getText());
+ }
}