diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java | 34 |
1 files changed, 32 insertions, 2 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 5482e48699..b2f7ae8395 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -24,6 +24,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; @@ -36,6 +37,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class SocketTransportConnection implements ITransportConnection { @@ -83,8 +87,34 @@ public class SocketTransportConnection implements ITransportConnection _logger.info("send-buffer-size = " + scfg.getSendBufferSize()); scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE)); _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize()); - final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); - _logger.info("Attempting connection to " + address); + + final InetSocketAddress address; + + if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) + { + address = null; + + Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost()); + + if (socket != null) + { + _logger.info("Using existing Socket:" + socket); + + ((ExistingSocketConnector) ioConnector).setOpenSocket(socket); + } + else + { + throw new IllegalArgumentException("Active Socket must be provided for broker " + + "with 'socket://<SocketID>' transport:" + brokerDetail); + } + } + else + { + address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); + _logger.info("Attempting connection to " + address); + } + + ConnectFuture future = ioConnector.connect(address, protocolHandler); // wait for connection to complete |