summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java102
1 files changed, 77 insertions, 25 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index b7253e6e9c..af57fd98fc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -20,11 +20,8 @@
*/
package org.apache.qpid.client.protocol;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.jms.JMSException;
-import javax.security.sasl.SaslClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
@@ -48,8 +45,11 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.security.sasl.SaslClient;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
/**
* Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -73,16 +73,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected static final String SASL_CLIENT = "SASLClient";
- /**
- * The handler from which this session was created and which is used to handle protocol events. We send failover
- * events to the handler.
- */
- protected final AMQProtocolHandler _protocolHandler;
+ private final AMQProtocolHandler _protocolHandler;
- /** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
+ private ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
- protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
+ private ConcurrentMap _closingChannels = new ConcurrentHashMap();
/**
* Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
@@ -91,20 +86,17 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>();
private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
- /** Counter to ensure unique queue names */
- protected int _queueId = 1;
- protected final Object _queueIdLock = new Object();
+ private int _queueId = 1;
+ private final Object _queueIdLock = new Object();
private ProtocolVersion _protocolVersion;
-// private VersionSpecificRegistry _registry =
-// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
private MethodRegistry _methodRegistry =
MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
private MethodDispatcher _methodDispatcher;
- protected final AMQConnection _connection;
+ private final AMQConnection _connection;
private ConnectionTuneParameters _connectionTuneParameters;
@@ -116,7 +108,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
- _logger.info("Using ProtocolVersion for Session:" + _protocolVersion);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Using ProtocolVersion for Session:" + _protocolVersion);
+ }
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
this);
_connection = connection;
@@ -223,7 +218,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
msg.setContentHeader(contentHeader);
- if (contentHeader.bodySize == 0)
+ if (contentHeader.getBodySize() == 0)
{
deliverMessageToAMQSession(channelId, msg);
}
@@ -310,7 +305,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void closeSession(AMQSession session)
{
- _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
+ }
final int channelId = session.getChannelId();
if (channelId <= 0)
{
@@ -401,7 +399,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void setProtocolVersion(final ProtocolVersion pv)
{
- _logger.info("Setting ProtocolVersion to :" + pv);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting ProtocolVersion to :" + pv);
+ }
_protocolVersion = pv;
_methodRegistry = MethodRegistry.getMethodRegistry(pv);
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
@@ -470,4 +471,55 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
return "AMQProtocolSession[" + _connection + ']';
}
+
+ /**
+ * The handler from which this session was created and which is used to handle protocol events. We send failover
+ * events to the handler.
+ */
+ protected AMQProtocolHandler getProtocolHandler()
+ {
+ return _protocolHandler;
+ }
+
+ /** Maps from the channel id to the AMQSession that it represents. */
+ protected ConcurrentMap<Integer, AMQSession> getChannelId2SessionMap()
+ {
+ return _channelId2SessionMap;
+ }
+
+ protected void setChannelId2SessionMap(ConcurrentMap<Integer, AMQSession> channelId2SessionMap)
+ {
+ _channelId2SessionMap = channelId2SessionMap;
+ }
+
+ protected ConcurrentMap getClosingChannels()
+ {
+ return _closingChannels;
+ }
+
+ protected void setClosingChannels(ConcurrentMap closingChannels)
+ {
+ _closingChannels = closingChannels;
+ }
+
+ /** Counter to ensure unique queue names */
+ protected int getQueueId()
+ {
+ return _queueId;
+ }
+
+ protected void setQueueId(int queueId)
+ {
+ _queueId = queueId;
+ }
+
+ protected Object getQueueIdLock()
+ {
+ return _queueIdLock;
+ }
+
+ protected AMQConnection getConnection()
+ {
+ return _connection;
+ }
}