summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java61
1 files changed, 44 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 3973b5dd71..7ab26f3b47 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1312,7 +1312,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
checkNotClosed();
if (queueName.indexOf('/') == -1)
{
- return new AMQQueue(queueName);
+ return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName));
}
else
{
@@ -1330,6 +1330,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ public AMQShortString getDefaultQueueExchangeName()
+ {
+ return _connection.getDefaultQueueExchangeName();
+ }
+
/**
* Creates a QueueReceiver wrapping a MessageConsumer
*
@@ -1379,7 +1384,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (topicName.indexOf('/') == -1)
{
- return new AMQTopic(new AMQShortString(topicName));
+ return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName));
}
else
{
@@ -1397,6 +1402,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ public AMQShortString getDefaultTopicExchangeName()
+ {
+ return _connection.getDefaultTopicExchangeName();
+ }
+
/**
* Creates a non-durable subscriber
*
@@ -1409,8 +1419,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
- checkValidTopic(topic);
- AMQTopic dest = new AMQTopic(topic.getTopicName());
+ AMQTopic dest = checkValidTopic(topic);
+ //AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1428,16 +1438,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
- checkValidTopic(topic);
- AMQTopic dest = new AMQTopic(topic.getTopicName());
+ AMQTopic dest = checkValidTopic(topic);
+ //AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
checkNotClosed();
- checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
+ AMQTopic origTopic = checkValidTopic(topic);
+ AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
if (subscriber != null)
{
@@ -1464,8 +1474,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// if the queue is bound to the exchange but NOT for this topic, then the JMS spec
// says we must trash the subscription.
- if (isQueueBound(dest.getAMQQueueName()) &&
- !isQueueBound(dest.getAMQQueueName(), topicName))
+ if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) &&
+ !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName))
{
deleteQueue(dest.getAMQQueueName());
}
@@ -1556,7 +1566,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection)))
+ if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
{
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
@@ -1567,17 +1577,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- boolean isQueueBound(AMQShortString queueName) throws JMSException
+ boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
{
- return isQueueBound(queueName, null);
+ return isQueueBound(exchangeName, queueName, null);
}
- boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException
+ boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
{
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
+ exchangeName, // exchange
queueName, // queue
routingKey); // routingKey
AMQMethodEvent response = null;
@@ -1858,7 +1868,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- private void checkValidTopic(Topic topic) throws JMSException
+ private AMQTopic checkValidTopic(Topic topic) throws JMSException
{
if (topic == null)
{
@@ -1866,8 +1876,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this)
{
- throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
+ throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session");
}
+ if(!(topic instanceof AMQTopic))
+ {
+ throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName());
+ }
+ return (AMQTopic) topic;
}
private void checkValidQueue(Queue queue) throws InvalidDestinationException
@@ -1887,6 +1902,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
+ public AMQShortString getTemporaryTopicExchangeName()
+ {
+ return _connection.getTemporaryTopicExchangeName();
+ }
+
+ public AMQShortString getTemporaryQueueExchangeName()
+ {
+ return _connection.getTemporaryQueueExchangeName();
+ }
+
+
+
public int getTicket()
{
return _ticket;