diff options
author | Robert Gemmell <robbie@apache.org> | 2011-07-07 15:10:30 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-07-07 15:10:30 +0000 |
commit | 245f2793e0a4efd4876ad72b2cf32edc93750d84 (patch) | |
tree | b5fd72fdea830222b314029b13062cbd690e8d2e /java/client/src | |
parent | b4f9004439f56f492931f4b35f7fa0ae58f3ff85 (diff) | |
download | qpid-python-245f2793e0a4efd4876ad72b2cf32edc93750d84.tar.gz |
QPID-3342: transition TCP based Mina transport for 0-8/0-9/0-9-1 protocols over to new IO interface model
Applied patch by Keith Wall and myself
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1143867 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
10 files changed, 71 insertions, 833 deletions
diff --git a/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java b/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java deleted file mode 100644 index 98716c0c3c..0000000000 --- a/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java +++ /dev/null @@ -1,478 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.transport.socket.nio; - -import edu.emory.mathcs.backport.java.util.concurrent.Executor; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.ExceptionMonitor; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoConnectorConfig; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.support.BaseIoConnector; -import org.apache.mina.common.support.DefaultConnectFuture; -import org.apache.mina.util.NamePreservingRunnable; -import org.apache.mina.util.NewThreadExecutor; -import org.apache.mina.util.Queue; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.Socket; -import java.net.SocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Set; - -/** - * {@link IoConnector} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 627427 $, $Date: 2008-02-13 14:39:10 +0000 (Wed, 13 Feb 2008) $ - */ -public class ExistingSocketConnector extends BaseIoConnector -{ - /** @noinspection StaticNonFinalField */ - private static volatile int nextId = 0; - - private final Object lock = new Object(); - private final int id = nextId++; - private final String threadName = "SocketConnector-" + id; - private SocketConnectorConfig defaultConfig = new SocketConnectorConfig(); - private final Queue connectQueue = new Queue(); - private final SocketIoProcessor[] ioProcessors; - private final int processorCount; - private final Executor executor; - - /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ - private Selector selector; - private Worker worker; - private int processorDistributor = 0; - private int workerTimeout = 60; // 1 min. - private Socket _openSocket = null; - - /** Create a connector with a single processing thread using a NewThreadExecutor */ - public ExistingSocketConnector() - { - this(1, new NewThreadExecutor()); - } - - /** - * Create a connector with the desired number of processing threads - * - * @param processorCount Number of processing threads - * @param executor Executor to use for launching threads - */ - public ExistingSocketConnector(int processorCount, Executor executor) - { - if (processorCount < 1) - { - throw new IllegalArgumentException("Must have at least one processor"); - } - - this.executor = executor; - this.processorCount = processorCount; - ioProcessors = new SocketIoProcessor[processorCount]; - - for (int i = 0; i < processorCount; i++) - { - ioProcessors[i] = new SocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor); - } - } - - /** - * How many seconds to keep the connection thread alive between connection requests - * - * @return Number of seconds to keep connection thread alive - */ - public int getWorkerTimeout() - { - return workerTimeout; - } - - /** - * Set how many seconds the connection worker thread should remain alive once idle before terminating itself. - * - * @param workerTimeout Number of seconds to keep thread alive. Must be >=0 - */ - public void setWorkerTimeout(int workerTimeout) - { - if (workerTimeout < 0) - { - throw new IllegalArgumentException("Must be >= 0"); - } - this.workerTimeout = workerTimeout; - } - - public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) - { - return connect(address, null, handler, config); - } - - public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, - IoHandler handler, IoServiceConfig config) - { - /** Changes here from the Mina OpenSocketConnector. - * Ignoreing all address as they are not needed */ - - if (handler == null) - { - throw new NullPointerException("handler"); - } - - - if (config == null) - { - config = getDefaultConfig(); - } - - if (_openSocket == null) - { - throw new IllegalArgumentException("Specifed Socket not active"); - } - - boolean success = false; - - try - { - DefaultConnectFuture future = new DefaultConnectFuture(); - newSession(_openSocket, handler, config, future); - success = true; - return future; - } - catch (IOException e) - { - return DefaultConnectFuture.newFailedFuture(e); - } - finally - { - if (!success && _openSocket != null) - { - try - { - _openSocket.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - } - } - } - - public IoServiceConfig getDefaultConfig() - { - return defaultConfig; - } - - /** - * Sets the config this connector will use by default. - * - * @param defaultConfig the default config. - * - * @throws NullPointerException if the specified value is <code>null</code>. - */ - public void setDefaultConfig(SocketConnectorConfig defaultConfig) - { - if (defaultConfig == null) - { - throw new NullPointerException("defaultConfig"); - } - this.defaultConfig = defaultConfig; - } - - private synchronized void startupWorker() throws IOException - { - if (worker == null) - { - selector = Selector.open(); - worker = new Worker(); - executor.execute(new NamePreservingRunnable(worker)); - } - } - - private void registerNew() - { - if (connectQueue.isEmpty()) - { - return; - } - - for (; ;) - { - ConnectionRequest req; - synchronized (connectQueue) - { - req = (ConnectionRequest) connectQueue.pop(); - } - - if (req == null) - { - break; - } - - SocketChannel ch = req.channel; - try - { - ch.register(selector, SelectionKey.OP_CONNECT, req); - } - catch (IOException e) - { - req.setException(e); - } - } - } - - private void processSessions(Set keys) - { - Iterator it = keys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - - if (!key.isConnectable()) - { - continue; - } - - SocketChannel ch = (SocketChannel) key.channel(); - ConnectionRequest entry = (ConnectionRequest) key.attachment(); - - boolean success = false; - try - { - ch.finishConnect(); - newSession(ch, entry.handler, entry.config, entry); - success = true; - } - catch (Throwable e) - { - entry.setException(e); - } - finally - { - key.cancel(); - if (!success) - { - try - { - ch.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - } - } - } - - keys.clear(); - } - - private void processTimedOutSessions(Set keys) - { - long currentTime = System.currentTimeMillis(); - Iterator it = keys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - - if (!key.isValid()) - { - continue; - } - - ConnectionRequest entry = (ConnectionRequest) key.attachment(); - - if (currentTime >= entry.deadline) - { - entry.setException(new ConnectException()); - try - { - key.channel().close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - key.cancel(); - } - } - } - } - - private void newSession(Socket socket, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) - throws IOException - { - SocketSessionImpl session = new SocketSessionImpl(this, - nextProcessor(), - getListeners(), - config, - socket.getChannel(), - handler, - socket.getRemoteSocketAddress()); - - newSession(session, config, connectFuture); - } - - private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) - throws IOException - - { - SocketSessionImpl session = new SocketSessionImpl(this, - nextProcessor(), - getListeners(), - config, - ch, - handler, - ch.socket().getRemoteSocketAddress()); - - newSession(session, config, connectFuture); - } - - private void newSession(SocketSessionImpl session, IoServiceConfig config, ConnectFuture connectFuture) - throws IOException - { - try - { - getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getThreadModel().buildFilterChain(session.getFilterChain()); - } - catch (Throwable e) - { - throw (IOException) new IOException("Failed to create a session.").initCause(e); - } - session.getIoProcessor().addNew(session); - connectFuture.setSession(session); - } - - private SocketIoProcessor nextProcessor() - { - return ioProcessors[processorDistributor++ % processorCount]; - } - - public void setOpenSocket(Socket openSocket) - { - _openSocket = openSocket; - } - - private class Worker implements Runnable - { - private long lastActive = System.currentTimeMillis(); - - public void run() - { - Thread.currentThread().setName(ExistingSocketConnector.this.threadName); - - for (; ;) - { - try - { - int nKeys = selector.select(1000); - - registerNew(); - - if (nKeys > 0) - { - processSessions(selector.selectedKeys()); - } - - processTimedOutSessions(selector.keys()); - - if (selector.keys().isEmpty()) - { - if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) - { - synchronized (lock) - { - if (selector.keys().isEmpty() && - connectQueue.isEmpty()) - { - worker = null; - try - { - selector.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - selector = null; - } - break; - } - } - } - } - else - { - lastActive = System.currentTimeMillis(); - } - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e1) - { - ExceptionMonitor.getInstance().exceptionCaught(e1); - } - } - } - } - } - - private class ConnectionRequest extends DefaultConnectFuture - { - private final SocketChannel channel; - private final long deadline; - private final IoHandler handler; - private final IoServiceConfig config; - - private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config) - { - this.channel = channel; - long timeout; - if (config instanceof IoConnectorConfig) - { - timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis(); - } - else - { - timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis(); - } - this.deadline = System.currentTimeMillis() + timeout; - this.handler = handler; - this.config = config; - } - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index b31dd2bc91..ed37a69b82 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -56,9 +56,8 @@ public class AMQBrokerDetails implements BrokerDetails if (transport != null) { //todo this list of valid transports should be enumerated somewhere - if ((!(transport.equalsIgnoreCase(BrokerDetails.VM) || - transport.equalsIgnoreCase(BrokerDetails.TCP) || - transport.equalsIgnoreCase(BrokerDetails.SOCKET)))) + if (!(transport.equalsIgnoreCase(BrokerDetails.VM) || + transport.equalsIgnoreCase(BrokerDetails.TCP))) { if (transport.equalsIgnoreCase("localhost")) { @@ -182,10 +181,7 @@ public class AMQBrokerDetails implements BrokerDetails } else { - if (!_transport.equalsIgnoreCase(SOCKET)) - { - setPort(port); - } + setPort(port); } String queryString = connection.getQuery(); @@ -307,11 +303,8 @@ public class AMQBrokerDetails implements BrokerDetails sb.append(_host); } - if (!(_transport.equalsIgnoreCase(SOCKET))) - { - sb.append(':'); - sb.append(_port); - } + sb.append(':'); + sb.append(_port); sb.append(printOptionsURL()); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 98573c9cc3..b0242210d8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -49,6 +49,11 @@ import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.mina.MinaNetworkTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,8 +94,21 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); - TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); + ConnectionSettings settings = new ConnectionSettings(); + settings.setHost(brokerDetail.getHost()); + settings.setPort(brokerDetail.getPort()); + settings.setProtocol(brokerDetail.getTransport()); + SSLConfiguration sslConfig = _conn.getSSLConfiguration(); + SSLContextFactory sslFactory = null; + if (sslConfig != null) + { + sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + } + + OutgoingNetworkTransport transport = new MinaNetworkTransport(); + NetworkConnection network = transport.connect(settings, _conn._protocolHandler, sslFactory); + _conn._protocolHandler.setNetworkConnection(network); _conn._protocolHandler.getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 424e09693f..34c6468629 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -57,7 +57,6 @@ import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.Job; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; @@ -65,8 +64,9 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.NetworkTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -172,11 +172,13 @@ public class AMQProtocolHandler implements ProtocolEngine private Job _readJob; private Job _writeJob; private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); - private NetworkDriver _networkDriver; private ProtocolVersion _suggestedProtocolVersion; private long _writtenBytes; private long _readBytes; + private NetworkTransport _transport; + private NetworkConnection _network; + private Sender<ByteBuffer> _sender; /** * Creates a new protocol handler, associated with the specified client connection instance. @@ -300,7 +302,7 @@ public class AMQProtocolHandler implements ProtocolEngine // failover: HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); - _networkDriver.close(); + _network.close(); } public void writerIdle() @@ -322,7 +324,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); // this will attempt failover - _networkDriver.close(); + _network.close(); closed(); } else @@ -574,7 +576,7 @@ public class AMQProtocolHandler implements ProtocolEngine { public void run() { - _networkDriver.send(buf); + _sender.send(buf); } }); if (PROTOCOL_DEBUG) @@ -595,7 +597,7 @@ public class AMQProtocolHandler implements ProtocolEngine if (wait) { - _networkDriver.flush(); + _sender.flush(); } } @@ -709,7 +711,7 @@ public class AMQProtocolHandler implements ProtocolEngine try { syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _networkDriver.close(); + _network.close(); closed(); } catch (AMQTimeoutException e) @@ -829,17 +831,18 @@ public class AMQProtocolHandler implements ProtocolEngine public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } - public void setNetworkDriver(NetworkDriver driver) + public void setNetworkConnection(NetworkConnection network) { - _networkDriver = driver; + _network = network; + _sender = network.getSender(); } /** @param delay delay in seconds (not ms) */ @@ -847,15 +850,15 @@ public class AMQProtocolHandler implements ProtocolEngine { if (delay > 0) { - getNetworkDriver().setMaxWriteIdle(delay); - getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); + _network.setMaxWriteIdle(delay); + _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } - public NetworkDriver getNetworkDriver() + public NetworkConnection getNetworkConnection() { - return _networkDriver; + return _network; } public ProtocolVersion getSuggestedProtocolVersion() diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java deleted file mode 100644 index 1ac8f62e32..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.transport; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.qpid.client.SSLConfiguration; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SocketTransportConnection implements ITransportConnection -{ - private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class); - private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; - - private SocketConnectorFactory _socketConnectorFactory; - - static interface SocketConnectorFactory - { - IoConnector newSocketConnector(); - } - - public SocketTransportConnection(SocketConnectorFactory socketConnectorFactory) - { - _socketConnectorFactory = socketConnectorFactory; - } - - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException - { - ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); - - // the MINA default is currently to use the pooled allocator although this may change in future - // once more testing of the performance of the simple allocator has been done - if (!Boolean.getBoolean("amqj.enablePooledAllocator")) - { - _logger.info("Using SimpleByteBufferAllocator"); - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - } - - final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); - final InetSocketAddress address; - - if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) - { - address = null; - } - else - { - address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); - _logger.info("Attempting connection to " + address); - } - - SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration(); - SSLContextFactory sslFactory = null; - if (sslConfig != null) - { - sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); - } - - MINANetworkDriver driver = new MINANetworkDriver(ioConnector); - driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory); - protocolHandler.setNetworkDriver(driver); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 97657a09f4..3e03f88341 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -22,22 +22,17 @@ package org.apache.qpid.client.transport; import java.io.IOException; import java.net.Socket; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.transport.socket.nio.ExistingSocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; -import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.thread.QpidThreadExecutor; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; +import org.apache.qpid.transport.network.VMBrokerMap; +import org.apache.qpid.transport.network.mina.MinaNetworkHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,132 +46,15 @@ public class TransportConnection { private static ITransportConnection _instance; - private static final Map _inVmPipeAddress = new HashMap(); private static VmPipeAcceptor _acceptor; private static int _currentInstance = -1; private static int _currentVMPort = -1; - private static final int TCP = 0; - private static final int VM = 1; - private static final int SOCKET = 2; - private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class); private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory"; - private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>(); - - public static void registerOpenSocket(String socketID, Socket openSocket) - { - _openSocketRegister.put(socketID, openSocket); - } - - public static Socket removeOpenSocket(String socketID) - { - return _openSocketRegister.remove(socketID); - } - - public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException - { - int transport = getTransport(details.getTransport()); - - if (transport == -1) - { - throw new AMQNoTransportForProtocolException(details, null, null); - } - - switch (transport) - { - case SOCKET: - return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() - { - public IoConnector newSocketConnector() - { - ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor()); - - Socket socket = TransportConnection.removeOpenSocket(details.getHost()); - - if (socket != null) - { - _logger.info("Using existing Socket:" + socket); - - ((ExistingSocketConnector) connector).setOpenSocket(socket); - } - else - { - throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket://<SocketID>' transport:" + details); - } - return connector; - } - }); - case TCP: - return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() - { - public IoConnector newSocketConnector() - { - SocketConnector result = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector - - // Don't have the connector's worker thread wait around for other connections (we only use - // one SocketConnector per connection at the moment anyway). This allows short-running - // clients (like unit tests) to complete quickly. - result.setWorkerTimeout(0); - return result; - } - }); - case VM: - { - return getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); - } - default: - throw new AMQNoTransportForProtocolException(details, "Transport not recognised:" + transport, null); - } - } - - private static int getTransport(String transport) - { - if (transport.equals(BrokerDetails.SOCKET)) - { - return SOCKET; - } - - if (transport.equals(BrokerDetails.TCP)) - { - return TCP; - } - - if (transport.equals(BrokerDetails.VM)) - { - return VM; - } - - return -1; - } - - private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) - throws AMQVMBrokerCreationException - { - int port = details.getPort(); - - synchronized (_inVmPipeAddress) - { - if (!_inVmPipeAddress.containsKey(port)) - { - if (AutoCreate) - { - _logger.warn("Auto Creating InVM Broker on port:" + port); - createVMBroker(port); - } - else - { - throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port - + " does not exist. Auto create disabled.", null); - } - } - } - - return new VmPipeTransportConnection(port); - } + public static final int DEFAULT_VM_PORT = 1; public static void createVMBroker(int port) throws AMQVMBrokerCreationException { @@ -189,10 +67,10 @@ public class TransportConnection IoServiceConfig config = _acceptor.getDefaultConfig(); } } - synchronized (_inVmPipeAddress) + synchronized (VMBrokerMap.class) { - if (!_inVmPipeAddress.containsKey(port)) + if (!VMBrokerMap.contains(port)) { _logger.info("Creating InVM Qpid.AMQP listening on port " + port); IoHandlerAdapter provider = null; @@ -204,7 +82,7 @@ public class TransportConnection _acceptor.bind(pipe, provider); - _inVmPipeAddress.put(port, pipe); + VMBrokerMap.add(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); } catch (IOException e) @@ -231,7 +109,7 @@ public class TransportConnection } _acceptor.bind(pipe, provider); - _inVmPipeAddress.put(port, pipe); + VMBrokerMap.add(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); } catch (IOException justUseFirstException) @@ -272,11 +150,10 @@ public class TransportConnection { Class[] cnstr = {Integer.class}; Object[] params = {port}; - - provider = new MINANetworkDriver(); + ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); - ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true); - // Give the broker a second to create + provider = new MinaNetworkHandler(null, engineFactory); + _logger.info("Created VMBroker Instance:" + port); } catch (Exception e) @@ -309,9 +186,9 @@ public class TransportConnection { _acceptor.unbindAll(); } - synchronized (_inVmPipeAddress) + synchronized (VMBrokerMap.class) { - _inVmPipeAddress.clear(); + VMBrokerMap.clear(); } _acceptor = null; } @@ -321,16 +198,15 @@ public class TransportConnection public static void killVMBroker(int port) { - synchronized (_inVmPipeAddress) + synchronized (VMBrokerMap.class) { - VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port); - if (pipe != null) + if (VMBrokerMap.contains(port)) { _logger.info("Killing VM Broker:" + port); - _inVmPipeAddress.remove(port); + VmPipeAddress address = VMBrokerMap.remove(port); // This does need to be sychronized as otherwise mina can hang // if a new connection is made - _acceptor.unbind(pipe); + _acceptor.unbind(address); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java deleted file mode 100644 index 87cc2e7a5a..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.transport; - -import java.io.IOException; - -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; -import org.apache.mina.transport.vmpipe.VmPipeAddress; -import org.apache.mina.transport.vmpipe.VmPipeConnector; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class VmPipeTransportConnection implements ITransportConnection -{ - private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class); - - private int _port; - - private MINANetworkDriver _networkDriver; - - public VmPipeTransportConnection(int port) - { - _port = port; - } - - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException - { - final VmPipeConnector ioConnector = new QpidVmPipeConnector(); - - final VmPipeAddress address = new VmPipeAddress(_port); - _logger.info("Attempting connection to " + address); - _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler); - protocolHandler.setNetworkDriver(_networkDriver); - ConnectFuture future = ioConnector.connect(address, _networkDriver); - // wait for connection to complete - future.join(); - // we call getSession which throws an IOException if there has been an error connecting - future.getSession(); - _networkDriver.setProtocolEngine(protocolHandler); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 6d81f728c9..1aca28aa3a 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -52,7 +52,6 @@ public interface BrokerDetails public static final int DEFAULT_PORT = 5672; - public static final String SOCKET = "socket"; public static final String TCP = "tcp"; public static final String VM = "vm"; diff --git a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index f520a21ba0..e54b8ef369 100644 --- a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -20,23 +20,24 @@ */ package org.apache.qpid.client.protocol; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import junit.framework.TestCase; -import org.apache.qpid.framing.AMQFrame; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQAuthenticationException; +import org.apache.qpid.client.MockAMQConnection; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; -import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.TestNetworkDriver; -import org.apache.qpid.client.MockAMQConnection; -import org.apache.qpid.client.AMQAuthenticationException; -import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.transport.TestNetworkConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - /** * This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception. * @@ -73,7 +74,7 @@ public class AMQProtocolHandlerTest extends TestCase { //Create a new ProtocolHandler with a fake connection. _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'")); - _handler.setNetworkDriver(new TestNetworkDriver()); + _handler.setNetworkConnection(new TestNetworkConnection()); AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1); _blockFrame = new AMQFrame(0, body); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 2c5fa0112e..a3dfff45f9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -487,27 +487,6 @@ public class ConnectionURLTest extends TestCase } - public void testSocketProtocol() throws URLSyntaxException - { - String url = "amqp://guest:guest@id/test" + "?brokerlist='socket://VM-Unique-socketID'"; - - try - { - AMQConnectionURL curl = new AMQConnectionURL(url); - assertNotNull(curl); - assertEquals(1, curl.getBrokerCount()); - assertNotNull(curl.getBrokerDetails(0)); - assertEquals(BrokerDetails.SOCKET, curl.getBrokerDetails(0).getTransport()); - assertEquals("VM-Unique-socketID", curl.getBrokerDetails(0).getHost()); - assertEquals("URL does not toString as expected", - url.replace(":guest", ":********"), curl.toString()); - } - catch (URLSyntaxException e) - { - fail(e.getMessage()); - } - } - public void testSingleTransportMultiOptionOnBrokerURL() throws URLSyntaxException { String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'"; |