diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java | 110 |
1 files changed, 18 insertions, 92 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 0e872170aa..cd049c24a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.client.protocol; -import org.apache.commons.lang.StringUtils; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.WriteFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +28,7 @@ import javax.security.sasl.SaslClient; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang.StringUtils; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -65,10 +61,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected static final String SASL_CLIENT = "SASLClient"; - protected final IoSession _minaProtocolSession; - - protected WriteFuture _lastWriteFuture; - /** * The handler from which this session was created and which is used to handle protocol events. We send failover * events to the handler. @@ -102,28 +94,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected final AMQConnection _connection; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private ConnectionTuneParameters _connectionTuneParameters; - public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) - { - _protocolHandler = protocolHandler; - _minaProtocolSession = protocolSession; - _minaProtocolSession.setAttachment(this); - // properties of the connection are made available to the event handlers - _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); - // fixme - real value needed - _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - _protocolVersion = connection.getProtocolVersion(); - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), - this); - _connection = connection; + private SaslClient _saslClient; - } + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { - _protocolHandler = protocolHandler; - _minaProtocolSession = null; + _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); @@ -134,7 +113,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { // start the process of setting up the connection. This is the first place that // data is written to the server. - _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion())); + _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion())); } public String getClientID() @@ -175,14 +154,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return getAMQConnection().getPassword(); } - public IoSession getIoSession() - { - return _minaProtocolSession; - } - public SaslClient getSaslClient() { - return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT); + return _saslClient; } /** @@ -192,28 +166,21 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public void setSaslClient(SaslClient client) { - if (client == null) - { - _minaProtocolSession.removeAttribute(SASL_CLIENT); - } - else - { - _minaProtocolSession.setAttribute(SASL_CLIENT, client); - } + _saslClient = client; } public ConnectionTuneParameters getConnectionTuneParameters() { - return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS); + return _connectionTuneParameters; } public void setConnectionTuneParameters(ConnectionTuneParameters params) { - _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params); + _connectionTuneParameters = params; AMQConnection con = getAMQConnection(); con.setMaximumChannelCount(params.getChannelMax()); con.setMaximumFrameSize(params.getFrameMax()); - initHeartbeats((int) params.getHeartbeat()); + _protocolHandler.initHeartbeats((int) params.getHeartbeat()); } /** @@ -335,21 +302,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public void writeFrame(AMQDataBlock frame) { - writeFrame(frame, false); + _protocolHandler.writeFrame(frame); } public void writeFrame(AMQDataBlock frame, boolean wait) { - WriteFuture f = _minaProtocolSession.write(frame); - if (wait) - { - // fixme -- time out? - f.join(); - } - else - { - _lastWriteFuture = f; - } + _protocolHandler.writeFrame(frame, wait); } /** @@ -407,33 +365,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public AMQConnection getAMQConnection() { - return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION); + return _connection; } - public void closeProtocolSession() + public void closeProtocolSession() throws AMQException { - closeProtocolSession(true); - } - - public void closeProtocolSession(boolean waitLast) - { - _logger.debug("Waiting for last write to join."); - if (waitLast && (_lastWriteFuture != null)) - { - _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - } - - _logger.debug("Closing protocol session"); - - final CloseFuture future = _minaProtocolSession.close(); - - // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED - // then wait for the connection to close. - // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any - // error now shouldn't matter. - - _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + _protocolHandler.closeConnection(0); } public void failover(String host, int port) @@ -449,22 +386,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession id = _queueId++; } // get rid of / and : and ; from address for spec conformance - String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", ""); + String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", ""); return new AMQShortString("tmp_" + localAddress + "_" + id); } - /** @param delay delay in seconds (not ms) */ - void initHeartbeats(int delay) - { - if (delay > 0) - { - _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); - _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay)); - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); - } - } - public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag) { final AMQSession session = getSession(channelId); @@ -530,7 +456,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException { - _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession); + _protocolHandler.methodBodyReceived(channel, amqMethodBody); } public void notifyError(Exception error) |