summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java322
1 files changed, 189 insertions, 133 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 25562cfff7..44c4e8987a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -70,7 +70,6 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
@@ -88,8 +87,6 @@ import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
@@ -97,7 +94,10 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -213,8 +213,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
- protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
-
/**
* The period to wait while flow controlled before sending a log message confirming that the session is still
* waiting on flow control being revoked
@@ -364,7 +362,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* Set when recover is called. This is to handle the case where recover() is called by application code during
* onMessage() processing to ensure that an auto ack is not sent.
*/
- private boolean _inRecovery;
+ private volatile boolean _sessionInRecovery;
+
+ /**
+ * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
+ * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
+ */
+ private volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
@@ -606,29 +610,36 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
*
* @throws IllegalStateException If the session is closed.
+ * @throws JMSException if there is a problem during acknowledge process.
*/
- public void acknowledge() throws IllegalStateException
+ public void acknowledge() throws IllegalStateException, JMSException
{
if (isClosed())
{
throw new IllegalStateException("Session is already closed");
}
- else if (hasFailedOver())
+ else if (hasFailedOverDirty())
{
+ //perform an implicit recover in this scenario
+ recover();
+
+ //notify the consumer
throw new IllegalStateException("has failed over");
}
- while (true)
+ try
{
- Long tag = _unacknowledgedMessageTags.poll();
- if (tag == null)
- {
- break;
- }
- acknowledgeMessage(tag, false);
+ acknowledgeImpl();
+ markClean();
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
}
}
+ protected abstract void acknowledgeImpl() throws JMSException;
+
/**
* Acknowledge one or many messages.
*
@@ -763,6 +774,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_logger.debug(
"Got FailoverException during channel close, ignored as channel already marked as closed.");
}
+ catch (TransportException e)
+ {
+ throw toJMSException("Error closing session:" + e.getMessage(), e);
+ }
finally
{
_connection.deregisterSession(_channelId);
@@ -833,51 +848,40 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
* not mean that the commit is known to have failed, merely that it is not known whether it
* failed or not.
- * @todo Be aware of possible changes to parameter order as versions change.
*/
public void commit() throws JMSException
{
checkTransacted();
- try
+ //Check that we are clean to commit.
+ if (_failedOverDirty)
{
- //Check that we are clean to commit.
- if (_failedOverDirty)
- {
- rollback();
+ rollback();
- throw new TransactionRolledBackException("Connection failover has occured since last send. " +
- "Forced rollback");
- }
-
-
- // Acknowledge all delivered messages
- while (true)
- {
- Long tag = _deliveredMessageTags.poll();
- if (tag == null)
- {
- break;
- }
+ throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
+ "The session transaction was rolled back.");
+ }
- acknowledgeMessage(tag, false);
- }
- // Commits outstanding messages and acknowledgments
- sendCommit();
+ try
+ {
+ commitImpl();
markClean();
}
catch (AMQException e)
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
+ throw new JMSAMQException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), e);
}
catch (FailoverException e)
{
throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
}
+ catch(TransportException e)
+ {
+ throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
+ }
}
- public abstract void sendCommit() throws AMQException, FailoverException;
-
+ protected abstract void commitImpl() throws AMQException, FailoverException, TransportException;
public void confirmConsumerCancelled(int consumerTag)
{
@@ -955,7 +959,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
}
- public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
+ protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
checkValidDestination(destination);
@@ -969,15 +973,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
- ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
- }
-
- public C createExclusiveConsumer(Destination destination) throws JMSException
- {
- checkValidDestination(destination);
-
- return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
- ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -985,7 +981,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
- messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ messageSelector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -994,16 +990,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
- messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
- }
-
- public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
- throws JMSException
- {
- checkValidDestination(destination);
-
- return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
- messageSelector, null, false, false);
+ messageSelector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
@@ -1011,23 +998,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
- boolean exclusive, String selector) throws JMSException
+ boolean exclusive, String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
- }
-
- public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
- String selector, FieldTable rawSelector) throws JMSException
- {
- checkValidDestination(destination);
-
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -1035,7 +1014,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()),
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, isBrowseOnlyDestination(destination),
false);
}
@@ -1071,6 +1050,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
ex.setLinkedException(e);
throw ex;
}
+ catch(TransportException e)
+ {
+ throw toJMSException("Error when verifying destination", e);
+ }
}
String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
@@ -1156,6 +1139,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return subscriber;
}
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e);
+ }
finally
{
_subscriberDetails.unlock();
@@ -1216,12 +1203,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return createProducerImpl(destination, mandatory, immediate);
}
- public P createProducer(Destination destination, boolean mandatory, boolean immediate,
- boolean waitUntilSent) throws JMSException
- {
- return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
- }
-
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
checkNotClosed();
@@ -1405,7 +1386,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkNotClosed();
- // return (QueueSender) createProducer(queue);
return new QueueSenderAdapter(createProducer(queue), queue);
}
@@ -1440,10 +1420,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
- Topic dest = checkValidTopic(topic);
+ checkValidTopic(topic);
- // AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
+ return new TopicSubscriberAdaptor<C>(topic,
+ createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false));
}
/**
@@ -1460,10 +1440,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
- Topic dest = checkValidTopic(topic);
+ checkValidTopic(topic);
- // AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
+ return new TopicSubscriberAdaptor<C>(topic,
+ createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal,
+ true, messageSelector, null, false, false));
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1565,10 +1546,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
abstract public void sync() throws AMQException;
- public int getAcknowledgeMode() throws JMSException
+ public int getAcknowledgeMode()
{
- checkNotClosed();
-
return _acknowledgeMode;
}
@@ -1628,10 +1607,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return _ticket;
}
- public boolean getTransacted() throws JMSException
+ public boolean getTransacted()
{
- checkNotClosed();
-
return _transacted;
}
@@ -1727,13 +1704,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Ensure that the session is not transacted.
checkNotTransacted();
- // flush any acks we are holding in the buffer.
- flushAcknowledgments();
-
- // this is set only here, and the before the consumer's onMessage is called it is set to false
- _inRecovery = true;
+
try
{
+ // flush any acks we are holding in the buffer.
+ flushAcknowledgments();
+
+ // this is only set true here, and only set false when the consumers preDeliver method is called
+ _sessionInRecovery = true;
boolean isSuspended = isSuspended();
@@ -1741,9 +1719,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
suspendChannel(true);
}
-
+
+ // Set to true to short circuit delivery of anything currently
+ //in the pre-dispatch queue.
+ _usingDispatcherForCleanup = true;
+
syncDispatchQueue();
-
+
+ // Set to false before sending the recover as 0-8/9/9-1 will
+ //send messages back before the recover completes, and we
+ //probably shouldn't clean those! ;-)
+ _usingDispatcherForCleanup = false;
+
if (_dispatcher != null)
{
_dispatcher.recover();
@@ -1752,10 +1739,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
sendRecover();
markClean();
-
- // Set inRecovery to false before you start message flow again again.
- _inRecovery = false;
-
+
if (!isSuspended)
{
suspendChannel(false);
@@ -1769,7 +1753,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
}
-
+ catch(TransportException e)
+ {
+ throw toJMSException("Recover failed: " + e.getMessage(), e);
+ }
}
protected abstract void sendRecover() throws AMQException, FailoverException;
@@ -1854,6 +1841,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e);
}
+ catch (TransportException e)
+ {
+ throw toJMSException("Failure to rollback:" + e.getMessage(), e);
+ }
}
}
@@ -1900,7 +1891,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
public void unsubscribe(String name) throws JMSException
{
- unsubscribe(name, false);
+ try
+ {
+ unsubscribe(name, false);
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e);
+ }
}
/**
@@ -1977,6 +1975,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkTemporaryDestination(destination);
+ if(!noConsume && isBrowseOnlyDestination(destination))
+ {
+ throw new InvalidDestinationException("The consumer being created is not 'noConsume'," +
+ "but a 'browseOnly' Destination has been supplied.");
+ }
+
final String messageSelector;
if (_strictAMQP && !((selector == null) || selector.equals("")))
@@ -2021,8 +2025,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// argument, as specifying null for the arguments when querying means they should not be checked at all
ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
- C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
- noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+ C consumer;
+ try
+ {
+ consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+ noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+ }
+ catch(TransportException e)
+ {
+ throw toJMSException("Exception while creating consumer: " + e.getMessage(), e);
+ }
if (_messageListener != null)
{
@@ -2059,7 +2071,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
ex.initCause(e);
throw ex;
}
-
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while registering consumer:" + e.getMessage(), e);
+ }
return consumer;
}
}, _connection).execute();
@@ -2124,7 +2139,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
boolean isInRecovery()
{
- return _inRecovery;
+ return _sessionInRecovery;
}
boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
@@ -2246,7 +2261,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void setInRecovery(boolean inRecovery)
{
- _inRecovery = inRecovery;
+ _sessionInRecovery = inRecovery;
}
boolean isStarted()
@@ -2585,15 +2600,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract void sendConsume(C consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
- private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
+ private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
throws JMSException
{
- return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
- }
-
- private P createProducerImpl(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent) throws JMSException
- {
return new FailoverRetrySupport<P, JMSException>(
new FailoverProtectedOperation<P, JMSException>()
{
@@ -2601,8 +2610,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkNotClosed();
long producerId = getNextProducerId();
- P producer = createMessageProducer(destination, mandatory,
- immediate, waitUntilSent, producerId);
+
+ P producer;
+ try
+ {
+ producer = createMessageProducer(destination, mandatory,
+ immediate, producerId);
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while creating producer:" + e.getMessage(), e);
+ }
+
registerProducer(producerId, producer);
return producer;
@@ -2611,7 +2630,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException;
+ final boolean immediate, final long producerId) throws JMSException;
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
@@ -2754,6 +2773,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ /**
+ * Undeclares the specified temporary queue/topic.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param amqQueue The name of the temporary destination to delete.
+ *
+ * @throws JMSException If the queue could not be deleted for any reason.
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException
+ {
+ deleteQueue(amqQueue.getAMQQueueName());
+ }
+
public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
private long getNextProducerId()
@@ -3009,6 +3043,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e);
}
+ catch (TransportException e)
+ {
+ throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e);
+ }
}
}
@@ -3047,21 +3085,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*
* @return boolean true if failover has occured.
*/
- public boolean hasFailedOver()
+ public boolean hasFailedOverDirty()
{
return _failedOverDirty;
}
- /**
- * Check to see if any message have been sent in this transaction and have not been commited.
- *
- * @return boolean true if a message has been sent but not commited
- */
- public boolean isDirty()
- {
- return _dirty;
- }
-
public void setTicket(int ticket)
{
_ticket = ticket;
@@ -3325,7 +3353,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
rejectMessage(message, true);
}
- else if (isInRecovery())
+ else if (_usingDispatcherForCleanup)
{
_unacknowledgedMessageTags.add(deliveryTag);
}
@@ -3486,4 +3514,32 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
return DECLARE_EXCHANGES;
}
+
+ JMSException toJMSException(String message, TransportException e)
+ {
+ int code = getErrorCode(e);
+ JMSException jmse = new JMSException(message, Integer.toString(code));
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ return jmse;
+ }
+
+ private int getErrorCode(TransportException e)
+ {
+ int code = AMQConstant.INTERNAL_ERROR.getCode();
+ if (e instanceof SessionException)
+ {
+ SessionException se = (SessionException) e;
+ if(se.getException() != null && se.getException().getErrorCode() != null)
+ {
+ code = se.getException().getErrorCode().getValue();
+ }
+ }
+ return code;
+ }
+
+ private boolean isBrowseOnlyDestination(Destination destination)
+ {
+ return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly());
+ }
}