summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-08-18 16:16:10 +0000
committerAidan Skinner <aidan@apache.org>2009-08-18 16:16:10 +0000
commit0314cbe225dce796e09ae9abbd450323808fe493 (patch)
tree2108ca3a099e8272dfa4ad2aab39b8e3f078e88c
parent31567e20755c42126c1479352139ca434079dc19 (diff)
downloadqpid-python-0314cbe225dce796e09ae9abbd450323808fe493.tar.gz
QPID-2024: Add ProtocolEngine and NetworkDriver interfaces and a NetworkDriver implementation that uses MINA.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@805477 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java22
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java62
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java29
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java43
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java61
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java44
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java32
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java379
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java473
10 files changed, 1124 insertions, 22 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java
deleted file mode 100644
index 3de410fb69..0000000000
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.qpid.client.transport;
-
-import org.apache.qpid.thread.Threading;
-
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-
-public class QpidThreadExecutor implements Executor
-{
- @Override
- public void execute(Runnable command)
- {
- try
- {
- Threading.getThreadFactory().createThread(command).start();
- }
- catch(Exception e)
- {
- throw new RuntimeException("Error creating a thread using Qpid thread factory",e);
- }
- }
-
-}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 0bacda04ff..32cc8c4cb5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -31,6 +31,7 @@ import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.thread.QpidThreadExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
new file mode 100644
index 0000000000..8ab845454a
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import java.net.SocketAddress;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.Receiver;
+
+/**
+ * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
+ * decodes it and then process the result.
+ */
+public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
+{
+ // Sets the network driver providing data for this ProtocolEngine
+ void setNetworkDriver (NetworkDriver driver);
+
+ // Returns the remote address of the NetworkDriver
+ SocketAddress getRemoteAddress();
+
+ // Returns number of bytes written
+ long getWrittenBytes();
+
+ // Returns number of bytes read
+ long getReadBytes();
+
+ // Called by the NetworkDriver when the socket has been closed for reading
+ void closed();
+
+ // Called when the NetworkEngine has not written data for the specified period of time (will trigger a
+ // heartbeat)
+ void writerIdle();
+
+ // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
+ void readerIdle();
+
+ /**
+ * Accepts an AMQFrame for writing to the network. The ProtocolEngine encodes the frame into bytes and
+ * passes the data onto the NetworkDriver for sending
+ */
+ void writeFrame(AMQDataBlock frame);
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
new file mode 100644
index 0000000000..d8c0f2c916
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+public interface ProtocolEngineFactory
+{
+
+ // Returns a new instance of a ProtocolEngine
+ ProtocolEngine newProtocolEngine();
+
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
new file mode 100644
index 0000000000..376658bb99
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.thread;
+
+import org.apache.qpid.thread.Threading;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+
+public class QpidThreadExecutor implements Executor
+{
+ @Override
+ public void execute(Runnable command)
+ {
+ try
+ {
+ Threading.getThreadFactory().createThread(command).start();
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException("Error creating a thread using Qpid thread factory",e);
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
new file mode 100644
index 0000000000..d45cee8004
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.SocketAddress;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+
+public interface NetworkDriver extends Sender<java.nio.ByteBuffer>
+{
+ // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to
+ // it using the SSLEngine if provided
+ void open(int port, InetAddress destination, ProtocolEngine engine,
+ NetworkDriverConfiguration config, SSLEngine sslEngine)
+ throws OpenException;
+
+ // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which
+ // processes incoming connections with ProtocolEngines created from factory using the SSLEngine if provided
+ void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException;
+
+ // Returns the remote address of underlying socket
+ SocketAddress getRemoteAddress();
+
+ /**
+ * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been
+ * read in seconds
+ */
+ void setMaxReadIdle(int idleTime);
+
+ /**
+ * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been
+ * written in seconds
+ */
+ void setMaxWriteIdle(int idleTime);
+
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
new file mode 100644
index 0000000000..18cae6bf85
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+/**
+ * This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing
+ * buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned
+ * from here if the underlying implementation supports them.
+ */
+public interface NetworkDriverConfiguration
+{
+ // Taken from Socket
+ boolean getKeepAlive();
+ boolean getOOBInline();
+ boolean getReuseAddress();
+ Integer getSoLinger(); // null means off
+ int getSoTimeout();
+ boolean getTcpNoDelay();
+ int getTrafficClass();
+
+ // The amount of memory in bytes to allocate to the incoming buffer
+ int getReceiveBufferSize();
+
+ // The amount of memory in bytes to allocate to the outgoing buffer
+ int getSendBufferSize();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
new file mode 100644
index 0000000000..8628b8c7aa
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport;
+
+public class OpenException extends Exception
+{
+
+ public OpenException(String string, Throwable lastException)
+ {
+ super(string, lastException);
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
new file mode 100644
index 0000000000..96fb7b1ef8
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -0,0 +1,379 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.mina;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.SessionUtil;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
+import org.apache.qpid.transport.OpenException;
+
+public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
+{
+
+ private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ ProtocolEngine _protocolEngine;
+ private boolean _useNIO = false;
+ private int _processors = 4;
+ private boolean _executorPool = false;
+ private SSLContextFactory _sslFactory = null;
+ private SocketConnector _socketConnector;
+ private IoAcceptor _acceptor;
+ private IoSession _ioSession;
+ private ProtocolEngineFactory _factory;
+ private boolean _protectIO;
+ private NetworkDriverConfiguration _config;
+ private Throwable _lastException;
+ private boolean _acceptingConnections = false;
+
+ public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO)
+ {
+ _useNIO = useNIO;
+ _processors = processors;
+ _executorPool = executorPool;
+ _protectIO = protectIO;
+ }
+
+ public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO,
+ ProtocolEngine protocolEngine, IoSession session)
+ {
+ _useNIO = useNIO;
+ _processors = processors;
+ _executorPool = executorPool;
+ _protectIO = protectIO;
+ _protocolEngine = protocolEngine;
+ _ioSession = session;
+ }
+
+ public MINANetworkDriver()
+ {
+
+ }
+
+ public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
+ {
+
+ _factory = factory;
+ _config = config;
+
+ if (_useNIO)
+ {
+ _acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors,
+ new NewThreadExecutor());
+ }
+ else
+ {
+ _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor());
+ }
+
+ SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
+ SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
+
+ if (config != null)
+ {
+ sc.setReceiveBufferSize(config.getReceiveBufferSize());
+ sc.setSendBufferSize(config.getSendBufferSize());
+ sc.setTcpNoDelay(config.getTcpNoDelay());
+ }
+
+ // if we do not use the executor pool threading model we get the default
+ // leader follower
+ // implementation provided by MINA
+ if (_executorPool)
+ {
+ sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
+ }
+
+ if (sslFactory != null)
+ {
+ _sslFactory = sslFactory;
+ }
+
+ if (addresses != null && addresses.length > 0)
+ {
+ for (InetAddress addr : addresses)
+ {
+ try
+ {
+ _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig);
+ }
+ catch (IOException e)
+ {
+ throw new BindException(String.format("Could not bind to {0}:{2}", addr, port));
+ }
+ }
+ }
+ else
+ {
+ try
+ {
+ _acceptor.bind(new InetSocketAddress(port), this, sconfig);
+ }
+ catch (IOException e)
+ {
+ throw new BindException(String.format("Could not bind to *:{1}", port));
+ }
+ }
+ _acceptingConnections = true;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _ioSession.getRemoteAddress();
+ }
+
+ public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
+ SSLEngine sslEngine) throws OpenException
+ {
+ if (_useNIO)
+ {
+ _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
+ }
+ else
+ {
+ _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking
+ // connector
+ }
+
+ org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
+ // the MINA default is currently to use the pooled allocator although this may change in future
+ // once more testing of the performance of the simple allocator has been done
+ if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
+ {
+ org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ }
+
+
+ SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
+
+ // if we do not use our own thread model we get the MINA default which is to use
+ // its own leader-follower model
+ boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
+ if (readWriteThreading)
+ {
+ cfg.setThreadModel(ReadWriteThreadModel.getInstance());
+ }
+
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true);
+ scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE);
+ scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE);
+
+ // Don't have the connector's worker thread wait around for other
+ // connections (we only use
+ // one SocketConnector per connection at the moment anyway). This allows
+ // short-running
+ // clients (like unit tests) to complete quickly.
+ _socketConnector.setWorkerTimeout(0);
+ ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
+ future.join();
+ if (!future.isConnected())
+ {
+ throw new OpenException("Could not open connection", _lastException);
+ }
+ _ioSession = future.getSession();
+ ReadWriteThreadModel.getInstance().getAsynchronousReadFilter().createNewJobForSession(_ioSession);
+ ReadWriteThreadModel.getInstance().getAsynchronousWriteFilter().createNewJobForSession(_ioSession);
+ _ioSession.setAttachment(engine);
+ engine.setNetworkDriver(this);
+ _protocolEngine = engine;
+ }
+
+ public void setMaxReadIdle(int idleTime)
+ {
+ _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime);
+ }
+
+ public void setMaxWriteIdle(int idleTime)
+ {
+ _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime);
+ }
+
+ public void close()
+ {
+ if (_acceptor != null)
+ {
+ _acceptor.unbindAll();
+ }
+ if (_ioSession != null)
+ {
+ _ioSession.close();
+ }
+ }
+
+ public void flush()
+ {
+ // MINA doesn't support flush
+ }
+
+ public void send(ByteBuffer msg)
+ {
+ WriteFuture future = _ioSession.write(org.apache.mina.common.ByteBuffer.wrap(msg));
+ future.join();
+ }
+
+ public void setIdleTimeout(long l)
+ {
+ // MINA doesn't support setting SO_TIMEOUT
+ }
+
+ public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
+ {
+ if (_protocolEngine != null)
+ {
+ _protocolEngine.exception(throwable);
+ }
+ _lastException = throwable;
+ }
+
+ /**
+ * Invoked when a message is received on a particular protocol session. Note
+ * that a protocol session is directly tied to a particular physical
+ * connection.
+ *
+ * @param protocolSession
+ * the protocol session that received the message
+ * @param message
+ * the message itself (i.e. a decoded frame)
+ *
+ * @throws Exception
+ * if the message cannot be processed
+ */
+ public void messageReceived(IoSession protocolSession, Object message) throws Exception
+ {
+ if (message instanceof org.apache.mina.common.ByteBuffer)
+ {
+ ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf());
+ }
+ else
+ {
+ throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message);
+ }
+ }
+
+ public void sessionClosed(IoSession protocolSession) throws Exception
+ {
+ ((ProtocolEngine) protocolSession.getAttachment()).closed();
+ }
+
+ public void sessionCreated(IoSession protocolSession) throws Exception
+ {
+ if (_acceptingConnections)
+ {
+ // Configure the session with SSL if necessary
+ SessionUtil.initialize(protocolSession);
+ if (_executorPool)
+ {
+ if (_sslFactory != null)
+ {
+ protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
+ }
+ }
+ else
+ {
+ if (_sslFactory != null)
+ {
+ protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
+ }
+ }
+
+ // Do we want to have read/write buffer limits?
+ if (_protectIO)
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = protocolSession.getFilterChain();
+
+ protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
+ readfilter.attach(chain);
+
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
+ writefilter.attach(chain);
+
+ protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ }
+
+ // Set up the protocol engine
+ ProtocolEngine protocolEngine = _factory.newProtocolEngine();
+ MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession);
+ protocolEngine.setNetworkDriver(newDriver);
+ protocolSession.setAttachment(protocolEngine);
+ }
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ {
+ if (IdleStatus.WRITER_IDLE.equals(status))
+ {
+ ((ProtocolEngine) session.getAttachment()).writerIdle();
+ }
+ else if (IdleStatus.READER_IDLE.equals(status))
+ {
+ ((ProtocolEngine) session.getAttachment()).readerIdle();
+ }
+ }
+
+ private ProtocolEngine getProtocolEngine()
+ {
+ return _protocolEngine;
+ }
+
+}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
new file mode 100644
index 0000000000..7901f6a99d
--- /dev/null
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
@@ -0,0 +1,473 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.mina;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.OpenException;
+
+public class MINANetworkDriverTest extends TestCase
+{
+
+ private static final String TEST_DATA = "YHALOTHAR";
+ private static final int TEST_PORT = 2323;
+ private NetworkDriver _server;
+ private NetworkDriver _client;
+ private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read
+ private Exception _thrownEx;
+
+ @Override
+ public void setUp()
+ {
+ _server = new MINANetworkDriver();
+ _client = new MINANetworkDriver();
+ _thrownEx = null;
+ _countingEngine = new CountingProtocolEngine();
+ }
+
+ @Override
+ public void tearDown()
+ {
+ if (_server != null)
+ {
+ _server.close();
+ }
+
+ if (_client != null)
+ {
+ _client.close();
+ }
+ }
+
+ /**
+ * Tests that a socket can't be opened if a driver hasn't been bound
+ * to the port and can be opened if a driver has been bound.
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ */
+ public void testBindOpen() throws BindException, UnknownHostException, OpenException
+ {
+ try
+ {
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ }
+ catch (OpenException e)
+ {
+ _thrownEx = e;
+ }
+
+ assertNotNull("Open should have failed since no engine bound", _thrownEx);
+
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ }
+
+ /**
+ * Tests that a socket can't be opened after a bound NetworkDriver has been closed
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ */
+ public void testBindOpenCloseOpen() throws BindException, UnknownHostException, OpenException
+ {
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _client.close();
+ _server.close();
+
+ try
+ {
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ }
+ catch (OpenException e)
+ {
+ _thrownEx = e;
+ }
+ assertNotNull("Open should have failed", _thrownEx);
+ }
+
+ /**
+ * Checks that the right exception is thrown when binding a NetworkDriver to an already
+ * existing socket.
+ */
+ public void testBindPortInUse()
+ {
+ try
+ {
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ }
+ catch (BindException e)
+ {
+ fail("First bind should not fail");
+ }
+
+ try
+ {
+ _client.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ }
+ catch (BindException e)
+ {
+ _thrownEx = e;
+ }
+ assertNotNull("Second bind should throw BindException", _thrownEx);
+ }
+
+ /**
+ * tests that bytes sent on a network driver are received at the other end
+ *
+ * @throws UnknownHostException
+ * @throws OpenException
+ * @throws InterruptedException
+ * @throws BindException
+ */
+ public void testSend() throws UnknownHostException, OpenException, InterruptedException, BindException
+ {
+ // Open a connection from a counting engine to an echo engine
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+
+ // Tell the counting engine how much data we're sending
+ _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
+
+ // Send the data and wait for up to 2 seconds to get it back
+ _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
+ _countingEngine.getLatch().await(2, TimeUnit.SECONDS);
+
+ // Check what we got
+ assertEquals("Wrong amount of data recieved", TEST_DATA.getBytes().length, _countingEngine.getReadBytes());
+ }
+
+ /**
+ * Opens a connection with a low read idle and check that it gets triggered
+ * @throws BindException
+ * @throws OpenException
+ * @throws UnknownHostException
+ *
+ */
+ public void testSetReadIdle() throws BindException, UnknownHostException, OpenException
+ {
+ // Open a connection from a counting engine to an echo engine
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle());
+ _client.setMaxReadIdle(1);
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // Eat it
+ }
+ assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle());
+ }
+
+ /**
+ * Opens a connection with a low write idle and check that it gets triggered
+ * @throws BindException
+ * @throws OpenException
+ * @throws UnknownHostException
+ *
+ */
+ public void testSetWriteIdle() throws BindException, UnknownHostException, OpenException
+ {
+ // Open a connection from a counting engine to an echo engine
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle());
+ _client.setMaxWriteIdle(1);
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // Eat it
+ }
+ assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle());
+ }
+
+
+ /**
+ * Creates and then closes a connection from client to server and checks that the server
+ * has its closed() method called. Then creates a new client and closes the server to check
+ * that the client has its closed() method called.
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ */
+ public void testClosed() throws BindException, UnknownHostException, OpenException
+ {
+ // Open a connection from a counting engine to an echo engine
+ EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory();
+ _server.bind(TEST_PORT, null, factory, null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ EchoProtocolEngine serverEngine = null;
+ while (serverEngine == null)
+ {
+ serverEngine = factory.getEngine();
+ if (serverEngine == null)
+ {
+ try
+ {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ assertFalse("Server should not have been closed", serverEngine.getClosed());
+ serverEngine.setNewLatch(1);
+ _client.close();
+ try
+ {
+ serverEngine.getLatch().await(2, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ assertTrue("Server should have been closed", serverEngine.getClosed());
+
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _countingEngine.setClosed(false);
+ assertFalse("Client should not have been closed", _countingEngine.getClosed());
+ _countingEngine.setNewLatch(1);
+ _server.close();
+ try
+ {
+ _countingEngine.getLatch().await(2, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ assertTrue("Client should have been closed", _countingEngine.getClosed());
+ }
+
+ /**
+ * Create a connection and instruct the client to throw an exception when it gets some data
+ * and that the latch gets counted down.
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ * @throws InterruptedException
+ */
+ public void testExceptionCaught() throws BindException, UnknownHostException, OpenException, InterruptedException
+ {
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+
+
+ assertEquals("Exception should not have been thrown", 1,
+ _countingEngine.getExceptionLatch().getCount());
+ _countingEngine.setErrorOnNextRead(true);
+ _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
+ _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
+ _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS);
+ assertEquals("Exception should not been thrown", 0,
+ _countingEngine.getExceptionLatch().getCount());
+ }
+
+ /**
+ * Opens a connection and checks that the remote address is the one that was asked for
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ */
+ public void testGetRemoteAddress() throws BindException, UnknownHostException, OpenException
+ {
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), TEST_PORT),
+ _client.getRemoteAddress());
+ }
+
+ private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory
+ {
+ EchoProtocolEngine _engine = null;
+
+ public ProtocolEngine newProtocolEngine()
+ {
+ if (_engine == null)
+ {
+ _engine = new EchoProtocolEngine();
+ }
+ return getEngine();
+ }
+
+ public EchoProtocolEngine getEngine()
+ {
+ return _engine;
+ }
+ }
+
+ public class CountingProtocolEngine implements ProtocolEngine
+ {
+
+ protected NetworkDriver _driver;
+ public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>();
+ private int _readBytes;
+ private CountDownLatch _latch = new CountDownLatch(0);
+ private boolean _readerHasBeenIdle;
+ private boolean _writerHasBeenIdle;
+ private boolean _closed = false;
+ private boolean _nextReadErrors = false;
+ private CountDownLatch _exceptionLatch = new CountDownLatch(1);
+
+ public void closed()
+ {
+ setClosed(true);
+ _latch.countDown();
+ }
+
+ public void setErrorOnNextRead(boolean b)
+ {
+ _nextReadErrors = b;
+ }
+
+ public void setNewLatch(int length)
+ {
+ _latch = new CountDownLatch(length);
+ }
+
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ if (_driver != null)
+ {
+ return _driver.getRemoteAddress();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0;
+ }
+
+ public void readerIdle()
+ {
+ _readerHasBeenIdle = true;
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _driver = driver;
+ }
+
+ public void writeFrame(AMQDataBlock frame)
+ {
+
+ }
+
+ public void writerIdle()
+ {
+ _writerHasBeenIdle = true;
+ }
+
+ public void exception(Throwable t)
+ {
+ _exceptionLatch.countDown();
+ }
+
+ public CountDownLatch getExceptionLatch()
+ {
+ return _exceptionLatch;
+ }
+
+ public void received(ByteBuffer msg)
+ {
+ // increment read bytes and count down the latch for that many
+ int bytes = msg.remaining();
+ _readBytes += bytes;
+ for (int i = 0; i < bytes; i++)
+ {
+ _latch.countDown();
+ }
+
+ // Throw an error if we've been asked too, but we can still count
+ if (_nextReadErrors)
+ {
+ throw new RuntimeException("Was asked to error");
+ }
+ }
+
+ public CountDownLatch getLatch()
+ {
+ return _latch;
+ }
+
+ public boolean getWriterHasBeenIdle()
+ {
+ return _writerHasBeenIdle;
+ }
+
+ public boolean getReaderHasBeenIdle()
+ {
+ return _readerHasBeenIdle;
+ }
+
+ public void setClosed(boolean _closed)
+ {
+ this._closed = _closed;
+ }
+
+ public boolean getClosed()
+ {
+ return _closed;
+ }
+ }
+
+ private class EchoProtocolEngine extends CountingProtocolEngine
+ {
+
+ public void received(ByteBuffer msg)
+ {
+ super.received(msg);
+ msg.rewind();
+ _driver.send(msg);
+ }
+ }
+} \ No newline at end of file