summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-07-07 15:10:30 +0000
committerRobert Gemmell <robbie@apache.org>2011-07-07 15:10:30 +0000
commit245f2793e0a4efd4876ad72b2cf32edc93750d84 (patch)
treeb5fd72fdea830222b314029b13062cbd690e8d2e /java/common
parentb4f9004439f56f492931f4b35f7fa0ae58f3ff85 (diff)
downloadqpid-python-245f2793e0a4efd4876ad72b2cf32edc93750d84.tar.gz
QPID-3342: transition TCP based Mina transport for 0-8/0-9/0-9-1 protocols over to new IO interface model
Applied patch by Keith Wall and myself git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1143867 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java63
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java (renamed from java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java)20
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java30
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Transport.java27
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java51
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java368
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java81
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java149
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java250
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java79
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/MockSender.java47
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java (renamed from java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java)19
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java (renamed from java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java)267
19 files changed, 919 insertions, 568 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 31953ea6ab..48a3df734a 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -22,8 +22,6 @@ package org.apache.qpid.protocol;
import java.net.SocketAddress;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Receiver;
/**
@@ -32,9 +30,6 @@ import org.apache.qpid.transport.Receiver;
*/
public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
{
- // Sets the network driver providing data for this ProtocolEngine
- void setNetworkDriver (NetworkDriver driver);
-
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
index 9df84eef90..4e40b78440 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.protocol;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.NetworkConnection;
public interface ProtocolEngineFactory
{
// Returns a new instance of a ProtocolEngine
- ProtocolEngine newProtocolEngine(NetworkDriver networkDriver);
+ ProtocolEngine newProtocolEngine(NetworkConnection network);
} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
index 08678b213b..2074c77a5b 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
@@ -30,6 +30,8 @@ import java.util.Map;
*/
public class ConnectionSettings
{
+ public static final String WILDCARD_ADDRESS = "*";
+
String protocol = "tcp";
String host = "localhost";
String vhost;
diff --git a/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
deleted file mode 100644
index 86af97bf7e..0000000000
--- a/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport;
-
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.SocketAddress;
-
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-
-public interface NetworkDriver extends Sender<java.nio.ByteBuffer>
-{
- // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to
- // it using the SSLContextFactory if provided
- void open(int port, InetAddress destination, ProtocolEngine engine,
- NetworkDriverConfiguration config, SSLContextFactory sslFactory)
- throws OpenException;
-
- // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which
- // processes incoming connections with ProtocolEngines and SSLEngines created from the factories
- // (in the case of an SSLContextFactory, if provided)
- void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
- NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException;
-
- // Returns the remote address of the underlying socket
- SocketAddress getRemoteAddress();
-
- // Returns the local address of the underlying socket
- SocketAddress getLocalAddress();
-
- /**
- * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been
- * read in seconds
- */
- void setMaxReadIdle(int idleTime);
-
- /**
- * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been
- * written in seconds
- */
- void setMaxWriteIdle(int idleTime);
-
-} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
index c38afe5dd5..8d3f7a779a 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
@@ -25,20 +25,22 @@ package org.apache.qpid.transport;
* buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned
* from here if the underlying implementation supports them.
*/
-public interface NetworkDriverConfiguration
+public interface NetworkTransportConfiguration
{
// Taken from Socket
- Boolean getKeepAlive();
- Boolean getOOBInline();
- Boolean getReuseAddress();
- Integer getSoLinger(); // null means off
- Integer getSoTimeout();
Boolean getTcpNoDelay();
- Integer getTrafficClass();
// The amount of memory in bytes to allocate to the incoming buffer
Integer getReceiveBufferSize();
// The amount of memory in bytes to allocate to the outgoing buffer
- Integer getSendBufferSize();
-}
+ Integer getSendBufferSize();
+
+ Integer getPort();
+
+ String getHost();
+
+ String getTransport();
+
+ Integer getConnectorProcessors();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java b/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java
new file mode 100644
index 0000000000..2c7652abeb
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java
@@ -0,0 +1,8 @@
+package org.apache.qpid.transport;
+
+import org.apache.mina.common.IoConnector;
+
+public interface SocketConnectorFactory
+{
+ IoConnector newConnector();
+} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
new file mode 100644
index 0000000000..7099916c33
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+
+public interface IncomingNetworkTransport extends NetworkTransport
+{
+ public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContextFactory sslFactory);
+} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
index 80b32ea909..1f69973b96 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
@@ -40,4 +40,8 @@ public interface NetworkConnection
* Returns the local address of the underlying socket.
*/
SocketAddress getLocalAddress();
+
+ void setMaxWriteIdle(int sec);
+
+ void setMaxReadIdle(int sec);
} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
new file mode 100644
index 0000000000..4b8a0baf75
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+public class Transport
+{
+ public static final String TCP = "tcp";
+ public static final String VM = "vm";
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java b/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java
new file mode 100644
index 0000000000..acc55c2e2d
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
+public class VMBrokerMap
+{
+ private static final Map<Integer, VmPipeAddress> _map = new HashMap<Integer, VmPipeAddress>();
+
+ public static void add(int port, VmPipeAddress pipe)
+ {
+ _map.put(port, pipe);
+ }
+
+ public static VmPipeAddress remove(int port)
+ {
+ return _map.remove(port);
+ }
+
+ public static void clear()
+ {
+ _map.clear();
+ }
+
+ public static boolean contains(int port)
+ {
+ return _map.containsKey(port);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
index 3252544fee..cca1fc46c9 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
@@ -78,5 +78,16 @@ public class IoNetworkConnection implements NetworkConnection
{
return _socket.getLocalSocketAddress();
}
-
+
+ public void setMaxWriteIdle(int sec)
+ {
+ // TODO implement support for setting heartbeating config in this way
+ // Currently a socket timeout is used in IoSender
+ }
+
+ public void setMaxReadIdle(int sec)
+ {
+ // TODO implement support for setting heartbeating config in this way
+ // Currently a socket timeout is used in IoSender
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
deleted file mode 100644
index 2206e0999e..0000000000
--- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.transport.network.mina;
-
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.ExecutorThreadModel;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.common.WriteFuture;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.util.NewThreadExecutor;
-import org.apache.mina.util.SessionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.thread.QpidThreadExecutor;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.NetworkDriverConfiguration;
-import org.apache.qpid.transport.OpenException;
-
-import java.io.IOException;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
-public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
-{
-
- private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
-
- ProtocolEngine _protocolEngine;
- private int _processors = 4;
- private SSLContextFactory _sslFactory = null;
- private IoConnector _socketConnector;
- private IoAcceptor _acceptor;
- private IoSession _ioSession;
- private ProtocolEngineFactory _factory;
- private Throwable _lastException;
- private boolean _acceptingConnections = false;
-
- private WriteFuture _lastWriteFuture;
-
- private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class);
-
- static
- {
- org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
-
- //override the MINA defaults to prevent use of the PooledByteBufferAllocator
- org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
- }
-
- public MINANetworkDriver(int processors, ProtocolEngine protocolEngine, IoSession session)
- {
- _processors = processors;
- _protocolEngine = protocolEngine;
- _ioSession = session;
- _ioSession.setAttachment(_protocolEngine);
- }
-
- public MINANetworkDriver()
- {
-
- }
-
- public MINANetworkDriver(IoConnector ioConnector)
- {
- _socketConnector = ioConnector;
- }
-
- public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine)
- {
- _socketConnector = ioConnector;
- _protocolEngine = engine;
- }
-
- public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory,
- NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
- {
-
- _factory = factory;
-
- _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor());
-
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
- sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)"));
- SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
-
- if (config != null)
- {
- sc.setReceiveBufferSize(config.getReceiveBufferSize());
- sc.setSendBufferSize(config.getSendBufferSize());
- sc.setTcpNoDelay(config.getTcpNoDelay());
- }
-
- if (sslFactory != null)
- {
- _sslFactory = sslFactory;
- }
-
- if (addresses != null && addresses.length > 0)
- {
- for (InetAddress addr : addresses)
- {
- try
- {
- _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig);
- }
- catch (IOException e)
- {
- throw new BindException(String.format("Could not bind to %1s:%2s", addr, port));
- }
- }
- }
- else
- {
- try
- {
- _acceptor.bind(new InetSocketAddress(port), this, sconfig);
- }
- catch (IOException e)
- {
- throw new BindException(String.format("Could not bind to *:%1s", port));
- }
- }
- _acceptingConnections = true;
- }
-
- public SocketAddress getRemoteAddress()
- {
- return _ioSession.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _ioSession.getLocalAddress();
- }
-
-
- public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
- SSLContextFactory sslFactory) throws OpenException
- {
- if (sslFactory != null)
- {
- _sslFactory = sslFactory;
- }
-
- _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector
-
- SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
- String s = "";
- StackTraceElement[] trace = Thread.currentThread().getStackTrace();
- for(StackTraceElement elt : trace)
- {
- if(elt.getClassName().contains("Test"))
- {
- s = elt.getClassName();
- break;
- }
- }
- cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Client)-"+s));
-
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
- scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true);
- scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE);
- scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE);
-
- // Don't have the connector's worker thread wait around for other
- // connections (we only use
- // one SocketConnector per connection at the moment anyway). This allows
- // short-running
- // clients (like unit tests) to complete quickly.
- if (_socketConnector instanceof SocketConnector)
- {
- ((SocketConnector) _socketConnector).setWorkerTimeout(0);
- }
-
- ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
- future.join();
- if (!future.isConnected())
- {
- throw new OpenException("Could not open connection", _lastException);
- }
- _ioSession = future.getSession();
- _ioSession.setAttachment(engine);
- engine.setNetworkDriver(this);
- _protocolEngine = engine;
- }
-
- public void setMaxReadIdle(int idleTime)
- {
- _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime);
- }
-
- public void setMaxWriteIdle(int idleTime)
- {
- _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime);
- }
-
- public void close()
- {
- if (_lastWriteFuture != null)
- {
- _lastWriteFuture.join();
- }
- if (_acceptor != null)
- {
- _acceptor.unbindAll();
- }
- if (_ioSession != null)
- {
- _ioSession.close();
- }
- }
-
- public void flush()
- {
- if (_lastWriteFuture != null)
- {
- _lastWriteFuture.join();
- }
- }
-
- public void send(ByteBuffer msg)
- {
- org.apache.mina.common.ByteBuffer minaBuf = org.apache.mina.common.ByteBuffer.allocate(msg.capacity());
- minaBuf.put(msg);
- minaBuf.flip();
- _lastWriteFuture = _ioSession.write(minaBuf);
- }
-
- public void setIdleTimeout(int i)
- {
- // MINA doesn't support setting SO_TIMEOUT
- }
-
- public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
- {
- if (_protocolEngine != null)
- {
- _protocolEngine.exception(throwable);
- }
- else
- {
- _logger.error("Exception thrown and no ProtocolEngine to handle it", throwable);
- }
- _lastException = throwable;
- }
-
- /**
- * Invoked when a message is received on a particular protocol session. Note
- * that a protocol session is directly tied to a particular physical
- * connection.
- *
- * @param protocolSession
- * the protocol session that received the message
- * @param message
- * the message itself (i.e. a decoded frame)
- *
- * @throws Exception
- * if the message cannot be processed
- */
- public void messageReceived(IoSession protocolSession, Object message) throws Exception
- {
- if (message instanceof org.apache.mina.common.ByteBuffer)
- {
- ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf());
- }
- else
- {
- throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message);
- }
- }
-
- public void sessionClosed(IoSession protocolSession) throws Exception
- {
- ((ProtocolEngine) protocolSession.getAttachment()).closed();
- }
-
- public void sessionCreated(IoSession protocolSession) throws Exception
- {
- // Configure the session with SSL if necessary
- SessionUtil.initialize(protocolSession);
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
-
- if (_ioSession == null)
- {
- _ioSession = protocolSession;
- }
-
- if (_acceptingConnections)
- {
- // Set up the protocol engine
- ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
- MINANetworkDriver newDriver = new MINANetworkDriver(_processors, protocolEngine, protocolSession);
- protocolEngine.setNetworkDriver(newDriver);
- }
- }
-
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
- {
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- ((ProtocolEngine) session.getAttachment()).writerIdle();
- }
- else if (IdleStatus.READER_IDLE.equals(status))
- {
- ((ProtocolEngine) session.getAttachment()).readerIdle();
- }
- }
-
- private ProtocolEngine getProtocolEngine()
- {
- return _protocolEngine;
- }
-
- public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections)
- {
- _factory = engineFactory;
- _acceptingConnections = acceptingConnections;
- }
-
- public void setProtocolEngine(ProtocolEngine protocolEngine)
- {
- _protocolEngine = protocolEngine;
- if (_ioSession != null)
- {
- _ioSession.setAttachment(protocolEngine);
- }
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java
new file mode 100644
index 0000000000..0f433f6eeb
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java
@@ -0,0 +1,81 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.mina;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+public class MinaNetworkConnection implements NetworkConnection
+{
+ private IoSession _session;
+ private Sender<ByteBuffer> _sender;
+
+ public MinaNetworkConnection(IoSession session)
+ {
+ _session = session;
+ _sender = new MinaSender(_session);
+ }
+
+ public Sender<ByteBuffer> getSender()
+ {
+ return _sender;
+ }
+
+ public void close()
+ {
+ _session.close();
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _session.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _session.getLocalAddress();
+ }
+
+ public long getReadBytes()
+ {
+ return _session.getReadBytes();
+ }
+
+ public long getWrittenBytes()
+ {
+ return _session.getWrittenBytes();
+ }
+
+ public void setMaxWriteIdle(int sec)
+ {
+ _session.setIdleTime(IdleStatus.WRITER_IDLE, sec);
+ }
+
+ public void setMaxReadIdle(int sec)
+ {
+ _session.setIdleTime(IdleStatus.READER_IDLE, sec);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
new file mode 100644
index 0000000000..c00187480c
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.mina;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.util.SessionUtil;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MinaNetworkHandler extends IoHandlerAdapter
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(MinaNetworkHandler.class);
+
+ private ProtocolEngineFactory _factory;
+ private SSLContextFactory _sslFactory = null;
+
+ static
+ {
+ boolean directBuffers = Boolean.getBoolean("amqj.enableDirectBuffers");
+ LOGGER.debug("Using " + (directBuffers ? "direct" : "heap") + " buffers");
+ ByteBuffer.setUseDirectBuffers(directBuffers);
+
+ //override the MINA defaults to prevent use of the PooledByteBufferAllocator
+ ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ }
+
+ public MinaNetworkHandler(SSLContextFactory sslFactory, ProtocolEngineFactory factory)
+ {
+ _sslFactory = sslFactory;
+ _factory = factory;
+ }
+
+ public MinaNetworkHandler(SSLContextFactory sslFactory)
+ {
+ this(sslFactory, null);
+ }
+
+ public void messageReceived(IoSession session, Object message)
+ {
+ ProtocolEngine engine = (ProtocolEngine) session.getAttachment();
+ ByteBuffer buf = (ByteBuffer) message;
+ try
+ {
+ engine.received(buf.buf());
+ }
+ catch (RuntimeException re)
+ {
+ engine.exception(re);
+ }
+ }
+
+ public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception
+ {
+ ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment();
+ if(engine != null)
+ {
+ LOGGER.error("Exception caught by Mina", throwable);
+ engine.exception(throwable);
+ }
+ else
+ {
+ LOGGER.error("Exception caught by Mina but without protocol engine to handle it", throwable);
+ }
+ }
+
+ public void sessionCreated(IoSession ioSession) throws Exception
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Created session: " + ioSession.getRemoteAddress());
+ }
+
+ SessionUtil.initialize(ioSession);
+
+ if (_sslFactory != null)
+ {
+ ioSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
+ }
+
+ if (_factory != null)
+ {
+ NetworkConnection netConn = new MinaNetworkConnection(ioSession);
+
+ ProtocolEngine engine = _factory.newProtocolEngine(netConn);
+ ioSession.setAttachment(engine);
+ }
+ }
+
+ public void sessionClosed(IoSession ioSession) throws Exception
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("closed: " + ioSession.getRemoteAddress());
+ }
+
+ ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment();
+ if(engine != null)
+ {
+ engine.closed();
+ }
+ else
+ {
+ LOGGER.error("Unable to close ProtocolEngine as none was present");
+ }
+ }
+
+
+ public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ {
+ if (IdleStatus.WRITER_IDLE.equals(status))
+ {
+ ((ProtocolEngine) session.getAttachment()).writerIdle();
+ }
+ else if (IdleStatus.READER_IDLE.equals(status))
+ {
+ ((ProtocolEngine) session.getAttachment()).readerIdle();
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
new file mode 100644
index 0000000000..62f9429f30
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
@@ -0,0 +1,250 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.mina;
+
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExecutorThreadModel;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.SocketConnectorFactory;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.VMBrokerMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+{
+ private static final int UNKNOWN = -1;
+ private static final int TCP = 0;
+ private static final int VM = 1;
+
+ public NetworkConnection _connection;
+ private SocketAcceptor _acceptor;
+ private InetSocketAddress _address;
+
+ public NetworkConnection connect(ConnectionSettings settings,
+ Receiver<java.nio.ByteBuffer> delegate, SSLContextFactory sslFactory)
+ {
+ int transport = getTransport(settings.getProtocol());
+
+ IoConnectorCreator stc;
+ switch(transport)
+ {
+ case TCP:
+ stc = new IoConnectorCreator(new SocketConnectorFactory()
+ {
+ public IoConnector newConnector()
+ {
+ return new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector
+ }
+ });
+ _connection = stc.connect(delegate, settings, sslFactory);
+ break;
+ case VM:
+ stc = new IoConnectorCreator(new SocketConnectorFactory()
+ {
+ public IoConnector newConnector()
+ {
+ return new QpidVmPipeConnector();
+ }
+ });
+ _connection = stc.connect(delegate, settings, sslFactory);
+ break;
+ case UNKNOWN:
+ default:
+ throw new TransportException("Unknown protocol: " + settings.getProtocol());
+ }
+
+ return _connection;
+ }
+
+ private static int getTransport(String transport)
+ {
+ if (transport.equals(Transport.TCP))
+ {
+ return TCP;
+ }
+
+ if (transport.equals(Transport.VM))
+ {
+ return VM;
+ }
+
+ return -1;
+ }
+
+ public void close()
+ {
+ if(_connection != null)
+ {
+ _connection.close();
+ }
+ if (_acceptor != null)
+ {
+ _acceptor.unbindAll();
+ }
+ }
+
+ public NetworkConnection getConnection()
+ {
+ return _connection;
+ }
+
+ public void accept(final NetworkTransportConfiguration config, final ProtocolEngineFactory factory,
+ final SSLContextFactory sslFactory)
+ {
+ int processors = config.getConnectorProcessors();
+
+ if (Transport.TCP.equalsIgnoreCase(config.getTransport()))
+ {
+ _acceptor = new SocketAcceptor(processors, new NewThreadExecutor());
+
+ SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
+ sconfig.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Acceptor)"));
+ SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
+ sc.setTcpNoDelay(config.getTcpNoDelay());
+ sc.setSendBufferSize(config.getSendBufferSize());
+ sc.setReceiveBufferSize(config.getReceiveBufferSize());
+
+ if (config.getHost().equals(WILDCARD_ADDRESS))
+ {
+ _address = new InetSocketAddress(config.getPort());
+ }
+ else
+ {
+ _address = new InetSocketAddress(config.getHost(), config.getPort());
+ }
+ }
+ else
+ {
+ throw new TransportException("Unknown transport: " + config.getTransport());
+ }
+
+ try
+ {
+ _acceptor.bind(_address, new MinaNetworkHandler(sslFactory, factory));
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Could not bind to " + _address, e);
+ }
+ }
+
+
+ private static class IoConnectorCreator
+ {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoConnectorCreator.class);
+
+ private static final int CLIENT_DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ private SocketConnectorFactory _ioConnectorFactory;
+
+ public IoConnectorCreator(SocketConnectorFactory socketConnectorFactory)
+ {
+ _ioConnectorFactory = socketConnectorFactory;
+ }
+
+ public NetworkConnection connect(Receiver<java.nio.ByteBuffer> receiver, ConnectionSettings settings, SSLContextFactory sslFactory)
+ {
+ final IoConnector ioConnector = _ioConnectorFactory.newConnector();
+ final SocketAddress address;
+ final String protocol = settings.getProtocol();
+ final int port = settings.getPort();
+
+ if (Transport.TCP.equalsIgnoreCase(protocol))
+ {
+ address = new InetSocketAddress(settings.getHost(), port);
+ }
+ else if(Transport.VM.equalsIgnoreCase(protocol))
+ {
+ synchronized (VMBrokerMap.class)
+ {
+ if(!VMBrokerMap.contains(port))
+ {
+ throw new TransportException("VM broker on port " + port + " does not exist.");
+ }
+ }
+
+ address = new VmPipeAddress(port);
+ }
+ else
+ {
+ throw new TransportException("Unknown transport: " + protocol);
+ }
+
+ LOGGER.info("Attempting connection to " + address);
+
+ if (ioConnector instanceof SocketConnector)
+ {
+ SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+ cfg.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Client)"));
+
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ scfg.setTcpNoDelay(true);
+ scfg.setSendBufferSize(CLIENT_DEFAULT_BUFFER_SIZE);
+ scfg.setReceiveBufferSize(CLIENT_DEFAULT_BUFFER_SIZE);
+
+ // Don't have the connector's worker thread wait around for other
+ // connections (we only use one SocketConnector per connection
+ // at the moment anyway). This allows short-running
+ // clients (like unit tests) to complete quickly.
+ ((SocketConnector) ioConnector).setWorkerTimeout(0);
+ }
+
+ ConnectFuture future = ioConnector.connect(address, new MinaNetworkHandler(sslFactory), ioConnector.getDefaultConfig());
+ future.join();
+ if (!future.isConnected())
+ {
+ throw new TransportException("Could not open connection");
+ }
+
+ IoSession session = future.getSession();
+ session.setAttachment(receiver);
+
+ return new MinaNetworkConnection(session);
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
new file mode 100644
index 0000000000..be114e2fa1
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.mina;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.apache.qpid.transport.Sender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MinaSender
+ */
+public class MinaSender implements Sender<java.nio.ByteBuffer>
+{
+ private static final Logger _log = LoggerFactory.getLogger(MinaSender.class);
+
+ private final IoSession _session;
+ private WriteFuture _lastWrite;
+
+ public MinaSender(IoSession session)
+ {
+ _session = session;
+ }
+
+ public synchronized void send(java.nio.ByteBuffer msg)
+ {
+ _log.debug("sending data:");
+ ByteBuffer mina = ByteBuffer.allocate(msg.limit());
+ mina.put(msg);
+ mina.flip();
+ _lastWrite = _session.write(mina);
+ _log.debug("sent data:");
+ }
+
+ public synchronized void flush()
+ {
+ if (_lastWrite != null)
+ {
+ _lastWrite.join();
+ }
+ }
+
+ public void close()
+ {
+ // MINA will sometimes throw away in-progress writes when you ask it to close
+ flush();
+ CloseFuture closed = _session.close();
+ closed.join();
+ }
+
+ public void setIdleTimeout(int i)
+ {
+ //TODO:
+ //We are instead using the setMax[Read|Write]IdleTime methods in
+ //MinaNetworkConnection for this. Should remove this method from
+ //sender interface, but currently being used by IoSender for 0-10.
+ }
+}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/MockSender.java b/java/common/src/test/java/org/apache/qpid/transport/MockSender.java
new file mode 100644
index 0000000000..4b38b7318a
--- /dev/null
+++ b/java/common/src/test/java/org/apache/qpid/transport/MockSender.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.nio.ByteBuffer;
+
+public class MockSender implements Sender<ByteBuffer>
+{
+
+ public void setIdleTimeout(int i)
+ {
+
+ }
+
+ public void send(ByteBuffer msg)
+ {
+
+ }
+
+ public void flush()
+ {
+
+ }
+
+ public void close()
+ {
+
+ }
+}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
index 957a7190ee..8686c17414 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
@@ -25,32 +25,32 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.NetworkConnection;
/**
* Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
* so if this class is being used and some methods are to be used, then please update those.
*/
-public class TestNetworkDriver implements NetworkDriver
+public class TestNetworkConnection implements NetworkConnection
{
- private final ConcurrentMap attributes = new ConcurrentHashMap();
private String _remoteHost = "127.0.0.1";
private String _localHost = "127.0.0.1";
private int _port = 1;
private SocketAddress _localAddress = null;
private SocketAddress _remoteAddress = null;
+ private final MockSender _sender;
- public TestNetworkDriver()
+ public TestNetworkConnection()
{
+ _sender = new MockSender();
}
public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
- NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
+ NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws BindException
{
}
@@ -65,7 +65,7 @@ public class TestNetworkDriver implements NetworkDriver
return (_remoteAddress != null) ? _remoteAddress : new InetSocketAddress(_remoteHost, _port);
}
- public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
+ public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkTransportConfiguration config,
SSLContextFactory sslFactory) throws OpenException
{
@@ -130,4 +130,9 @@ public class TestNetworkDriver implements NetworkDriver
{
_remoteAddress = address;
}
+
+ public Sender<ByteBuffer> getSender()
+ {
+ return _sender;
+ }
}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
index fc8e689ca4..a4292d9009 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
@@ -21,44 +21,58 @@
package org.apache.qpid.transport.network.mina;
-import java.net.BindException;
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import junit.framework.TestCase;
-
+import org.apache.mina.util.AvailablePortFinder;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.OpenException;
-
-public class MINANetworkDriverTest extends TestCase
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+
+public class MinaNetworkHandlerTest extends QpidTestCase
{
private static final String TEST_DATA = "YHALOTHAR";
- private static int TEST_PORT = 2323;
- private NetworkDriver _server;
- private NetworkDriver _client;
+ private int _testPort;
+ private IncomingNetworkTransport _server;
+ private OutgoingNetworkTransport _client;
private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read
private Exception _thrownEx;
+ private ConnectionSettings _clientSettings;
+ private NetworkConnection _network;
+ private TestNetworkTransportConfiguration _brokerSettings;
@Override
- public void setUp()
+ public void setUp() throws Exception
{
- _server = new MINANetworkDriver();
- _client = new MINANetworkDriver();
+ String host = InetAddress.getLocalHost().getHostName();
+ _testPort = AvailablePortFinder.getNextAvailable(10000);
+
+ _clientSettings = new ConnectionSettings();
+ _clientSettings.setHost(host);
+ _clientSettings.setPort(_testPort);
+
+ _brokerSettings = new TestNetworkTransportConfiguration(_testPort, host);
+
+ _server = new MinaNetworkTransport();
+ _client = new MinaNetworkTransport();
_thrownEx = null;
_countingEngine = new CountingProtocolEngine();
- // increment the port to prevent tests clashing with each other when
- // the port is in TIMED_WAIT state.
- TEST_PORT++;
}
@Override
@@ -78,46 +92,40 @@ public class MINANetworkDriverTest extends TestCase
/**
* Tests that a socket can't be opened if a driver hasn't been bound
* to the port and can be opened if a driver has been bound.
- * @throws BindException
- * @throws UnknownHostException
- * @throws OpenException
*/
- public void testBindOpen() throws BindException, UnknownHostException, OpenException
+ public void testBindOpen() throws Exception
{
try
{
- _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _client.connect(_clientSettings, _countingEngine, null);
}
- catch (OpenException e)
+ catch (TransportException e)
{
_thrownEx = e;
}
assertNotNull("Open should have failed since no engine bound", _thrownEx);
- _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _server.accept(_brokerSettings, null, null);
- _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _client.connect(_clientSettings, _countingEngine, null);
}
/**
* Tests that a socket can't be opened after a bound NetworkDriver has been closed
- * @throws BindException
- * @throws UnknownHostException
- * @throws OpenException
*/
- public void testBindOpenCloseOpen() throws BindException, UnknownHostException, OpenException
+ public void testBindOpenCloseOpen() throws Exception
{
- _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
- _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
+ _client.connect(_clientSettings, _countingEngine, null);
_client.close();
_server.close();
try
{
- _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _client.connect(_clientSettings, _countingEngine, null);
}
- catch (OpenException e)
+ catch (TransportException e)
{
_thrownEx = e;
}
@@ -132,43 +140,60 @@ public class MINANetworkDriverTest extends TestCase
{
try
{
- _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
}
- catch (BindException e)
+ catch (TransportException e)
{
fail("First bind should not fail");
}
try
{
- _client.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ IncomingNetworkTransport second = new MinaNetworkTransport();
+ second.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
}
- catch (BindException e)
+ catch (TransportException e)
{
_thrownEx = e;
}
assertNotNull("Second bind should throw BindException", _thrownEx);
- }
-
+ }
+
+ /**
+ * Tests that binding to the wildcard address succeeds and a client can
+ * connect via localhost.
+ */
+ public void testWildcardBind() throws Exception
+ {
+ TestNetworkTransportConfiguration serverSettings =
+ new TestNetworkTransportConfiguration(_testPort, WILDCARD_ADDRESS);
+
+ _server.accept(serverSettings, null, null);
+
+ try
+ {
+ _client.connect(_clientSettings, _countingEngine, null);
+ }
+ catch (TransportException e)
+ {
+ fail("Open should have succeeded since we used a wildcard bind");
+ }
+ }
+
/**
* tests that bytes sent on a network driver are received at the other end
- *
- * @throws UnknownHostException
- * @throws OpenException
- * @throws InterruptedException
- * @throws BindException
*/
- public void testSend() throws UnknownHostException, OpenException, InterruptedException, BindException
+ public void testSend() throws Exception
{
// Open a connection from a counting engine to an echo engine
- _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
- _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
+ _network = _client.connect(_clientSettings, _countingEngine, null);
// Tell the counting engine how much data we're sending
_countingEngine.setNewLatch(TEST_DATA.getBytes().length);
// Send the data and wait for up to 2 seconds to get it back
- _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
+ _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes()));
_countingEngine.getLatch().await(2, TimeUnit.SECONDS);
// Check what we got
@@ -177,36 +202,30 @@ public class MINANetworkDriverTest extends TestCase
/**
* Opens a connection with a low read idle and check that it gets triggered
- * @throws BindException
- * @throws OpenException
- * @throws UnknownHostException
*
*/
- public void testSetReadIdle() throws BindException, UnknownHostException, OpenException
+ public void testSetReadIdle() throws Exception
{
// Open a connection from a counting engine to an echo engine
- _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
- _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
+ _network = _client.connect(_clientSettings, _countingEngine, null);
assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle());
- _client.setMaxReadIdle(1);
+ _network.setMaxReadIdle(1);
sleepForAtLeast(1500);
assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle());
}
/**
* Opens a connection with a low write idle and check that it gets triggered
- * @throws BindException
- * @throws OpenException
- * @throws UnknownHostException
*
*/
- public void testSetWriteIdle() throws BindException, UnknownHostException, OpenException
+ public void testSetWriteIdle() throws Exception
{
// Open a connection from a counting engine to an echo engine
- _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
- _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
+ _network = _client.connect(_clientSettings, _countingEngine, null);
assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle());
- _client.setMaxWriteIdle(1);
+ _network.setMaxWriteIdle(1);
sleepForAtLeast(1500);
assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle());
}
@@ -216,16 +235,13 @@ public class MINANetworkDriverTest extends TestCase
* Creates and then closes a connection from client to server and checks that the server
* has its closed() method called. Then creates a new client and closes the server to check
* that the client has its closed() method called.
- * @throws BindException
- * @throws UnknownHostException
- * @throws OpenException
*/
- public void testClosed() throws BindException, UnknownHostException, OpenException
+ public void testClosed() throws Exception
{
// Open a connection from a counting engine to an echo engine
EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory();
- _server.bind(TEST_PORT, null, factory, null, null);
- _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _server.accept(_brokerSettings, factory, null);
+ _network = _client.connect(_clientSettings, _countingEngine, null);
EchoProtocolEngine serverEngine = null;
while (serverEngine == null)
{
@@ -253,7 +269,7 @@ public class MINANetworkDriverTest extends TestCase
}
assertTrue("Server should have been closed", serverEngine.getClosed());
- _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _client.connect(_clientSettings, _countingEngine, null);
_countingEngine.setClosed(false);
assertFalse("Client should not have been closed", _countingEngine.getClosed());
_countingEngine.setNewLatch(1);
@@ -271,22 +287,18 @@ public class MINANetworkDriverTest extends TestCase
/**
* Create a connection and instruct the client to throw an exception when it gets some data
* and that the latch gets counted down.
- * @throws BindException
- * @throws UnknownHostException
- * @throws OpenException
- * @throws InterruptedException
*/
- public void testExceptionCaught() throws BindException, UnknownHostException, OpenException, InterruptedException
+ public void testExceptionCaught() throws Exception
{
- _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
- _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
+ _network = _client.connect(_clientSettings, _countingEngine, null);
assertEquals("Exception should not have been thrown", 1,
_countingEngine.getExceptionLatch().getCount());
_countingEngine.setErrorOnNextRead(true);
_countingEngine.setNewLatch(TEST_DATA.getBytes().length);
- _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
+ _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes()));
_countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS);
assertEquals("Exception should have been thrown", 0,
_countingEngine.getExceptionLatch().getCount());
@@ -294,28 +306,24 @@ public class MINANetworkDriverTest extends TestCase
/**
* Opens a connection and checks that the remote address is the one that was asked for
- * @throws BindException
- * @throws UnknownHostException
- * @throws OpenException
*/
- public void testGetRemoteAddress() throws BindException, UnknownHostException, OpenException
+ public void testGetRemoteAddress() throws Exception
{
- _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
- _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
- assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), TEST_PORT),
- _client.getRemoteAddress());
+ _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
+ _network = _client.connect(_clientSettings, _countingEngine, null);
+ assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), _testPort),
+ _network.getRemoteAddress());
}
private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory
{
- EchoProtocolEngine _engine = null;
+ private EchoProtocolEngine _engine = null;
- public ProtocolEngine newProtocolEngine(NetworkDriver driver)
+ public ProtocolEngine newProtocolEngine(NetworkConnection network)
{
if (_engine == null)
{
- _engine = new EchoProtocolEngine();
- _engine.setNetworkDriver(driver);
+ _engine = new EchoProtocolEngine(network);
}
return getEngine();
}
@@ -328,8 +336,6 @@ public class MINANetworkDriverTest extends TestCase
public class CountingProtocolEngine implements ProtocolEngine
{
-
- protected NetworkDriver _driver;
public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>();
private int _readBytes;
private CountDownLatch _latch = new CountDownLatch(0);
@@ -362,26 +368,12 @@ public class MINANetworkDriverTest extends TestCase
public SocketAddress getRemoteAddress()
{
- if (_driver != null)
- {
- return _driver.getRemoteAddress();
- }
- else
- {
- return null;
- }
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- if (_driver != null)
- {
- return _driver.getLocalAddress();
- }
- else
- {
- return null;
- }
+ return _network.getLocalAddress();
}
public long getWrittenBytes()
@@ -394,11 +386,6 @@ public class MINANetworkDriverTest extends TestCase
_readerHasBeenIdle = true;
}
- public void setNetworkDriver(NetworkDriver driver)
- {
- _driver = driver;
- }
-
public void writeFrame(AMQDataBlock frame)
{
@@ -465,12 +452,18 @@ public class MINANetworkDriverTest extends TestCase
private class EchoProtocolEngine extends CountingProtocolEngine
{
+ private NetworkConnection _echoNetwork;
+
+ public EchoProtocolEngine(NetworkConnection network)
+ {
+ _echoNetwork = network;
+ }
public void received(ByteBuffer msg)
{
super.received(msg);
msg.rewind();
- _driver.send(msg);
+ _echoNetwork.getSender().send(msg);
}
}
@@ -491,4 +484,52 @@ public class MINANetworkDriverTest extends TestCase
timeLeft = period - (System.currentTimeMillis() - start);
}
}
+
+ private static class TestNetworkTransportConfiguration implements NetworkTransportConfiguration
+ {
+ private int _port;
+ private String _host;
+
+ public TestNetworkTransportConfiguration(final int port, final String host)
+ {
+ _port = port;
+ _host = host;
+ }
+
+ public Boolean getTcpNoDelay()
+ {
+ return true;
+ }
+
+ public Integer getReceiveBufferSize()
+ {
+ return 32768;
+ }
+
+ public Integer getSendBufferSize()
+ {
+ return 32768;
+ }
+
+ public Integer getPort()
+ {
+ return _port;
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public String getTransport()
+ {
+ return Transport.TCP;
+ }
+
+ public Integer getConnectorProcessors()
+ {
+ return 4;
+ }
+
+ }
} \ No newline at end of file