diff options
author | Martin Ritchie <ritchiem@apache.org> | 2008-02-12 17:36:07 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2008-02-12 17:36:07 +0000 |
commit | 031b02ced99d04d103c02575913ea238e48221ac (patch) | |
tree | 2d9d8547c55ca06b45983a479026e0489860ee0e | |
parent | 1df04df23ddda53cda350ddaeec692cb2f7cbed8 (diff) | |
download | qpid-python-031b02ced99d04d103c02575913ea238e48221ac.tar.gz |
QPID-784 : Added ability to provide existing Socket to Qpid Client Libraries to use as for connection.
Modified based on review by Robert Godfrey due to Thread safety around SocketTransportConnection.java and TransportConnection.java. Now use a safe Map to store all registered sockets in the TransportConnection.java these are then removed as used or on request.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@620876 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 54 insertions, 63 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java b/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java index 0979c9c6b8..d7eb138523 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java @@ -23,6 +23,7 @@ package org.apache.qpid.example.transport; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.url.URLSyntaxException; @@ -36,6 +37,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SocketChannel; +import java.util.UUID; /** * This is a simple application that demonstrates how you can use the Qpid AMQP interfaces to use existing sockets as @@ -66,9 +68,14 @@ public class ExistingSocketConnectorDemo implements ConnectionListener MessageProducer _producer; Session _session; + String Socket1_ID = UUID.randomUUID().toString(); + String Socket2_ID = UUID.randomUUID().toString(); + + /** Here we can see the broker we are connecting to is set to be 'socket:///' signifying we will provide the socket. */ - public static final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket:///'"; + public final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket://" + Socket1_ID + ";socket://" + Socket2_ID + "'"; + public ExistingSocketConnectorDemo() throws IOException, URLSyntaxException, AMQException, JMSException { @@ -76,7 +83,10 @@ public class ExistingSocketConnectorDemo implements ConnectionListener Socket socket = SocketChannel.open().socket(); socket.connect(new InetSocketAddress("localhost", 5672)); - _connection = new AMQConnection(CONNECTION, socket); + TransportConnection.registerOpenSocket(Socket1_ID, socket); + + + _connection = new AMQConnection(CONNECTION); _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -130,7 +140,7 @@ public class ExistingSocketConnectorDemo implements ConnectionListener socket.connect(new InetSocketAddress("localhost", 5673)); // This is the new method to pass in an open socket for the connection to use. - ((AMQConnection) _connection).setOpenSocket(socket); + TransportConnection.registerOpenSocket(Socket2_ID, socket); } catch (IOException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 89ce4b2c72..572ea48f85 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -157,7 +157,10 @@ public class AMQBrokerDetails implements BrokerDetails } else { - setPort(port); + if (!_transport.equalsIgnoreCase(SOCKET)) + { + setPort(port); + } } String queryString = connection.getQuery(); @@ -264,13 +267,16 @@ public class AMQBrokerDetails implements BrokerDetails sb.append(_transport); sb.append("://"); - if (!(_transport.equalsIgnoreCase("vm"))) + if (!(_transport.equalsIgnoreCase(VM))) { sb.append(_host); } - sb.append(':'); - sb.append(_port); + if (!(_transport.equalsIgnoreCase(SOCKET))) + { + sb.append(':'); + sb.append(_port); + } sb.append(printOptionsURL()); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index acbe495550..c9928a084e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -30,8 +30,6 @@ import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.transport.ITransportConnection; -import org.apache.qpid.client.transport.SocketTransportConnection; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; @@ -64,7 +62,6 @@ import javax.naming.Referenceable; import javax.naming.StringRefAddr; import java.io.IOException; import java.net.ConnectException; -import java.net.Socket; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.*; @@ -160,8 +157,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final long DEFAULT_TIMEOUT = 1000 * 30; private ProtocolVersion _protocolVersion; - /** The active socket that is to be used as a value for connection */ - private Socket _openSocket; /** * @param broker brokerdetails @@ -179,7 +174,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL( ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" - + AMQBrokerDetails.checkTransport(broker) + "'"), null, null); + + AMQBrokerDetails.checkTransport(broker) + "'"), null); } /** @@ -198,7 +193,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL( ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" - + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig, null); + + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); } public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) @@ -223,38 +218,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'") : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port - + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig, null); - } - - public AMQConnection(String connection, Socket socket) throws AMQException, URLSyntaxException - { - this(new AMQConnectionURL(connection), null, socket); + + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig); } public AMQConnection(String connection) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(connection), null, null); + this(new AMQConnectionURL(connection), null); } public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(connection), sslConfig, null); + this(new AMQConnectionURL(connection), sslConfig); } public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { - this(connectionURL, sslConfig, null); - } - - public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig, Socket socket) throws AMQException - { if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); } - _openSocket = socket; - _sslConfiguration = sslConfig; if (connectionURL == null) { @@ -414,23 +397,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { - ITransportConnection connection = TransportConnection.getInstance(brokerDetail); - - if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) - { - if (_openSocket != null) - { - ((SocketTransportConnection) connection).setOpenSocket(_openSocket); - } - else - { - throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket' transport:" + brokerDetail); - } - - } - - connection.connect(_protocolHandler, brokerDetail); + TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail); // this blocks until the connection has been set up or when an error // has prevented the connection being set up @@ -1327,11 +1294,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _sessions.get(channelId); } - public void setOpenSocket(Socket socket) - { - _openSocket = socket; - } - public ProtocolVersion getProtocolVersion() { return _protocolVersion; diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index e9d6242e77..b2f7ae8395 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class SocketTransportConnection implements ITransportConnection { @@ -46,8 +48,6 @@ public class SocketTransportConnection implements ITransportConnection private SocketConnectorFactory _socketConnectorFactory; - private Socket _openSocket; - static interface SocketConnectorFactory { IoConnector newSocketConnector(); @@ -58,11 +58,6 @@ public class SocketTransportConnection implements ITransportConnection _socketConnectorFactory = socketConnectorFactory; } - public void setOpenSocket(Socket openSocket) - { - _openSocket = openSocket; - } - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); @@ -99,15 +94,18 @@ public class SocketTransportConnection implements ITransportConnection { address = null; - if (_openSocket != null) + Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost()); + + if (socket != null) { - _logger.info("Using existing Socket:" + _openSocket); - ((ExistingSocketConnector) ioConnector).setOpenSocket(_openSocket); + _logger.info("Using existing Socket:" + socket); + + ((ExistingSocketConnector) ioConnector).setOpenSocket(socket); } else { throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket' transport:" + brokerDetail); + "with 'socket://<SocketID>' transport:" + brokerDetail); } } else diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 94361fccfc..0ea04e5bc3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -37,6 +37,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.net.Socket; + /** * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying @@ -61,6 +64,18 @@ public class TransportConnection private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler"; + private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>(); + + public static void registerOpenSocket(String socketID, Socket openSocket) + { + _openSocketRegister.put(socketID, openSocket); + } + + public static Socket removeOpenSocket(String socketID) + { + return _openSocketRegister.remove(socketID); + } + public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException { int transport = getTransport(details.getTransport()); @@ -305,7 +320,7 @@ public class TransportConnection synchronized (_inVmPipeAddress) { _inVmPipeAddress.clear(); - } + } _acceptor = null; _currentInstance = -1; _currentVMPort = -1; |