diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java | 125 |
1 files changed, 125 insertions, 0 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java new file mode 100644 index 0000000000..1de0e7bdfc --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java @@ -0,0 +1,125 @@ +package org.apache.qpid.client.protocol; + +import java.util.UUID; + +import javax.security.sasl.SaslClient; + +import org.apache.commons.lang.StringUtils; +import org.apache.mina.common.IdleStatus; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.ConnectionTuneParameters; +import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.transport.Sender; + +public class AMQIoTransportProtocolSession extends AMQProtocolSession +{ + + protected Sender<java.nio.ByteBuffer> _ioSender; + private SaslClient _saslClient; + private ConnectionTuneParameters _connectionTuneParameters; + + public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) + { + super(protocolHandler, connection); + } + + @Override + public void closeProtocolSession(boolean waitLast) throws AMQException + { + _ioSender.close(); + _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); + } + + @Override + public void init() + { + _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer()); + _ioSender.flush(); + } + + @Override + protected AMQShortString generateQueueName() + { + int id; + synchronized (_queueIdLock) + { + id = _queueId++; + } + return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id); + } + + @Override + public AMQConnection getAMQConnection() + { + return _connection; + } + + @Override + public SaslClient getSaslClient() + { + return _saslClient; + } + + @Override + public void setSaslClient(SaslClient client) + { + _saslClient = client; + } + + /** @param delay delay in seconds (not ms) */ + @Override + void initHeartbeats(int delay) + { + if (delay > 0) + { + // FIXME: actually do something here + HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); + } + } + + @Override + public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException + { + // FIXME? + _protocolHandler.methodBodyReceived(channel, amqMethodBody, null); + } + + @Override + public void writeFrame(AMQDataBlock frame, boolean wait) + { + _ioSender.send(frame.toNioByteBuffer()); + if (wait) + { + _ioSender.flush(); + } + } + + @Override + public void setSender(Sender<java.nio.ByteBuffer> sender) + { + _ioSender = sender; + } + + @Override + public ConnectionTuneParameters getConnectionTuneParameters() + { + return _connectionTuneParameters; + } + + @Override + public void setConnectionTuneParameters(ConnectionTuneParameters params) + { + _connectionTuneParameters = params; + AMQConnection con = getAMQConnection(); + con.setMaximumChannelCount(params.getChannelMax()); + con.setMaximumFrameSize(params.getFrameMax()); + initHeartbeats((int) params.getHeartbeat()); + } +} |