summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java74
1 files changed, 62 insertions, 12 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index b48adbdb08..6a5cc62bfc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -74,8 +74,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
protected final AMQProtocolHandler _protocolHandler;
- /** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
@@ -83,7 +81,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
* Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
* first) with the subsequent content header and content bodies.
*/
- protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+ private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>();
+ private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
/** Counter to ensure unique queue names */
protected int _queueId = 1;
@@ -101,7 +100,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private MethodDispatcher _methodDispatcher;
- private final AMQConnection _connection;
+ private final AMQConnection _connection;
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{
@@ -230,14 +230,24 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*
* @throws AMQException if this was not expected
*/
- public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
+ public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
{
- _channelId2UnprocessedMsgMap.put(message.getChannelId(), message);
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = message;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.put(channelId, message);
+ }
}
- public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
+ public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
{
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+ final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
+ : _channelId2UnprocessedMsgMap.get(channelId);
+
+
if (msg == null)
{
throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
@@ -256,9 +266,19 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
}
- public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
+ public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException
{
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+ UnprocessedMessage msg;
+ final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
+ if(fastAccess)
+ {
+ msg = _channelId2UnprocessedMsgArray[channelId];
+ }
+ else
+ {
+ msg = _channelId2UnprocessedMsgMap.get(channelId);
+ }
+
if (msg == null)
{
throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
@@ -266,7 +286,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
if (msg.getContentHeader() == null)
{
- _channelId2UnprocessedMsgMap.remove(channelId);
+ if(fastAccess)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = null;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
throw new AMQException("Error: received content body without having received a ContentHeader frame first");
}
@@ -286,6 +313,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
}
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
+ {
+
+ }
+
/**
* Deliver a message to the appropriate session, removing the unprocessed message from our map
*
@@ -296,7 +328,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
AMQSession session = getSession(channelId);
session.messageReceived(msg);
- _channelId2UnprocessedMsgMap.remove(channelId);
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = null;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
}
protected AMQSession getSession(int channelId)
@@ -486,4 +525,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_methodDispatcher = methodDispatcher;
}
+
+ public void setFlowControl(final int channelId, final boolean active)
+ {
+ final AMQSession session = getSession(channelId);
+ session.setFlowControl(active);
+ }
+
+ public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
+ {
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
+ }
}