summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java110
1 files changed, 18 insertions, 92 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 0e872170aa..cd049c24a1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.commons.lang.StringUtils;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +28,7 @@ import javax.security.sasl.SaslClient;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang.StringUtils;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -65,10 +61,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected static final String SASL_CLIENT = "SASLClient";
- protected final IoSession _minaProtocolSession;
-
- protected WriteFuture _lastWriteFuture;
-
/**
* The handler from which this session was created and which is used to handle protocol events. We send failover
* events to the handler.
@@ -102,28 +94,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected final AMQConnection _connection;
- private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+ private ConnectionTuneParameters _connectionTuneParameters;
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
- {
- _protocolHandler = protocolHandler;
- _minaProtocolSession = protocolSession;
- _minaProtocolSession.setAttachment(this);
- // properties of the connection are made available to the event handlers
- _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
- // fixme - real value needed
- _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- _protocolVersion = connection.getProtocolVersion();
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
- this);
- _connection = connection;
+ private SaslClient _saslClient;
- }
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
- _protocolHandler = protocolHandler;
- _minaProtocolSession = null;
+ _protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
this);
@@ -134,7 +113,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
- _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
+ _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
}
public String getClientID()
@@ -175,14 +154,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return getAMQConnection().getPassword();
}
- public IoSession getIoSession()
- {
- return _minaProtocolSession;
- }
-
public SaslClient getSaslClient()
{
- return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+ return _saslClient;
}
/**
@@ -192,28 +166,21 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void setSaslClient(SaslClient client)
{
- if (client == null)
- {
- _minaProtocolSession.removeAttribute(SASL_CLIENT);
- }
- else
- {
- _minaProtocolSession.setAttribute(SASL_CLIENT, client);
- }
+ _saslClient = client;
}
public ConnectionTuneParameters getConnectionTuneParameters()
{
- return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS);
+ return _connectionTuneParameters;
}
public void setConnectionTuneParameters(ConnectionTuneParameters params)
{
- _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params);
+ _connectionTuneParameters = params;
AMQConnection con = getAMQConnection();
con.setMaximumChannelCount(params.getChannelMax());
con.setMaximumFrameSize(params.getFrameMax());
- initHeartbeats((int) params.getHeartbeat());
+ _protocolHandler.initHeartbeats((int) params.getHeartbeat());
}
/**
@@ -335,21 +302,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void writeFrame(AMQDataBlock frame)
{
- writeFrame(frame, false);
+ _protocolHandler.writeFrame(frame);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- WriteFuture f = _minaProtocolSession.write(frame);
- if (wait)
- {
- // fixme -- time out?
- f.join();
- }
- else
- {
- _lastWriteFuture = f;
- }
+ _protocolHandler.writeFrame(frame, wait);
}
/**
@@ -407,33 +365,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public AMQConnection getAMQConnection()
{
- return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
+ return _connection;
}
- public void closeProtocolSession()
+ public void closeProtocolSession() throws AMQException
{
- closeProtocolSession(true);
- }
-
- public void closeProtocolSession(boolean waitLast)
- {
- _logger.debug("Waiting for last write to join.");
- if (waitLast && (_lastWriteFuture != null))
- {
- _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- }
-
- _logger.debug("Closing protocol session");
-
- final CloseFuture future = _minaProtocolSession.close();
-
- // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
- // then wait for the connection to close.
- // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
- // error now shouldn't matter.
-
- _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
- future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ _protocolHandler.closeConnection(0);
}
public void failover(String host, int port)
@@ -449,22 +386,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
id = _queueId++;
}
// get rid of / and : and ; from address for spec conformance
- String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
+ String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", "");
return new AMQShortString("tmp_" + localAddress + "_" + id);
}
- /** @param delay delay in seconds (not ms) */
- void initHeartbeats(int delay)
- {
- if (delay > 0)
- {
- _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
- _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
- }
- }
-
public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
{
final AMQSession session = getSession(channelId);
@@ -530,7 +456,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
{
- _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody);
}
public void notifyError(Exception error)