summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-08-08 17:08:37 +0000
committerAidan Skinner <aidan@apache.org>2008-08-08 17:08:37 +0000
commit004392878a706bbdc119d99b9161cc8ca8c37484 (patch)
tree192b4c7f25c147f6e9a647c4f1079d1bc7567250
parentb09a610a2c26ca094be508cdf8daaf8fd8f03003 (diff)
downloadqpid-python-004392878a706bbdc119d99b9161cc8ca8c37484.tar.gz
QPID-1218 Optionally use IoTransport, it's hot, but doesn't pass all the tests yet.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@684016 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java125
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java24
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java109
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java39
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java13
11 files changed, 350 insertions, 14 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index b4075b81ac..12231e4882 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -50,6 +50,7 @@ import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.network.io.IoSender;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
@@ -845,4 +846,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
return (_clientVersion == null) ? null : _clientVersion.toString();
}
+
+ public void setSender(IoSender sender)
+ {
+ // No-op, interface munging between this and AMQProtocolSession
+ }
+
+ public void init()
+ {
+ // No-op, interface munging between this and AMQProtocolSession
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 1e65c50304..2ec8737d16 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -48,6 +48,7 @@ import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,7 +89,16 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
- TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+ // TODO: use system property thingy for this
+ if (System.getProperty("UseTransportIo", "false").equals("false"))
+ {
+ TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+ }
+ else
+ {
+ _conn.getProtocolHandler().createIoTransportSession(brokerDetail);
+ }
+
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
new file mode 100644
index 0000000000..43ec7789c2
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
@@ -0,0 +1,125 @@
+package org.apache.qpid.client.protocol;
+
+import java.util.UUID;
+
+import javax.security.sasl.SaslClient;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.mina.common.IdleStatus;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.network.io.IoSender;
+
+public class AMQIoTransportProtocolSession extends AMQProtocolSession
+{
+
+ protected IoSender _ioSender;
+ private SaslClient _saslClient;
+ private ConnectionTuneParameters _connectionTuneParameters;
+
+ public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ {
+ super(protocolHandler, connection);
+ }
+
+ @Override
+ public void closeProtocolSession(boolean waitLast) throws AMQException
+ {
+ _ioSender.close();
+ _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
+ }
+
+ @Override
+ public void init()
+ {
+ _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer());
+ _ioSender.flush();
+ }
+
+ @Override
+ protected AMQShortString generateQueueName()
+ {
+ int id;
+ synchronized (_queueIdLock)
+ {
+ id = _queueId++;
+ }
+ return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id);
+ }
+
+ @Override
+ public AMQConnection getAMQConnection()
+ {
+ return _connection;
+ }
+
+ @Override
+ public SaslClient getSaslClient()
+ {
+ return _saslClient;
+ }
+
+ @Override
+ public void setSaslClient(SaslClient client)
+ {
+ _saslClient = client;
+ }
+
+ /** @param delay delay in seconds (not ms) */
+ @Override
+ void initHeartbeats(int delay)
+ {
+ if (delay > 0)
+ {
+ // FIXME: actually do something here
+ HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+ }
+ }
+
+ @Override
+ public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
+ {
+ // FIXME?
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody, null);
+ }
+
+ @Override
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ _ioSender.send(frame.toNioByteBuffer());
+ if (wait)
+ {
+ _ioSender.flush();
+ }
+ }
+
+ @Override
+ public void setSender(IoSender sender)
+ {
+ _ioSender = sender;
+ }
+
+ @Override
+ public ConnectionTuneParameters getConnectionTuneParameters()
+ {
+ return _connectionTuneParameters;
+ }
+
+ @Override
+ public void setConnectionTuneParameters(ConnectionTuneParameters params)
+ {
+ _connectionTuneParameters = params;
+ AMQConnection con = getAMQConnection();
+ con.setMaximumChannelCount(params.getChannelMax());
+ con.setMaximumFrameSize(params.getFrameMax());
+ initHeartbeats((int) params.getHeartbeat());
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 8328d87b87..e92817f713 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -47,11 +47,13 @@ import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -253,6 +255,19 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
+ * Called when we want to create a new IoTransport session
+ * @param brokerDetail
+ */
+ public void createIoTransportSession(BrokerDetails brokerDetail)
+ {
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager.setProtocolSession(_protocolSession);
+ IoTransport.connect_0_9(getProtocolSession(),
+ brokerDetail.getHost(), brokerDetail.getPort());
+ _protocolSession.init();
+ }
+
+ /**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
* where the connection died, an attempt to failover automatically to a new connection may be started. The failover
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 6beec3c9ba..6c3ae06ce9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -44,6 +44,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.network.io.IoSender;
import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
/**
@@ -99,7 +100,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private MethodDispatcher _methodDispatcher;
- private final AMQConnection _connection;
+ protected final AMQConnection _connection;
+
private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -118,11 +120,20 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
+ public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ {
+ _protocolHandler = protocolHandler;
+ _minaProtocolSession = null;
+ _protocolVersion = connection.getProtocolVersion();
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
+ this);
+ _connection = connection;
+ }
+
public void init()
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
-
_minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
}
@@ -171,7 +182,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public SaslClient getSaslClient()
{
- return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+ return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
}
/**
@@ -422,6 +433,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
_logger.debug("Closing protocol session");
+
final CloseFuture future = _minaProtocolSession.close();
// There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
@@ -430,7 +442,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
// error now shouldn't matter.
_protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
-
future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
}
@@ -535,4 +546,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_protocolHandler.propagateExceptionToAllWaiters(error);
}
+
+ public void setSender(IoSender sender)
+ {
+ // No-op, interface munging
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
index 903b5bfa7a..a2fc3a03ef 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
@@ -50,4 +50,14 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock
return buffer;
}
+ public java.nio.ByteBuffer toNioByteBuffer()
+ {
+ final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
+
+ ByteBuffer buf = ByteBuffer.wrap(buffer);
+ writePayload(buf);
+ buffer.flip();
+ return buffer;
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
index 59003225b7..44cc9586a9 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -21,6 +21,7 @@
package org.apache.qpid.protocol;
import org.apache.qpid.framing.*;
+import org.apache.qpid.transport.network.io.IoSender;
import org.apache.qpid.AMQException;
/**
@@ -54,4 +55,7 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
+ public void setSender(IoSender sender);
+ public void init();
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
new file mode 100644
index 0000000000..b63020913b
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
@@ -0,0 +1,109 @@
+package org.apache.qpid.transport.network.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyFactory;
+import org.apache.qpid.framing.BodyFactory;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentBodyFactory;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderBodyFactory;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.HeartbeatBodyFactory;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Receiver;
+
+public class InputHandler_0_9 implements Receiver<ByteBuffer>
+{
+
+ private AMQVersionAwareProtocolSession _session;
+ private MethodRegistry _registry;
+ private BodyFactory bodyFactory;
+ private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
+
+ static
+ {
+ _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
+ _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
+ _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
+ }
+
+ public InputHandler_0_9(AMQVersionAwareProtocolSession session)
+ {
+ _session = session;
+ _registry = _session.getMethodRegistry();
+ }
+
+ public void closed()
+ {
+ // AS FIXME: implement
+ }
+
+ public void exception(Throwable t)
+ {
+ // TODO: propogate exception to things
+ t.printStackTrace();
+ }
+
+ public void received(ByteBuffer buf)
+ {
+ org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf);
+ try
+ {
+ final byte type = in.get();
+ if (type == AMQMethodBody.TYPE)
+ {
+ bodyFactory = new AMQMethodBodyFactory(_session);
+ }
+ else
+ {
+ bodyFactory = _bodiesSupported[type];
+ }
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
+ }
+
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
+
+ // bodySize can be zero
+ if ((channel < 0) || (bodySize < 0))
+ {
+ throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
+ + " bodySize = " + bodySize, null);
+ }
+
+ AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
+
+ byte marker = in.get();
+ if ((marker & 0xFF) != 0xCE)
+ {
+ throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
+ + " type=" + type, null);
+ }
+
+ try
+ {
+ frame.getBodyFrame().handle(frame.getChannel(), _session);
+ }
+ catch (AMQException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 23f48a06de..ef892744ab 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -31,7 +31,7 @@ import org.apache.qpid.transport.util.Logger;
import static org.apache.qpid.transport.util.Functions.*;
-final class IoSender extends Thread implements Sender<ByteBuffer>
+public final class IoSender extends Thread implements Sender<ByteBuffer>
{
private static final Logger log = Logger.get(IoSender.class);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
index 3b543b3e60..7a17ef6b73 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -26,6 +26,7 @@ import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Receiver;
@@ -82,6 +83,19 @@ public final class IoTransport
private Connection connectInternal(String host, int port,
ConnectionDelegate delegate)
{
+ createSocket(host, port);
+
+ sender = new IoSender(this, 2*writeBufferSize, timeout);
+ Connection conn = new Connection
+ (new Disassembler(sender, 64*1024 - 1), delegate);
+ receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
+ 2*readBufferSize, timeout);
+
+ return conn;
+ }
+
+ private void createSocket(String host, int port)
+ {
try
{
InetAddress address = InetAddress.getByName(host);
@@ -108,14 +122,6 @@ public final class IoTransport
{
throw new TransportException("Error connecting to broker", e);
}
-
- sender = new IoSender(this, 2*writeBufferSize, timeout);
- Connection conn = new Connection
- (new Disassembler(sender, 64*1024 - 1), delegate);
- receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
- 2*readBufferSize, timeout);
-
- return conn;
}
IoSender getSender()
@@ -133,4 +139,21 @@ public final class IoTransport
return socket;
}
+ public static void connect_0_9 (AMQVersionAwareProtocolSession session, String host, int port)
+ {
+ IoTransport handler = new IoTransport();
+ handler.connectInternal_0_9(session, host, port);
+ }
+
+ public void connectInternal_0_9(AMQVersionAwareProtocolSession session, String host, int port)
+ {
+
+ createSocket(host, port);
+
+ sender = new IoSender(this, 2*writeBufferSize, timeout);
+ receiver = new IoReceiver(this, new InputHandler_0_9(session),
+ 2*readBufferSize, timeout);
+ session.setSender(sender);
+ }
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 14020299f6..ff10fb747a 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.transport.network.io.IoSender;
import javax.security.sasl.SaslServer;
import java.util.HashMap;
@@ -246,4 +247,16 @@ public class MockProtocolSession implements AMQProtocolSession
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+
+ public void setSender(IoSender sender)
+ {
+ // FIXME AS TODO
+
+ }
+
+ public void init()
+ {
+ // TODO Auto-generated method stub
+
+ }
}