diff options
Diffstat (limited to 'java/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java')
-rw-r--r-- | java/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java | 409 |
1 files changed, 0 insertions, 409 deletions
diff --git a/java/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java deleted file mode 100644 index a4ed89719b..0000000000 --- a/java/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ /dev/null @@ -1,409 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.apache.log4j.Logger; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.WriteFuture; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.ConnectionTuneParameters; -import org.apache.qpid.client.message.UnexpectedBodyReceivedException; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; -import org.apache.commons.lang.StringUtils; - -import javax.jms.JMSException; -import javax.security.sasl.SaslClient; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Wrapper for protocol session that provides type-safe access to session attributes. - * - * The underlying protocol session is still available but clients should not - * use it to obtain session attributes. - */ -public class AMQProtocolSession implements ProtocolVersionList -{ - - protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2; - - protected static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); - - public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived"; - - protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters"; - - protected static final String AMQ_CONNECTION = "AMQConnection"; - - protected static final String SASL_CLIENT = "SASLClient"; - - protected final IoSession _minaProtocolSession; - - protected WriteFuture _lastWriteFuture; - - /** - * The handler from which this session was created and which is used to handle protocol events. - * We send failover events to the handler. - */ - protected final AMQProtocolHandler _protocolHandler; - - /** - * Maps from the channel id to the AMQSession that it represents. - */ - protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); - - protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); - - /** - * Maps from a channel id to an unprocessed message. This is used to tie together the - * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies. - */ - protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); - - /** - * Counter to ensure unique queue names - */ - protected int _queueId = 1; - protected final Object _queueIdLock = new Object(); - - /** - * No-arg constructor for use by test subclass - has to initialise final vars - * NOT intended for use other then for test - */ - public AMQProtocolSession() - { - _protocolHandler = null; - _minaProtocolSession = null; - } - - public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) - { - _protocolHandler = protocolHandler; - _minaProtocolSession = protocolSession; - // properties of the connection are made available to the event handlers - _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); - } - - public void init() - { - // start the process of setting up the connection. This is the first place that - // data is written to the server. - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be used - here. */ - int i = pv.length - 1; - _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); - } - - public String getClientID() - { - try - { - return getAMQConnection().getClientID(); - } - catch (JMSException e) - { - // we never throw a JMSException here - return null; - } - } - - public void setClientID(String clientID) throws JMSException - { - getAMQConnection().setClientID(clientID); - } - - public String getVirtualHost() - { - return getAMQConnection().getVirtualHost(); - } - - public String getUsername() - { - return getAMQConnection().getUsername(); - } - - public String getPassword() - { - return getAMQConnection().getPassword(); - } - - public IoSession getIoSession() - { - return _minaProtocolSession; - } - - public SaslClient getSaslClient() - { - return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT); - } - - /** - * Store the SASL client currently being used for the authentication handshake - * @param client if non-null, stores this in the session. if null clears any existing client - * being stored - */ - public void setSaslClient(SaslClient client) - { - if (client == null) - { - _minaProtocolSession.removeAttribute(SASL_CLIENT); - } - else - { - _minaProtocolSession.setAttribute(SASL_CLIENT, client); - } - } - - public ConnectionTuneParameters getConnectionTuneParameters() - { - return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS); - } - - public void setConnectionTuneParameters(ConnectionTuneParameters params) - { - _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params); - AMQConnection con = getAMQConnection(); - con.setMaximumChannelCount(params.getChannelMax()); - con.setMaximumFrameSize(params.getFrameMax()); - initHeartbeats((int) params.getHeartbeat()); - } - - /** - * Callback invoked from the BasicDeliverMethodHandler when a message has been received. - * This is invoked on the MINA dispatcher thread. - * @param message - * @throws AMQException if this was not expected - */ - public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException - { - _channelId2UnprocessedMsgMap.put(message.channelId, message); - } - - public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) - throws AMQException - { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); - if (msg == null) - { - throw new AMQException("Error: received content header without having received a BasicDeliver frame first"); - } - if (msg.contentHeader != null) - { - throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames"); - } - msg.contentHeader = contentHeader; - if (contentHeader.bodySize == 0) - { - deliverMessageToAMQSession(channelId, msg); - } - } - - public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException - { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); - if (msg == null) - { - throw new AMQException("Error: received content body without having received a JMSDeliver frame first"); - } - if (msg.contentHeader == null) - { - _channelId2UnprocessedMsgMap.remove(channelId); - throw new AMQException("Error: received content body without having received a ContentHeader frame first"); - } - try - { - msg.receiveBody(contentBody); - } - catch (UnexpectedBodyReceivedException e) - { - _channelId2UnprocessedMsgMap.remove(channelId); - throw e; - } - if (msg.isAllBodyDataReceived()) - { - deliverMessageToAMQSession(channelId, msg); - } - } - - /** - * Deliver a message to the appropriate session, removing the unprocessed message - * from our map - * @param channelId the channel id the message should be delivered to - * @param msg the message - */ - private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg) - { - AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId); - session.messageReceived(msg); - _channelId2UnprocessedMsgMap.remove(channelId); - } - - /** - * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). - * - * @param frame the frame to write - */ - public void writeFrame(AMQDataBlock frame) - { - writeFrame(frame, false); - } - - public void writeFrame(AMQDataBlock frame, boolean wait) - { - WriteFuture f = _minaProtocolSession.write(frame); - if (wait) - { - f.join(); - } - else - { - _lastWriteFuture = f; - } - } - - public void addSessionByChannel(int channelId, AMQSession session) - { - if (channelId <= 0) - { - throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero"); - } - if (session == null) - { - throw new IllegalArgumentException("Attempt to register a null session"); - } - _logger.debug("Add session with channel id " + channelId); - _channelId2SessionMap.put(channelId, session); - } - - public void removeSessionByChannel(int channelId) - { - if (channelId <= 0) - { - throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero"); - } - _logger.debug("Removing session with channelId " + channelId); - _channelId2SessionMap.remove(channelId); - } - - /** - * Starts the process of closing a session - * @param session the AMQSession being closed - */ - public void closeSession(AMQSession session) - { - _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); - final int channelId = session.getChannelId(); - if (channelId <= 0) - { - throw new IllegalArgumentException("Attempt to close a channel with id < 0"); - } - // we need to know when a channel is closing so that we can respond - // with a channel.close frame when we receive any other type of frame - // on that channel - _closingChannels.putIfAbsent(channelId, session); - } - - /** - * Called from the ChannelClose handler when a channel close frame is received. - * This method decides whether this is a response or an initiation. The latter - * case causes the AMQSession to be closed and an exception to be thrown if - * appropriate. - * @param channelId the id of the channel (session) - * @return true if the client must respond to the server, i.e. if the server - * initiated the channel close, false if the channel close is just the server - * responding to the client's earlier request to close the channel. - */ - public boolean channelClosed(int channelId, int code, String text) - { - final Integer chId = channelId; - // if this is not a response to an earlier request to close the channel - if (_closingChannels.remove(chId) == null) - { - final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId); - session.closed(new AMQException(_logger, code, text)); - return true; - } - else - { - return false; - } - } - - public AMQConnection getAMQConnection() - { - return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION); - } - - public void closeProtocolSession() - { - _logger.debug("Waiting for last write to join."); - if (_lastWriteFuture != null) - { - _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - } - - _logger.debug("Closing protocol session"); - final CloseFuture future = _minaProtocolSession.close(); - future.join(); - } - - public void failover(String host, int port) - { - _protocolHandler.failover(host, port); - } - - protected String generateQueueName() - { - int id; - synchronized(_queueIdLock) - { - id = _queueId++; - } - //get rid of / and : and ; from address for spec conformance - String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(),"/;:",""); - return "tmp_" + localAddress + "_" + id; - } - - /** - * - * @param delay delay in seconds (not ms) - */ - void initHeartbeats(int delay) - { - if (delay > 0) - { - _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); - _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay)); - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); - } - } -} |