summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java125
1 files changed, 125 insertions, 0 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
new file mode 100644
index 0000000000..1de0e7bdfc
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
@@ -0,0 +1,125 @@
+package org.apache.qpid.client.protocol;
+
+import java.util.UUID;
+
+import javax.security.sasl.SaslClient;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.mina.common.IdleStatus;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.Sender;
+
+public class AMQIoTransportProtocolSession extends AMQProtocolSession
+{
+
+ protected Sender<java.nio.ByteBuffer> _ioSender;
+ private SaslClient _saslClient;
+ private ConnectionTuneParameters _connectionTuneParameters;
+
+ public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ {
+ super(protocolHandler, connection);
+ }
+
+ @Override
+ public void closeProtocolSession(boolean waitLast) throws AMQException
+ {
+ _ioSender.close();
+ _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
+ }
+
+ @Override
+ public void init()
+ {
+ _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer());
+ _ioSender.flush();
+ }
+
+ @Override
+ protected AMQShortString generateQueueName()
+ {
+ int id;
+ synchronized (_queueIdLock)
+ {
+ id = _queueId++;
+ }
+ return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id);
+ }
+
+ @Override
+ public AMQConnection getAMQConnection()
+ {
+ return _connection;
+ }
+
+ @Override
+ public SaslClient getSaslClient()
+ {
+ return _saslClient;
+ }
+
+ @Override
+ public void setSaslClient(SaslClient client)
+ {
+ _saslClient = client;
+ }
+
+ /** @param delay delay in seconds (not ms) */
+ @Override
+ void initHeartbeats(int delay)
+ {
+ if (delay > 0)
+ {
+ // FIXME: actually do something here
+ HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+ }
+ }
+
+ @Override
+ public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
+ {
+ // FIXME?
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody, null);
+ }
+
+ @Override
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ _ioSender.send(frame.toNioByteBuffer());
+ if (wait)
+ {
+ _ioSender.flush();
+ }
+ }
+
+ @Override
+ public void setSender(Sender<java.nio.ByteBuffer> sender)
+ {
+ _ioSender = sender;
+ }
+
+ @Override
+ public ConnectionTuneParameters getConnectionTuneParameters()
+ {
+ return _connectionTuneParameters;
+ }
+
+ @Override
+ public void setConnectionTuneParameters(ConnectionTuneParameters params)
+ {
+ _connectionTuneParameters = params;
+ AMQConnection con = getAMQConnection();
+ con.setMaximumChannelCount(params.getChannelMax());
+ con.setMaximumFrameSize(params.getFrameMax());
+ initHeartbeats((int) params.getHeartbeat());
+ }
+}