summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-09-17 15:19:54 +0000
committerAidan Skinner <aidan@apache.org>2009-09-17 15:19:54 +0000
commit31bbc100ac6b3a31eb25d29f407d60ff23334d1f (patch)
treef21589bdba604c46c555d48fe7862defcf6e781f
parent93fa7d17feecb3d27cead67e11b250af1fcc595e (diff)
downloadqpid-python-31bbc100ac6b3a31eb25d29f407d60ff23334d1f.tar.gz
QPID-2024 QPID-2105: Remove now unnecessary classes like Event, PoolingFilter,
ReadWriteThreadModel. Move the couple of necessary methods to Job. Fix imports. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@816232 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java53
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java23
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java167
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java111
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java487
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java102
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java22
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java111
11 files changed, 113 insertions, 1003 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index b776c6ae82..3bcd102858 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -62,10 +62,7 @@ import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.Event;
import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -172,14 +169,13 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
_networkDriver = driver;
_codecFactory = new AMQCodecFactory(true, this);
-
- ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
- _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
- _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
+ _poolReference.acquireExecutorService();
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
_actor.message(ConnectionMessages.CON_1001(null, null, false, false));
- _poolReference.acquireExecutorService();
+
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -212,7 +208,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
@Override
public void run()
@@ -232,7 +228,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
}
}
}
- }));
+ });
}
catch (Exception e)
{
@@ -459,14 +455,14 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
final ByteBuffer buf = frame.toNioByteBuffer();
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
- Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
{
@Override
public void run()
{
_networkDriver.send(buf);
}
- }));
+ });
}
public AMQShortString getContextKey()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 99366101d1..be75fc150f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,20 +20,22 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.mina.filter.codec.ProtocolCodecException;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.SSLConfiguration;
-import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
@@ -42,32 +44,29 @@ import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.Event;
import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-
/**
* AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
* network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
@@ -107,9 +106,6 @@ import java.util.concurrent.CountDownLatch;
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create the filter chain to filter this handlers events.
- * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
- *
* <tr><td> Maintain fail-over state.
* <tr><td>
* </table>
@@ -191,9 +187,8 @@ public class AMQProtocolHandler implements ProtocolEngine
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
_codecFactory = new AMQCodecFactory(false, _protocolSession);
- ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
- _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
- _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_poolReference.acquireExecutorService();
_failoverHandler = new FailoverHandler(this);
}
@@ -436,7 +431,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_readBytes += msg.remaining();
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
@Override
public void run()
@@ -495,7 +490,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
}
}
- }));
+ });
}
catch (Exception e)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index 77c9c40e82..1ac8f62e32 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -20,37 +20,20 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-
import org.apache.qpid.client.SSLConfiguration;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.network.mina.MINANetworkDriver;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.net.InetAddressCachePolicy;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.security.GeneralSecurityException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.net.ssl.SSLEngine;
-
public class SocketTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 45194750dc..a4f8bb0166 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
@@ -30,20 +36,12 @@ import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.thread.QpidThreadExecutor;
import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.net.Socket;
-
/**
* The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
* connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index 3de6f9b9ea..504d475740 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -20,20 +20,18 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+
import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
public class VmPipeTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
deleted file mode 100644
index 49bce9f2f9..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.pool;
-
-import org.apache.mina.common.IoFilter;
-import org.apache.mina.common.IoSession;
-
-/**
- * An Event is a continuation, which is used to break a Mina filter chain and save the current point in the chain
- * for later processing. It is an abstract class, with different implementations for continuations of different kinds
- * of Mina events.
- *
- * <p/>These continuations are typically batched by {@link Job} for processing by a worker thread pool.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Process a continuation in the context of a Mina session.
- * </table>
- *
- * @todo Pull up _nextFilter and getNextFilter into Event, as all events use it. Inner classes need to be non-static
- * to use instance variables in the parent. Consequently they need to be non-inner to be instantiable outside of
- * the context of the outer Event class. The inner class construction used here is preventing common code re-use
- * (though not by a huge amount), but makes for an inelegent way of handling inheritance and doesn't seem like
- * a justifiable use of inner classes. Move the inner classes out into their own files.
- *
- * @todo Could make Event implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as
- * a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract,
- * it is really an interface, so could just drop it and use the continuation interface instead.
- */
-public class Event
-{
- private Runnable _runner;
-
- public Event()
- {
-
- }
-
- /**
- * Creates a continuation.
- */
- public Event(Runnable runner)
- {
- _runner = runner;
- }
-
- /**
- * Processes the continuation
- */
- public void process()
- {
- _runner.run();
- }
-
- /**
- * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Pass a Mina messageReceived event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession}
- * </table>
- */
- public static final class MinaReceivedEvent extends Event
- {
- private final Object _data;
- private final IoFilter.NextFilter _nextFilter;
- private final IoSession _session;
-
- public MinaReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data, final IoSession session)
- {
- _nextFilter = nextFilter;
- _data = data;
- _session = session;
- }
-
- public void process()
- {
- _nextFilter.messageReceived(_session, _data);
- }
-
- public IoFilter.NextFilter getNextFilter()
- {
- return _nextFilter;
- }
- }
-
- /**
- * A continuation ({@link Event}) that takes a Mina filterWrite event, and passes it to a NextFilter.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Pass a Mina filterWrite event to a NextFilter.
- * <td> {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession}
- * </table>
- */
- public static final class MinaWriteEvent extends Event
- {
- private final IoFilter.WriteRequest _data;
- private final IoFilter.NextFilter _nextFilter;
- private IoSession _session;
-
- public MinaWriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data, final IoSession session)
- {
- _nextFilter = nextFilter;
- _data = data;
- _session = session;
- }
-
- public void process()
- {
- _nextFilter.filterWrite(_session, _data);
- }
-
- public IoFilter.NextFilter getNextFilter()
- {
- return _nextFilter;
- }
- }
-
- /**
- * A continuation ({@link Event}) that takes a Mina sessionClosed event, and passes it to a NextFilter.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Pass a Mina sessionClosed event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession}
- * </table>
- */
- public static final class CloseEvent extends Event
- {
- private final IoFilter.NextFilter _nextFilter;
- private final IoSession _session;
-
- public CloseEvent(final IoFilter.NextFilter nextFilter, final IoSession session)
- {
- _nextFilter = nextFilter;
- _session = session;
- }
-
- public void process()
- {
- _nextFilter.sessionClosed(_session);
- }
-
- public IoFilter.NextFilter getNextFilter()
- {
- return _nextFilter;
- }
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
index 15d1c20ff1..82b600de88 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.mina.common.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,40 +55,28 @@ import org.slf4j.LoggerFactory;
*/
public class Job implements ReadWriteRunnable
{
+
+ /** Defines the maximum number of events that will be batched into a single job. */
+ public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+
/** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
private final int _maxEvents;
/** Holds the queue of events that make up the job. */
- private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
+ private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>();
/** Holds a status flag, that indicates when the job is actively running. */
private final AtomicBoolean _active = new AtomicBoolean();
- /** Holds the completion continuation, called upon completion of a run of the job. */
- private final JobCompletionHandler _completionHandler;
-
private final boolean _readJob;
+ private ReferenceCountingExecutorService _poolReference;
+
private final static Logger _logger = LoggerFactory.getLogger(Job.class);
- /**
- * Creates a new job that aggregates many continuations together.
- *
- * @param session The Mina session.
- * @param completionHandler The per job run, terminal continuation.
- * @param maxEvents The maximum number of aggregated continuations to process per run of the job.
- * @param readJob
- */
- Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
- {
- _completionHandler = completionHandler;
- _maxEvents = maxEvents;
- _readJob = readJob;
- }
-
- public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob)
+ public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob)
{
- _completionHandler = completionHandler;
+ _poolReference = poolReference;
_maxEvents = maxEvents;
_readJob = readJob;
}
@@ -99,7 +86,7 @@ public class Job implements ReadWriteRunnable
*
* @param evt The continuation to enqueue.
*/
- public void add(Event evt)
+ public void add(Runnable evt)
{
_eventQueue.add(evt);
}
@@ -113,14 +100,14 @@ public class Job implements ReadWriteRunnable
int i = _maxEvents;
while( --i != 0 )
{
- Event e = _eventQueue.poll();
+ Runnable e = _eventQueue.poll();
if (e == null)
{
return true;
}
else
{
- e.process();
+ e.run();
}
}
return false;
@@ -162,11 +149,11 @@ public class Job implements ReadWriteRunnable
if(processAll())
{
deactivate();
- _completionHandler.completed(this);
+ completed();
}
else
{
- _completionHandler.notCompleted(this);
+ notCompleted();
}
}
@@ -174,19 +161,6 @@ public class Job implements ReadWriteRunnable
{
return _readJob;
}
-
- /**
- * Another interface for a continuation.
- *
- * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask,
- * Runnable or a custom Continuation interface.
- */
- static interface JobCompletionHandler
- {
- public void completed(Job job);
-
- public void notCompleted(final Job job);
- }
/**
* Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
@@ -194,7 +168,7 @@ public class Job implements ReadWriteRunnable
* @param job The job.
* @param event The event to hand off asynchronously.
*/
- public static void fireAsynchEvent(ExecutorService pool, Job job, Event event)
+ public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event)
{
job.add(event);
@@ -221,4 +195,59 @@ public class Job implements ReadWriteRunnable
}
+ /**
+ * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
+ * of a batch of events this is called. This method simply re-activates the job, if it has more events to process.
+ *
+ * @param session The Mina session to work in.
+ * @param job The job that completed.
+ */
+ public void completed()
+ {
+ if (!isComplete())
+ {
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+
+ // ritchiem : 2006-12-13 Do we need to perform the additional checks here?
+ // Can the pool be shutdown at this point?
+ if (activate())
+ {
+ try
+ {
+ pool.execute(this);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+
+ }
+ }
+ }
+
+ public void notCompleted()
+ {
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ try
+ {
+ pool.execute(this);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
deleted file mode 100644
index 4e02ac3a55..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ /dev/null
@@ -1,487 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.pool;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.qpid.pool.Event.CloseEvent;
-import org.apache.qpid.pool.Event.MinaReceivedEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it
- * adds no behaviour by default to the filter chain, it is abstract.
- *
- * <p/>PoolingFilter provides a capability, available to sub-classes, to handle events in the chain asynchronously, by
- * adding them to a job. If a job is not active, adding an event to it activates it. If it is active, the event is
- * added to the job, which will run to completion and eventually process the event. The queue on the job itself acts as
- * a buffer between stages of the pipeline.
- *
- * <p/>There are two convenience methods, {@link #createAynschReadPoolingFilter} and
- * {@link #createAynschWritePoolingFilter}, for obtaining pooling filters that handle 'messageReceived' and
- * 'filterWrite' events, making it possible to process these event streams seperately.
- *
- * <p/>Pooling filters have a name, in order to distinguish different filter types. They set up a {@link Job} on the
- * Mina session they are working with, and store it in the session against their identifying name. This allows different
- * filters with different names to be set up on the same filter chain, on the same Mina session, that batch their
- * workloads in different jobs.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Implement default, pass through filter.
- * <tr><td> Create pooling filters and a specific thread pool. <td> {@link ReferenceCountingExecutorService}
- * <tr><td> Provide the ability to batch Mina events for asynchronous processing. <td> {@link Job}, {@link Event}
- * <tr><td> Provide a terminal continuation to keep jobs running till empty.
- * <td> {@link Job}, {@link Job.JobCompletionHandler}
- * </table>
- *
- * @todo The static helper methods are pointless. Could just call new.
- */
-public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
-{
- /** Used for debugging purposes. */
- private static final Logger _logger = LoggerFactory.getLogger(PoolingFilter.class);
-
- /** Holds the managed reference to obtain the executor for the batched jobs. */
- private final ReferenceCountingExecutorService _poolReference;
-
- /** Used to hold a name for identifying differeny pooling filter types. */
- private final String _name;
-
- /** Defines the maximum number of events that will be batched into a single job. */
- public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
-
- private final int _maxEvents;
-
- private final boolean _readFilter;
-
- /**
- * Creates a named pooling filter, on the specified shared thread pool.
- *
- * @param refCountingPool The thread pool reference.
- * @param name The identifying name of the filter type.
- */
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter)
- {
- _poolReference = refCountingPool;
- _name = name;
- _maxEvents = maxEvents;
- _readFilter = readFilter;
- }
-
- /**
- * Helper method to get an instance of a pooling filter that handles read events asynchronously.
- *
- * @param refCountingPool A managed reference to the thread pool.
- * @param name The filter types identifying name.
- *
- * @return A pooling filter for asynchronous read events.
- */
- public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
- {
- return new AsynchReadPoolingFilter(refCountingPool, name);
- }
-
- /**
- * Helper method to get an instance of a pooling filter that handles write events asynchronously.
- *
- * @param refCountingPool A managed reference to the thread pool.
- * @param name The filter types identifying name.
- *
- * @return A pooling filter for asynchronous write events.
- */
- public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
- {
- return new AsynchWritePoolingFilter(refCountingPool, name);
- }
-
- /**
- * Called by Mina to initialize this filter. Takes a reference to the thread pool.
- */
- public void init()
- {
- _logger.debug("Init called on PoolingFilter " + toString());
-
- // Called when the filter is initialised in the chain. If the reference count is
- // zero this acquire will initialise the pool.
- _poolReference.acquireExecutorService();
- }
-
- /**
- * Called by Mina to clean up this filter. Releases the reference to the thread pool.
- */
- public void destroy()
- {
- _logger.debug("Destroy called on PoolingFilter " + toString());
- }
-
- /**
- * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
- *
- * @param job The job.
- * @param event The event to hand off asynchronously.
- */
- void fireAsynchEvent(Job job, Event event)
- {
-
- job.add(event);
-
- final ExecutorService pool = _poolReference.getPool();
-
- if(pool == null)
- {
- return;
- }
-
- // rather than perform additional checks on pool to check that it hasn't shutdown.
- // catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate())
- {
- try
- {
- pool.execute(job);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
- }
-
- }
-
- /**
- * Creates a Job on the Mina session, identified by this filters name, in which this filter places asynchronously
- * handled events.
- *
- * @param session The Mina session.
- */
- public void createNewJobForSession(IoSession session)
- {
- Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter);
- session.setAttribute(_name, job);
- }
-
- /**
- * Retrieves this filters Job, by this filters name, from the Mina session.
- *
- * @param session The Mina session.
- *
- * @return The Job for this filter to place asynchronous events into.
- */
- public Job getJobForSession(IoSession session)
- {
- return (Job) session.getAttribute(_name);
- }
-
- /**
- * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
- * of a batch of events this is called. This method simply re-activates the job, if it has more events to process.
- *
- * @param session The Mina session to work in.
- * @param job The job that completed.
- */
- public void completed(Job job)
- {
-
-
- if (!job.isComplete())
- {
- final ExecutorService pool = _poolReference.getPool();
-
- if(pool == null)
- {
- return;
- }
-
-
- // ritchiem : 2006-12-13 Do we need to perform the additional checks here?
- // Can the pool be shutdown at this point?
- if (job.activate())
- {
- try
- {
- pool.execute(job);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
-
- }
- }
- }
-
- public void notCompleted(Job job)
- {
- final ExecutorService pool = _poolReference.getPool();
-
- if(pool == null)
- {
- return;
- }
-
- try
- {
- pool.execute(job);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
-
- }
-
-
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception
- {
- nextFilter.sessionOpened(session);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
- {
- nextFilter.sessionClosed(session);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param status The session idle status.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void sessionIdle(final NextFilter nextFilter, final IoSession session, final IdleStatus status) throws Exception
- {
- nextFilter.sessionIdle(session, status);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param cause The underlying exception.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void exceptionCaught(final NextFilter nextFilter, final IoSession session, final Throwable cause) throws Exception
- {
- nextFilter.exceptionCaught(session, cause);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param message The message received.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception
- {
- nextFilter.messageReceived(session, message);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param message The message sent.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void messageSent(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception
- {
- nextFilter.messageSent(session, message);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param writeRequest The write request event.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
- throws Exception
- {
- nextFilter.filterWrite(session, writeRequest);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void filterClose(NextFilter nextFilter, IoSession session) throws Exception
- {
- nextFilter.filterClose(session);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception
- {
- nextFilter.sessionCreated(session);
- }
-
- /**
- * Prints the filter types identifying name to a string, mainly for debugging purposes.
- *
- * @return The filter types identifying name.
- */
- public String toString()
- {
- return _name;
- }
-
- /**
- * AsynchReadPoolingFilter is a pooling filter that handles 'messageReceived' and 'sessionClosed' events
- * asynchronously.
- */
- public static class AsynchReadPoolingFilter extends PoolingFilter
- {
- /**
- * Creates a pooling filter that handles read events asynchronously.
- *
- * @param refCountingPool A managed reference to the thread pool.
- * @param name The filter types identifying name.
- */
- public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
- {
- super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true);
- }
-
- /**
- * Hands off this event for asynchronous execution.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param message The message received.
- */
- public void messageReceived(NextFilter nextFilter, final IoSession session, Object message)
- {
- Job job = getJobForSession(session);
- fireAsynchEvent(job, new MinaReceivedEvent(nextFilter, message, session));
- }
-
- /**
- * Hands off this event for asynchronous execution.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- */
- public void sessionClosed(final NextFilter nextFilter, final IoSession session)
- {
- Job job = getJobForSession(session);
- fireAsynchEvent(job, new CloseEvent(nextFilter, session));
- }
- }
-
- /**
- * AsynchWritePoolingFilter is a pooling filter that handles 'filterWrite' and 'sessionClosed' events
- * asynchronously.
- */
- public static class AsynchWritePoolingFilter extends PoolingFilter
- {
- /**
- * Creates a pooling filter that handles write events asynchronously.
- *
- * @param refCountingPool A managed reference to the thread pool.
- * @param name The filter types identifying name.
- */
- public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
- {
- super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false);
- }
-
- /**
- * Hands off this event for asynchronous execution.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param writeRequest The write request event.
- */
- public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
- {
- Job job = getJobForSession(session);
- fireAsynchEvent(job, new Event.MinaWriteEvent(nextFilter, writeRequest, session));
- }
-
- /**
- * Hands off this event for asynchronous execution.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- */
- public void sessionClosed(final NextFilter nextFilter, final IoSession session)
- {
- Job job = getJobForSession(session);
- fireAsynchEvent(job, new CloseEvent(nextFilter, session));
- }
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
deleted file mode 100644
index 8cea70e597..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.pool;
-
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.ThreadModel;
-import org.apache.mina.filter.ReferenceCountingIoFilter;
-
-/**
- * ReadWriteThreadModel is a Mina i/o filter chain factory, which creates a filter chain with seperate filters to
- * handle read and write events. The seperate filters are {@link PoolingFilter}s, which have thread pools to handle
- * these events. The effect of this is that reading and writing may happen concurrently.
- *
- * <p/>Socket i/o will only happen with concurrent reads and writes if Mina has seperate selector threads for each.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create a filter chain with seperate read and write thread pools for read/write Mina events.
- * <td> {@link PoolingFilter}
- * </table>
- */
-public class ReadWriteThreadModel implements ThreadModel
-{
- /** Holds the singleton instance of this factory. */
- private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel();
-
- /** Holds the thread pooling filter for reads. */
- private final PoolingFilter _asynchronousReadFilter;
-
- /** Holds the thread pooloing filter for writes. */
- private final PoolingFilter _asynchronousWriteFilter;
-
- /**
- * Creates a new factory for concurrent i/o, thread pooling filter chain construction. This is private, so that
- * only a singleton instance of the factory is ever created.
- */
- private ReadWriteThreadModel()
- {
- final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
- _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
- _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
- }
-
- /**
- * Gets the singleton instance of this filter chain factory.
- *
- * @return The singleton instance of this filter chain factory.
- */
- public static ReadWriteThreadModel getInstance()
- {
- return _instance;
- }
-
- /**
- * Gets the read filter.
- *
- * @return The read filter.
- */
- public PoolingFilter getAsynchronousReadFilter()
- {
- return _asynchronousReadFilter;
- }
-
- /**
- * Gets the write filter.
- *
- * @return The write filter.
- */
- public PoolingFilter getAsynchronousWriteFilter()
- {
- return _asynchronousWriteFilter;
- }
-
- /**
- * Adds the concurrent read and write filters to a filter chain.
- *
- * @param chain The Mina filter chain to add to.
- */
- public void buildFilterChain(IoFilterChain chain)
- {
- chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter));
- chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter));
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
index 7cc5f8e442..38ea9307b7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -28,8 +28,6 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import javax.net.ssl.SSLEngine;
-
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
@@ -50,7 +48,6 @@ import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.NewThreadExecutor;
import org.apache.mina.util.SessionUtil;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.ssl.SSLContextFactory;
@@ -58,7 +55,6 @@ import org.apache.qpid.thread.QpidThreadExecutor;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.NetworkDriverConfiguration;
import org.apache.qpid.transport.OpenException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,14 +144,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
sc.setTcpNoDelay(config.getTcpNoDelay());
}
- // if we do not use the executor pool threading model we get the default
- // leader follower
- // implementation provided by MINA
- if (_executorPool)
- {
- sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
if (sslFactory != null)
{
_sslFactory = sslFactory;
@@ -227,14 +215,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
}
SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
-
- // if we do not use our own thread model we get the MINA default which is to use
- // its own leader-follower model
- boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
- if (readWriteThreading)
- {
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
- }
SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true);
@@ -258,8 +238,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
throw new OpenException("Could not open connection", _lastException);
}
_ioSession = future.getSession();
- ReadWriteThreadModel.getInstance().getAsynchronousReadFilter().createNewJobForSession(_ioSession);
- ReadWriteThreadModel.getInstance().getAsynchronousWriteFilter().createNewJobForSession(_ioSession);
_ioSession.setAttachment(engine);
engine.setNetworkDriver(this);
_protocolEngine = engine;
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
deleted file mode 100644
index 6383d52298..0000000000
--- a/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.pool;
-
-import junit.framework.TestCase;
-import junit.framework.Assert;
-import org.apache.qpid.session.TestSession;
-import org.apache.mina.common.IoFilter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IdleStatus;
-
-import java.util.concurrent.RejectedExecutionException;
-
-public class PoolingFilterTest extends TestCase
-{
- private PoolingFilter _pool;
- ReferenceCountingExecutorService _executorService;
-
- public void setUp()
- {
-
- //Create Pool
- _executorService = ReferenceCountingExecutorService.getInstance();
- _executorService.acquireExecutorService();
- _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
- "AsynchronousWriteFilter");
-
- }
-
- public void testRejectedExecution() throws Exception
- {
-
- TestSession testSession = new TestSession();
- _pool.createNewJobForSession(testSession);
- _pool.filterWrite(new NoOpFilter(), testSession, new IoFilter.WriteRequest("Message"));
-
- //Shutdown the pool
- _executorService.getPool().shutdownNow();
-
- try
- {
-
- testSession = new TestSession();
- _pool.createNewJobForSession(testSession);
- //prior to fix for QPID-172 this would throw RejectedExecutionException
- _pool.filterWrite(null, testSession, null);
- }
- catch (RejectedExecutionException rje)
- {
- Assert.fail("RejectedExecutionException should not occur after pool has shutdown:" + rje);
- }
- }
-
- private static class NoOpFilter implements IoFilter.NextFilter
- {
-
- public void sessionOpened(IoSession session)
- {
- }
-
- public void sessionClosed(IoSession session)
- {
- }
-
- public void sessionIdle(IoSession session, IdleStatus status)
- {
- }
-
- public void exceptionCaught(IoSession session, Throwable cause)
- {
- }
-
- public void messageReceived(IoSession session, Object message)
- {
- }
-
- public void messageSent(IoSession session, Object message)
- {
- }
-
- public void filterWrite(IoSession session, IoFilter.WriteRequest writeRequest)
- {
- }
-
- public void filterClose(IoSession session)
- {
- }
-
- public void sessionCreated(IoSession session)
- {
- }
- }
-}