summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java131
1 files changed, 73 insertions, 58 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 5a16a148cb..8e93b19eea 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -25,9 +25,10 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.*;
@@ -38,7 +39,6 @@ import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
import javax.jms.IllegalStateException;
-
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -287,7 +287,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public BytesMessage createBytesMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -303,7 +303,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MapMessage createMapMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -319,7 +319,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public javax.jms.Message createMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -335,7 +335,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -351,7 +351,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -403,7 +403,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TextMessage createTextMessage(String text) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -473,7 +473,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
@@ -493,7 +493,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
catch (AMQException e)
{
- throw new JMSException("Error closing session: " + e);
+ JMSException jmse = new JMSException("Error closing session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
}
finally
{
@@ -536,7 +538,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void closed(Throwable e)
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
@@ -747,7 +749,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
return new QueueReceiverAdaptor(dest, consumer);
@@ -763,7 +765,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(destination, messageSelector);
@@ -772,20 +774,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
}
@@ -795,7 +797,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean exclusive,
String selector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
}
@@ -807,7 +809,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean exclusive,
String selector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
}
@@ -818,7 +820,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector,
FieldTable rawSelector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
selector, rawSelector);
}
@@ -831,7 +833,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector,
FieldTable rawSelector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
selector, rawSelector);
}
@@ -963,7 +965,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public Queue createQueue(String queueName) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
if (queueName.indexOf('/') == -1)
{
return new AMQQueue(queueName);
@@ -993,7 +995,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
@@ -1009,7 +1011,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(dest, messageSelector);
@@ -1018,14 +1020,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public QueueSender createSender(Queue queue) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
//return (QueueSender) createProducer(queue);
return new QueueSenderAdapter(createProducer(queue), queue);
}
public Topic createTopic(String topicName) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
if (topicName.indexOf('/') == -1)
{
@@ -1056,8 +1058,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
+ checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1073,8 +1075,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
+ checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
@@ -1088,8 +1090,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
+ checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1100,8 +1102,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
+ checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
return new TopicSubscriberAdaptor(dest, consumer);
@@ -1109,44 +1111,51 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
- //return (TopicPublisher) createProducer(topic);
- return new TopicPublisherAdapter(createProducer(topic), topic);
+ checkNotClosed();
+ return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
}
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
- checkNotClosed();
- checkValidQueue(queue);
+ checkNotClosed();
+ checkValidQueue(queue);
throw new UnsupportedOperationException("Queue browsing not supported");
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
- checkNotClosed();
- checkValidQueue(queue);
+ checkNotClosed();
+ checkValidQueue(queue);
throw new UnsupportedOperationException("Queue browsing not supported");
}
public TemporaryQueue createTemporaryQueue() throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
return new AMQTemporaryQueue();
}
public TemporaryTopic createTemporaryTopic() throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
return new AMQTemporaryTopic();
}
public void unsubscribe(String name) throws JMSException
{
- checkNotClosed();
-
- //send a queue.delete for the subscription
+ checkNotClosed();
+
String queue = _connection.getClientID() + ":" + name;
+
+ AMQFrame queueDeclareFrame = QueueDeclareBody.createAMQFrame(_channelId,0,queue,true,false, false, false, true, null);
+
+ try {
+ AMQMethodEvent event = _connection.getProtocolHandler().syncWrite(queueDeclareFrame,QueueDeclareOkBody.class);
+ // if this method doen't throw an exception means we have received a queue declare ok.
+ } catch (AMQException e) {
+ throw new javax.jms.InvalidDestinationException("This destination doesn't exist");
+ }
+ //send a queue.delete for the subscription
AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true);
_connection.getProtocolHandler().writeFrame(frame);
}
@@ -1350,21 +1359,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- private void checkValidTopic(Topic topic) throws InvalidDestinationException{
- if (topic == null){
- throw new javax.jms.InvalidDestinationException("Invalid Topic");
- }
+ private void checkValidTopic(Topic topic) throws InvalidDestinationException
+ {
+ if (topic == null)
+ {
+ throw new javax.jms.InvalidDestinationException("Invalid Topic");
+ }
}
- private void checkValidQueue(Queue queue) throws InvalidDestinationException{
- if (queue == null){
- throw new javax.jms.InvalidDestinationException("Invalid Queue");
- }
+ private void checkValidQueue(Queue queue) throws InvalidDestinationException
+ {
+ if (queue == null)
+ {
+ throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ }
}
- private void checkValidDestination(Destination destination) throws InvalidDestinationException{
- if (destination == null){
- throw new javax.jms.InvalidDestinationException("Invalid Queue");
- }
+ private void checkValidDestination(Destination destination) throws InvalidDestinationException
+ {
+ if (destination == null)
+ {
+ throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ }
}
}