diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 100 |
1 files changed, 32 insertions, 68 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 25562cfff7..1f940b62f0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -567,8 +567,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic close(-1); } - public abstract AMQException getLastException(); - public void checkNotClosed() throws JMSException { try @@ -577,20 +575,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (IllegalStateException ise) { - AMQException ex = getLastException(); - if (ex != null) - { - IllegalStateException ssnClosed = new IllegalStateException( - "Session has been closed", ex.getErrorCode().toString()); + // if the Connection has closed then we should throw any exception that has occurred that we were not waiting for + AMQStateManager manager = _connection.getProtocolHandler().getStateManager(); - ssnClosed.setLinkedException(ex); - ssnClosed.initCause(ex); - throw ssnClosed; - } - else + if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null) { - throw ise; + ise.setLinkedException(manager.getLastException()); + ise.initCause(ise.getLinkedException()); } + + throw ise; } } @@ -1049,29 +1043,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throws JMSException { checkNotClosed(); - Topic origTopic = checkValidTopic(topic, true); - + AMQTopic origTopic = checkValidTopic(topic, true); AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - if (dest.getDestSyntax() == DestSyntax.ADDR && - !dest.isAddressResolved()) - { - try - { - handleAddressBasedDestination(dest,false,true); - if (dest.getAddressType() != AMQDestination.TOPIC_TYPE) - { - throw new JMSException("Durable subscribers can only be created for Topics"); - } - dest.getSourceNode().setDurable(true); - } - catch(AMQException e) - { - JMSException ex = new JMSException("Error when verifying destination"); - ex.initCause(e); - ex.setLinkedException(e); - throw ex; - } - } String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; @@ -1083,9 +1056,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Not subscribed to this name in the current session if (subscriber == null) { - // After the address is resolved routing key will not be null. - AMQShortString topicName = dest.getRoutingKey(); - + AMQShortString topicName; + if (topic instanceof AMQTopic) + { + topicName = ((AMQTopic) topic).getRoutingKey(); + } else + { + topicName = new AMQShortString(topic.getTopicName()); + } + if (_strictAMQP) { if (_strictAMQPFATAL) @@ -1246,6 +1225,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic else { AMQQueue queue = new AMQQueue(queueName); + queue.setCreate(AddressOption.ALWAYS); return queue; } @@ -1327,8 +1307,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createQueueReceiver(Destination destination) throws JMSException { checkValidDestination(destination); - Queue dest = validateQueue(destination); - C consumer = (C) createConsumer(dest); + AMQQueue dest = (AMQQueue) destination; + C consumer = (C) createConsumer(destination); return new QueueReceiverAdaptor(dest, consumer); } @@ -1346,8 +1326,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); - Queue dest = validateQueue(destination); - C consumer = (C) createConsumer(dest, messageSelector); + AMQQueue dest = (AMQQueue) destination; + C consumer = (C) createConsumer(destination, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } @@ -1364,7 +1344,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createReceiver(Queue queue) throws JMSException { checkNotClosed(); - Queue dest = validateQueue(queue); + AMQQueue dest = (AMQQueue) queue; C consumer = (C) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); @@ -1383,23 +1363,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { checkNotClosed(); - Queue dest = validateQueue(queue); + AMQQueue dest = (AMQQueue) queue; C consumer = (C) createConsumer(dest, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } - - private Queue validateQueue(Destination dest) throws InvalidDestinationException - { - if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue) - { - return (Queue)dest; - } - else - { - throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue"); - } - } public QueueSender createSender(Queue queue) throws JMSException { @@ -1440,7 +1408,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkNotClosed(); - Topic dest = checkValidTopic(topic); + AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); @@ -1460,7 +1428,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkNotClosed(); - Topic dest = checkValidTopic(topic); + AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal)); @@ -2427,7 +2395,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /* * I could have combined the last 3 methods, but this way it improves readability */ - protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException + protected AMQTopic checkValidTopic(Topic topic, boolean durable) throws JMSException { if (topic == null) { @@ -2446,17 +2414,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ("Cannot create a durable subscription with a temporary topic: " + topic); } - if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic)) + if (!(topic instanceof AMQTopic)) { throw new javax.jms.InvalidDestinationException( "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName()); } - return topic; + return (AMQTopic) topic; } - protected Topic checkValidTopic(Topic topic) throws JMSException + protected AMQTopic checkValidTopic(Topic topic) throws JMSException { return checkValidTopic(topic, false); } @@ -2851,7 +2819,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait); } - bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); } AMQShortString queueName = amqd.getAMQQueueName(); @@ -2859,6 +2826,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // store the consumer queue name consumer.setQueuename(queueName); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); + // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) { @@ -3481,9 +3450,4 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { return _closing.get()|| _connection.isClosing(); } - - public boolean isDeclareExchanges() - { - return DECLARE_EXCHANGES; - } } |