summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java71
1 files changed, 15 insertions, 56 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index b2f7ae8395..1ac8f62e32 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -20,27 +20,20 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-
+import org.apache.qpid.client.SSLConfiguration;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
public class SocketTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
@@ -71,61 +64,27 @@ public class SocketTransportConnection implements ITransportConnection
}
final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
- SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
-
- // if we do not use our own thread model we get the MINA default which is to use
- // its own leader-follower model
- boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
- if (readWriteThreading)
- {
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
- scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true")));
- scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
- _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
- scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
- _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
-
final InetSocketAddress address;
if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
{
address = null;
-
- Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost());
-
- if (socket != null)
- {
- _logger.info("Using existing Socket:" + socket);
-
- ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
- }
- else
- {
- throw new IllegalArgumentException("Active Socket must be provided for broker " +
- "with 'socket://<SocketID>' transport:" + brokerDetail);
- }
}
else
{
address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
_logger.info("Attempting connection to " + address);
}
-
-
- ConnectFuture future = ioConnector.connect(address, protocolHandler);
-
- // wait for connection to complete
- if (future.join(brokerDetail.getTimeout()))
- {
- // we call getSession which throws an IOException if there has been an error connecting
- future.getSession();
- }
- else
+
+ SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration();
+ SSLContextFactory sslFactory = null;
+ if (sslConfig != null)
{
- throw new IOException("Timeout waiting for connection.");
+ sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
}
+
+ MINANetworkDriver driver = new MINANetworkDriver(ioConnector);
+ driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory);
+ protocolHandler.setNetworkDriver(driver);
}
}