summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java170
1 files changed, 13 insertions, 157 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index b483406949..af0d8a3a1d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -26,15 +26,12 @@ import java.net.ConnectException;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@@ -84,153 +81,6 @@ import org.slf4j.LoggerFactory;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
- public static final class ChannelToSessionMap
- {
- private final AMQSession[] _fastAccessSessions = new AMQSession[16];
- private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
- private int _size = 0;
- private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
- private AtomicInteger _idFactory = new AtomicInteger(0);
- private int _maxChannelID;
- private boolean _cycledIds;
-
- public AMQSession get(int channelId)
- {
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- return _fastAccessSessions[channelId];
- }
- else
- {
- return _slowAccessSessions.get(channelId);
- }
- }
-
- public AMQSession put(int channelId, AMQSession session)
- {
- AMQSession oldVal;
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- oldVal = _fastAccessSessions[channelId];
- _fastAccessSessions[channelId] = session;
- }
- else
- {
- oldVal = _slowAccessSessions.put(channelId, session);
- }
- if ((oldVal != null) && (session == null))
- {
- _size--;
- }
- else if ((oldVal == null) && (session != null))
- {
- _size++;
- }
-
- return session;
-
- }
-
- public AMQSession remove(int channelId)
- {
- AMQSession session;
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- session = _fastAccessSessions[channelId];
- _fastAccessSessions[channelId] = null;
- }
- else
- {
- session = _slowAccessSessions.remove(channelId);
- }
-
- if (session != null)
- {
- _size--;
- }
- return session;
-
- }
-
- public Collection<AMQSession> values()
- {
- ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
-
- for (int i = 0; i < 16; i++)
- {
- if (_fastAccessSessions[i] != null)
- {
- values.add(_fastAccessSessions[i]);
- }
- }
- values.addAll(_slowAccessSessions.values());
-
- return values;
- }
-
- public int size()
- {
- return _size;
- }
-
- public void clear()
- {
- _size = 0;
- _slowAccessSessions.clear();
- for (int i = 0; i < 16; i++)
- {
- _fastAccessSessions[i] = null;
- }
- }
-
- /*
- * Synchronized on whole method so that we don't need to consider the
- * increment-then-reset path in too much detail
- */
- public synchronized int getNextChannelId()
- {
- int id = 0;
- if (!_cycledIds)
- {
- id = _idFactory.incrementAndGet();
- if (id == _maxChannelID)
- {
- _cycledIds = true;
- _idFactory.set(0); // Go back to the start
- }
- }
- else
- {
- boolean done = false;
- while (!done)
- {
- // Needs to work second time through
- id = _idFactory.incrementAndGet();
- if (id > _maxChannelID)
- {
- _idFactory.set(0);
- id = _idFactory.incrementAndGet();
- }
- if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- done = (_fastAccessSessions[id] == null);
- }
- else
- {
- done = (!_slowAccessSessions.keySet().contains(id));
- }
- }
- }
-
- return id;
- }
-
- public void setMaxChannelID(int maxChannelID)
- {
- _maxChannelID = maxChannelID;
- }
- }
-
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
@@ -244,9 +94,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
- * and we must prevent the client from opening too many. Zero means unlimited.
+ * and we must prevent the client from opening too many.
*/
- protected long _maximumChannelCount;
+ private long _maximumChannelCount;
/** The maximum size of frame supported by the server */
private long _maximumFrameSize;
@@ -489,7 +339,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_delegate = new AMQConnectionDelegate_0_10(this);
}
- _sessions.setMaxChannelID(_delegate.getMaxChannelID());
if (_logger.isInfoEnabled())
{
@@ -570,8 +419,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
-
if (_logger.isDebugEnabled())
{
_logger.debug("Are we connected:" + _connected);
@@ -579,6 +426,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (!_connected)
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+ }
+
String message = null;
if (connectionException != null)
@@ -620,6 +472,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new AMQConnectionFailureException(message, connectionException);
}
+ _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+
+ _sessions.setMaxChannelID(_delegate.getMaxChannelID());
+ _sessions.setMinChannelID(_delegate.getMinChannelID());
+
_connectionMetaData = new QpidConnectionMetaData(this);
}
@@ -647,7 +504,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
- _sessions.setMaxChannelID(_delegate.getMaxChannelID());
//Update our session to use this new protocol version
_protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
@@ -898,7 +754,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public boolean channelLimitReached()
{
- return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount);
+ return _sessions.size() >= _maximumChannelCount;
}
public String getClientID() throws JMSException