diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 131 |
1 files changed, 73 insertions, 58 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 5a16a148cb..8e93b19eea 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -25,9 +25,10 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.JMSStreamMessage; +import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.*; @@ -38,7 +39,6 @@ import org.apache.qpid.url.URLSyntaxException; import javax.jms.*; import javax.jms.IllegalStateException; - import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; @@ -287,7 +287,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public BytesMessage createBytesMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -303,7 +303,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MapMessage createMapMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -319,7 +319,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public javax.jms.Message createMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -335,7 +335,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -351,7 +351,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage(Serializable object) throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -403,7 +403,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage(String text) throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); try @@ -473,7 +473,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { //Ensure we only try and close an open session. if (!_closed.getAndSet(true)) @@ -493,7 +493,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (AMQException e) { - throw new JMSException("Error closing session: " + e); + JMSException jmse = new JMSException("Error closing session: " + e); + jmse.setLinkedException(e); + throw jmse; } finally { @@ -536,7 +538,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request @@ -747,7 +749,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createQueueReceiver(Destination destination) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); return new QueueReceiverAdaptor(dest, consumer); @@ -763,7 +765,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector); @@ -772,20 +774,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageConsumer createConsumer(Destination destination) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector); } @@ -795,7 +797,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean exclusive, String selector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null); } @@ -807,7 +809,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean exclusive, String selector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); } @@ -818,7 +820,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector, FieldTable rawSelector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector); } @@ -831,7 +833,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector, FieldTable rawSelector) throws JMSException { - checkValidDestination(destination); + checkValidDestination(destination); return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector); } @@ -963,7 +965,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public Queue createQueue(String queueName) throws JMSException { - checkNotClosed(); + checkNotClosed(); if (queueName.indexOf('/') == -1) { return new AMQQueue(queueName); @@ -993,7 +995,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createReceiver(Queue queue) throws JMSException { - checkNotClosed(); + checkNotClosed(); AMQQueue dest = (AMQQueue) queue; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); @@ -1009,7 +1011,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { - checkNotClosed(); + checkNotClosed(); AMQQueue dest = (AMQQueue) queue; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector); @@ -1018,14 +1020,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public QueueSender createSender(Queue queue) throws JMSException { - checkNotClosed(); + checkNotClosed(); //return (QueueSender) createProducer(queue); return new QueueSenderAdapter(createProducer(queue), queue); } public Topic createTopic(String topicName) throws JMSException { - checkNotClosed(); + checkNotClosed(); if (topicName.indexOf('/') == -1) { @@ -1056,8 +1058,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public TopicSubscriber createSubscriber(Topic topic) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); + checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1073,8 +1075,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); + checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } @@ -1088,8 +1090,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); + checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1100,8 +1102,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); + checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); return new TopicSubscriberAdaptor(dest, consumer); @@ -1109,44 +1111,51 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicPublisher createPublisher(Topic topic) throws JMSException { - checkNotClosed(); - checkValidTopic(topic); - //return (TopicPublisher) createProducer(topic); - return new TopicPublisherAdapter(createProducer(topic), topic); + checkNotClosed(); + return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic); } public QueueBrowser createBrowser(Queue queue) throws JMSException { - checkNotClosed(); - checkValidQueue(queue); + checkNotClosed(); + checkValidQueue(queue); throw new UnsupportedOperationException("Queue browsing not supported"); } public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { - checkNotClosed(); - checkValidQueue(queue); + checkNotClosed(); + checkValidQueue(queue); throw new UnsupportedOperationException("Queue browsing not supported"); } public TemporaryQueue createTemporaryQueue() throws JMSException { - checkNotClosed(); + checkNotClosed(); return new AMQTemporaryQueue(); } public TemporaryTopic createTemporaryTopic() throws JMSException { - checkNotClosed(); + checkNotClosed(); return new AMQTemporaryTopic(); } public void unsubscribe(String name) throws JMSException { - checkNotClosed(); - - //send a queue.delete for the subscription + checkNotClosed(); + String queue = _connection.getClientID() + ":" + name; + + AMQFrame queueDeclareFrame = QueueDeclareBody.createAMQFrame(_channelId,0,queue,true,false, false, false, true, null); + + try { + AMQMethodEvent event = _connection.getProtocolHandler().syncWrite(queueDeclareFrame,QueueDeclareOkBody.class); + // if this method doen't throw an exception means we have received a queue declare ok. + } catch (AMQException e) { + throw new javax.jms.InvalidDestinationException("This destination doesn't exist"); + } + //send a queue.delete for the subscription AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true); _connection.getProtocolHandler().writeFrame(frame); } @@ -1350,21 +1359,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /* * I could have combined the last 3 methods, but this way it improves readability */ - private void checkValidTopic(Topic topic) throws InvalidDestinationException{ - if (topic == null){ - throw new javax.jms.InvalidDestinationException("Invalid Topic"); - } + private void checkValidTopic(Topic topic) throws InvalidDestinationException + { + if (topic == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Topic"); + } } - private void checkValidQueue(Queue queue) throws InvalidDestinationException{ - if (queue == null){ - throw new javax.jms.InvalidDestinationException("Invalid Queue"); - } + private void checkValidQueue(Queue queue) throws InvalidDestinationException + { + if (queue == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } } - private void checkValidDestination(Destination destination) throws InvalidDestinationException{ - if (destination == null){ - throw new javax.jms.InvalidDestinationException("Invalid Queue"); - } + private void checkValidDestination(Destination destination) throws InvalidDestinationException + { + if (destination == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } } } |