summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-09-16 10:06:55 +0000
committerAidan Skinner <aidan@apache.org>2009-09-16 10:06:55 +0000
commit9c4ecc45da929750ff7f0e0a5d7ada4e674b9105 (patch)
tree3834f1b7f1fe3fbdd632a9c78f6295e54595abc5 /qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
parentc1ebe66bfab328c5192a35c21ea290b5c45f40f5 (diff)
downloadqpid-python-9c4ecc45da929750ff7f0e0a5d7ada4e674b9105.tar.gz
QPID-2105: Make NetworkDriver.open use a SSLContextFactory, not an SSLEngine.
Allow an existing SocketConnector to be passed into a MINANetworkDriver, for use with the ExistingSocket bit of TransportConnection. Move the ExistingSocket stuff to one place, use MINANetworkDriver in TransportConnection and make AMQProtocolHandler implement ProtocolEngine. Remove MINA specific gubbins from AMQProtocolHandler and AMQProtocolSession. Move fireAsynchEvent to Job Add getLocalAddress to AMQProtocolEngine Move TestNetworkDriver to common Use correct class for logger in AMQProtocolEngine Check the exception is thrown properly in SimpleACLTest, make it a little less prone to obscure race conditions. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815704 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java110
1 files changed, 18 insertions, 92 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 0e872170aa..cd049c24a1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.commons.lang.StringUtils;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +28,7 @@ import javax.security.sasl.SaslClient;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang.StringUtils;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -65,10 +61,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected static final String SASL_CLIENT = "SASLClient";
- protected final IoSession _minaProtocolSession;
-
- protected WriteFuture _lastWriteFuture;
-
/**
* The handler from which this session was created and which is used to handle protocol events. We send failover
* events to the handler.
@@ -102,28 +94,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected final AMQConnection _connection;
- private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+ private ConnectionTuneParameters _connectionTuneParameters;
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
- {
- _protocolHandler = protocolHandler;
- _minaProtocolSession = protocolSession;
- _minaProtocolSession.setAttachment(this);
- // properties of the connection are made available to the event handlers
- _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
- // fixme - real value needed
- _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- _protocolVersion = connection.getProtocolVersion();
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
- this);
- _connection = connection;
+ private SaslClient _saslClient;
- }
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
- _protocolHandler = protocolHandler;
- _minaProtocolSession = null;
+ _protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
this);
@@ -134,7 +113,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
- _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
+ _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
}
public String getClientID()
@@ -175,14 +154,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return getAMQConnection().getPassword();
}
- public IoSession getIoSession()
- {
- return _minaProtocolSession;
- }
-
public SaslClient getSaslClient()
{
- return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+ return _saslClient;
}
/**
@@ -192,28 +166,21 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void setSaslClient(SaslClient client)
{
- if (client == null)
- {
- _minaProtocolSession.removeAttribute(SASL_CLIENT);
- }
- else
- {
- _minaProtocolSession.setAttribute(SASL_CLIENT, client);
- }
+ _saslClient = client;
}
public ConnectionTuneParameters getConnectionTuneParameters()
{
- return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS);
+ return _connectionTuneParameters;
}
public void setConnectionTuneParameters(ConnectionTuneParameters params)
{
- _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params);
+ _connectionTuneParameters = params;
AMQConnection con = getAMQConnection();
con.setMaximumChannelCount(params.getChannelMax());
con.setMaximumFrameSize(params.getFrameMax());
- initHeartbeats((int) params.getHeartbeat());
+ _protocolHandler.initHeartbeats((int) params.getHeartbeat());
}
/**
@@ -335,21 +302,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void writeFrame(AMQDataBlock frame)
{
- writeFrame(frame, false);
+ _protocolHandler.writeFrame(frame);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- WriteFuture f = _minaProtocolSession.write(frame);
- if (wait)
- {
- // fixme -- time out?
- f.join();
- }
- else
- {
- _lastWriteFuture = f;
- }
+ _protocolHandler.writeFrame(frame, wait);
}
/**
@@ -407,33 +365,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public AMQConnection getAMQConnection()
{
- return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
+ return _connection;
}
- public void closeProtocolSession()
+ public void closeProtocolSession() throws AMQException
{
- closeProtocolSession(true);
- }
-
- public void closeProtocolSession(boolean waitLast)
- {
- _logger.debug("Waiting for last write to join.");
- if (waitLast && (_lastWriteFuture != null))
- {
- _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- }
-
- _logger.debug("Closing protocol session");
-
- final CloseFuture future = _minaProtocolSession.close();
-
- // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
- // then wait for the connection to close.
- // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
- // error now shouldn't matter.
-
- _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
- future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ _protocolHandler.closeConnection(0);
}
public void failover(String host, int port)
@@ -449,22 +386,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
id = _queueId++;
}
// get rid of / and : and ; from address for spec conformance
- String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
+ String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", "");
return new AMQShortString("tmp_" + localAddress + "_" + id);
}
- /** @param delay delay in seconds (not ms) */
- void initHeartbeats(int delay)
- {
- if (delay > 0)
- {
- _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
- _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
- }
- }
-
public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
{
final AMQSession session = getSession(channelId);
@@ -530,7 +456,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
{
- _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody);
}
public void notifyError(Exception error)