diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java | 74 |
1 files changed, 62 insertions, 12 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index b48adbdb08..6a5cc62bfc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -74,8 +74,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ protected final AMQProtocolHandler _protocolHandler; - /** Maps from the channel id to the AMQSession that it represents. */ - protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>(); protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); @@ -83,7 +81,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives * first) with the subsequent content header and content bodies. */ - protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); + private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>(); + private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16]; /** Counter to ensure unique queue names */ protected int _queueId = 1; @@ -101,7 +100,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private MethodDispatcher _methodDispatcher; - private final AMQConnection _connection; + private final AMQConnection _connection; + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) { @@ -230,14 +230,24 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * * @throws AMQException if this was not expected */ - public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException + public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException { - _channelId2UnprocessedMsgMap.put(message.getChannelId(), message); + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + _channelId2UnprocessedMsgArray[channelId] = message; + } + else + { + _channelId2UnprocessedMsgMap.put(channelId, message); + } } - public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException + public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); + final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId] + : _channelId2UnprocessedMsgMap.get(channelId); + + if (msg == null) { throw new AMQException("Error: received content header without having received a BasicDeliver frame first"); @@ -256,9 +266,19 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } - public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException + public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); + UnprocessedMessage msg; + final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0; + if(fastAccess) + { + msg = _channelId2UnprocessedMsgArray[channelId]; + } + else + { + msg = _channelId2UnprocessedMsgMap.get(channelId); + } + if (msg == null) { throw new AMQException("Error: received content body without having received a JMSDeliver frame first"); @@ -266,7 +286,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession if (msg.getContentHeader() == null) { - _channelId2UnprocessedMsgMap.remove(channelId); + if(fastAccess) + { + _channelId2UnprocessedMsgArray[channelId] = null; + } + else + { + _channelId2UnprocessedMsgMap.remove(channelId); + } throw new AMQException("Error: received content body without having received a ContentHeader frame first"); } @@ -286,6 +313,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException + { + + } + /** * Deliver a message to the appropriate session, removing the unprocessed message from our map * @@ -296,7 +328,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { AMQSession session = getSession(channelId); session.messageReceived(msg); - _channelId2UnprocessedMsgMap.remove(channelId); + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + _channelId2UnprocessedMsgArray[channelId] = null; + } + else + { + _channelId2UnprocessedMsgMap.remove(channelId); + } } protected AMQSession getSession(int channelId) @@ -486,4 +525,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _methodDispatcher = methodDispatcher; } + + public void setFlowControl(final int channelId, final boolean active) + { + final AMQSession session = getSession(channelId); + session.setFlowControl(active); + } + + public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException + { + _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession); + } } |