summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java796
1 files changed, 405 insertions, 391 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 48c4e3e3e6..efc5982dac 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -20,50 +20,8 @@
*/
package org.apache.qpid.client;
-import java.io.Serializable;
-import java.net.URISyntaxException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.jms.TransactionRolledBackException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQDisconnectedException;
@@ -89,7 +47,7 @@ import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.filter.MessageFilter;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
@@ -98,8 +56,27 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* <p/><table id="crc"><caption>CRC Card</caption>
@@ -119,150 +96,65 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
- public static final class IdToConsumerMap<C extends BasicMessageConsumer>
- {
- private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
- private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
-
- public C get(int id)
- {
- if ((id & 0xFFFFFFF0) == 0)
- {
- return (C) _fastAccessConsumers[id];
- }
- else
- {
- return _slowAccessConsumers.get(id);
- }
- }
-
- public C put(int id, C consumer)
- {
- C oldVal;
- if ((id & 0xFFFFFFF0) == 0)
- {
- oldVal = (C) _fastAccessConsumers[id];
- _fastAccessConsumers[id] = consumer;
- }
- else
- {
- oldVal = _slowAccessConsumers.put(id, consumer);
- }
-
- return oldVal;
-
- }
-
- public C remove(int id)
- {
- C consumer;
- if ((id & 0xFFFFFFF0) == 0)
- {
- consumer = (C) _fastAccessConsumers[id];
- _fastAccessConsumers[id] = null;
- }
- else
- {
- consumer = _slowAccessConsumers.remove(id);
- }
-
- return consumer;
-
- }
+ /** Used for debugging. */
+ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
- public Collection<C> values()
- {
- ArrayList<C> values = new ArrayList<C>();
+ /** System property to enable strict AMQP compliance. */
+ public static final String STRICT_AMQP = "STRICT_AMQP";
- for (int i = 0; i < 16; i++)
- {
- if (_fastAccessConsumers[i] != null)
- {
- values.add((C) _fastAccessConsumers[i]);
- }
- }
- values.addAll(_slowAccessConsumers.values());
+ /** Strict AMQP default setting. */
+ public static final String STRICT_AMQP_DEFAULT = "false";
- return values;
- }
+ /** System property to enable failure if strict AMQP compliance is violated. */
+ public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
- public void clear()
- {
- _slowAccessConsumers.clear();
- for (int i = 0; i < 16; i++)
- {
- _fastAccessConsumers[i] = null;
- }
- }
- }
+ /** Strickt AMQP failure default. */
+ public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
- final AMQSession<C, P> _thisSession = this;
-
- /** Used for debugging. */
- private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
+ /** System property to enable immediate message prefetching. */
+ public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
- /**
- * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
- * not need to be attached to a queue.
- */
- protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+ /** Immediate message prefetch default. */
+ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
- /**
- * The default value for mandatory flag used by producers created by this session is true. That is, server will not
- * silently drop messages where no queue is connected to the exchange for the message.
- */
- protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+ public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
/**
* The period to wait while flow controlled before sending a log message confirming that the session is still
* waiting on flow control being revoked
*/
- protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+ private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
/**
* The period to wait while flow controlled before declaring a failure
*/
- public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
- protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure",
+ private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure",
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
- protected final boolean DECLARE_QUEUES =
+ private final boolean _delareQueues =
Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
- protected final boolean DECLARE_EXCHANGES =
+ private final boolean _declareExchanges =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
-
- protected final boolean USE_AMQP_ENCODED_MAP_MESSAGE;
-
- /** System property to enable strict AMQP compliance. */
- public static final String STRICT_AMQP = "STRICT_AMQP";
-
- /** Strict AMQP default setting. */
- public static final String STRICT_AMQP_DEFAULT = "false";
- /** System property to enable failure if strict AMQP compliance is violated. */
- public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
-
- /** Strickt AMQP failure default. */
- public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
-
- /** System property to enable immediate message prefetching. */
- public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
+ private final boolean _useAMQPEncodedMapMessage;
- /** Immediate message prefetch default. */
- public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+ /**
+ * Flag indicating to start dispatcher as a daemon thread
+ */
+ protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
/** The connection to which this session belongs. */
- protected AMQConnection _connection;
+ private AMQConnection _connection;
/** Used to indicate whether or not this is a transactional session. */
- protected final boolean _transacted;
+ private final boolean _transacted;
/** Holds the sessions acknowledgement mode. */
- protected final int _acknowledgeMode;
+ private final int _acknowledgeMode;
/** Holds this session unique identifier, used to distinguish it from other sessions. */
- protected int _channelId;
+ private int _channelId;
private int _ticket;
@@ -278,55 +170,30 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Used to indicate that this session has been started at least once. */
private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
- /**
- * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only
- * keeps a record of subscriptions which have been created in the current instance. It does not remember
- * subscriptions between executions of the client.
- */
- protected final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions =
+ private final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>();
- /**
- * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
- * up in the {@link #_subscriptions} map.
- */
- protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>();
+ private final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>();
- /**
- * Locks to keep access to subscriber details atomic.
- * <p>
- * Added for QPID2418
- */
- protected final Lock _subscriberDetails = new ReentrantLock(true);
- protected final Lock _subscriberAccess = new ReentrantLock(true);
+ private final Lock _subscriberDetails = new ReentrantLock(true);
+ private final Lock _subscriberAccess = new ReentrantLock(true);
- /**
- * Used to hold incoming messages.
- *
- * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
- */
- protected final FlowControllingBlockingQueue _queue;
+ private final FlowControllingBlockingQueue _queue;
- /** Holds the highest received delivery tag. */
- protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
- /** Pre-fetched message tags */
- protected ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>();
+ private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>();
- /** All the not yet acknowledged message tags */
- protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
+ private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
- /** All the delivered message tags */
- protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
+ private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
- /** Holds the dispatcher thread for this session. */
- protected Dispatcher _dispatcher;
+ private volatile Dispatcher _dispatcher;
- protected Thread _dispatcherThread;
+ private volatile Thread _dispatcherThread;
- /** Holds the message factory factory for this session. */
- protected MessageFactoryRegistry _messageFactoryRegistry;
+ private MessageFactoryRegistry _messageFactoryRegistry;
/** Holds all of the producers created by this session, keyed by their unique identifiers. */
private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
@@ -337,11 +204,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
private int _nextTag = 1;
- /**
- * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
- * consumer.
- */
- protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
+ private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
/**
* Contains a list of consumers which have been removed but which might still have
@@ -367,10 +230,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
private volatile boolean _sessionInRecovery;
- /**
- * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
- * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
- */
private volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
@@ -388,28 +247,163 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
private final Object _suspensionLock = new Object();
- /**
- * Used to ensure that only the first call to start the dispatcher can unsuspend the channel.
- *
- * @todo This is accessed only within a synchronized method, so does not need to be atomic.
- */
- protected final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+ private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
- /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
- protected final boolean _immediatePrefetch;
+ private final boolean _immediatePrefetch;
- /** Indicates that warnings should be generated on violations of the strict AMQP. */
- protected final boolean _strictAMQP;
+ private final boolean _strictAMQP;
- /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
- protected final boolean _strictAMQPFATAL;
+ private final boolean _strictAMQPFATAL;
private final Object _messageDeliveryLock = new Object();
/** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
private boolean _dirty;
/** Has failover occured on this session with outstanding actions to commit? */
private boolean _failedOverDirty;
-
+
+ /** Flow control */
+ private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
+
+
+ /** Holds the highest received delivery tag. */
+ protected AtomicLong getHighestDeliveryTag()
+ {
+ return _highestDeliveryTag;
+ }
+
+ /** Pre-fetched message tags */
+ protected ConcurrentLinkedQueue<Long> getPrefetchedMessageTags()
+ {
+ return _prefetchedMessageTags;
+ }
+
+ /** All the not yet acknowledged message tags */
+ protected ConcurrentLinkedQueue<Long> getUnacknowledgedMessageTags()
+ {
+ return _unacknowledgedMessageTags;
+ }
+
+ /** All the delivered message tags */
+ protected ConcurrentLinkedQueue<Long> getDeliveredMessageTags()
+ {
+ return _deliveredMessageTags;
+ }
+
+ /** Holds the dispatcher thread for this session. */
+ protected Dispatcher getDispatcher()
+ {
+ return _dispatcher;
+ }
+
+ protected Thread getDispatcherThread()
+ {
+ return _dispatcherThread;
+ }
+
+ /** Holds the message factory factory for this session. */
+ protected MessageFactoryRegistry getMessageFactoryRegistry()
+ {
+ return _messageFactoryRegistry;
+ }
+
+ /**
+ * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
+ * consumer.
+ */
+ protected IdToConsumerMap<C> getConsumers()
+ {
+ return _consumers;
+ }
+
+ protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup)
+ {
+ _usingDispatcherForCleanup = usingDispatcherForCleanup;
+ }
+
+ /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
+ protected boolean isImmediatePrefetch()
+ {
+ return _immediatePrefetch;
+ }
+
+ public static final class IdToConsumerMap<C extends BasicMessageConsumer>
+ {
+ private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
+ private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
+
+ public C get(int id)
+ {
+ if ((id & 0xFFFFFFF0) == 0)
+ {
+ return (C) _fastAccessConsumers[id];
+ }
+ else
+ {
+ return _slowAccessConsumers.get(id);
+ }
+ }
+
+ public C put(int id, C consumer)
+ {
+ C oldVal;
+ if ((id & 0xFFFFFFF0) == 0)
+ {
+ oldVal = (C) _fastAccessConsumers[id];
+ _fastAccessConsumers[id] = consumer;
+ }
+ else
+ {
+ oldVal = _slowAccessConsumers.put(id, consumer);
+ }
+
+ return oldVal;
+
+ }
+
+ public C remove(int id)
+ {
+ C consumer;
+ if ((id & 0xFFFFFFF0) == 0)
+ {
+ consumer = (C) _fastAccessConsumers[id];
+ _fastAccessConsumers[id] = null;
+ }
+ else
+ {
+ consumer = _slowAccessConsumers.remove(id);
+ }
+
+ return consumer;
+
+ }
+
+ public Collection<C> values()
+ {
+ ArrayList<C> values = new ArrayList<C>();
+
+ for (int i = 0; i < 16; i++)
+ {
+ if (_fastAccessConsumers[i] != null)
+ {
+ values.add((C) _fastAccessConsumers[i]);
+ }
+ }
+ values.addAll(_slowAccessConsumers.values());
+
+ return values;
+ }
+
+ public void clear()
+ {
+ _slowAccessConsumers.clear();
+ for (int i = 0; i < 16; i++)
+ {
+ _fastAccessConsumers[i] = null;
+ }
+ }
+ }
+
private static final class FlowControlIndicator
{
private volatile boolean _flowControl = true;
@@ -426,9 +420,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
- /** Flow control */
- private FlowControlIndicator _flowControl = new FlowControlIndicator();
-
/**
* Creates a new session on a connection.
*
@@ -443,7 +434,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
- USE_AMQP_ENCODED_MAP_MESSAGE = con == null ? true : !con.isUseLegacyMapMessageFormat();
+ _useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat();
_strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
_strictAMQPFATAL =
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
@@ -479,7 +470,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
// If the session has been closed don't waste time creating a thread to do
// flow control
- if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
{
// Only execute change if previous state
// was False
@@ -507,7 +498,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
// If the session has been closed don't waste time creating a thread to do
// flow control
- if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
{
// Only execute change if previous state
// was true
@@ -539,9 +530,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
// Add creation logging to tie in with the existing close logging
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Created session:" + this);
+ _logger.debug("Created session:" + this);
}
}
@@ -730,17 +721,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private void close(long timeout, boolean sendClose) throws JMSException
{
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- // StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
- _logger.info("Closing session: " + this); // + ":"
- // Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+ _logger.debug("Closing session: " + this);
}
// Ensure we only try and close an open session.
- if (!_closed.getAndSet(true))
+ if (!setClosed())
{
- _closing.set(true);
+ setClosing(true);
synchronized (getFailoverMutex())
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
@@ -808,7 +797,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (e instanceof AMQDisconnectedException)
{
- if (_dispatcher != null)
+ if (_dispatcherThread != null)
{
// Failover failed and ain't coming back. Knife the dispatcher.
_dispatcherThread.interrupt();
@@ -817,9 +806,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
//if we don't have an exception then we can perform closing operations
- _closing.set(e == null);
+ setClosing(e == null);
- if (!_closed.getAndSet(true))
+ if (!setClosed())
{
synchronized (_messageDeliveryLock)
{
@@ -903,11 +892,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Flush any pending messages for this consumerTag
if (_dispatcher != null)
{
- _logger.info("Dispatcher is not null");
+ _logger.debug("Dispatcher is not null");
}
else
{
- _logger.info("Dispatcher is null so created stopped dispatcher");
+ _logger.debug("Dispatcher is null so created stopped dispatcher");
startDispatcherIfNecessary(true);
}
@@ -918,18 +907,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Just close the consumer
// fixme the CancelOK is being processed before the arriving messages..
// The dispatcher is still to process them so the server sent in order but the client
- // has yet to receive before the close comes in.
-
- // consumer.markClosed();
+ // has yet to receive before the close comes in
if (consumer.isAutoClose())
{
// There is a small window where the message is between the two queues in the dispatcher.
if (consumer.isClosed())
{
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Closing consumer:" + consumer.debugIdentity());
+ _logger.debug("Closing consumer:" + consumer.debugIdentity());
}
deregisterConsumer(consumer);
@@ -953,6 +940,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return createBrowser(queue, null);
}
+ /**
+ * Create a queue browser if the destination is a valid queue.
+ */
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
if (isStrictAMQP())
@@ -963,7 +953,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkNotClosed();
checkValidQueue(queue);
- return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
+ return new AMQQueueBrowser(this, queue, messageSelector);
}
protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -1043,7 +1033,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
try
{
- handleAddressBasedDestination(dest,false,true);
+ handleAddressBasedDestination(dest,false,noLocal,true);
if (dest.getAddressType() != AMQDestination.TOPIC_TYPE)
{
throw new JMSException("Durable subscribers can only be created for Topics");
@@ -1099,6 +1089,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
+ if(noLocal)
+ {
+ args.put(AMQPFilterTypes.NO_LOCAL.getValue().toString(), true);
+ }
// if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
// says we must trash the subscription.
@@ -1159,7 +1153,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public MapMessage createMapMessage() throws JMSException
{
checkNotClosed();
- if (USE_AMQP_ENCODED_MAP_MESSAGE)
+ if (_useAMQPEncodedMapMessage)
{
AMQPEncodedMapMessage msg = new AMQPEncodedMapMessage(getMessageDelegateFactory());
msg.setAMQSession(this);
@@ -1196,12 +1190,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public P createProducer(Destination destination) throws JMSException
{
- return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+ return createProducerImpl(destination, null, null);
}
public P createProducer(Destination destination, boolean immediate) throws JMSException
{
- return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
+ return createProducerImpl(destination, null, immediate);
}
public P createProducer(Destination destination, boolean mandatory, boolean immediate)
@@ -1600,7 +1594,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public MessageListener getMessageListener() throws JMSException
{
- // checkNotClosed();
return _messageListener;
}
@@ -1648,6 +1641,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return (counter != null) && (counter.get() != 0);
}
+ /** Indicates that warnings should be generated on violations of the strict AMQP. */
public boolean isStrictAMQP()
{
return _strictAMQP;
@@ -1690,7 +1684,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
AMQProtocolHandler protocolHandler = getProtocolHandler();
declareExchange(amqd, protocolHandler, false);
- AMQShortString queueName = declareQueue(amqd, protocolHandler, false);
+ AMQShortString queueName = declareQueue(amqd, false);
bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
}
@@ -1886,31 +1880,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void setMessageListener(MessageListener listener) throws JMSException
{
- // checkNotClosed();
- //
- // if (_dispatcher != null && !_dispatcher.connectionStopped())
- // {
- // throw new javax.njms.IllegalStateException("Attempt to set listener while session is started.");
- // }
- //
- // // We are stopped
- // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- // {
- // BasicMessageConsumer consumer = i.next();
- //
- // if (consumer.isReceiving())
- // {
- // throw new javax.njms.IllegalStateException("Another thread is already receiving synchronously.");
- // }
- // }
- //
- // _messageListener = listener;
- //
- // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- // {
- // i.next().setMessageListener(_messageListener);
- // }
-
}
/**
@@ -2184,7 +2153,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
void markClosed()
{
- _closed.set(true);
+ setClosed();
_connection.deregisterSession(_channelId);
markClosedProducersAndConsumers();
@@ -2199,7 +2168,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
if (Thread.currentThread() == _dispatcherThread)
{
- while (!_closed.get() && !_queue.isEmpty())
+ while (!super.isClosed() && !_queue.isEmpty())
{
Dispatchable disp;
try
@@ -2247,6 +2216,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ void drainDispatchQueue()
+ {
+ if (Thread.currentThread() == _dispatcherThread)
+ {
+ while (!super.isClosed() && !_queue.isEmpty())
+ {
+ Dispatchable disp;
+ try
+ {
+ disp = (Dispatchable) _queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // Check just in case _queue becomes empty, it shouldn't but
+ // better than an NPE.
+ if (disp == null)
+ {
+ _logger.debug("_queue became empty during sync.");
+ break;
+ }
+
+ disp.dispatch(AMQSession.this);
+ }
+ }
+ else
+ {
+ startDispatcherIfNecessary(false);
+
+ final CountDownLatch signal = new CountDownLatch(1);
+
+ _queue.add(new Dispatchable()
+ {
+ public void dispatch(AMQSession ssn)
+ {
+ signal.countDown();
+ }
+ });
+
+ try
+ {
+ signal.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
/**
* Resubscribes all producers and consumers. This is called when performing failover.
*
@@ -2289,7 +2310,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
void start() throws AMQException
{
- // Check if the session has perviously been started and suspended, in which case it must be unsuspended.
+ // Check if the session has previously been started and suspended, in which case it must be unsuspended.
if (_startedAtLeastOnce.getAndSet(true))
{
suspendChannel(false);
@@ -2323,7 +2344,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (AMQException e)
{
- _logger.info("Unsuspending channel threw an exception:" + e);
+ _logger.info("Unsuspending channel threw an exception:", e);
}
}
}
@@ -2346,12 +2367,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
throw new Error("Error creating Dispatcher thread",e);
}
_dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
- _dispatcherThread.setDaemon(true);
+ _dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD);
_dispatcher.setConnectionStopped(initiallyStopped);
_dispatcherThread.start();
- if (_dispatcherLogger.isInfoEnabled())
+ if (_dispatcherLogger.isDebugEnabled())
{
- _dispatcherLogger.info(_dispatcherThread.getName() + " created");
+ _dispatcherLogger.debug(_dispatcherThread.getName() + " created");
}
}
else
@@ -2371,32 +2392,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
- /*
- * Binds the named queue, with the specified routing key, to the named exchange.
- *
- * <p/>Note that this operation automatically retries in the event of fail-over.
- *
- * @param queueName The name of the queue to bind.
- * @param routingKey The routing key to bind the queue with.
- * @param arguments Additional arguments.
- * @param exchangeName The exchange to bind the queue on.
- *
- * @throws AMQException If the queue cannot be bound for any reason.
- */
- /*private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft)
- throws AMQException, FailoverException
- {
- AMQFrame queueBind =
- QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), ft, // arguments
- amqd.getExchangeName(), // exchange
- false, // nowait
- queueName, // queue
- amqd.getRoutingKey(), // routingKey
- getTicket()); // ticket
-
- protocolHandler.syncWrite(queueBind, QueueBindOkBody.class);
- }*/
-
private void checkNotTransacted() throws JMSException
{
if (getTransacted())
@@ -2580,7 +2575,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @param queueName
*/
private void consumeFromQueue(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector) throws AMQException, FailoverException
+ AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException
{
int tagId = _nextTag++;
@@ -2597,7 +2592,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId);
+ sendConsume(consumer, queueName, protocolHandler, nowait, tagId);
}
catch (AMQException e)
{
@@ -2608,9 +2603,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract void sendConsume(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException;
+ AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException;
- private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
+ private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate)
throws JMSException
{
return new FailoverRetrySupport<P, JMSException>(
@@ -2639,8 +2634,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}, _connection).execute();
}
- public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final long producerId) throws JMSException;
+ public abstract P createMessageProducer(final Destination destination, final Boolean mandatory,
+ final Boolean immediate, final long producerId) throws JMSException;
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
@@ -2661,18 +2656,38 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public long getQueueDepth(final AMQDestination amqd)
throws AMQException
{
- return new FailoverNoopSupport<Long, AMQException>(
- new FailoverProtectedOperation<Long, AMQException>()
- {
- public Long execute() throws AMQException, FailoverException
- {
- return requestQueueDepth(amqd);
- }
- }, _connection).execute();
+ return getQueueDepth(amqd, false);
+ }
+ /**
+ * Returns the number of messages currently queued by the given
+ * destination. Syncs session before receiving the queue depth if sync is
+ * set to true.
+ *
+ * @param amqd AMQ destination to get the depth value
+ * @param sync flag to sync session before receiving the queue depth
+ * @return queue depth
+ * @throws AMQException
+ */
+ public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws AMQException
+ {
+ return new FailoverNoopSupport<Long, AMQException>(new FailoverProtectedOperation<Long, AMQException>()
+ {
+ public Long execute() throws AMQException, FailoverException
+ {
+ try
+ {
+ return requestQueueDepth(amqd, sync);
+ }
+ catch (TransportException e)
+ {
+ throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e);
+ }
+ }
+ }, _connection).execute();
}
- protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
+ protected abstract Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException;
/**
* Declares the named exchange and type of exchange.
@@ -2703,6 +2718,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
final boolean nowait) throws AMQException, FailoverException;
+
+ void declareQueuePassive(AMQDestination queue) throws AMQException
+ {
+ declareQueue(queue,false,false,true);
+ }
+
/**
* Declares a queue for a JMS destination.
*
@@ -2712,27 +2733,35 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*
* <p/>Note that this operation automatically retries in the event of fail-over.
*
- * @param amqd The destination to declare as a queue.
- * @param protocolHandler The protocol handler to communicate through.
*
+ * @param amqd The destination to declare as a queue.
* @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of
* the client.
*
+ *
+ *
* @throws AMQException If the queue cannot be declared for any reason.
* @todo Verify the destiation is valid or throw an exception.
* @todo Be aware of possible changes to parameter order as versions change.
*/
- protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ protected AMQShortString declareQueue(final AMQDestination amqd,
final boolean noLocal) throws AMQException
{
- return declareQueue(amqd, protocolHandler, noLocal, false);
+ return declareQueue(amqd, noLocal, false);
}
- protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ protected AMQShortString declareQueue(final AMQDestination amqd,
final boolean noLocal, final boolean nowait)
+ throws AMQException
+ {
+ return declareQueue(amqd, noLocal, nowait, false);
+ }
+
+ protected AMQShortString declareQueue(final AMQDestination amqd,
+ final boolean noLocal, final boolean nowait, final boolean passive)
throws AMQException
{
- /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new FailoverNoopSupport<AMQShortString, AMQException>(
new FailoverProtectedOperation<AMQShortString, AMQException>()
{
@@ -2744,7 +2773,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
amqd.setQueueName(protocolHandler.generateQueueName());
}
- sendQueueDeclare(amqd, protocolHandler, nowait);
+ sendQueueDeclare(amqd, protocolHandler, nowait, passive);
return amqd.getAMQQueueName();
}
@@ -2752,7 +2781,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException;
+ final boolean nowait, boolean passive) throws AMQException, FailoverException;
/**
* Undeclares the specified queue.
@@ -2882,18 +2911,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (amqd.getDestSyntax() == DestSyntax.ADDR)
{
- handleAddressBasedDestination(amqd,true,nowait);
+ handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait);
}
else
{
- if (DECLARE_EXCHANGES)
+ if (_declareExchanges)
{
declareExchange(amqd, protocolHandler, nowait);
}
- if (DECLARE_QUEUES || amqd.isNameRequired())
+ if (_delareQueues || amqd.isNameRequired())
{
- declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
+ declareQueue(amqd, consumer.isNoLocal(), nowait);
}
bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
}
@@ -2916,24 +2945,24 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
suspendChannel(true);
- _logger.info(
+ _logger.debug(
"Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
}
catch (AMQException e)
{
- _logger.info("Suspending channel threw an exception:" + e);
+ _logger.info("Suspending channel threw an exception:", e);
}
}
}
}
else
{
- _logger.info("Immediately prefetching existing messages to new consumer.");
+ _logger.debug("Immediately prefetching existing messages to new consumer.");
}
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelectorFilter());
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait);
}
catch (FailoverException e)
{
@@ -2943,6 +2972,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
+ boolean noLocal,
boolean noWait) throws AMQException;
private void registerProducer(long producerId, MessageProducer producer)
@@ -2959,18 +2989,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
{
Iterator messages = _queue.iterator();
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
+ _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
+ requeue);
if (messages.hasNext())
{
- _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
+ _logger.debug("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
}
else
{
- _logger.info("No messages in _queue to reject");
+ _logger.debug("No messages in _queue to reject");
}
}
while (messages.hasNext())
@@ -3013,7 +3043,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private void resubscribeProducers() throws AMQException
{
ArrayList producers = new ArrayList(_producers.values());
- _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
+ _logger.debug(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
for (Iterator it = producers.iterator(); it.hasNext();)
{
P producer = (P) it.next();
@@ -3103,7 +3133,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void setFlowControl(final boolean active)
{
_flowControl.setFlowControl(active);
- _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
+ }
}
public void checkFlowControl() throws InterruptedException, JMSException
@@ -3112,17 +3145,20 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
synchronized (_flowControl)
{
while (!_flowControl.getFlowControl() &&
- (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE)
+ (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure)
: expiryTime) >= System.currentTimeMillis() )
{
- _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD);
- _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control");
+ _flowControl.wait(_flowControlWaitPeriod);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control");
+ }
}
if(!_flowControl.getFlowControl())
{
_logger.error("Message send failed due to timeout waiting on broker enforced flow control");
- throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control");
+ throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
}
}
@@ -3154,7 +3190,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
- private String dispatcherID = "" + System.identityHashCode(this);
+ private final String dispatcherID = "" + System.identityHashCode(this);
public Dispatcher()
{
@@ -3169,6 +3205,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
+ private AtomicBoolean getClosed()
+ {
+ return _closed;
+ }
+
public void rejectPending(C consumer)
{
synchronized (_lock)
@@ -3220,7 +3261,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
else
{
// should perhaps clear the _SQ here.
- // consumer._synchronousQueue.clear();
consumer.clearReceiveQueue();
}
@@ -3266,13 +3306,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void run()
{
- if (_dispatcherLogger.isInfoEnabled())
+ if (_dispatcherLogger.isDebugEnabled())
{
- _dispatcherLogger.info(_dispatcherThread.getName() + " started");
+ _dispatcherLogger.debug(_dispatcherThread.getName() + " started");
}
- UnprocessedMessage message;
-
// Allow disptacher to start stopped
synchronized (_lock)
{
@@ -3284,7 +3322,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (InterruptedException e)
{
- // ignore
+ Thread.currentThread().interrupt();
}
}
}
@@ -3299,12 +3337,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (InterruptedException e)
{
- // ignore
+ // ignored as run will exit immediately
}
- if (_dispatcherLogger.isInfoEnabled())
+ if (_dispatcherLogger.isDebugEnabled())
{
- _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + _thisSession);
+ _dispatcherLogger.debug(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this);
}
}
@@ -3350,7 +3388,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (InterruptedException e)
{
- // pass
+ Thread.currentThread().interrupt();
}
if (!(message instanceof CloseConsumerMessage)
@@ -3425,7 +3463,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
- + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
+ + " for closing consumer " + String.valueOf(consumer == null? null: consumer.getConsumerTag()));
}
rejectMessage(message, true);
}
@@ -3443,30 +3481,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract AMQMessageDelegateFactory getMessageDelegateFactory();
- /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
- boolean read) throws AMQException
- {
- getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
- getProtocolMajorVersion(), getProtocolMinorVersion(), active, exclusive, passive, read, realm, write),
- new BlockingMethodFrameListener(_channelId)
- {
-
- public boolean processMethod(int channelId, AMQMethodBody frame) // throws AMQException
- {
- if (frame instanceof AccessRequestOkBody)
- {
- setTicket(((AccessRequestOkBody) frame).getTicket());
-
- return true;
- }
- else
- {
- return false;
- }
- }
- });
- }*/
-
private class SuspenderRunner implements Runnable
{
private AtomicBoolean _suspend;
@@ -3484,7 +3498,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
// If the session has closed by the time we get here
// then we should not attempt to write to the sesion/channel.
- if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
{
suspendChannel(_suspend.get());
}
@@ -3492,11 +3506,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (AMQException e)
{
- _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + _thisSession + " due to: " + e);
+ _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: ", e);
if (_logger.isDebugEnabled())
{
_logger.debug("Is the _queue empty?" + _queue.isEmpty());
- _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher._closed));
+ _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher.getClosed()));
}
}
}
@@ -3510,7 +3524,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
@Override
public boolean isClosed()
{
- return _closed.get() || _connection.isClosed();
+ return super.isClosed() || _connection.isClosed();
}
/**
@@ -3522,12 +3536,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
@Override
public boolean isClosing()
{
- return _closing.get()|| _connection.isClosing();
+ return super.isClosing() || _connection.isClosing();
}
public boolean isDeclareExchanges()
{
- return DECLARE_EXCHANGES;
+ return _declareExchanges;
}
JMSException toJMSException(String message, TransportException e)