summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java48
1 files changed, 33 insertions, 15 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 6c12821c74..a4f8bb0166 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
@@ -30,16 +36,12 @@ import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.net.Socket;
-
/**
* The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
* connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
@@ -61,7 +63,7 @@ public class TransportConnection
private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class);
- private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
+ private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory";
private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>();
@@ -75,7 +77,7 @@ public class TransportConnection
return _openSocketRegister.remove(socketID);
}
- public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
+ public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException
{
int transport = getTransport(details.getTransport());
@@ -91,7 +93,22 @@ public class TransportConnection
{
public IoConnector newSocketConnector()
{
- return new ExistingSocketConnector();
+ ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor());
+
+ Socket socket = TransportConnection.removeOpenSocket(details.getHost());
+
+ if (socket != null)
+ {
+ _logger.info("Using existing Socket:" + socket);
+
+ ((ExistingSocketConnector) connector).setOpenSocket(socket);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Active Socket must be provided for broker " +
+ "with 'socket://<SocketID>' transport:" + details);
+ }
+ return connector;
}
});
case TCP:
@@ -106,12 +123,12 @@ public class TransportConnection
_logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
? "Qpid NIO is new default"
: "Sysproperty 'qpidnio' is set"));
- result = new MultiThreadSocketConnector();
+ result = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
}
else
{
_logger.info("Using Mina NIO");
- result = new SocketConnector(); // non-blocking connector
+ result = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector
}
// Don't have the connector's worker thread wait around for other connections (we only use
// one SocketConnector per connection at the moment anyway). This allows short-running
@@ -189,8 +206,6 @@ public class TransportConnection
_acceptor = new VmPipeAcceptor();
IoServiceConfig config = _acceptor.getDefaultConfig();
-
- config.setThreadModel(ReadWriteThreadModel.getInstance());
}
synchronized (_inVmPipeAddress)
{
@@ -275,7 +290,10 @@ public class TransportConnection
{
Class[] cnstr = {Integer.class};
Object[] params = {port};
- provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+
+ provider = new MINANetworkDriver();
+ ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+ ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true);
// Give the broker a second to create
_logger.info("Created VMBroker Instance:" + port);
}