summaryrefslogtreecommitdiff
path: root/qpid/java/common/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java40
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java108
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java122
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java13
7 files changed, 253 insertions, 46 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
index 4e4192dbe3..15d1c20ff1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -21,9 +21,13 @@
package org.apache.qpid.pool;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.mina.common.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
@@ -66,6 +70,8 @@ public class Job implements ReadWriteRunnable
private final boolean _readJob;
+ private final static Logger _logger = LoggerFactory.getLogger(Job.class);
+
/**
* Creates a new job that aggregates many continuations together.
*
@@ -181,4 +187,38 @@ public class Job implements ReadWriteRunnable
public void notCompleted(final Job job);
}
+
+ /**
+ * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+ *
+ * @param job The job.
+ * @param event The event to hand off asynchronously.
+ */
+ public static void fireAsynchEvent(ExecutorService pool, Job job, Event event)
+ {
+
+ job.add(event);
+
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ // rather than perform additional checks on pool to check that it hasn't shutdown.
+ // catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate())
+ {
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+ }
+
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 8ab845454a..5bfc189b02 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -37,6 +37,9 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
+
+ // Returns the local address of the NetworkDriver
+ SocketAddress getLocalAddress();
// Returns number of bytes written
long getWrittenBytes();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
index 34b0ef65be..86af97bf7e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
@@ -24,8 +24,6 @@ import java.net.BindException;
import java.net.InetAddress;
import java.net.SocketAddress;
-import javax.net.ssl.SSLEngine;
-
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.ssl.SSLContextFactory;
@@ -33,13 +31,14 @@ import org.apache.qpid.ssl.SSLContextFactory;
public interface NetworkDriver extends Sender<java.nio.ByteBuffer>
{
// Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to
- // it using the SSLEngine if provided
+ // it using the SSLContextFactory if provided
void open(int port, InetAddress destination, ProtocolEngine engine,
- NetworkDriverConfiguration config, SSLEngine sslEngine)
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory)
throws OpenException;
// listens for incoming connections on the specified ports and address and creates a new NetworkDriver which
- // processes incoming connections with ProtocolEngines created from factory using the SSLEngine if provided
+ // processes incoming connections with ProtocolEngines and SSLEngines created from the factories
+ // (in the case of an SSLContextFactory, if provided)
void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
index 8628b8c7aa..68fbb5e8ec 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
@@ -21,7 +21,9 @@
package org.apache.qpid.transport;
-public class OpenException extends Exception
+import java.io.IOException;
+
+public class OpenException extends IOException
{
public OpenException(String string, Throwable lastException)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
index e34103a944..7cc5f8e442 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -33,6 +33,7 @@ import javax.net.ssl.SSLEngine;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
@@ -71,7 +72,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
private int _processors = 4;
private boolean _executorPool = false;
private SSLContextFactory _sslFactory = null;
- private SocketConnector _socketConnector;
+ private IoConnector _socketConnector;
private IoAcceptor _acceptor;
private IoSession _ioSession;
private ProtocolEngineFactory _factory;
@@ -101,6 +102,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
_protectIO = protectIO;
_protocolEngine = protocolEngine;
_ioSession = session;
+ _ioSession.setAttachment(_protocolEngine);
}
public MINANetworkDriver()
@@ -108,6 +110,17 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
}
+ public MINANetworkDriver(IoConnector ioConnector)
+ {
+ _socketConnector = ioConnector;
+ }
+
+ public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine)
+ {
+ _socketConnector = ioConnector;
+ _protocolEngine = engine;
+ }
+
public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory,
NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
{
@@ -188,8 +201,13 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
- SSLEngine sslEngine) throws OpenException
+ SSLContextFactory sslFactory) throws OpenException
{
+ if (sslFactory != null)
+ {
+ _sslFactory = sslFactory;
+ }
+
if (_useNIO)
{
_socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
@@ -207,7 +225,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
-
SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
@@ -229,7 +246,11 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
// one SocketConnector per connection at the moment anyway). This allows
// short-running
// clients (like unit tests) to complete quickly.
- _socketConnector.setWorkerTimeout(0);
+ if (_socketConnector instanceof SocketConnector)
+ {
+ ((SocketConnector) _socketConnector).setWorkerTimeout(0);
+ }
+
ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
future.join();
if (!future.isConnected())
@@ -333,56 +354,54 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
public void sessionCreated(IoSession protocolSession) throws Exception
{
- if (_acceptingConnections)
+ // Configure the session with SSL if necessary
+ SessionUtil.initialize(protocolSession);
+ if (_executorPool)
{
- // Configure the session with SSL if necessary
- SessionUtil.initialize(protocolSession);
- if (_executorPool)
+ if (_sslFactory != null)
{
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
+ protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
}
- else
+ }
+ else
+ {
+ if (_sslFactory != null)
{
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
+ protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
}
+ }
+ // Do we want to have read/write buffer limits?
+ if (_protectIO)
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = protocolSession.getFilterChain();
- // Do we want to have read/write buffer limits?
- if (_protectIO)
- {
- //Add IO Protection Filters
- IoFilterChain chain = protocolSession.getFilterChain();
+ protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
- protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
+ readfilter.attach(chain);
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
- readfilter.attach(chain);
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
+ writefilter.attach(chain);
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
- writefilter.attach(chain);
+ protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ }
- protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
- }
-
- if (_ioSession == null)
- {
- _ioSession = protocolSession;
- }
-
+ if (_ioSession == null)
+ {
+ _ioSession = protocolSession;
+ }
+
+ if (_acceptingConnections)
+ {
// Set up the protocol engine
ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession);
protocolEngine.setNetworkDriver(newDriver);
- protocolSession.setAttachment(protocolEngine);
}
}
@@ -409,4 +428,13 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
_acceptingConnections = acceptingConnections;
}
+ public void setProtocolEngine(ProtocolEngine protocolEngine)
+ {
+ _protocolEngine = protocolEngine;
+ if (_ioSession != null)
+ {
+ _ioSession.setAttachment(protocolEngine);
+ }
+ }
+
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
new file mode 100644
index 0000000000..a4c4b59cdd
--- /dev/null
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+
+/**
+ * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
+ * so if this class is being used and some methods are to be used, then please update those.
+ */
+public class TestNetworkDriver implements NetworkDriver
+{
+ private final ConcurrentMap attributes = new ConcurrentHashMap();
+ private String _remoteAddress = "127.0.0.1";
+ private String _localAddress = "127.0.0.1";
+ private int _port = 1;
+
+ public TestNetworkDriver()
+ {
+ }
+
+ public void setRemoteAddress(String string)
+ {
+ this._remoteAddress = string;
+ }
+
+ public void setPort(int _port)
+ {
+ this._port = _port;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
+ {
+
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return new InetSocketAddress(_localAddress, _port);
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return new InetSocketAddress(_remoteAddress, _port);
+ }
+
+ public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
+ SSLContextFactory sslFactory) throws OpenException
+ {
+
+ }
+
+ public void setMaxReadIdle(int idleTime)
+ {
+
+ }
+
+ public void setMaxWriteIdle(int idleTime)
+ {
+
+ }
+
+ public void close()
+ {
+
+ }
+
+ public void flush()
+ {
+
+ }
+
+ public void send(ByteBuffer msg)
+ {
+
+ }
+
+ public void setIdleTimeout(long l)
+ {
+
+ }
+
+ public void setLocalAddress(String localAddress)
+ {
+ _localAddress = localAddress;
+ }
+
+}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
index 6024875cf5..5500ff9d4b 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
@@ -382,6 +382,18 @@ public class MINANetworkDriverTest extends TestCase
return null;
}
}
+
+ public SocketAddress getLocalAddress()
+ {
+ if (_driver != null)
+ {
+ return _driver.getLocalAddress();
+ }
+ else
+ {
+ return null;
+ }
+ }
public long getWrittenBytes()
{
@@ -459,6 +471,7 @@ public class MINANetworkDriverTest extends TestCase
{
return _closed;
}
+
}
private class EchoProtocolEngine extends CountingProtocolEngine