diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-66765100f4257159622cefe57bed50125a5ad017.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java | 467 |
1 files changed, 467 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java new file mode 100644 index 0000000000..5b7d272506 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -0,0 +1,467 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.protocol; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.security.sasl.SaslClient; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.ConnectionTuneParameters; +import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.UnprocessedMessage_0_8; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.framing.*; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; + +/** + * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol + * session is still available but clients should not use it to obtain session attributes. + */ +public class AMQProtocolSession implements AMQVersionAwareProtocolSession +{ + protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2; + + protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class); + + public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived"; + + //Usable channels are numbered 1 to <ChannelMax> + public static final int MAX_CHANNEL_MAX = 0xFFFF; + public static final int MIN_USABLE_CHANNEL_NUM = 1; + + protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters"; + + protected static final String AMQ_CONNECTION = "AMQConnection"; + + protected static final String SASL_CLIENT = "SASLClient"; + + /** + * The handler from which this session was created and which is used to handle protocol events. We send failover + * events to the handler. + */ + protected final AMQProtocolHandler _protocolHandler; + + /** Maps from the channel id to the AMQSession that it represents. */ + protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>(); + + protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); + + /** + * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives + * first) with the subsequent content header and content bodies. + */ + private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>(); + private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16]; + + /** Counter to ensure unique queue names */ + protected int _queueId = 1; + protected final Object _queueIdLock = new Object(); + + private ProtocolVersion _protocolVersion; +// private VersionSpecificRegistry _registry = +// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion()); + + private MethodRegistry _methodRegistry = + MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion()); + + private MethodDispatcher _methodDispatcher; + + protected final AMQConnection _connection; + + private ConnectionTuneParameters _connectionTuneParameters; + + private SaslClient _saslClient; + + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + + public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) + { + _protocolHandler = protocolHandler; + _protocolVersion = connection.getProtocolVersion(); + _logger.info("Using ProtocolVersion for Session:" + _protocolVersion); + _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), + this); + _connection = connection; + } + + public void init() + { + // start the process of setting up the connection. This is the first place that + // data is written to the server. + _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion())); + } + + public String getClientID() + { + try + { + return getAMQConnection().getClientID(); + } + catch (JMSException e) + { + // we never throw a JMSException here + return null; + } + } + + public void setClientID(String clientID) throws JMSException + { + getAMQConnection().setClientID(clientID); + } + + public AMQStateManager getStateManager() + { + return _protocolHandler.getStateManager(); + } + + public String getVirtualHost() + { + return getAMQConnection().getVirtualHost(); + } + + public SaslClient getSaslClient() + { + return _saslClient; + } + + /** + * Store the SASL client currently being used for the authentication handshake + * + * @param client if non-null, stores this in the session. if null clears any existing client being stored + */ + public void setSaslClient(SaslClient client) + { + _saslClient = client; + } + + public ConnectionTuneParameters getConnectionTuneParameters() + { + return _connectionTuneParameters; + } + + public void setConnectionTuneParameters(ConnectionTuneParameters params) + { + _connectionTuneParameters = params; + AMQConnection con = getAMQConnection(); + + con.setMaximumChannelCount(params.getChannelMax()); + con.setMaximumFrameSize(params.getFrameMax()); + _protocolHandler.initHeartbeats((int) params.getHeartbeat()); + } + + /** + * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA + * dispatcher thread. + * + * @param message + * + * @throws AMQException if this was not expected + */ + public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException + { + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + _channelId2UnprocessedMsgArray[channelId] = message; + } + else + { + _channelId2UnprocessedMsgMap.put(channelId, message); + } + } + + public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException + { + final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId] + : _channelId2UnprocessedMsgMap.get(channelId)); + + if (msg == null) + { + throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null); + } + + if (msg.getContentHeader() != null) + { + throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null); + } + + msg.setContentHeader(contentHeader); + if (contentHeader.bodySize == 0) + { + deliverMessageToAMQSession(channelId, msg); + } + } + + public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException + { + UnprocessedMessage_0_8 msg; + final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0; + if (fastAccess) + { + msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId]; + } + else + { + msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId); + } + + if (msg == null) + { + throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null); + } + + if (msg.getContentHeader() == null) + { + if (fastAccess) + { + _channelId2UnprocessedMsgArray[channelId] = null; + } + else + { + _channelId2UnprocessedMsgMap.remove(channelId); + } + throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null); + } + + msg.receiveBody(contentBody); + + if (msg.isAllBodyDataReceived()) + { + deliverMessageToAMQSession(channelId, msg); + } + } + + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException + { + + } + + /** + * Deliver a message to the appropriate session, removing the unprocessed message from our map + * + * @param channelId the channel id the message should be delivered to + * @param msg the message + */ + private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg) + { + AMQSession session = getSession(channelId); + session.messageReceived(msg); + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + _channelId2UnprocessedMsgArray[channelId] = null; + } + else + { + _channelId2UnprocessedMsgMap.remove(channelId); + } + } + + protected AMQSession getSession(int channelId) + { + return _connection.getSession(channelId); + } + + /** + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). + * + * @param frame the frame to write + */ + public void writeFrame(AMQDataBlock frame) + { + _protocolHandler.writeFrame(frame); + } + + public void writeFrame(AMQDataBlock frame, boolean wait) + { + _protocolHandler.writeFrame(frame, wait); + } + + /** + * Starts the process of closing a session + * + * @param session the AMQSession being closed + */ + public void closeSession(AMQSession session) + { + _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); + final int channelId = session.getChannelId(); + if (channelId <= 0) + { + throw new IllegalArgumentException("Attempt to close a channel with id < 0"); + } + // we need to know when a channel is closing so that we can respond + // with a channel.close frame when we receive any other type of frame + // on that channel + _closingChannels.putIfAbsent(channelId, session); + } + + /** + * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is + * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if + * appropriate. + * + * @param channelId the id of the channel (session) + * + * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if + * the channel close is just the server responding to the client's earlier request to close the channel. + */ + public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException + { + + // if this is not a response to an earlier request to close the channel + if (_closingChannels.remove(channelId) == null) + { + final AMQSession session = getSession(channelId); + try + { + session.closed(new AMQException(code, text, null)); + } + catch (JMSException e) + { + throw new AMQException(null, "JMSException received while closing session", e); + } + + return true; + } + else + { + return false; + } + } + + public AMQConnection getAMQConnection() + { + return _connection; + } + + public void closeProtocolSession() throws AMQException + { + _protocolHandler.closeConnection(0); + } + + public void failover(String host, int port) + { + _protocolHandler.failover(host, port); + } + + protected AMQShortString generateQueueName() + { + int id; + synchronized (_queueIdLock) + { + id = _queueId++; + } + // convert '.', '/', ':' and ';' to single '_', for spec compliance and readability + String localAddress = _protocolHandler.getLocalAddress().toString().replaceAll("[./:;]", "_"); + String queueName = "tmp_" + localAddress + "_" + id; + return new AMQShortString(queueName.replaceAll("_+", "_")); + } + + public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag) + { + final AMQSession session = getSession(channelId); + + session.confirmConsumerCancelled(consumerTag.toIntValue()); + } + + public void setProtocolVersion(final ProtocolVersion pv) + { + _logger.info("Setting ProtocolVersion to :" + pv); + _protocolVersion = pv; + _methodRegistry = MethodRegistry.getMethodRegistry(pv); + _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); + } + + public byte getProtocolMinorVersion() + { + return _protocolVersion.getMinorVersion(); + } + + public byte getProtocolMajorVersion() + { + return _protocolVersion.getMajorVersion(); + } + + public ProtocolVersion getProtocolVersion() + { + return _protocolVersion; + } + + public MethodRegistry getMethodRegistry() + { + return _methodRegistry; + } + + public MethodDispatcher getMethodDispatcher() + { + return _methodDispatcher; + } + + public void setTicket(int ticket, int channelId) + { + final AMQSession session = getSession(channelId); + session.setTicket(ticket); + } + + public void setMethodDispatcher(MethodDispatcher methodDispatcher) + { + _methodDispatcher = methodDispatcher; + } + + public void setFlowControl(final int channelId, final boolean active) + { + final AMQSession session = getSession(channelId); + session.setFlowControl(active); + } + + public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException + { + _protocolHandler.methodBodyReceived(channel, amqMethodBody); + } + + public void notifyError(Exception error) + { + _protocolHandler.propagateExceptionToAllWaiters(error); + } + + public void setSender(Sender<java.nio.ByteBuffer> sender) + { + // No-op, interface munging + } + + + @Override + public String toString() + { + return "AMQProtocolSession[" + _connection + ']'; + } +} |