diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java | 71 |
1 files changed, 15 insertions, 56 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index b2f7ae8395..1ac8f62e32 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -20,27 +20,20 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; +import java.net.InetSocketAddress; + import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.transport.socket.nio.ExistingSocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; - +import org.apache.qpid.client.SSLConfiguration; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; - +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - public class SocketTransportConnection implements ITransportConnection { private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class); @@ -71,61 +64,27 @@ public class SocketTransportConnection implements ITransportConnection } final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); - SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); - - // if we do not use our own thread model we get the MINA default which is to use - // its own leader-follower model - boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); - if (readWriteThreading) - { - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); - } - - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true"))); - scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE)); - _logger.info("send-buffer-size = " + scfg.getSendBufferSize()); - scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE)); - _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize()); - final InetSocketAddress address; if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) { address = null; - - Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost()); - - if (socket != null) - { - _logger.info("Using existing Socket:" + socket); - - ((ExistingSocketConnector) ioConnector).setOpenSocket(socket); - } - else - { - throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket://<SocketID>' transport:" + brokerDetail); - } } else { address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); _logger.info("Attempting connection to " + address); } - - - ConnectFuture future = ioConnector.connect(address, protocolHandler); - - // wait for connection to complete - if (future.join(brokerDetail.getTimeout())) - { - // we call getSession which throws an IOException if there has been an error connecting - future.getSession(); - } - else + + SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration(); + SSLContextFactory sslFactory = null; + if (sslConfig != null) { - throw new IOException("Timeout waiting for connection."); + sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); } + + MINANetworkDriver driver = new MINANetworkDriver(ioConnector); + driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory); + protocolHandler.setNetworkDriver(driver); } } |