diff options
author | Aidan Skinner <aidan@apache.org> | 2008-08-08 17:08:37 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-08-08 17:08:37 +0000 |
commit | 004392878a706bbdc119d99b9161cc8ca8c37484 (patch) | |
tree | 192b4c7f25c147f6e9a647c4f1079d1bc7567250 | |
parent | b09a610a2c26ca094be508cdf8daaf8fd8f03003 (diff) | |
download | qpid-python-004392878a706bbdc119d99b9161cc8ca8c37484.tar.gz |
QPID-1218 Optionally use IoTransport, it's hot, but doesn't pass all the tests yet.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@684016 13f79535-47bb-0310-9956-ffa450edef68
11 files changed, 350 insertions, 14 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index b4075b81ac..12231e4882 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.network.io.IoSender; import javax.management.JMException; import javax.security.sasl.SaslServer; @@ -845,4 +846,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { return (_clientVersion == null) ? null : _clientVersion.toString(); } + + public void setSender(IoSender sender) + { + // No-op, interface munging between this and AMQProtocolSession + } + + public void init() + { + // No-op, interface munging between this and AMQProtocolSession + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 1e65c50304..2ec8737d16 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -48,6 +48,7 @@ import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; +import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,7 +89,16 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); - TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); + // TODO: use system property thingy for this + if (System.getProperty("UseTransportIo", "false").equals("false")) + { + TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); + } + else + { + _conn.getProtocolHandler().createIoTransportSession(brokerDetail); + } + // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java new file mode 100644 index 0000000000..43ec7789c2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java @@ -0,0 +1,125 @@ +package org.apache.qpid.client.protocol; + +import java.util.UUID; + +import javax.security.sasl.SaslClient; + +import org.apache.commons.lang.StringUtils; +import org.apache.mina.common.IdleStatus; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.ConnectionTuneParameters; +import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.transport.network.io.IoSender; + +public class AMQIoTransportProtocolSession extends AMQProtocolSession +{ + + protected IoSender _ioSender; + private SaslClient _saslClient; + private ConnectionTuneParameters _connectionTuneParameters; + + public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) + { + super(protocolHandler, connection); + } + + @Override + public void closeProtocolSession(boolean waitLast) throws AMQException + { + _ioSender.close(); + _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); + } + + @Override + public void init() + { + _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer()); + _ioSender.flush(); + } + + @Override + protected AMQShortString generateQueueName() + { + int id; + synchronized (_queueIdLock) + { + id = _queueId++; + } + return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id); + } + + @Override + public AMQConnection getAMQConnection() + { + return _connection; + } + + @Override + public SaslClient getSaslClient() + { + return _saslClient; + } + + @Override + public void setSaslClient(SaslClient client) + { + _saslClient = client; + } + + /** @param delay delay in seconds (not ms) */ + @Override + void initHeartbeats(int delay) + { + if (delay > 0) + { + // FIXME: actually do something here + HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); + } + } + + @Override + public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException + { + // FIXME? + _protocolHandler.methodBodyReceived(channel, amqMethodBody, null); + } + + @Override + public void writeFrame(AMQDataBlock frame, boolean wait) + { + _ioSender.send(frame.toNioByteBuffer()); + if (wait) + { + _ioSender.flush(); + } + } + + @Override + public void setSender(IoSender sender) + { + _ioSender = sender; + } + + @Override + public ConnectionTuneParameters getConnectionTuneParameters() + { + return _connectionTuneParameters; + } + + @Override + public void setConnectionTuneParameters(ConnectionTuneParameters params) + { + _connectionTuneParameters = params; + AMQConnection con = getAMQConnection(); + con.setMaximumChannelCount(params.getChannelMax()); + con.setMaximumFrameSize(params.getFrameMax()); + initHeartbeats((int) params.getHeartbeat()); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 8328d87b87..e92817f713 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -47,11 +47,13 @@ import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.*; +import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -253,6 +255,19 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** + * Called when we want to create a new IoTransport session + * @param brokerDetail + */ + public void createIoTransportSession(BrokerDetails brokerDetail) + { + _protocolSession = new AMQProtocolSession(this, _connection); + _stateManager.setProtocolSession(_protocolSession); + IoTransport.connect_0_9(getProtocolSession(), + brokerDetail.getHost(), brokerDetail.getPort()); + _protocolSession.init(); + } + + /** * Called when the network connection is closed. This can happen, either because the client explicitly requested * that the connection be closed, in which case nothing is done, or because the connection died. In the case * where the connection died, an attempt to failover automatically to a new connection may be started. The failover diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 6beec3c9ba..6c3ae06ce9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -44,6 +44,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.network.io.IoSender; import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; /** @@ -99,7 +100,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private MethodDispatcher _methodDispatcher; - private final AMQConnection _connection; + protected final AMQConnection _connection; + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) @@ -118,11 +120,20 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } + public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) + { + _protocolHandler = protocolHandler; + _minaProtocolSession = null; + _protocolVersion = connection.getProtocolVersion(); + _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), + this); + _connection = connection; + } + public void init() { // start the process of setting up the connection. This is the first place that // data is written to the server. - _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion())); } @@ -171,7 +182,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public SaslClient getSaslClient() { - return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT); + return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT); } /** @@ -422,6 +433,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } _logger.debug("Closing protocol session"); + final CloseFuture future = _minaProtocolSession.close(); // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED @@ -430,7 +442,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession // error now shouldn't matter. _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); } @@ -535,4 +546,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolHandler.propagateExceptionToAllWaiters(error); } + + public void setSender(IoSender sender) + { + // No-op, interface munging + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java index 903b5bfa7a..a2fc3a03ef 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -50,4 +50,14 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock return buffer; } + public java.nio.ByteBuffer toNioByteBuffer() + { + final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize()); + + ByteBuffer buf = ByteBuffer.wrap(buffer); + writePayload(buf); + buffer.flip(); + return buffer; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 59003225b7..44cc9586a9 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.protocol; import org.apache.qpid.framing.*; +import org.apache.qpid.transport.network.io.IoSender; import org.apache.qpid.AMQException; /** @@ -54,4 +55,7 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException; + public void setSender(IoSender sender); + public void init(); + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java new file mode 100644 index 0000000000..b63020913b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java @@ -0,0 +1,109 @@ +package org.apache.qpid.transport.network.io; + +import java.nio.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyFactory; +import org.apache.qpid.framing.BodyFactory; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentBodyFactory; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ContentHeaderBodyFactory; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.HeartbeatBodyFactory; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.Receiver; + +public class InputHandler_0_9 implements Receiver<ByteBuffer> +{ + + private AMQVersionAwareProtocolSession _session; + private MethodRegistry _registry; + private BodyFactory bodyFactory; + private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; + + static + { + _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance(); + _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance(); + _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); + } + + public InputHandler_0_9(AMQVersionAwareProtocolSession session) + { + _session = session; + _registry = _session.getMethodRegistry(); + } + + public void closed() + { + // AS FIXME: implement + } + + public void exception(Throwable t) + { + // TODO: propogate exception to things + t.printStackTrace(); + } + + public void received(ByteBuffer buf) + { + org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf); + try + { + final byte type = in.get(); + if (type == AMQMethodBody.TYPE) + { + bodyFactory = new AMQMethodBodyFactory(_session); + } + else + { + bodyFactory = _bodiesSupported[type]; + } + + if (bodyFactory == null) + { + throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null); + } + + final int channel = in.getUnsignedShort(); + final long bodySize = in.getUnsignedInt(); + + // bodySize can be zero + if ((channel < 0) || (bodySize < 0)) + { + throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize, null); + } + + AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); + + byte marker = in.get(); + if ((marker & 0xFF) != 0xCE) + { + throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type, null); + } + + try + { + frame.getBodyFrame().handle(frame.getChannel(), _session); + } + catch (AMQException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + catch (AMQFrameDecodingException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 23f48a06de..ef892744ab 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -31,7 +31,7 @@ import org.apache.qpid.transport.util.Logger; import static org.apache.qpid.transport.util.Functions.*; -final class IoSender extends Thread implements Sender<ByteBuffer> +public final class IoSender extends Thread implements Sender<ByteBuffer> { private static final Logger log = Logger.get(IoSender.class); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index 3b543b3e60..7a17ef6b73 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -26,6 +26,7 @@ import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.Receiver; @@ -82,6 +83,19 @@ public final class IoTransport private Connection connectInternal(String host, int port, ConnectionDelegate delegate) { + createSocket(host, port); + + sender = new IoSender(this, 2*writeBufferSize, timeout); + Connection conn = new Connection + (new Disassembler(sender, 64*1024 - 1), delegate); + receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), + 2*readBufferSize, timeout); + + return conn; + } + + private void createSocket(String host, int port) + { try { InetAddress address = InetAddress.getByName(host); @@ -108,14 +122,6 @@ public final class IoTransport { throw new TransportException("Error connecting to broker", e); } - - sender = new IoSender(this, 2*writeBufferSize, timeout); - Connection conn = new Connection - (new Disassembler(sender, 64*1024 - 1), delegate); - receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), - 2*readBufferSize, timeout); - - return conn; } IoSender getSender() @@ -133,4 +139,21 @@ public final class IoTransport return socket; } + public static void connect_0_9 (AMQVersionAwareProtocolSession session, String host, int port) + { + IoTransport handler = new IoTransport(); + handler.connectInternal_0_9(session, host, port); + } + + public void connectInternal_0_9(AMQVersionAwareProtocolSession session, String host, int port) + { + + createSocket(host, port); + + sender = new IoSender(this, 2*writeBufferSize, timeout); + receiver = new IoReceiver(this, new InputHandler_0_9(session), + 2*readBufferSize, timeout); + session.setSender(sender); + } + } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index 14020299f6..ff10fb747a 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.transport.network.io.IoSender; import javax.security.sasl.SaslServer; import java.util.HashMap; @@ -246,4 +247,16 @@ public class MockProtocolSession implements AMQProtocolSession { return null; //To change body of implemented methods use File | Settings | File Templates. } + + public void setSender(IoSender sender) + { + // FIXME AS TODO + + } + + public void init() + { + // TODO Auto-generated method stub + + } } |