diff options
author | Keith Wall <kwall@apache.org> | 2012-02-07 11:30:30 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2012-02-07 11:30:30 +0000 |
commit | 2e6596c2167d0888c7bea8f320981589e2423cc2 (patch) | |
tree | 745f9dd6b928af5343a7d00a3003ead007f71441 | |
parent | fe05ba45954eb5f83b13039370afd7530cdef572 (diff) | |
download | qpid-python-2e6596c2167d0888c7bea8f320981589e2423cc2.tar.gz |
NO-JIRA: Move members back to top of class where they belong
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1241428 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 361 |
1 files changed, 180 insertions, 181 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index eef5bc0b57..3037c7f420 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -97,6 +97,186 @@ import java.util.concurrent.locks.ReentrantLock; */ public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession { + /** Used for debugging. */ + private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); + + /** System property to enable strict AMQP compliance. */ + public static final String STRICT_AMQP = "STRICT_AMQP"; + + /** Strict AMQP default setting. */ + public static final String STRICT_AMQP_DEFAULT = "false"; + + /** System property to enable failure if strict AMQP compliance is violated. */ + public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; + + /** Strickt AMQP failure default. */ + public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; + + /** System property to enable immediate message prefetching. */ + public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; + + /** Immediate message prefetch default. */ + public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; + + public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; + + /** + * The default value for immediate flag used by producers created by this session is false. That is, a consumer does + * not need to be attached to a queue. + */ + private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); + + /** + * The default value for mandatory flag used by producers created by this session is true. That is, server will not + * silently drop messages where no queue is connected to the exchange for the message. + */ + private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + + /** + * The period to wait while flow controlled before sending a log message confirming that the session is still + * waiting on flow control being revoked + */ + private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + + /** + * The period to wait while flow controlled before declaring a failure + */ + private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure", + DEFAULT_FLOW_CONTROL_WAIT_FAILURE); + + private final boolean _delareQueues = + Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); + + private final boolean _declareExchanges = + Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); + + private final boolean _useAMQPEncodedMapMessage; + + /** + * Flag indicating to start dispatcher as a daemon thread + */ + protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER); + + /** The connection to which this session belongs. */ + private AMQConnection _connection; + + /** Used to indicate whether or not this is a transactional session. */ + private final boolean _transacted; + + /** Holds the sessions acknowledgement mode. */ + private final int _acknowledgeMode; + + /** Holds this session unique identifier, used to distinguish it from other sessions. */ + private int _channelId; + + private int _ticket; + + /** Holds the high mark for prefetched message, at which the session is suspended. */ + private int _prefetchHighMark; + + /** Holds the low mark for prefetched messages, below which the session is resumed. */ + private int _prefetchLowMark; + + /** Holds the message listener, if any, which is attached to this session. */ + private MessageListener _messageListener = null; + + /** Used to indicate that this session has been started at least once. */ + private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); + + private final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions = + new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>(); + + private final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>(); + + private final Lock _subscriberDetails = new ReentrantLock(true); + private final Lock _subscriberAccess = new ReentrantLock(true); + + private final FlowControllingBlockingQueue _queue; + + private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); + private final AtomicLong _rollbackMark = new AtomicLong(-1); + + private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>(); + + private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); + + private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); + + private Dispatcher _dispatcher; + + private Thread _dispatcherThread; + + private MessageFactoryRegistry _messageFactoryRegistry; + + /** Holds all of the producers created by this session, keyed by their unique identifiers. */ + private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); + + /** + * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume + * methods. + */ + private int _nextTag = 1; + + private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); + + /** + * Contains a list of consumers which have been removed but which might still have + * messages to acknowledge, eg in client ack or transacted modes + */ + private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>(); + + /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ + private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = + new ConcurrentHashMap<Destination, AtomicInteger>(); + + /** + * Used as a source of unique identifiers for producers within the session. + * + * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one + * thread of control is allowed to create producers for any given session instance. + */ + private long _nextProducerId; + + /** + * Set when recover is called. This is to handle the case where recover() is called by application code during + * onMessage() processing to ensure that an auto ack is not sent. + */ + private volatile boolean _sessionInRecovery; + + private volatile boolean _usingDispatcherForCleanup; + + /** Used to indicates that the connection to which this session belongs, has been stopped. */ + private boolean _connectionStopped; + + /** Used to indicate that this session has a message listener attached to it. */ + private boolean _hasMessageListeners; + + /** Used to indicate that this session has been suspended. */ + private boolean _suspended; + + /** + * Used to protect the suspension of this session, so that critical code can be executed during suspension, + * without the session being resumed by other threads. + */ + private final Object _suspensionLock = new Object(); + + private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); + + private final boolean _immediatePrefetch; + + private final boolean _strictAMQP; + + private final boolean _strictAMQPFATAL; + private final Object _messageDeliveryLock = new Object(); + + /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */ + private boolean _dirty; + /** Has failover occured on this session with outstanding actions to commit? */ + private boolean _failedOverDirty; + + /** Flow control */ + private FlowControlIndicator _flowControl = new FlowControlIndicator(); + /** * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only * keeps a record of subscriptions which have been created in the current instance. It does not remember @@ -334,182 +514,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - /** Used for debugging. */ - private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); - - /** - * The default value for immediate flag used by producers created by this session is false. That is, a consumer does - * not need to be attached to a queue. - */ - private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); - - /** - * The default value for mandatory flag used by producers created by this session is true. That is, server will not - * silently drop messages where no queue is connected to the exchange for the message. - */ - private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); - - /** - * The period to wait while flow controlled before sending a log message confirming that the session is still - * waiting on flow control being revoked - */ - private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L); - - /** - * The period to wait while flow controlled before declaring a failure - */ - public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; - private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure", - DEFAULT_FLOW_CONTROL_WAIT_FAILURE); - - private final boolean _delareQueues = - Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); - - private final boolean _declareExchanges = - Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); - - private final boolean _useAMQPEncodedMapMessage; - - /** System property to enable strict AMQP compliance. */ - public static final String STRICT_AMQP = "STRICT_AMQP"; - - /** Strict AMQP default setting. */ - public static final String STRICT_AMQP_DEFAULT = "false"; - - /** System property to enable failure if strict AMQP compliance is violated. */ - public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; - - /** Strickt AMQP failure default. */ - public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; - - /** System property to enable immediate message prefetching. */ - public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; - - /** Immediate message prefetch default. */ - public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - - /** - * Flag indicating to start dispatcher as a daemon thread - */ - protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER); - - /** The connection to which this session belongs. */ - private AMQConnection _connection; - - /** Used to indicate whether or not this is a transactional session. */ - private final boolean _transacted; - - /** Holds the sessions acknowledgement mode. */ - private final int _acknowledgeMode; - - /** Holds this session unique identifier, used to distinguish it from other sessions. */ - private int _channelId; - - private int _ticket; - - /** Holds the high mark for prefetched message, at which the session is suspended. */ - private int _prefetchHighMark; - - /** Holds the low mark for prefetched messages, below which the session is resumed. */ - private int _prefetchLowMark; - - /** Holds the message listener, if any, which is attached to this session. */ - private MessageListener _messageListener = null; - - /** Used to indicate that this session has been started at least once. */ - private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); - - private final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions = - new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>(); - - private final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>(); - - private final Lock _subscriberDetails = new ReentrantLock(true); - private final Lock _subscriberAccess = new ReentrantLock(true); - - private final FlowControllingBlockingQueue _queue; - - private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); - private final AtomicLong _rollbackMark = new AtomicLong(-1); - - private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>(); - - private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); - - private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); - - private Dispatcher _dispatcher; - - private Thread _dispatcherThread; - - private MessageFactoryRegistry _messageFactoryRegistry; - - /** Holds all of the producers created by this session, keyed by their unique identifiers. */ - private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); - - /** - * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume - * methods. - */ - private int _nextTag = 1; - - private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); - - /** - * Contains a list of consumers which have been removed but which might still have - * messages to acknowledge, eg in client ack or transacted modes - */ - private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>(); - - /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ - private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = - new ConcurrentHashMap<Destination, AtomicInteger>(); - - /** - * Used as a source of unique identifiers for producers within the session. - * - * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one - * thread of control is allowed to create producers for any given session instance. - */ - private long _nextProducerId; - - /** - * Set when recover is called. This is to handle the case where recover() is called by application code during - * onMessage() processing to ensure that an auto ack is not sent. - */ - private volatile boolean _sessionInRecovery; - - private volatile boolean _usingDispatcherForCleanup; - - /** Used to indicates that the connection to which this session belongs, has been stopped. */ - private boolean _connectionStopped; - - /** Used to indicate that this session has a message listener attached to it. */ - private boolean _hasMessageListeners; - - /** Used to indicate that this session has been suspended. */ - private boolean _suspended; - - /** - * Used to protect the suspension of this session, so that critical code can be executed during suspension, - * without the session being resumed by other threads. - */ - private final Object _suspensionLock = new Object(); - - private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); - - private final boolean _immediatePrefetch; - - private final boolean _strictAMQP; - - private final boolean _strictAMQPFATAL; - private final Object _messageDeliveryLock = new Object(); - - /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */ - private boolean _dirty; - /** Has failover occured on this session with outstanding actions to commit? */ - private boolean _failedOverDirty; - private static final class FlowControlIndicator { private volatile boolean _flowControl = true; @@ -526,9 +530,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - /** Flow control */ - private FlowControlIndicator _flowControl = new FlowControlIndicator(); - /** * Creates a new session on a connection. * @@ -3392,8 +3393,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _dispatcherLogger.info(_dispatcherThread.getName() + " started"); } - UnprocessedMessage message; - // Allow disptacher to start stopped synchronized (_lock) { |