summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-10-13 15:05:29 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-10-13 15:05:29 +0000
commit812f6e52d172f8857cf2b2f7944f27adc71ab29b (patch)
treedf1960a28ceea3055c3582e36fcb2f3de9134e09 /qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
parentdb5fe426e71c551b0776f8a9f03a6d1562cbe21c (diff)
downloadqpid-python-812f6e52d172f8857cf2b2f7944f27adc71ab29b.tar.gz
Network layer changes, and then some. Actual commit history preserved elsewhere...
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20101013@1022127 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java107
1 files changed, 50 insertions, 57 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index 4e6d2130ae..301d6b0fad 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -24,55 +24,64 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.NetworkTransport;
-import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class IoNetworkTransport implements NetworkTransport, IoContext
+public class IoNetworkTransport implements OutgoingNetworkTransport
{
- static
- {
- org.apache.mina.common.ByteBuffer.setAllocator
- (new org.apache.mina.common.SimpleByteBufferAllocator());
- org.apache.mina.common.ByteBuffer.setUseDirectBuffers
- (Boolean.getBoolean("amqj.enableDirectBuffers"));
- }
-
- private static final Logger log = Logger.get(IoNetworkTransport.class);
+ private static final Logger _log = LoggerFactory.getLogger(IoNetworkTransport.class);
+
+ private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
- private Socket socket;
- private Sender<ByteBuffer> sender;
- private IoReceiver receiver;
- private long timeout = 60000;
- private ConnectionSettings settings;
+ public static final List<String> SUPPORTED = Arrays.asList(Transport.TCP);
+
+ private Socket _socket;
+ private IoNetworkConnection _connection;
+ private long _timeout = 60000;
- @Override
- public void init(ConnectionSettings settings)
+ public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslfactory)
{
+ if (!settings.getProtocol().equalsIgnoreCase(Transport.TCP))
+ {
+ throw new TransportException("Invalid protocol: " + settings.getProtocol());
+ }
+
+ boolean noDelay = Boolean.getBoolean("amqj.tcpNoDelay");
+ boolean keepAlive = Boolean.getBoolean("amqj.keepAlive");
+ Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE);
+ Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE);
+
try
{
- this.settings = settings;
- InetAddress address = InetAddress.getByName(settings.getHost());
- socket = new Socket();
- socket.setReuseAddress(true);
- socket.setTcpNoDelay(settings.isTcpNodelay());
+ _socket = new Socket();
- log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
- log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
+ _log.debug("default-SO_RCVBUF : %s", _socket.getReceiveBufferSize());
+ _log.debug("default-SO_SNDBUF : %s", _socket.getSendBufferSize());
- socket.setSendBufferSize(settings.getWriteBufferSize());
- socket.setReceiveBufferSize(settings.getReadBufferSize());
+ _socket.setTcpNoDelay(noDelay);
+ _socket.setKeepAlive(keepAlive);
+ _socket.setSendBufferSize(sendBufferSize);
+ _socket.setReceiveBufferSize(receiveBufferSize);
+ _socket.setReuseAddress(true);
- log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
- log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+ _log.debug("new-SO_RCVBUF : %s", _socket.getReceiveBufferSize());
+ _log.debug("new-SO_SNDBUF : %s", _socket.getSendBufferSize());
- socket.connect(new InetSocketAddress(address, settings.getPort()));
+ InetAddress address = InetAddress.getByName(settings.getHost());
+ _socket.connect(new InetSocketAddress(address, settings.getPort()));
}
catch (SocketException e)
{
@@ -82,39 +91,23 @@ public class IoNetworkTransport implements NetworkTransport, IoContext
{
throw new TransportException("Error connecting to broker", e);
}
- }
-
- @Override
- public void receiver(Receiver<ByteBuffer> delegate)
- {
- receiver = new IoReceiver(this, delegate,
- 2*settings.getReadBufferSize() , timeout);
- }
-
- @Override
- public Sender<ByteBuffer> sender()
- {
- return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
- }
-
- @Override
- public void close()
- {
+ _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout);
+
+ return _connection;
}
- public Sender<ByteBuffer> getSender()
+ public void close()
{
- return sender;
+ _connection.close();
}
- public IoReceiver getReceiver()
+ public SocketAddress getAddress()
{
- return receiver;
+ return _socket.getLocalSocketAddress();
}
- public Socket getSocket()
- {
- return socket;
+ public boolean isCompatible(String protocol) {
+ return SUPPORTED.contains(protocol);
}
}