diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 796 |
1 files changed, 405 insertions, 391 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 48c4e3e3e6..efc5982dac 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,50 +20,8 @@ */ package org.apache.qpid.client; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.jms.TransactionRolledBackException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQDisconnectedException; @@ -89,7 +47,7 @@ import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.filter.MessageFilter; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; @@ -98,8 +56,27 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * <p/><table id="crc"><caption>CRC Card</caption> @@ -119,150 +96,65 @@ import org.slf4j.LoggerFactory; */ public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap<C extends BasicMessageConsumer> - { - private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>(); - - public C get(int id) - { - if ((id & 0xFFFFFFF0) == 0) - { - return (C) _fastAccessConsumers[id]; - } - else - { - return _slowAccessConsumers.get(id); - } - } - - public C put(int id, C consumer) - { - C oldVal; - if ((id & 0xFFFFFFF0) == 0) - { - oldVal = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = consumer; - } - else - { - oldVal = _slowAccessConsumers.put(id, consumer); - } - - return oldVal; - - } - - public C remove(int id) - { - C consumer; - if ((id & 0xFFFFFFF0) == 0) - { - consumer = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = null; - } - else - { - consumer = _slowAccessConsumers.remove(id); - } - - return consumer; - - } + /** Used for debugging. */ + private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); - public Collection<C> values() - { - ArrayList<C> values = new ArrayList<C>(); + /** System property to enable strict AMQP compliance. */ + public static final String STRICT_AMQP = "STRICT_AMQP"; - for (int i = 0; i < 16; i++) - { - if (_fastAccessConsumers[i] != null) - { - values.add((C) _fastAccessConsumers[i]); - } - } - values.addAll(_slowAccessConsumers.values()); + /** Strict AMQP default setting. */ + public static final String STRICT_AMQP_DEFAULT = "false"; - return values; - } + /** System property to enable failure if strict AMQP compliance is violated. */ + public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; - public void clear() - { - _slowAccessConsumers.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessConsumers[i] = null; - } - } - } + /** Strickt AMQP failure default. */ + public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; - final AMQSession<C, P> _thisSession = this; - - /** Used for debugging. */ - private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); + /** System property to enable immediate message prefetching. */ + public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; - /** - * The default value for immediate flag used by producers created by this session is false. That is, a consumer does - * not need to be attached to a queue. - */ - protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); + /** Immediate message prefetch default. */ + public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - /** - * The default value for mandatory flag used by producers created by this session is true. That is, server will not - * silently drop messages where no queue is connected to the exchange for the message. - */ - protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; /** * The period to wait while flow controlled before sending a log message confirming that the session is still * waiting on flow control being revoked */ - protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L); /** * The period to wait while flow controlled before declaring a failure */ - public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; - protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure", + private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure", DEFAULT_FLOW_CONTROL_WAIT_FAILURE); - protected final boolean DECLARE_QUEUES = + private final boolean _delareQueues = Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); - protected final boolean DECLARE_EXCHANGES = + private final boolean _declareExchanges = Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); - - protected final boolean USE_AMQP_ENCODED_MAP_MESSAGE; - - /** System property to enable strict AMQP compliance. */ - public static final String STRICT_AMQP = "STRICT_AMQP"; - - /** Strict AMQP default setting. */ - public static final String STRICT_AMQP_DEFAULT = "false"; - /** System property to enable failure if strict AMQP compliance is violated. */ - public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; - - /** Strickt AMQP failure default. */ - public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; - - /** System property to enable immediate message prefetching. */ - public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; + private final boolean _useAMQPEncodedMapMessage; - /** Immediate message prefetch default. */ - public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; + /** + * Flag indicating to start dispatcher as a daemon thread + */ + protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER); /** The connection to which this session belongs. */ - protected AMQConnection _connection; + private AMQConnection _connection; /** Used to indicate whether or not this is a transactional session. */ - protected final boolean _transacted; + private final boolean _transacted; /** Holds the sessions acknowledgement mode. */ - protected final int _acknowledgeMode; + private final int _acknowledgeMode; /** Holds this session unique identifier, used to distinguish it from other sessions. */ - protected int _channelId; + private int _channelId; private int _ticket; @@ -278,55 +170,30 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Used to indicate that this session has been started at least once. */ private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); - /** - * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only - * keeps a record of subscriptions which have been created in the current instance. It does not remember - * subscriptions between executions of the client. - */ - protected final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions = + private final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions = new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>(); - /** - * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked - * up in the {@link #_subscriptions} map. - */ - protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>(); + private final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>(); - /** - * Locks to keep access to subscriber details atomic. - * <p> - * Added for QPID2418 - */ - protected final Lock _subscriberDetails = new ReentrantLock(true); - protected final Lock _subscriberAccess = new ReentrantLock(true); + private final Lock _subscriberDetails = new ReentrantLock(true); + private final Lock _subscriberAccess = new ReentrantLock(true); - /** - * Used to hold incoming messages. - * - * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue. - */ - protected final FlowControllingBlockingQueue _queue; + private final FlowControllingBlockingQueue _queue; - /** Holds the highest received delivery tag. */ - protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1); + private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); - /** Pre-fetched message tags */ - protected ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>(); + private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>(); - /** All the not yet acknowledged message tags */ - protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); + private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); - /** All the delivered message tags */ - protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); + private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); - /** Holds the dispatcher thread for this session. */ - protected Dispatcher _dispatcher; + private volatile Dispatcher _dispatcher; - protected Thread _dispatcherThread; + private volatile Thread _dispatcherThread; - /** Holds the message factory factory for this session. */ - protected MessageFactoryRegistry _messageFactoryRegistry; + private MessageFactoryRegistry _messageFactoryRegistry; /** Holds all of the producers created by this session, keyed by their unique identifiers. */ private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); @@ -337,11 +204,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ private int _nextTag = 1; - /** - * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right - * consumer. - */ - protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); + private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); /** * Contains a list of consumers which have been removed but which might still have @@ -367,10 +230,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ private volatile boolean _sessionInRecovery; - /** - * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of - * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). - */ private volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ @@ -388,28 +247,163 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ private final Object _suspensionLock = new Object(); - /** - * Used to ensure that only the first call to start the dispatcher can unsuspend the channel. - * - * @todo This is accessed only within a synchronized method, so does not need to be atomic. - */ - protected final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); + private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); - /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */ - protected final boolean _immediatePrefetch; + private final boolean _immediatePrefetch; - /** Indicates that warnings should be generated on violations of the strict AMQP. */ - protected final boolean _strictAMQP; + private final boolean _strictAMQP; - /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */ - protected final boolean _strictAMQPFATAL; + private final boolean _strictAMQPFATAL; private final Object _messageDeliveryLock = new Object(); /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */ private boolean _dirty; /** Has failover occured on this session with outstanding actions to commit? */ private boolean _failedOverDirty; - + + /** Flow control */ + private FlowControlIndicator _flowControl = new FlowControlIndicator(); + + + + /** Holds the highest received delivery tag. */ + protected AtomicLong getHighestDeliveryTag() + { + return _highestDeliveryTag; + } + + /** Pre-fetched message tags */ + protected ConcurrentLinkedQueue<Long> getPrefetchedMessageTags() + { + return _prefetchedMessageTags; + } + + /** All the not yet acknowledged message tags */ + protected ConcurrentLinkedQueue<Long> getUnacknowledgedMessageTags() + { + return _unacknowledgedMessageTags; + } + + /** All the delivered message tags */ + protected ConcurrentLinkedQueue<Long> getDeliveredMessageTags() + { + return _deliveredMessageTags; + } + + /** Holds the dispatcher thread for this session. */ + protected Dispatcher getDispatcher() + { + return _dispatcher; + } + + protected Thread getDispatcherThread() + { + return _dispatcherThread; + } + + /** Holds the message factory factory for this session. */ + protected MessageFactoryRegistry getMessageFactoryRegistry() + { + return _messageFactoryRegistry; + } + + /** + * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right + * consumer. + */ + protected IdToConsumerMap<C> getConsumers() + { + return _consumers; + } + + protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup) + { + _usingDispatcherForCleanup = usingDispatcherForCleanup; + } + + /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */ + protected boolean isImmediatePrefetch() + { + return _immediatePrefetch; + } + + public static final class IdToConsumerMap<C extends BasicMessageConsumer> + { + private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; + private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>(); + + public C get(int id) + { + if ((id & 0xFFFFFFF0) == 0) + { + return (C) _fastAccessConsumers[id]; + } + else + { + return _slowAccessConsumers.get(id); + } + } + + public C put(int id, C consumer) + { + C oldVal; + if ((id & 0xFFFFFFF0) == 0) + { + oldVal = (C) _fastAccessConsumers[id]; + _fastAccessConsumers[id] = consumer; + } + else + { + oldVal = _slowAccessConsumers.put(id, consumer); + } + + return oldVal; + + } + + public C remove(int id) + { + C consumer; + if ((id & 0xFFFFFFF0) == 0) + { + consumer = (C) _fastAccessConsumers[id]; + _fastAccessConsumers[id] = null; + } + else + { + consumer = _slowAccessConsumers.remove(id); + } + + return consumer; + + } + + public Collection<C> values() + { + ArrayList<C> values = new ArrayList<C>(); + + for (int i = 0; i < 16; i++) + { + if (_fastAccessConsumers[i] != null) + { + values.add((C) _fastAccessConsumers[i]); + } + } + values.addAll(_slowAccessConsumers.values()); + + return values; + } + + public void clear() + { + _slowAccessConsumers.clear(); + for (int i = 0; i < 16; i++) + { + _fastAccessConsumers[i] = null; + } + } + } + private static final class FlowControlIndicator { private volatile boolean _flowControl = true; @@ -426,9 +420,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - /** Flow control */ - private FlowControlIndicator _flowControl = new FlowControlIndicator(); - /** * Creates a new session on a connection. * @@ -443,7 +434,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { - USE_AMQP_ENCODED_MAP_MESSAGE = con == null ? true : !con.isUseLegacyMapMessageFormat(); + _useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat(); _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); @@ -479,7 +470,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { // If the session has been closed don't waste time creating a thread to do // flow control - if (!(_thisSession.isClosed() || _thisSession.isClosing())) + if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing())) { // Only execute change if previous state // was False @@ -507,7 +498,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { // If the session has been closed don't waste time creating a thread to do // flow control - if (!(_thisSession.isClosed() || _thisSession.isClosing())) + if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing())) { // Only execute change if previous state // was true @@ -539,9 +530,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } // Add creation logging to tie in with the existing close logging - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Created session:" + this); + _logger.debug("Created session:" + this); } } @@ -730,17 +721,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private void close(long timeout, boolean sendClose) throws JMSException { - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - // StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); - _logger.info("Closing session: " + this); // + ":" - // Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); + _logger.debug("Closing session: " + this); } // Ensure we only try and close an open session. - if (!_closed.getAndSet(true)) + if (!setClosed()) { - _closing.set(true); + setClosing(true); synchronized (getFailoverMutex()) { // We must close down all producers and consumers in an orderly fashion. This is the only method @@ -808,7 +797,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (e instanceof AMQDisconnectedException) { - if (_dispatcher != null) + if (_dispatcherThread != null) { // Failover failed and ain't coming back. Knife the dispatcher. _dispatcherThread.interrupt(); @@ -817,9 +806,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } //if we don't have an exception then we can perform closing operations - _closing.set(e == null); + setClosing(e == null); - if (!_closed.getAndSet(true)) + if (!setClosed()) { synchronized (_messageDeliveryLock) { @@ -903,11 +892,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Flush any pending messages for this consumerTag if (_dispatcher != null) { - _logger.info("Dispatcher is not null"); + _logger.debug("Dispatcher is not null"); } else { - _logger.info("Dispatcher is null so created stopped dispatcher"); + _logger.debug("Dispatcher is null so created stopped dispatcher"); startDispatcherIfNecessary(true); } @@ -918,18 +907,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Just close the consumer // fixme the CancelOK is being processed before the arriving messages.. // The dispatcher is still to process them so the server sent in order but the client - // has yet to receive before the close comes in. - - // consumer.markClosed(); + // has yet to receive before the close comes in if (consumer.isAutoClose()) { // There is a small window where the message is between the two queues in the dispatcher. if (consumer.isClosed()) { - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Closing consumer:" + consumer.debugIdentity()); + _logger.debug("Closing consumer:" + consumer.debugIdentity()); } deregisterConsumer(consumer); @@ -953,6 +940,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return createBrowser(queue, null); } + /** + * Create a queue browser if the destination is a valid queue. + */ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { if (isStrictAMQP()) @@ -963,7 +953,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkNotClosed(); checkValidQueue(queue); - return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); + return new AMQQueueBrowser(this, queue, messageSelector); } protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) @@ -1043,7 +1033,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { try { - handleAddressBasedDestination(dest,false,true); + handleAddressBasedDestination(dest,false,noLocal,true); if (dest.getAddressType() != AMQDestination.TOPIC_TYPE) { throw new JMSException("Durable subscribers can only be created for Topics"); @@ -1099,6 +1089,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // possible to determine when querying the broker whether there are no arguments or just a non-matching selector // argument, as specifying null for the arguments when querying means they should not be checked at all args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + if(noLocal) + { + args.put(AMQPFilterTypes.NO_LOCAL.getValue().toString(), true); + } // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec // says we must trash the subscription. @@ -1159,7 +1153,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public MapMessage createMapMessage() throws JMSException { checkNotClosed(); - if (USE_AMQP_ENCODED_MAP_MESSAGE) + if (_useAMQPEncodedMapMessage) { AMQPEncodedMapMessage msg = new AMQPEncodedMapMessage(getMessageDelegateFactory()); msg.setAMQSession(this); @@ -1196,12 +1190,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public P createProducer(Destination destination) throws JMSException { - return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); + return createProducerImpl(destination, null, null); } public P createProducer(Destination destination, boolean immediate) throws JMSException { - return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); + return createProducerImpl(destination, null, immediate); } public P createProducer(Destination destination, boolean mandatory, boolean immediate) @@ -1600,7 +1594,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public MessageListener getMessageListener() throws JMSException { - // checkNotClosed(); return _messageListener; } @@ -1648,6 +1641,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return (counter != null) && (counter.get() != 0); } + /** Indicates that warnings should be generated on violations of the strict AMQP. */ public boolean isStrictAMQP() { return _strictAMQP; @@ -1690,7 +1684,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { AMQProtocolHandler protocolHandler = getProtocolHandler(); declareExchange(amqd, protocolHandler, false); - AMQShortString queueName = declareQueue(amqd, protocolHandler, false); + AMQShortString queueName = declareQueue(amqd, false); bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd); } @@ -1886,31 +1880,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void setMessageListener(MessageListener listener) throws JMSException { - // checkNotClosed(); - // - // if (_dispatcher != null && !_dispatcher.connectionStopped()) - // { - // throw new javax.njms.IllegalStateException("Attempt to set listener while session is started."); - // } - // - // // We are stopped - // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - // { - // BasicMessageConsumer consumer = i.next(); - // - // if (consumer.isReceiving()) - // { - // throw new javax.njms.IllegalStateException("Another thread is already receiving synchronously."); - // } - // } - // - // _messageListener = listener; - // - // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - // { - // i.next().setMessageListener(_messageListener); - // } - } /** @@ -2184,7 +2153,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ void markClosed() { - _closed.set(true); + setClosed(); _connection.deregisterSession(_channelId); markClosedProducersAndConsumers(); @@ -2199,7 +2168,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { if (Thread.currentThread() == _dispatcherThread) { - while (!_closed.get() && !_queue.isEmpty()) + while (!super.isClosed() && !_queue.isEmpty()) { Dispatchable disp; try @@ -2247,6 +2216,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + void drainDispatchQueue() + { + if (Thread.currentThread() == _dispatcherThread) + { + while (!super.isClosed() && !_queue.isEmpty()) + { + Dispatchable disp; + try + { + disp = (Dispatchable) _queue.take(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + // Check just in case _queue becomes empty, it shouldn't but + // better than an NPE. + if (disp == null) + { + _logger.debug("_queue became empty during sync."); + break; + } + + disp.dispatch(AMQSession.this); + } + } + else + { + startDispatcherIfNecessary(false); + + final CountDownLatch signal = new CountDownLatch(1); + + _queue.add(new Dispatchable() + { + public void dispatch(AMQSession ssn) + { + signal.countDown(); + } + }); + + try + { + signal.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + /** * Resubscribes all producers and consumers. This is called when performing failover. * @@ -2289,7 +2310,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ void start() throws AMQException { - // Check if the session has perviously been started and suspended, in which case it must be unsuspended. + // Check if the session has previously been started and suspended, in which case it must be unsuspended. if (_startedAtLeastOnce.getAndSet(true)) { suspendChannel(false); @@ -2323,7 +2344,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (AMQException e) { - _logger.info("Unsuspending channel threw an exception:" + e); + _logger.info("Unsuspending channel threw an exception:", e); } } } @@ -2346,12 +2367,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throw new Error("Error creating Dispatcher thread",e); } _dispatcherThread.setName("Dispatcher-Channel-" + _channelId); - _dispatcherThread.setDaemon(true); + _dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD); _dispatcher.setConnectionStopped(initiallyStopped); _dispatcherThread.start(); - if (_dispatcherLogger.isInfoEnabled()) + if (_dispatcherLogger.isDebugEnabled()) { - _dispatcherLogger.info(_dispatcherThread.getName() + " created"); + _dispatcherLogger.debug(_dispatcherThread.getName() + " created"); } } else @@ -2371,32 +2392,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - /* - * Binds the named queue, with the specified routing key, to the named exchange. - * - * <p/>Note that this operation automatically retries in the event of fail-over. - * - * @param queueName The name of the queue to bind. - * @param routingKey The routing key to bind the queue with. - * @param arguments Additional arguments. - * @param exchangeName The exchange to bind the queue on. - * - * @throws AMQException If the queue cannot be bound for any reason. - */ - /*private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) - throws AMQException, FailoverException - { - AMQFrame queueBind = - QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), ft, // arguments - amqd.getExchangeName(), // exchange - false, // nowait - queueName, // queue - amqd.getRoutingKey(), // routingKey - getTicket()); // ticket - - protocolHandler.syncWrite(queueBind, QueueBindOkBody.class); - }*/ - private void checkNotTransacted() throws JMSException { if (getTransacted()) @@ -2580,7 +2575,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @param queueName */ private void consumeFromQueue(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector) throws AMQException, FailoverException + AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException { int tagId = _nextTag++; @@ -2597,7 +2592,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId); + sendConsume(consumer, queueName, protocolHandler, nowait, tagId); } catch (AMQException e) { @@ -2608,9 +2603,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendConsume(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException; + AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException; - private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate) + private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate) throws JMSException { return new FailoverRetrySupport<P, JMSException>( @@ -2639,8 +2634,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic }, _connection).execute(); } - public abstract P createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final long producerId) throws JMSException; + public abstract P createMessageProducer(final Destination destination, final Boolean mandatory, + final Boolean immediate, final long producerId) throws JMSException; private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { @@ -2661,18 +2656,38 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public long getQueueDepth(final AMQDestination amqd) throws AMQException { - return new FailoverNoopSupport<Long, AMQException>( - new FailoverProtectedOperation<Long, AMQException>() - { - public Long execute() throws AMQException, FailoverException - { - return requestQueueDepth(amqd); - } - }, _connection).execute(); + return getQueueDepth(amqd, false); + } + /** + * Returns the number of messages currently queued by the given + * destination. Syncs session before receiving the queue depth if sync is + * set to true. + * + * @param amqd AMQ destination to get the depth value + * @param sync flag to sync session before receiving the queue depth + * @return queue depth + * @throws AMQException + */ + public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws AMQException + { + return new FailoverNoopSupport<Long, AMQException>(new FailoverProtectedOperation<Long, AMQException>() + { + public Long execute() throws AMQException, FailoverException + { + try + { + return requestQueueDepth(amqd, sync); + } + catch (TransportException e) + { + throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e); + } + } + }, _connection).execute(); } - protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException; + protected abstract Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException; /** * Declares the named exchange and type of exchange. @@ -2703,6 +2718,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException, FailoverException; + + void declareQueuePassive(AMQDestination queue) throws AMQException + { + declareQueue(queue,false,false,true); + } + /** * Declares a queue for a JMS destination. * @@ -2712,27 +2733,35 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * * <p/>Note that this operation automatically retries in the event of fail-over. * - * @param amqd The destination to declare as a queue. - * @param protocolHandler The protocol handler to communicate through. * + * @param amqd The destination to declare as a queue. * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of * the client. * + * + * * @throws AMQException If the queue cannot be declared for any reason. * @todo Verify the destiation is valid or throw an exception. * @todo Be aware of possible changes to parameter order as versions change. */ - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal) throws AMQException { - return declareQueue(amqd, protocolHandler, noLocal, false); + return declareQueue(amqd, noLocal, false); } - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal, final boolean nowait) + throws AMQException + { + return declareQueue(amqd, noLocal, nowait, false); + } + + protected AMQShortString declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException { - /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ + final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new FailoverNoopSupport<AMQShortString, AMQException>( new FailoverProtectedOperation<AMQShortString, AMQException>() { @@ -2744,7 +2773,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic amqd.setQueueName(protocolHandler.generateQueueName()); } - sendQueueDeclare(amqd, protocolHandler, nowait); + sendQueueDeclare(amqd, protocolHandler, nowait, passive); return amqd.getAMQQueueName(); } @@ -2752,7 +2781,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException; + final boolean nowait, boolean passive) throws AMQException, FailoverException; /** * Undeclares the specified queue. @@ -2882,18 +2911,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (amqd.getDestSyntax() == DestSyntax.ADDR) { - handleAddressBasedDestination(amqd,true,nowait); + handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait); } else { - if (DECLARE_EXCHANGES) + if (_declareExchanges) { declareExchange(amqd, protocolHandler, nowait); } - if (DECLARE_QUEUES || amqd.isNameRequired()) + if (_delareQueues || amqd.isNameRequired()) { - declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait); + declareQueue(amqd, consumer.isNoLocal(), nowait); } bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); } @@ -2916,24 +2945,24 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { suspendChannel(true); - _logger.info( + _logger.debug( "Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); } catch (AMQException e) { - _logger.info("Suspending channel threw an exception:" + e); + _logger.info("Suspending channel threw an exception:", e); } } } } else { - _logger.info("Immediately prefetching existing messages to new consumer."); + _logger.debug("Immediately prefetching existing messages to new consumer."); } try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelectorFilter()); + consumeFromQueue(consumer, queueName, protocolHandler, nowait); } catch (FailoverException e) { @@ -2943,6 +2972,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract void handleAddressBasedDestination(AMQDestination dest, boolean isConsumer, + boolean noLocal, boolean noWait) throws AMQException; private void registerProducer(long producerId, MessageProducer producer) @@ -2959,18 +2989,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers) { Iterator messages = _queue.iterator(); - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" + _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" + requeue); if (messages.hasNext()) { - _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")"); + _logger.debug("Checking all messages in _queue for Consumer tag(" + consumerTag + ")"); } else { - _logger.info("No messages in _queue to reject"); + _logger.debug("No messages in _queue to reject"); } } while (messages.hasNext()) @@ -3013,7 +3043,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private void resubscribeProducers() throws AMQException { ArrayList producers = new ArrayList(_producers.values()); - _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey + _logger.debug(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey for (Iterator it = producers.iterator(); it.hasNext();) { P producer = (P) it.next(); @@ -3103,7 +3133,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void setFlowControl(final boolean active) { _flowControl.setFlowControl(active); - _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); + if (_logger.isInfoEnabled()) + { + _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); + } } public void checkFlowControl() throws InterruptedException, JMSException @@ -3112,17 +3145,20 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic synchronized (_flowControl) { while (!_flowControl.getFlowControl() && - (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE) + (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure) : expiryTime) >= System.currentTimeMillis() ) { - _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD); - _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control"); + _flowControl.wait(_flowControlWaitPeriod); + if (_logger.isInfoEnabled()) + { + _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); + } } if(!_flowControl.getFlowControl()) { _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); - throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control"); + throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control"); } } @@ -3154,7 +3190,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final AtomicBoolean _closed = new AtomicBoolean(false); private final Object _lock = new Object(); - private String dispatcherID = "" + System.identityHashCode(this); + private final String dispatcherID = "" + System.identityHashCode(this); public Dispatcher() { @@ -3169,6 +3205,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } + private AtomicBoolean getClosed() + { + return _closed; + } + public void rejectPending(C consumer) { synchronized (_lock) @@ -3220,7 +3261,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic else { // should perhaps clear the _SQ here. - // consumer._synchronousQueue.clear(); consumer.clearReceiveQueue(); } @@ -3266,13 +3306,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void run() { - if (_dispatcherLogger.isInfoEnabled()) + if (_dispatcherLogger.isDebugEnabled()) { - _dispatcherLogger.info(_dispatcherThread.getName() + " started"); + _dispatcherLogger.debug(_dispatcherThread.getName() + " started"); } - UnprocessedMessage message; - // Allow disptacher to start stopped synchronized (_lock) { @@ -3284,7 +3322,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (InterruptedException e) { - // ignore + Thread.currentThread().interrupt(); } } } @@ -3299,12 +3337,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (InterruptedException e) { - // ignore + // ignored as run will exit immediately } - if (_dispatcherLogger.isInfoEnabled()) + if (_dispatcherLogger.isDebugEnabled()) { - _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + _thisSession); + _dispatcherLogger.debug(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this); } } @@ -3350,7 +3388,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (InterruptedException e) { - // pass + Thread.currentThread().interrupt(); } if (!(message instanceof CloseConsumerMessage) @@ -3425,7 +3463,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_logger.isDebugEnabled()) { _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag() - + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag)); + + " for closing consumer " + String.valueOf(consumer == null? null: consumer.getConsumerTag())); } rejectMessage(message, true); } @@ -3443,30 +3481,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract AMQMessageDelegateFactory getMessageDelegateFactory(); - /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, - boolean read) throws AMQException - { - getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), getProtocolMinorVersion(), active, exclusive, passive, read, realm, write), - new BlockingMethodFrameListener(_channelId) - { - - public boolean processMethod(int channelId, AMQMethodBody frame) // throws AMQException - { - if (frame instanceof AccessRequestOkBody) - { - setTicket(((AccessRequestOkBody) frame).getTicket()); - - return true; - } - else - { - return false; - } - } - }); - }*/ - private class SuspenderRunner implements Runnable { private AtomicBoolean _suspend; @@ -3484,7 +3498,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { // If the session has closed by the time we get here // then we should not attempt to write to the sesion/channel. - if (!(_thisSession.isClosed() || _thisSession.isClosing())) + if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing())) { suspendChannel(_suspend.get()); } @@ -3492,11 +3506,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (AMQException e) { - _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + _thisSession + " due to: " + e); + _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: ", e); if (_logger.isDebugEnabled()) { _logger.debug("Is the _queue empty?" + _queue.isEmpty()); - _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher._closed)); + _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher.getClosed())); } } } @@ -3510,7 +3524,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic @Override public boolean isClosed() { - return _closed.get() || _connection.isClosed(); + return super.isClosed() || _connection.isClosed(); } /** @@ -3522,12 +3536,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic @Override public boolean isClosing() { - return _closing.get()|| _connection.isClosing(); + return super.isClosing() || _connection.isClosing(); } public boolean isDeclareExchanges() { - return DECLARE_EXCHANGES; + return _declareExchanges; } JMSException toJMSException(String message, TransportException e) |