summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-02-07 11:30:30 +0000
committerKeith Wall <kwall@apache.org>2012-02-07 11:30:30 +0000
commit2e6596c2167d0888c7bea8f320981589e2423cc2 (patch)
tree745f9dd6b928af5343a7d00a3003ead007f71441
parentfe05ba45954eb5f83b13039370afd7530cdef572 (diff)
downloadqpid-python-2e6596c2167d0888c7bea8f320981589e2423cc2.tar.gz
NO-JIRA: Move members back to top of class where they belong
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1241428 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java361
1 files changed, 180 insertions, 181 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index eef5bc0b57..3037c7f420 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -97,6 +97,186 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
+ /** Used for debugging. */
+ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
+
+ /** System property to enable strict AMQP compliance. */
+ public static final String STRICT_AMQP = "STRICT_AMQP";
+
+ /** Strict AMQP default setting. */
+ public static final String STRICT_AMQP_DEFAULT = "false";
+
+ /** System property to enable failure if strict AMQP compliance is violated. */
+ public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
+
+ /** Strickt AMQP failure default. */
+ public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
+
+ /** System property to enable immediate message prefetching. */
+ public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
+
+ /** Immediate message prefetch default. */
+ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+
+ public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
+
+ /**
+ * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
+ * not need to be attached to a queue.
+ */
+ private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+
+ /**
+ * The default value for mandatory flag used by producers created by this session is true. That is, server will not
+ * silently drop messages where no queue is connected to the exchange for the message.
+ */
+ private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+
+ /**
+ * The period to wait while flow controlled before sending a log message confirming that the session is still
+ * waiting on flow control being revoked
+ */
+ private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
+ /**
+ * The period to wait while flow controlled before declaring a failure
+ */
+ private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure",
+ DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+
+ private final boolean _delareQueues =
+ Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
+
+ private final boolean _declareExchanges =
+ Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
+
+ private final boolean _useAMQPEncodedMapMessage;
+
+ /**
+ * Flag indicating to start dispatcher as a daemon thread
+ */
+ protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
+
+ /** The connection to which this session belongs. */
+ private AMQConnection _connection;
+
+ /** Used to indicate whether or not this is a transactional session. */
+ private final boolean _transacted;
+
+ /** Holds the sessions acknowledgement mode. */
+ private final int _acknowledgeMode;
+
+ /** Holds this session unique identifier, used to distinguish it from other sessions. */
+ private int _channelId;
+
+ private int _ticket;
+
+ /** Holds the high mark for prefetched message, at which the session is suspended. */
+ private int _prefetchHighMark;
+
+ /** Holds the low mark for prefetched messages, below which the session is resumed. */
+ private int _prefetchLowMark;
+
+ /** Holds the message listener, if any, which is attached to this session. */
+ private MessageListener _messageListener = null;
+
+ /** Used to indicate that this session has been started at least once. */
+ private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
+
+ private final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions =
+ new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>();
+
+ private final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>();
+
+ private final Lock _subscriberDetails = new ReentrantLock(true);
+ private final Lock _subscriberAccess = new ReentrantLock(true);
+
+ private final FlowControllingBlockingQueue _queue;
+
+ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+ private final AtomicLong _rollbackMark = new AtomicLong(-1);
+
+ private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>();
+
+ private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
+
+ private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
+
+ private Dispatcher _dispatcher;
+
+ private Thread _dispatcherThread;
+
+ private MessageFactoryRegistry _messageFactoryRegistry;
+
+ /** Holds all of the producers created by this session, keyed by their unique identifiers. */
+ private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
+
+ /**
+ * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume
+ * methods.
+ */
+ private int _nextTag = 1;
+
+ private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
+
+ /**
+ * Contains a list of consumers which have been removed but which might still have
+ * messages to acknowledge, eg in client ack or transacted modes
+ */
+ private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>();
+
+ /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
+ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
+ new ConcurrentHashMap<Destination, AtomicInteger>();
+
+ /**
+ * Used as a source of unique identifiers for producers within the session.
+ *
+ * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one
+ * thread of control is allowed to create producers for any given session instance.
+ */
+ private long _nextProducerId;
+
+ /**
+ * Set when recover is called. This is to handle the case where recover() is called by application code during
+ * onMessage() processing to ensure that an auto ack is not sent.
+ */
+ private volatile boolean _sessionInRecovery;
+
+ private volatile boolean _usingDispatcherForCleanup;
+
+ /** Used to indicates that the connection to which this session belongs, has been stopped. */
+ private boolean _connectionStopped;
+
+ /** Used to indicate that this session has a message listener attached to it. */
+ private boolean _hasMessageListeners;
+
+ /** Used to indicate that this session has been suspended. */
+ private boolean _suspended;
+
+ /**
+ * Used to protect the suspension of this session, so that critical code can be executed during suspension,
+ * without the session being resumed by other threads.
+ */
+ private final Object _suspensionLock = new Object();
+
+ private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+
+ private final boolean _immediatePrefetch;
+
+ private final boolean _strictAMQP;
+
+ private final boolean _strictAMQPFATAL;
+ private final Object _messageDeliveryLock = new Object();
+
+ /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
+ private boolean _dirty;
+ /** Has failover occured on this session with outstanding actions to commit? */
+ private boolean _failedOverDirty;
+
+ /** Flow control */
+ private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
/**
* Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only
* keeps a record of subscriptions which have been created in the current instance. It does not remember
@@ -334,182 +514,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
- /** Used for debugging. */
- private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
-
- /**
- * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
- * not need to be attached to a queue.
- */
- private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
-
- /**
- * The default value for mandatory flag used by producers created by this session is true. That is, server will not
- * silently drop messages where no queue is connected to the exchange for the message.
- */
- private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
-
- /**
- * The period to wait while flow controlled before sending a log message confirming that the session is still
- * waiting on flow control being revoked
- */
- private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
-
- /**
- * The period to wait while flow controlled before declaring a failure
- */
- public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
- private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure",
- DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
-
- private final boolean _delareQueues =
- Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
-
- private final boolean _declareExchanges =
- Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
-
- private final boolean _useAMQPEncodedMapMessage;
-
- /** System property to enable strict AMQP compliance. */
- public static final String STRICT_AMQP = "STRICT_AMQP";
-
- /** Strict AMQP default setting. */
- public static final String STRICT_AMQP_DEFAULT = "false";
-
- /** System property to enable failure if strict AMQP compliance is violated. */
- public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
-
- /** Strickt AMQP failure default. */
- public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
-
- /** System property to enable immediate message prefetching. */
- public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
-
- /** Immediate message prefetch default. */
- public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
-
- /**
- * Flag indicating to start dispatcher as a daemon thread
- */
- protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
-
- /** The connection to which this session belongs. */
- private AMQConnection _connection;
-
- /** Used to indicate whether or not this is a transactional session. */
- private final boolean _transacted;
-
- /** Holds the sessions acknowledgement mode. */
- private final int _acknowledgeMode;
-
- /** Holds this session unique identifier, used to distinguish it from other sessions. */
- private int _channelId;
-
- private int _ticket;
-
- /** Holds the high mark for prefetched message, at which the session is suspended. */
- private int _prefetchHighMark;
-
- /** Holds the low mark for prefetched messages, below which the session is resumed. */
- private int _prefetchLowMark;
-
- /** Holds the message listener, if any, which is attached to this session. */
- private MessageListener _messageListener = null;
-
- /** Used to indicate that this session has been started at least once. */
- private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
-
- private final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions =
- new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>();
-
- private final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>();
-
- private final Lock _subscriberDetails = new ReentrantLock(true);
- private final Lock _subscriberAccess = new ReentrantLock(true);
-
- private final FlowControllingBlockingQueue _queue;
-
- private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
- private final AtomicLong _rollbackMark = new AtomicLong(-1);
-
- private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>();
-
- private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
-
- private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
-
- private Dispatcher _dispatcher;
-
- private Thread _dispatcherThread;
-
- private MessageFactoryRegistry _messageFactoryRegistry;
-
- /** Holds all of the producers created by this session, keyed by their unique identifiers. */
- private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
-
- /**
- * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume
- * methods.
- */
- private int _nextTag = 1;
-
- private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
-
- /**
- * Contains a list of consumers which have been removed but which might still have
- * messages to acknowledge, eg in client ack or transacted modes
- */
- private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>();
-
- /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
- private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
- new ConcurrentHashMap<Destination, AtomicInteger>();
-
- /**
- * Used as a source of unique identifiers for producers within the session.
- *
- * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one
- * thread of control is allowed to create producers for any given session instance.
- */
- private long _nextProducerId;
-
- /**
- * Set when recover is called. This is to handle the case where recover() is called by application code during
- * onMessage() processing to ensure that an auto ack is not sent.
- */
- private volatile boolean _sessionInRecovery;
-
- private volatile boolean _usingDispatcherForCleanup;
-
- /** Used to indicates that the connection to which this session belongs, has been stopped. */
- private boolean _connectionStopped;
-
- /** Used to indicate that this session has a message listener attached to it. */
- private boolean _hasMessageListeners;
-
- /** Used to indicate that this session has been suspended. */
- private boolean _suspended;
-
- /**
- * Used to protect the suspension of this session, so that critical code can be executed during suspension,
- * without the session being resumed by other threads.
- */
- private final Object _suspensionLock = new Object();
-
- private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
-
- private final boolean _immediatePrefetch;
-
- private final boolean _strictAMQP;
-
- private final boolean _strictAMQPFATAL;
- private final Object _messageDeliveryLock = new Object();
-
- /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
- private boolean _dirty;
- /** Has failover occured on this session with outstanding actions to commit? */
- private boolean _failedOverDirty;
-
private static final class FlowControlIndicator
{
private volatile boolean _flowControl = true;
@@ -526,9 +530,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
- /** Flow control */
- private FlowControlIndicator _flowControl = new FlowControlIndicator();
-
/**
* Creates a new session on a connection.
*
@@ -3392,8 +3393,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_dispatcherLogger.info(_dispatcherThread.getName() + " started");
}
- UnprocessedMessage message;
-
// Allow disptacher to start stopped
synchronized (_lock)
{