summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-07-07 15:10:30 +0000
committerRobert Gemmell <robbie@apache.org>2011-07-07 15:10:30 +0000
commit245f2793e0a4efd4876ad72b2cf32edc93750d84 (patch)
treeb5fd72fdea830222b314029b13062cbd690e8d2e /java/client/src
parentb4f9004439f56f492931f4b35f7fa0ae58f3ff85 (diff)
downloadqpid-python-245f2793e0a4efd4876ad72b2cf32edc93750d84.tar.gz
QPID-3342: transition TCP based Mina transport for 0-8/0-9/0-9-1 protocols over to new IO interface model
Applied patch by Keith Wall and myself git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1143867 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java478
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java37
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java90
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java156
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java63
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java1
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java21
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java21
10 files changed, 71 insertions, 833 deletions
diff --git a/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java b/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
deleted file mode 100644
index 98716c0c3c..0000000000
--- a/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.transport.socket.nio;
-
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoConnectorConfig;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.support.BaseIoConnector;
-import org.apache.mina.common.support.DefaultConnectFuture;
-import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.NewThreadExecutor;
-import org.apache.mina.util.Queue;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Set;
-
-/**
- * {@link IoConnector} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 627427 $, $Date: 2008-02-13 14:39:10 +0000 (Wed, 13 Feb 2008) $
- */
-public class ExistingSocketConnector extends BaseIoConnector
-{
- /** @noinspection StaticNonFinalField */
- private static volatile int nextId = 0;
-
- private final Object lock = new Object();
- private final int id = nextId++;
- private final String threadName = "SocketConnector-" + id;
- private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
- private final Queue connectQueue = new Queue();
- private final SocketIoProcessor[] ioProcessors;
- private final int processorCount;
- private final Executor executor;
-
- /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
- private Selector selector;
- private Worker worker;
- private int processorDistributor = 0;
- private int workerTimeout = 60; // 1 min.
- private Socket _openSocket = null;
-
- /** Create a connector with a single processing thread using a NewThreadExecutor */
- public ExistingSocketConnector()
- {
- this(1, new NewThreadExecutor());
- }
-
- /**
- * Create a connector with the desired number of processing threads
- *
- * @param processorCount Number of processing threads
- * @param executor Executor to use for launching threads
- */
- public ExistingSocketConnector(int processorCount, Executor executor)
- {
- if (processorCount < 1)
- {
- throw new IllegalArgumentException("Must have at least one processor");
- }
-
- this.executor = executor;
- this.processorCount = processorCount;
- ioProcessors = new SocketIoProcessor[processorCount];
-
- for (int i = 0; i < processorCount; i++)
- {
- ioProcessors[i] = new SocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
- }
- }
-
- /**
- * How many seconds to keep the connection thread alive between connection requests
- *
- * @return Number of seconds to keep connection thread alive
- */
- public int getWorkerTimeout()
- {
- return workerTimeout;
- }
-
- /**
- * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
- *
- * @param workerTimeout Number of seconds to keep thread alive. Must be >=0
- */
- public void setWorkerTimeout(int workerTimeout)
- {
- if (workerTimeout < 0)
- {
- throw new IllegalArgumentException("Must be >= 0");
- }
- this.workerTimeout = workerTimeout;
- }
-
- public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
- {
- return connect(address, null, handler, config);
- }
-
- public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
- IoHandler handler, IoServiceConfig config)
- {
- /** Changes here from the Mina OpenSocketConnector.
- * Ignoreing all address as they are not needed */
-
- if (handler == null)
- {
- throw new NullPointerException("handler");
- }
-
-
- if (config == null)
- {
- config = getDefaultConfig();
- }
-
- if (_openSocket == null)
- {
- throw new IllegalArgumentException("Specifed Socket not active");
- }
-
- boolean success = false;
-
- try
- {
- DefaultConnectFuture future = new DefaultConnectFuture();
- newSession(_openSocket, handler, config, future);
- success = true;
- return future;
- }
- catch (IOException e)
- {
- return DefaultConnectFuture.newFailedFuture(e);
- }
- finally
- {
- if (!success && _openSocket != null)
- {
- try
- {
- _openSocket.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- }
- }
-
- public IoServiceConfig getDefaultConfig()
- {
- return defaultConfig;
- }
-
- /**
- * Sets the config this connector will use by default.
- *
- * @param defaultConfig the default config.
- *
- * @throws NullPointerException if the specified value is <code>null</code>.
- */
- public void setDefaultConfig(SocketConnectorConfig defaultConfig)
- {
- if (defaultConfig == null)
- {
- throw new NullPointerException("defaultConfig");
- }
- this.defaultConfig = defaultConfig;
- }
-
- private synchronized void startupWorker() throws IOException
- {
- if (worker == null)
- {
- selector = Selector.open();
- worker = new Worker();
- executor.execute(new NamePreservingRunnable(worker));
- }
- }
-
- private void registerNew()
- {
- if (connectQueue.isEmpty())
- {
- return;
- }
-
- for (; ;)
- {
- ConnectionRequest req;
- synchronized (connectQueue)
- {
- req = (ConnectionRequest) connectQueue.pop();
- }
-
- if (req == null)
- {
- break;
- }
-
- SocketChannel ch = req.channel;
- try
- {
- ch.register(selector, SelectionKey.OP_CONNECT, req);
- }
- catch (IOException e)
- {
- req.setException(e);
- }
- }
- }
-
- private void processSessions(Set keys)
- {
- Iterator it = keys.iterator();
-
- while (it.hasNext())
- {
- SelectionKey key = (SelectionKey) it.next();
-
- if (!key.isConnectable())
- {
- continue;
- }
-
- SocketChannel ch = (SocketChannel) key.channel();
- ConnectionRequest entry = (ConnectionRequest) key.attachment();
-
- boolean success = false;
- try
- {
- ch.finishConnect();
- newSession(ch, entry.handler, entry.config, entry);
- success = true;
- }
- catch (Throwable e)
- {
- entry.setException(e);
- }
- finally
- {
- key.cancel();
- if (!success)
- {
- try
- {
- ch.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- }
- }
-
- keys.clear();
- }
-
- private void processTimedOutSessions(Set keys)
- {
- long currentTime = System.currentTimeMillis();
- Iterator it = keys.iterator();
-
- while (it.hasNext())
- {
- SelectionKey key = (SelectionKey) it.next();
-
- if (!key.isValid())
- {
- continue;
- }
-
- ConnectionRequest entry = (ConnectionRequest) key.attachment();
-
- if (currentTime >= entry.deadline)
- {
- entry.setException(new ConnectException());
- try
- {
- key.channel().close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- finally
- {
- key.cancel();
- }
- }
- }
- }
-
- private void newSession(Socket socket, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
- throws IOException
- {
- SocketSessionImpl session = new SocketSessionImpl(this,
- nextProcessor(),
- getListeners(),
- config,
- socket.getChannel(),
- handler,
- socket.getRemoteSocketAddress());
-
- newSession(session, config, connectFuture);
- }
-
- private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
- throws IOException
-
- {
- SocketSessionImpl session = new SocketSessionImpl(this,
- nextProcessor(),
- getListeners(),
- config,
- ch,
- handler,
- ch.socket().getRemoteSocketAddress());
-
- newSession(session, config, connectFuture);
- }
-
- private void newSession(SocketSessionImpl session, IoServiceConfig config, ConnectFuture connectFuture)
- throws IOException
- {
- try
- {
- getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- config.getThreadModel().buildFilterChain(session.getFilterChain());
- }
- catch (Throwable e)
- {
- throw (IOException) new IOException("Failed to create a session.").initCause(e);
- }
- session.getIoProcessor().addNew(session);
- connectFuture.setSession(session);
- }
-
- private SocketIoProcessor nextProcessor()
- {
- return ioProcessors[processorDistributor++ % processorCount];
- }
-
- public void setOpenSocket(Socket openSocket)
- {
- _openSocket = openSocket;
- }
-
- private class Worker implements Runnable
- {
- private long lastActive = System.currentTimeMillis();
-
- public void run()
- {
- Thread.currentThread().setName(ExistingSocketConnector.this.threadName);
-
- for (; ;)
- {
- try
- {
- int nKeys = selector.select(1000);
-
- registerNew();
-
- if (nKeys > 0)
- {
- processSessions(selector.selectedKeys());
- }
-
- processTimedOutSessions(selector.keys());
-
- if (selector.keys().isEmpty())
- {
- if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L)
- {
- synchronized (lock)
- {
- if (selector.keys().isEmpty() &&
- connectQueue.isEmpty())
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- else
- {
- lastActive = System.currentTimeMillis();
- }
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
-
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e1)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e1);
- }
- }
- }
- }
- }
-
- private class ConnectionRequest extends DefaultConnectFuture
- {
- private final SocketChannel channel;
- private final long deadline;
- private final IoHandler handler;
- private final IoServiceConfig config;
-
- private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
- {
- this.channel = channel;
- long timeout;
- if (config instanceof IoConnectorConfig)
- {
- timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis();
- }
- else
- {
- timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis();
- }
- this.deadline = System.currentTimeMillis() + timeout;
- this.handler = handler;
- this.config = config;
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index b31dd2bc91..ed37a69b82 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -56,9 +56,8 @@ public class AMQBrokerDetails implements BrokerDetails
if (transport != null)
{
//todo this list of valid transports should be enumerated somewhere
- if ((!(transport.equalsIgnoreCase(BrokerDetails.VM) ||
- transport.equalsIgnoreCase(BrokerDetails.TCP) ||
- transport.equalsIgnoreCase(BrokerDetails.SOCKET))))
+ if (!(transport.equalsIgnoreCase(BrokerDetails.VM) ||
+ transport.equalsIgnoreCase(BrokerDetails.TCP)))
{
if (transport.equalsIgnoreCase("localhost"))
{
@@ -182,10 +181,7 @@ public class AMQBrokerDetails implements BrokerDetails
}
else
{
- if (!_transport.equalsIgnoreCase(SOCKET))
- {
- setPort(port);
- }
+ setPort(port);
}
String queryString = connection.getQuery();
@@ -307,11 +303,8 @@ public class AMQBrokerDetails implements BrokerDetails
sb.append(_host);
}
- if (!(_transport.equalsIgnoreCase(SOCKET)))
- {
- sb.append(':');
- sb.append(_port);
- }
+ sb.append(':');
+ sb.append(_port);
sb.append(printOptionsURL());
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 98573c9cc3..b0242210d8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -49,6 +49,11 @@ import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.mina.MinaNetworkTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,8 +94,21 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
- TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+ ConnectionSettings settings = new ConnectionSettings();
+ settings.setHost(brokerDetail.getHost());
+ settings.setPort(brokerDetail.getPort());
+ settings.setProtocol(brokerDetail.getTransport());
+ SSLConfiguration sslConfig = _conn.getSSLConfiguration();
+ SSLContextFactory sslFactory = null;
+ if (sslConfig != null)
+ {
+ sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+ }
+
+ OutgoingNetworkTransport transport = new MinaNetworkTransport();
+ NetworkConnection network = transport.connect(settings, _conn._protocolHandler, sslFactory);
+ _conn._protocolHandler.setNetworkConnection(network);
_conn._protocolHandler.getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 424e09693f..34c6468629 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -57,7 +57,6 @@ import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.Job;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
@@ -65,8 +64,9 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.NetworkTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -172,11 +172,13 @@ public class AMQProtocolHandler implements ProtocolEngine
private Job _readJob;
private Job _writeJob;
private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
- private NetworkDriver _networkDriver;
private ProtocolVersion _suggestedProtocolVersion;
private long _writtenBytes;
private long _readBytes;
+ private NetworkTransport _transport;
+ private NetworkConnection _network;
+ private Sender<ByteBuffer> _sender;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -300,7 +302,7 @@ public class AMQProtocolHandler implements ProtocolEngine
// failover:
HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
- _networkDriver.close();
+ _network.close();
}
public void writerIdle()
@@ -322,7 +324,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
// this will attempt failover
- _networkDriver.close();
+ _network.close();
closed();
}
else
@@ -574,7 +576,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
public void run()
{
- _networkDriver.send(buf);
+ _sender.send(buf);
}
});
if (PROTOCOL_DEBUG)
@@ -595,7 +597,7 @@ public class AMQProtocolHandler implements ProtocolEngine
if (wait)
{
- _networkDriver.flush();
+ _sender.flush();
}
}
@@ -709,7 +711,7 @@ public class AMQProtocolHandler implements ProtocolEngine
try
{
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _networkDriver.close();
+ _network.close();
closed();
}
catch (AMQTimeoutException e)
@@ -829,17 +831,18 @@ public class AMQProtocolHandler implements ProtocolEngine
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
- public void setNetworkDriver(NetworkDriver driver)
+ public void setNetworkConnection(NetworkConnection network)
{
- _networkDriver = driver;
+ _network = network;
+ _sender = network.getSender();
}
/** @param delay delay in seconds (not ms) */
@@ -847,15 +850,15 @@ public class AMQProtocolHandler implements ProtocolEngine
{
if (delay > 0)
{
- getNetworkDriver().setMaxWriteIdle(delay);
- getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+ _network.setMaxWriteIdle(delay);
+ _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
- public NetworkDriver getNetworkDriver()
+ public NetworkConnection getNetworkConnection()
{
- return _networkDriver;
+ return _network;
}
public ProtocolVersion getSuggestedProtocolVersion()
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
deleted file mode 100644
index 1ac8f62e32..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.transport;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.qpid.client.SSLConfiguration;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.network.mina.MINANetworkDriver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SocketTransportConnection implements ITransportConnection
-{
- private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
- private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
-
- private SocketConnectorFactory _socketConnectorFactory;
-
- static interface SocketConnectorFactory
- {
- IoConnector newSocketConnector();
- }
-
- public SocketTransportConnection(SocketConnectorFactory socketConnectorFactory)
- {
- _socketConnectorFactory = socketConnectorFactory;
- }
-
- public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
- {
- ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
-
- // the MINA default is currently to use the pooled allocator although this may change in future
- // once more testing of the performance of the simple allocator has been done
- if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
- {
- _logger.info("Using SimpleByteBufferAllocator");
- ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
- }
-
- final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
- final InetSocketAddress address;
-
- if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
- {
- address = null;
- }
- else
- {
- address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
- _logger.info("Attempting connection to " + address);
- }
-
- SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration();
- SSLContextFactory sslFactory = null;
- if (sslConfig != null)
- {
- sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
- }
-
- MINANetworkDriver driver = new MINANetworkDriver(ioConnector);
- driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory);
- protocolHandler.setNetworkDriver(driver);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 97657a09f4..3e03f88341 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -22,22 +22,17 @@ package org.apache.qpid.client.transport;
import java.io.IOException;
import java.net.Socket;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.thread.QpidThreadExecutor;
-import org.apache.qpid.transport.network.mina.MINANetworkDriver;
+import org.apache.qpid.transport.network.VMBrokerMap;
+import org.apache.qpid.transport.network.mina.MinaNetworkHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,132 +46,15 @@ public class TransportConnection
{
private static ITransportConnection _instance;
- private static final Map _inVmPipeAddress = new HashMap();
private static VmPipeAcceptor _acceptor;
private static int _currentInstance = -1;
private static int _currentVMPort = -1;
- private static final int TCP = 0;
- private static final int VM = 1;
- private static final int SOCKET = 2;
-
private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class);
private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory";
- private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>();
-
- public static void registerOpenSocket(String socketID, Socket openSocket)
- {
- _openSocketRegister.put(socketID, openSocket);
- }
-
- public static Socket removeOpenSocket(String socketID)
- {
- return _openSocketRegister.remove(socketID);
- }
-
- public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException
- {
- int transport = getTransport(details.getTransport());
-
- if (transport == -1)
- {
- throw new AMQNoTransportForProtocolException(details, null, null);
- }
-
- switch (transport)
- {
- case SOCKET:
- return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
- {
- public IoConnector newSocketConnector()
- {
- ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor());
-
- Socket socket = TransportConnection.removeOpenSocket(details.getHost());
-
- if (socket != null)
- {
- _logger.info("Using existing Socket:" + socket);
-
- ((ExistingSocketConnector) connector).setOpenSocket(socket);
- }
- else
- {
- throw new IllegalArgumentException("Active Socket must be provided for broker " +
- "with 'socket://<SocketID>' transport:" + details);
- }
- return connector;
- }
- });
- case TCP:
- return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
- {
- public IoConnector newSocketConnector()
- {
- SocketConnector result = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector
-
- // Don't have the connector's worker thread wait around for other connections (we only use
- // one SocketConnector per connection at the moment anyway). This allows short-running
- // clients (like unit tests) to complete quickly.
- result.setWorkerTimeout(0);
- return result;
- }
- });
- case VM:
- {
- return getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
- }
- default:
- throw new AMQNoTransportForProtocolException(details, "Transport not recognised:" + transport, null);
- }
- }
-
- private static int getTransport(String transport)
- {
- if (transport.equals(BrokerDetails.SOCKET))
- {
- return SOCKET;
- }
-
- if (transport.equals(BrokerDetails.TCP))
- {
- return TCP;
- }
-
- if (transport.equals(BrokerDetails.VM))
- {
- return VM;
- }
-
- return -1;
- }
-
- private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
- throws AMQVMBrokerCreationException
- {
- int port = details.getPort();
-
- synchronized (_inVmPipeAddress)
- {
- if (!_inVmPipeAddress.containsKey(port))
- {
- if (AutoCreate)
- {
- _logger.warn("Auto Creating InVM Broker on port:" + port);
- createVMBroker(port);
- }
- else
- {
- throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
- + " does not exist. Auto create disabled.", null);
- }
- }
- }
-
- return new VmPipeTransportConnection(port);
- }
+ public static final int DEFAULT_VM_PORT = 1;
public static void createVMBroker(int port) throws AMQVMBrokerCreationException
{
@@ -189,10 +67,10 @@ public class TransportConnection
IoServiceConfig config = _acceptor.getDefaultConfig();
}
}
- synchronized (_inVmPipeAddress)
+ synchronized (VMBrokerMap.class)
{
- if (!_inVmPipeAddress.containsKey(port))
+ if (!VMBrokerMap.contains(port))
{
_logger.info("Creating InVM Qpid.AMQP listening on port " + port);
IoHandlerAdapter provider = null;
@@ -204,7 +82,7 @@ public class TransportConnection
_acceptor.bind(pipe, provider);
- _inVmPipeAddress.put(port, pipe);
+ VMBrokerMap.add(port, pipe);
_logger.info("Created InVM Qpid.AMQP listening on port " + port);
}
catch (IOException e)
@@ -231,7 +109,7 @@ public class TransportConnection
}
_acceptor.bind(pipe, provider);
- _inVmPipeAddress.put(port, pipe);
+ VMBrokerMap.add(port, pipe);
_logger.info("Created InVM Qpid.AMQP listening on port " + port);
}
catch (IOException justUseFirstException)
@@ -272,11 +150,10 @@ public class TransportConnection
{
Class[] cnstr = {Integer.class};
Object[] params = {port};
-
- provider = new MINANetworkDriver();
+
ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
- ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true);
- // Give the broker a second to create
+ provider = new MinaNetworkHandler(null, engineFactory);
+
_logger.info("Created VMBroker Instance:" + port);
}
catch (Exception e)
@@ -309,9 +186,9 @@ public class TransportConnection
{
_acceptor.unbindAll();
}
- synchronized (_inVmPipeAddress)
+ synchronized (VMBrokerMap.class)
{
- _inVmPipeAddress.clear();
+ VMBrokerMap.clear();
}
_acceptor = null;
}
@@ -321,16 +198,15 @@ public class TransportConnection
public static void killVMBroker(int port)
{
- synchronized (_inVmPipeAddress)
+ synchronized (VMBrokerMap.class)
{
- VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
- if (pipe != null)
+ if (VMBrokerMap.contains(port))
{
_logger.info("Killing VM Broker:" + port);
- _inVmPipeAddress.remove(port);
+ VmPipeAddress address = VMBrokerMap.remove(port);
// This does need to be sychronized as otherwise mina can hang
// if a new connection is made
- _acceptor.unbind(pipe);
+ _acceptor.unbind(address);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
deleted file mode 100644
index 87cc2e7a5a..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.transport;
-
-import java.io.IOException;
-
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
-import org.apache.mina.transport.vmpipe.VmPipeConnector;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.transport.network.mina.MINANetworkDriver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VmPipeTransportConnection implements ITransportConnection
-{
- private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class);
-
- private int _port;
-
- private MINANetworkDriver _networkDriver;
-
- public VmPipeTransportConnection(int port)
- {
- _port = port;
- }
-
- public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
- {
- final VmPipeConnector ioConnector = new QpidVmPipeConnector();
-
- final VmPipeAddress address = new VmPipeAddress(_port);
- _logger.info("Attempting connection to " + address);
- _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler);
- protocolHandler.setNetworkDriver(_networkDriver);
- ConnectFuture future = ioConnector.connect(address, _networkDriver);
- // wait for connection to complete
- future.join();
- // we call getSession which throws an IOException if there has been an error connecting
- future.getSession();
- _networkDriver.setProtocolEngine(protocolHandler);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
index 6d81f728c9..1aca28aa3a 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
@@ -52,7 +52,6 @@ public interface BrokerDetails
public static final int DEFAULT_PORT = 5672;
- public static final String SOCKET = "socket";
public static final String TCP = "tcp";
public static final String VM = "vm";
diff --git a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
index f520a21ba0..e54b8ef369 100644
--- a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
@@ -20,23 +20,24 @@
*/
package org.apache.qpid.client.protocol;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.TestCase;
-import org.apache.qpid.framing.AMQFrame;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.client.MockAMQConnection;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl;
-import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.TestNetworkDriver;
-import org.apache.qpid.client.MockAMQConnection;
-import org.apache.qpid.client.AMQAuthenticationException;
-import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.transport.TestNetworkConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception.
*
@@ -73,7 +74,7 @@ public class AMQProtocolHandlerTest extends TestCase
{
//Create a new ProtocolHandler with a fake connection.
_handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"));
- _handler.setNetworkDriver(new TestNetworkDriver());
+ _handler.setNetworkConnection(new TestNetworkConnection());
AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1);
_blockFrame = new AMQFrame(0, body);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
index 2c5fa0112e..a3dfff45f9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
@@ -487,27 +487,6 @@ public class ConnectionURLTest extends TestCase
}
- public void testSocketProtocol() throws URLSyntaxException
- {
- String url = "amqp://guest:guest@id/test" + "?brokerlist='socket://VM-Unique-socketID'";
-
- try
- {
- AMQConnectionURL curl = new AMQConnectionURL(url);
- assertNotNull(curl);
- assertEquals(1, curl.getBrokerCount());
- assertNotNull(curl.getBrokerDetails(0));
- assertEquals(BrokerDetails.SOCKET, curl.getBrokerDetails(0).getTransport());
- assertEquals("VM-Unique-socketID", curl.getBrokerDetails(0).getHost());
- assertEquals("URL does not toString as expected",
- url.replace(":guest", ":********"), curl.toString());
- }
- catch (URLSyntaxException e)
- {
- fail(e.getMessage());
- }
- }
-
public void testSingleTransportMultiOptionOnBrokerURL() throws URLSyntaxException
{
String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'";