diff options
Diffstat (limited to 'qpid/java/common/src')
7 files changed, 253 insertions, 46 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java index 4e4192dbe3..15d1c20ff1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -21,9 +21,13 @@ package org.apache.qpid.pool; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.mina.common.IoSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation. @@ -66,6 +70,8 @@ public class Job implements ReadWriteRunnable private final boolean _readJob; + private final static Logger _logger = LoggerFactory.getLogger(Job.class); + /** * Creates a new job that aggregates many continuations together. * @@ -181,4 +187,38 @@ public class Job implements ReadWriteRunnable public void notCompleted(final Job job); } + + /** + * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. + * + * @param job The job. + * @param event The event to hand off asynchronously. + */ + public static void fireAsynchEvent(ExecutorService pool, Job job, Event event) + { + + job.add(event); + + + if(pool == null) + { + return; + } + + // rather than perform additional checks on pool to check that it hasn't shutdown. + // catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate()) + { + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } + + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 8ab845454a..5bfc189b02 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -37,6 +37,9 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); + + // Returns the local address of the NetworkDriver + SocketAddress getLocalAddress(); // Returns number of bytes written long getWrittenBytes(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java index 34b0ef65be..86af97bf7e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java @@ -24,8 +24,6 @@ import java.net.BindException; import java.net.InetAddress; import java.net.SocketAddress; -import javax.net.ssl.SSLEngine; - import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.ssl.SSLContextFactory; @@ -33,13 +31,14 @@ import org.apache.qpid.ssl.SSLContextFactory; public interface NetworkDriver extends Sender<java.nio.ByteBuffer> { // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to - // it using the SSLEngine if provided + // it using the SSLContextFactory if provided void open(int port, InetAddress destination, ProtocolEngine engine, - NetworkDriverConfiguration config, SSLEngine sslEngine) + NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws OpenException; // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which - // processes incoming connections with ProtocolEngines created from factory using the SSLEngine if provided + // processes incoming connections with ProtocolEngines and SSLEngines created from the factories + // (in the case of an SSLContextFactory, if provided) void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java index 8628b8c7aa..68fbb5e8ec 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java @@ -21,7 +21,9 @@ package org.apache.qpid.transport; -public class OpenException extends Exception +import java.io.IOException; + +public class OpenException extends IOException { public OpenException(String string, Throwable lastException) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index e34103a944..7cc5f8e442 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -33,6 +33,7 @@ import javax.net.ssl.SSLEngine; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; @@ -71,7 +72,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver private int _processors = 4; private boolean _executorPool = false; private SSLContextFactory _sslFactory = null; - private SocketConnector _socketConnector; + private IoConnector _socketConnector; private IoAcceptor _acceptor; private IoSession _ioSession; private ProtocolEngineFactory _factory; @@ -101,6 +102,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver _protectIO = protectIO; _protocolEngine = protocolEngine; _ioSession = session; + _ioSession.setAttachment(_protocolEngine); } public MINANetworkDriver() @@ -108,6 +110,17 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver } + public MINANetworkDriver(IoConnector ioConnector) + { + _socketConnector = ioConnector; + } + + public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine) + { + _socketConnector = ioConnector; + _protocolEngine = engine; + } + public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory, NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException { @@ -188,8 +201,13 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, - SSLEngine sslEngine) throws OpenException + SSLContextFactory sslFactory) throws OpenException { + if (sslFactory != null) + { + _sslFactory = sslFactory; + } + if (_useNIO) { _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor()); @@ -207,7 +225,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); } - SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); @@ -229,7 +246,11 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver // one SocketConnector per connection at the moment anyway). This allows // short-running // clients (like unit tests) to complete quickly. - _socketConnector.setWorkerTimeout(0); + if (_socketConnector instanceof SocketConnector) + { + ((SocketConnector) _socketConnector).setWorkerTimeout(0); + } + ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg); future.join(); if (!future.isConnected()) @@ -333,56 +354,54 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver public void sessionCreated(IoSession protocolSession) throws Exception { - if (_acceptingConnections) + // Configure the session with SSL if necessary + SessionUtil.initialize(protocolSession); + if (_executorPool) { - // Configure the session with SSL if necessary - SessionUtil.initialize(protocolSession); - if (_executorPool) + if (_sslFactory != null) { - if (_sslFactory != null) - { - protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", - new SSLFilter(_sslFactory.buildServerContext())); - } + protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); } - else + } + else + { + if (_sslFactory != null) { - if (_sslFactory != null) - { - protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", - new SSLFilter(_sslFactory.buildServerContext())); - } + protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); } + } + // Do we want to have read/write buffer limits? + if (_protectIO) + { + //Add IO Protection Filters + IoFilterChain chain = protocolSession.getFilterChain(); - // Do we want to have read/write buffer limits? - if (_protectIO) - { - //Add IO Protection Filters - IoFilterChain chain = protocolSession.getFilterChain(); + protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); - protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize()); + readfilter.attach(chain); - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize()); - readfilter.attach(chain); + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize()); + writefilter.attach(chain); - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize()); - writefilter.attach(chain); + protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + } - protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); - } - - if (_ioSession == null) - { - _ioSession = protocolSession; - } - + if (_ioSession == null) + { + _ioSession = protocolSession; + } + + if (_acceptingConnections) + { // Set up the protocol engine ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); protocolEngine.setNetworkDriver(newDriver); - protocolSession.setAttachment(protocolEngine); } } @@ -409,4 +428,13 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver _acceptingConnections = acceptingConnections; } + public void setProtocolEngine(ProtocolEngine protocolEngine) + { + _protocolEngine = protocolEngine; + if (_ioSession != null) + { + _ioSession.setAttachment(protocolEngine); + } + } + } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java new file mode 100644 index 0000000000..a4c4b59cdd --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; + +/** + * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, + * so if this class is being used and some methods are to be used, then please update those. + */ +public class TestNetworkDriver implements NetworkDriver +{ + private final ConcurrentMap attributes = new ConcurrentHashMap(); + private String _remoteAddress = "127.0.0.1"; + private String _localAddress = "127.0.0.1"; + private int _port = 1; + + public TestNetworkDriver() + { + } + + public void setRemoteAddress(String string) + { + this._remoteAddress = string; + } + + public void setPort(int _port) + { + this._port = _port; + } + + public int getPort() + { + return _port; + } + + public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, + NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException + { + + } + + public SocketAddress getLocalAddress() + { + return new InetSocketAddress(_localAddress, _port); + } + + public SocketAddress getRemoteAddress() + { + return new InetSocketAddress(_remoteAddress, _port); + } + + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, + SSLContextFactory sslFactory) throws OpenException + { + + } + + public void setMaxReadIdle(int idleTime) + { + + } + + public void setMaxWriteIdle(int idleTime) + { + + } + + public void close() + { + + } + + public void flush() + { + + } + + public void send(ByteBuffer msg) + { + + } + + public void setIdleTimeout(long l) + { + + } + + public void setLocalAddress(String localAddress) + { + _localAddress = localAddress; + } + +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java index 6024875cf5..5500ff9d4b 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java @@ -382,6 +382,18 @@ public class MINANetworkDriverTest extends TestCase return null; } } + + public SocketAddress getLocalAddress() + { + if (_driver != null) + { + return _driver.getLocalAddress(); + } + else + { + return null; + } + } public long getWrittenBytes() { @@ -459,6 +471,7 @@ public class MINANetworkDriverTest extends TestCase { return _closed; } + } private class EchoProtocolEngine extends CountingProtocolEngine |