diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 170 |
1 files changed, 13 insertions, 157 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b483406949..af0d8a3a1d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -26,15 +26,12 @@ import java.net.ConnectException; import java.net.UnknownHostException; import java.nio.channels.UnresolvedAddressException; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; @@ -84,153 +81,6 @@ import org.slf4j.LoggerFactory; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { - public static final class ChannelToSessionMap - { - private final AMQSession[] _fastAccessSessions = new AMQSession[16]; - private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); - private int _size = 0; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; - private AtomicInteger _idFactory = new AtomicInteger(0); - private int _maxChannelID; - private boolean _cycledIds; - - public AMQSession get(int channelId) - { - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - return _fastAccessSessions[channelId]; - } - else - { - return _slowAccessSessions.get(channelId); - } - } - - public AMQSession put(int channelId, AMQSession session) - { - AMQSession oldVal; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - oldVal = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = session; - } - else - { - oldVal = _slowAccessSessions.put(channelId, session); - } - if ((oldVal != null) && (session == null)) - { - _size--; - } - else if ((oldVal == null) && (session != null)) - { - _size++; - } - - return session; - - } - - public AMQSession remove(int channelId) - { - AMQSession session; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - session = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = null; - } - else - { - session = _slowAccessSessions.remove(channelId); - } - - if (session != null) - { - _size--; - } - return session; - - } - - public Collection<AMQSession> values() - { - ArrayList<AMQSession> values = new ArrayList<AMQSession>(size()); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessSessions[i] != null) - { - values.add(_fastAccessSessions[i]); - } - } - values.addAll(_slowAccessSessions.values()); - - return values; - } - - public int size() - { - return _size; - } - - public void clear() - { - _size = 0; - _slowAccessSessions.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessSessions[i] = null; - } - } - - /* - * Synchronized on whole method so that we don't need to consider the - * increment-then-reset path in too much detail - */ - public synchronized int getNextChannelId() - { - int id = 0; - if (!_cycledIds) - { - id = _idFactory.incrementAndGet(); - if (id == _maxChannelID) - { - _cycledIds = true; - _idFactory.set(0); // Go back to the start - } - } - else - { - boolean done = false; - while (!done) - { - // Needs to work second time through - id = _idFactory.incrementAndGet(); - if (id > _maxChannelID) - { - _idFactory.set(0); - id = _idFactory.incrementAndGet(); - } - if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) - { - done = (_fastAccessSessions[id] == null); - } - else - { - done = (!_slowAccessSessions.keySet().contains(id)); - } - } - } - - return id; - } - - public void setMaxChannelID(int maxChannelID) - { - _maxChannelID = maxChannelID; - } - } - private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); @@ -244,9 +94,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session - * and we must prevent the client from opening too many. Zero means unlimited. + * and we must prevent the client from opening too many. */ - protected long _maximumChannelCount; + private long _maximumChannelCount; /** The maximum size of frame supported by the server */ private long _maximumFrameSize; @@ -489,7 +339,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _delegate = new AMQConnectionDelegate_0_10(this); } - _sessions.setMaxChannelID(_delegate.getMaxChannelID()); if (_logger.isInfoEnabled()) { @@ -570,8 +419,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); - if (_logger.isDebugEnabled()) { _logger.debug("Are we connected:" + _connected); @@ -579,6 +426,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (!_connected) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + } + String message = null; if (connectionException != null) @@ -620,6 +472,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new AMQConnectionFailureException(message, connectionException); } + _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); + _sessions.setMinChannelID(_delegate.getMinChannelID()); + _connectionMetaData = new QpidConnectionMetaData(this); } @@ -647,7 +504,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); - _sessions.setMaxChannelID(_delegate.getMaxChannelID()); //Update our session to use this new protocol version _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion()); @@ -898,7 +754,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean channelLimitReached() { - return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount); + return _sessions.size() >= _maximumChannelCount; } public String getClientID() throws JMSException |