diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-10-13 15:05:29 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-10-13 15:05:29 +0000 |
commit | 812f6e52d172f8857cf2b2f7944f27adc71ab29b (patch) | |
tree | df1960a28ceea3055c3582e36fcb2f3de9134e09 /qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java | |
parent | db5fe426e71c551b0776f8a9f03a6d1562cbe21c (diff) | |
download | qpid-python-812f6e52d172f8857cf2b2f7944f27adc71ab29b.tar.gz |
Network layer changes, and then some. Actual commit history preserved elsewhere...
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20101013@1022127 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java | 107 |
1 files changed, 50 insertions, 57 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 4e6d2130ae..301d6b0fad 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -24,55 +24,64 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.NetworkTransport; -import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class IoNetworkTransport implements NetworkTransport, IoContext +public class IoNetworkTransport implements OutgoingNetworkTransport { - static - { - org.apache.mina.common.ByteBuffer.setAllocator - (new org.apache.mina.common.SimpleByteBufferAllocator()); - org.apache.mina.common.ByteBuffer.setUseDirectBuffers - (Boolean.getBoolean("amqj.enableDirectBuffers")); - } - - private static final Logger log = Logger.get(IoNetworkTransport.class); + private static final Logger _log = LoggerFactory.getLogger(IoNetworkTransport.class); + + private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; - private Socket socket; - private Sender<ByteBuffer> sender; - private IoReceiver receiver; - private long timeout = 60000; - private ConnectionSettings settings; + public static final List<String> SUPPORTED = Arrays.asList(Transport.TCP); + + private Socket _socket; + private IoNetworkConnection _connection; + private long _timeout = 60000; - @Override - public void init(ConnectionSettings settings) + public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslfactory) { + if (!settings.getProtocol().equalsIgnoreCase(Transport.TCP)) + { + throw new TransportException("Invalid protocol: " + settings.getProtocol()); + } + + boolean noDelay = Boolean.getBoolean("amqj.tcpNoDelay"); + boolean keepAlive = Boolean.getBoolean("amqj.keepAlive"); + Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE); + Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE); + try { - this.settings = settings; - InetAddress address = InetAddress.getByName(settings.getHost()); - socket = new Socket(); - socket.setReuseAddress(true); - socket.setTcpNoDelay(settings.isTcpNodelay()); + _socket = new Socket(); - log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize()); - log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize()); + _log.debug("default-SO_RCVBUF : %s", _socket.getReceiveBufferSize()); + _log.debug("default-SO_SNDBUF : %s", _socket.getSendBufferSize()); - socket.setSendBufferSize(settings.getWriteBufferSize()); - socket.setReceiveBufferSize(settings.getReadBufferSize()); + _socket.setTcpNoDelay(noDelay); + _socket.setKeepAlive(keepAlive); + _socket.setSendBufferSize(sendBufferSize); + _socket.setReceiveBufferSize(receiveBufferSize); + _socket.setReuseAddress(true); - log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize()); - log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize()); + _log.debug("new-SO_RCVBUF : %s", _socket.getReceiveBufferSize()); + _log.debug("new-SO_SNDBUF : %s", _socket.getSendBufferSize()); - socket.connect(new InetSocketAddress(address, settings.getPort())); + InetAddress address = InetAddress.getByName(settings.getHost()); + _socket.connect(new InetSocketAddress(address, settings.getPort())); } catch (SocketException e) { @@ -82,39 +91,23 @@ public class IoNetworkTransport implements NetworkTransport, IoContext { throw new TransportException("Error connecting to broker", e); } - } - - @Override - public void receiver(Receiver<ByteBuffer> delegate) - { - receiver = new IoReceiver(this, delegate, - 2*settings.getReadBufferSize() , timeout); - } - - @Override - public Sender<ByteBuffer> sender() - { - return new IoSender(this, 2*settings.getWriteBufferSize(), timeout); - } - - @Override - public void close() - { + _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout); + + return _connection; } - public Sender<ByteBuffer> getSender() + public void close() { - return sender; + _connection.close(); } - public IoReceiver getReceiver() + public SocketAddress getAddress() { - return receiver; + return _socket.getLocalSocketAddress(); } - public Socket getSocket() - { - return socket; + public boolean isCompatible(String protocol) { + return SUPPORTED.contains(protocol); } } |