summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java188
1 files changed, 104 insertions, 84 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index d34290e007..c058ceb624 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -98,6 +98,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -976,7 +977,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, isTopic(destination), null, null,
isBrowseOnlyDestination(destination), false);
}
@@ -984,7 +985,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, isTopic(destination),
messageSelector, null, isBrowseOnlyDestination(destination), false);
}
@@ -993,7 +994,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, isTopic(destination),
messageSelector, null, isBrowseOnlyDestination(destination), false);
}
@@ -1034,30 +1035,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkNotClosed();
Topic origTopic = checkValidTopic(topic, true);
- AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
- if (dest.getDestSyntax() == DestSyntax.ADDR &&
- !dest.isAddressResolved())
+ // The check valid Topic will throw an exception if topic is not an instanceof
+ // AMQTopic or AddressBasedTopc.
+ Topic dest;
+ if (topic instanceof AMQTopic)
{
- try
- {
- handleAddressBasedDestination(dest,false,true);
- if (dest.getAddressType() != AMQDestination.TOPIC_TYPE)
- {
- throw new JMSException("Durable subscribers can only be created for Topics");
- }
- dest.getSourceNode().setDurable(true);
- }
- catch(AMQException e)
- {
- JMSException ex = new JMSException("Error when verifying destination");
- ex.initCause(e);
- ex.setLinkedException(e);
- throw ex;
- }
- catch(TransportException e)
- {
- throw toJMSException("Error when verifying destination", e);
- }
+ dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+ }
+ else
+ {
+ dest = AddressBasedTopic.createDurableTopic(origTopic, name, _connection, this);
}
String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
@@ -1071,7 +1058,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (subscriber == null)
{
// After the address is resolved routing key will not be null.
- AMQShortString topicName = dest.getRoutingKey();
+ AMQShortString topicName = new AMQShortString(dest.getTopicName());
if (_strictAMQP)
{
@@ -1085,7 +1072,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
+ "' for creation durableSubscriber. Requesting queue deletion regardless.");
}
- deleteQueue(dest.getAMQQueueName());
+ if (topic instanceof AMQTopic)
+ {
+ deleteQueue(((AMQTopic)dest).getAMQQueueName());
+ }
+ else
+ {
+ ((AddressBasedTopic)topic).deleteSubscription(this);
+ }
}
else
{
@@ -1099,13 +1093,34 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
// says we must trash the subscription.
- boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
+ AMQShortString exchangeName;
+ AMQShortString queueName;
+
+ if (topic instanceof AMQTopic)
+ {
+ exchangeName = ((AMQTopic)dest).getExchangeName();
+ queueName = ((AMQTopic)dest).getAMQQueueName();
+
+ }
+ else
+ {
+ exchangeName = new AMQShortString(((AddressBasedTopic)topic).getAddress().getName());
+ queueName = new AMQShortString(((AddressBasedTopic)topic).getAddress().getSubject());
+ }
+ boolean isQueueBound = isQueueBound(exchangeName, queueName);
boolean isQueueBoundForTopicAndSelector =
- isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args);
+ isQueueBound(exchangeName.asString(), queueName.asString(), topicName.asString(), args);
if (isQueueBound && !isQueueBoundForTopicAndSelector)
{
- deleteQueue(dest.getAMQQueueName());
+ if (topic instanceof AMQTopic)
+ {
+ deleteQueue(((AMQTopic)dest).getAMQQueueName());
+ }
+ else
+ {
+ ((AddressBasedTopic)topic).deleteSubscription(this);
+ }
}
}
}
@@ -1217,36 +1232,34 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public Queue createQueue(String queueName) throws JMSException
{
checkNotClosed();
- try
+ DestSyntax syntax = AMQDestination.getDestType(queueName);
+ queueName = AMQDestination.stripSyntaxPrefix(queueName);
+ if (syntax == AMQDestination.DestSyntax.BURL)
{
- if (queueName.indexOf('/') == -1 && queueName.indexOf(';') == -1)
+ if (queueName.indexOf('/') == -1)
{
- DestSyntax syntax = AMQDestination.getDestType(queueName);
- if (syntax == AMQDestination.DestSyntax.BURL)
- {
- // For testing we may want to use the prefix
- return new AMQQueue(getDefaultQueueExchangeName(),
- new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName)));
- }
- else
- {
- AMQQueue queue = new AMQQueue(queueName);
- return queue;
-
- }
+ return new AMQQueue(getDefaultQueueExchangeName(),
+ new AMQShortString(queueName));
}
else
{
- return new AMQQueue(queueName);
+ try
+ {
+ return new AMQQueue(new AMQBindingURL(queueName));
+ }
+ catch (URISyntaxException urlse)
+ {
+ _logger.error("", urlse);
+ JMSException jmse = new JMSException(urlse.getReason());
+ jmse.setLinkedException(urlse);
+ jmse.initCause(urlse);
+ throw jmse;
+ }
}
}
- catch (URISyntaxException urlse)
+ else
{
- _logger.error("", urlse);
- JMSException jmse = new JMSException(urlse.getReason());
- jmse.setLinkedException(urlse);
- jmse.initCause(urlse);
- throw jmse;
+ return new AddressBasedQueue(queueName);
}
}
@@ -1511,36 +1524,39 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public Topic createTopic(String topicName) throws JMSException
{
checkNotClosed();
- try
+ DestSyntax syntax = AMQDestination.getDestType(topicName);
+ topicName = AMQDestination.stripSyntaxPrefix(topicName);
+ if (syntax == AMQDestination.DestSyntax.BURL)
{
- if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1)
+ if (topicName.indexOf('/') == -1)
+ {
+ return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
+ }
+ else
{
- DestSyntax syntax = AMQDestination.getDestType(topicName);
- // for testing we may want to use the prefix to indicate our choice.
- topicName = AMQDestination.stripSyntaxPrefix(topicName);
- if (syntax == AMQDestination.DestSyntax.BURL)
+ try
{
- return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
+ return new AMQTopic(new AMQBindingURL(topicName));
}
- else
+ catch (URISyntaxException urlse)
{
- return new AMQTopic("ADDR:" + getDefaultTopicExchangeName() + "/" + topicName);
+ _logger.error("", urlse);
+ JMSException jmse = new JMSException(urlse.getReason());
+ jmse.setLinkedException(urlse);
+ jmse.initCause(urlse);
+ throw jmse;
}
}
- else
- {
- return new AMQTopic(topicName);
- }
-
}
- catch (URISyntaxException urlse)
+ else
{
- _logger.error("", urlse);
- JMSException jmse = new JMSException(urlse.getReason());
- jmse.setLinkedException(urlse);
- jmse.initCause(urlse);
- throw jmse;
- }
+ if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1)
+ {
+ topicName = getDefaultTopicExchangeName() + "/" + topicName;
+ }
+
+ return new AddressBasedTopic(topicName);
+ }
}
public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
@@ -2463,7 +2479,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
("Cannot create a durable subscription with a temporary topic: " + topic);
}
- if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
+ if (!(topic instanceof AMQTopic || topic instanceof AddressBasedTopic))
{
throw new javax.jms.InvalidDestinationException(
"Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
@@ -2872,11 +2888,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
AMQProtocolHandler protocolHandler = getProtocolHandler();
- if (amqd.getDestSyntax() == DestSyntax.ADDR)
- {
- handleAddressBasedDestination(amqd,true,nowait);
- }
- else
+ if (amqd.getDestSyntax() == DestSyntax.BURL)
{
if (DECLARE_EXCHANGES)
{
@@ -2932,10 +2944,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
throw new AMQException(null, "Fail-over exception interrupted basic consume.", e);
}
}
-
- public abstract void handleAddressBasedDestination(AMQDestination dest,
- boolean isConsumer,
- boolean noWait) throws AMQException;
private void registerProducer(long producerId, MessageProducer producer)
{
@@ -3565,4 +3573,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_logger.debug("Rollback mark is set to " + _rollbackMark.get());
}
}
+
+ private boolean isTopic(Destination dest)
+ {
+ if (dest instanceof AMQDestination)
+ {
+ return ((AMQDestination)dest).isTopic();
+ }
+ else
+ {
+ return dest instanceof Topic;
+ }
+ }
}