diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org')
5 files changed, 70 insertions, 819 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java deleted file mode 100644 index 49bce9f2f9..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.pool; - -import org.apache.mina.common.IoFilter; -import org.apache.mina.common.IoSession; - -/** - * An Event is a continuation, which is used to break a Mina filter chain and save the current point in the chain - * for later processing. It is an abstract class, with different implementations for continuations of different kinds - * of Mina events. - * - * <p/>These continuations are typically batched by {@link Job} for processing by a worker thread pool. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Process a continuation in the context of a Mina session. - * </table> - * - * @todo Pull up _nextFilter and getNextFilter into Event, as all events use it. Inner classes need to be non-static - * to use instance variables in the parent. Consequently they need to be non-inner to be instantiable outside of - * the context of the outer Event class. The inner class construction used here is preventing common code re-use - * (though not by a huge amount), but makes for an inelegent way of handling inheritance and doesn't seem like - * a justifiable use of inner classes. Move the inner classes out into their own files. - * - * @todo Could make Event implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as - * a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract, - * it is really an interface, so could just drop it and use the continuation interface instead. - */ -public class Event -{ - private Runnable _runner; - - public Event() - { - - } - - /** - * Creates a continuation. - */ - public Event(Runnable runner) - { - _runner = runner; - } - - /** - * Processes the continuation - */ - public void process() - { - _runner.run(); - } - - /** - * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Pass a Mina messageReceived event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession} - * </table> - */ - public static final class MinaReceivedEvent extends Event - { - private final Object _data; - private final IoFilter.NextFilter _nextFilter; - private final IoSession _session; - - public MinaReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data, final IoSession session) - { - _nextFilter = nextFilter; - _data = data; - _session = session; - } - - public void process() - { - _nextFilter.messageReceived(_session, _data); - } - - public IoFilter.NextFilter getNextFilter() - { - return _nextFilter; - } - } - - /** - * A continuation ({@link Event}) that takes a Mina filterWrite event, and passes it to a NextFilter. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Pass a Mina filterWrite event to a NextFilter. - * <td> {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession} - * </table> - */ - public static final class MinaWriteEvent extends Event - { - private final IoFilter.WriteRequest _data; - private final IoFilter.NextFilter _nextFilter; - private IoSession _session; - - public MinaWriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data, final IoSession session) - { - _nextFilter = nextFilter; - _data = data; - _session = session; - } - - public void process() - { - _nextFilter.filterWrite(_session, _data); - } - - public IoFilter.NextFilter getNextFilter() - { - return _nextFilter; - } - } - - /** - * A continuation ({@link Event}) that takes a Mina sessionClosed event, and passes it to a NextFilter. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Pass a Mina sessionClosed event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession} - * </table> - */ - public static final class CloseEvent extends Event - { - private final IoFilter.NextFilter _nextFilter; - private final IoSession _session; - - public CloseEvent(final IoFilter.NextFilter nextFilter, final IoSession session) - { - _nextFilter = nextFilter; - _session = session; - } - - public void process() - { - _nextFilter.sessionClosed(_session); - } - - public IoFilter.NextFilter getNextFilter() - { - return _nextFilter; - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java index 15d1c20ff1..82b600de88 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.mina.common.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,40 +55,28 @@ import org.slf4j.LoggerFactory; */ public class Job implements ReadWriteRunnable { + + /** Defines the maximum number of events that will be batched into a single job. */ + public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; /** Holds the queue of events that make up the job. */ - private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); + private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>(); /** Holds a status flag, that indicates when the job is actively running. */ private final AtomicBoolean _active = new AtomicBoolean(); - /** Holds the completion continuation, called upon completion of a run of the job. */ - private final JobCompletionHandler _completionHandler; - private final boolean _readJob; + private ReferenceCountingExecutorService _poolReference; + private final static Logger _logger = LoggerFactory.getLogger(Job.class); - /** - * Creates a new job that aggregates many continuations together. - * - * @param session The Mina session. - * @param completionHandler The per job run, terminal continuation. - * @param maxEvents The maximum number of aggregated continuations to process per run of the job. - * @param readJob - */ - Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) - { - _completionHandler = completionHandler; - _maxEvents = maxEvents; - _readJob = readJob; - } - - public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob) + public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob) { - _completionHandler = completionHandler; + _poolReference = poolReference; _maxEvents = maxEvents; _readJob = readJob; } @@ -99,7 +86,7 @@ public class Job implements ReadWriteRunnable * * @param evt The continuation to enqueue. */ - public void add(Event evt) + public void add(Runnable evt) { _eventQueue.add(evt); } @@ -113,14 +100,14 @@ public class Job implements ReadWriteRunnable int i = _maxEvents; while( --i != 0 ) { - Event e = _eventQueue.poll(); + Runnable e = _eventQueue.poll(); if (e == null) { return true; } else { - e.process(); + e.run(); } } return false; @@ -162,11 +149,11 @@ public class Job implements ReadWriteRunnable if(processAll()) { deactivate(); - _completionHandler.completed(this); + completed(); } else { - _completionHandler.notCompleted(this); + notCompleted(); } } @@ -174,19 +161,6 @@ public class Job implements ReadWriteRunnable { return _readJob; } - - /** - * Another interface for a continuation. - * - * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask, - * Runnable or a custom Continuation interface. - */ - static interface JobCompletionHandler - { - public void completed(Job job); - - public void notCompleted(final Job job); - } /** * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. @@ -194,7 +168,7 @@ public class Job implements ReadWriteRunnable * @param job The job. * @param event The event to hand off asynchronously. */ - public static void fireAsynchEvent(ExecutorService pool, Job job, Event event) + public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event) { job.add(event); @@ -221,4 +195,59 @@ public class Job implements ReadWriteRunnable } + /** + * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing + * of a batch of events this is called. This method simply re-activates the job, if it has more events to process. + * + * @param session The Mina session to work in. + * @param job The job that completed. + */ + public void completed() + { + if (!isComplete()) + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + + // ritchiem : 2006-12-13 Do we need to perform the additional checks here? + // Can the pool be shutdown at this point? + if (activate()) + { + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + + } + } + } + + public void notCompleted() + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java deleted file mode 100644 index 4e02ac3a55..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ /dev/null @@ -1,487 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.pool; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; - -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoFilterAdapter; -import org.apache.mina.common.IoSession; -import org.apache.qpid.pool.Event.CloseEvent; -import org.apache.qpid.pool.Event.MinaReceivedEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it - * adds no behaviour by default to the filter chain, it is abstract. - * - * <p/>PoolingFilter provides a capability, available to sub-classes, to handle events in the chain asynchronously, by - * adding them to a job. If a job is not active, adding an event to it activates it. If it is active, the event is - * added to the job, which will run to completion and eventually process the event. The queue on the job itself acts as - * a buffer between stages of the pipeline. - * - * <p/>There are two convenience methods, {@link #createAynschReadPoolingFilter} and - * {@link #createAynschWritePoolingFilter}, for obtaining pooling filters that handle 'messageReceived' and - * 'filterWrite' events, making it possible to process these event streams seperately. - * - * <p/>Pooling filters have a name, in order to distinguish different filter types. They set up a {@link Job} on the - * Mina session they are working with, and store it in the session against their identifying name. This allows different - * filters with different names to be set up on the same filter chain, on the same Mina session, that batch their - * workloads in different jobs. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Implement default, pass through filter. - * <tr><td> Create pooling filters and a specific thread pool. <td> {@link ReferenceCountingExecutorService} - * <tr><td> Provide the ability to batch Mina events for asynchronous processing. <td> {@link Job}, {@link Event} - * <tr><td> Provide a terminal continuation to keep jobs running till empty. - * <td> {@link Job}, {@link Job.JobCompletionHandler} - * </table> - * - * @todo The static helper methods are pointless. Could just call new. - */ -public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler -{ - /** Used for debugging purposes. */ - private static final Logger _logger = LoggerFactory.getLogger(PoolingFilter.class); - - /** Holds the managed reference to obtain the executor for the batched jobs. */ - private final ReferenceCountingExecutorService _poolReference; - - /** Used to hold a name for identifying differeny pooling filter types. */ - private final String _name; - - /** Defines the maximum number of events that will be batched into a single job. */ - public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); - - private final int _maxEvents; - - private final boolean _readFilter; - - /** - * Creates a named pooling filter, on the specified shared thread pool. - * - * @param refCountingPool The thread pool reference. - * @param name The identifying name of the filter type. - */ - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter) - { - _poolReference = refCountingPool; - _name = name; - _maxEvents = maxEvents; - _readFilter = readFilter; - } - - /** - * Helper method to get an instance of a pooling filter that handles read events asynchronously. - * - * @param refCountingPool A managed reference to the thread pool. - * @param name The filter types identifying name. - * - * @return A pooling filter for asynchronous read events. - */ - public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - return new AsynchReadPoolingFilter(refCountingPool, name); - } - - /** - * Helper method to get an instance of a pooling filter that handles write events asynchronously. - * - * @param refCountingPool A managed reference to the thread pool. - * @param name The filter types identifying name. - * - * @return A pooling filter for asynchronous write events. - */ - public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - return new AsynchWritePoolingFilter(refCountingPool, name); - } - - /** - * Called by Mina to initialize this filter. Takes a reference to the thread pool. - */ - public void init() - { - _logger.debug("Init called on PoolingFilter " + toString()); - - // Called when the filter is initialised in the chain. If the reference count is - // zero this acquire will initialise the pool. - _poolReference.acquireExecutorService(); - } - - /** - * Called by Mina to clean up this filter. Releases the reference to the thread pool. - */ - public void destroy() - { - _logger.debug("Destroy called on PoolingFilter " + toString()); - } - - /** - * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. - * - * @param job The job. - * @param event The event to hand off asynchronously. - */ - void fireAsynchEvent(Job job, Event event) - { - - job.add(event); - - final ExecutorService pool = _poolReference.getPool(); - - if(pool == null) - { - return; - } - - // rather than perform additional checks on pool to check that it hasn't shutdown. - // catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate()) - { - try - { - pool.execute(job); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - } - - } - - /** - * Creates a Job on the Mina session, identified by this filters name, in which this filter places asynchronously - * handled events. - * - * @param session The Mina session. - */ - public void createNewJobForSession(IoSession session) - { - Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter); - session.setAttribute(_name, job); - } - - /** - * Retrieves this filters Job, by this filters name, from the Mina session. - * - * @param session The Mina session. - * - * @return The Job for this filter to place asynchronous events into. - */ - public Job getJobForSession(IoSession session) - { - return (Job) session.getAttribute(_name); - } - - /** - * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing - * of a batch of events this is called. This method simply re-activates the job, if it has more events to process. - * - * @param session The Mina session to work in. - * @param job The job that completed. - */ - public void completed(Job job) - { - - - if (!job.isComplete()) - { - final ExecutorService pool = _poolReference.getPool(); - - if(pool == null) - { - return; - } - - - // ritchiem : 2006-12-13 Do we need to perform the additional checks here? - // Can the pool be shutdown at this point? - if (job.activate()) - { - try - { - pool.execute(job); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - - } - } - } - - public void notCompleted(Job job) - { - final ExecutorService pool = _poolReference.getPool(); - - if(pool == null) - { - return; - } - - try - { - pool.execute(job); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - - } - - - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception - { - nextFilter.sessionOpened(session); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception - { - nextFilter.sessionClosed(session); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param status The session idle status. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void sessionIdle(final NextFilter nextFilter, final IoSession session, final IdleStatus status) throws Exception - { - nextFilter.sessionIdle(session, status); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param cause The underlying exception. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void exceptionCaught(final NextFilter nextFilter, final IoSession session, final Throwable cause) throws Exception - { - nextFilter.exceptionCaught(session, cause); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param message The message received. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception - { - nextFilter.messageReceived(session, message); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param message The message sent. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void messageSent(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception - { - nextFilter.messageSent(session, message); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param writeRequest The write request event. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) - throws Exception - { - nextFilter.filterWrite(session, writeRequest); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void filterClose(NextFilter nextFilter, IoSession session) throws Exception - { - nextFilter.filterClose(session); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception - { - nextFilter.sessionCreated(session); - } - - /** - * Prints the filter types identifying name to a string, mainly for debugging purposes. - * - * @return The filter types identifying name. - */ - public String toString() - { - return _name; - } - - /** - * AsynchReadPoolingFilter is a pooling filter that handles 'messageReceived' and 'sessionClosed' events - * asynchronously. - */ - public static class AsynchReadPoolingFilter extends PoolingFilter - { - /** - * Creates a pooling filter that handles read events asynchronously. - * - * @param refCountingPool A managed reference to the thread pool. - * @param name The filter types identifying name. - */ - public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true); - } - - /** - * Hands off this event for asynchronous execution. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param message The message received. - */ - public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) - { - Job job = getJobForSession(session); - fireAsynchEvent(job, new MinaReceivedEvent(nextFilter, message, session)); - } - - /** - * Hands off this event for asynchronous execution. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - */ - public void sessionClosed(final NextFilter nextFilter, final IoSession session) - { - Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter, session)); - } - } - - /** - * AsynchWritePoolingFilter is a pooling filter that handles 'filterWrite' and 'sessionClosed' events - * asynchronously. - */ - public static class AsynchWritePoolingFilter extends PoolingFilter - { - /** - * Creates a pooling filter that handles write events asynchronously. - * - * @param refCountingPool A managed reference to the thread pool. - * @param name The filter types identifying name. - */ - public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false); - } - - /** - * Hands off this event for asynchronous execution. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param writeRequest The write request event. - */ - public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) - { - Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.MinaWriteEvent(nextFilter, writeRequest, session)); - } - - /** - * Hands off this event for asynchronous execution. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - */ - public void sessionClosed(final NextFilter nextFilter, final IoSession session) - { - Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter, session)); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java deleted file mode 100644 index 8cea70e597..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.pool; - -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.ThreadModel; -import org.apache.mina.filter.ReferenceCountingIoFilter; - -/** - * ReadWriteThreadModel is a Mina i/o filter chain factory, which creates a filter chain with seperate filters to - * handle read and write events. The seperate filters are {@link PoolingFilter}s, which have thread pools to handle - * these events. The effect of this is that reading and writing may happen concurrently. - * - * <p/>Socket i/o will only happen with concurrent reads and writes if Mina has seperate selector threads for each. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Create a filter chain with seperate read and write thread pools for read/write Mina events. - * <td> {@link PoolingFilter} - * </table> - */ -public class ReadWriteThreadModel implements ThreadModel -{ - /** Holds the singleton instance of this factory. */ - private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel(); - - /** Holds the thread pooling filter for reads. */ - private final PoolingFilter _asynchronousReadFilter; - - /** Holds the thread pooloing filter for writes. */ - private final PoolingFilter _asynchronousWriteFilter; - - /** - * Creates a new factory for concurrent i/o, thread pooling filter chain construction. This is private, so that - * only a singleton instance of the factory is ever created. - */ - private ReadWriteThreadModel() - { - final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); - _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter"); - _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter"); - } - - /** - * Gets the singleton instance of this filter chain factory. - * - * @return The singleton instance of this filter chain factory. - */ - public static ReadWriteThreadModel getInstance() - { - return _instance; - } - - /** - * Gets the read filter. - * - * @return The read filter. - */ - public PoolingFilter getAsynchronousReadFilter() - { - return _asynchronousReadFilter; - } - - /** - * Gets the write filter. - * - * @return The write filter. - */ - public PoolingFilter getAsynchronousWriteFilter() - { - return _asynchronousWriteFilter; - } - - /** - * Adds the concurrent read and write filters to a filter chain. - * - * @param chain The Mina filter chain to add to. - */ - public void buildFilterChain(IoFilterChain chain) - { - chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter)); - chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter)); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 7cc5f8e442..38ea9307b7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -28,8 +28,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import javax.net.ssl.SSLEngine; - import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; @@ -50,7 +48,6 @@ import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.NewThreadExecutor; import org.apache.mina.util.SessionUtil; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.ssl.SSLContextFactory; @@ -58,7 +55,6 @@ import org.apache.qpid.thread.QpidThreadExecutor; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.NetworkDriverConfiguration; import org.apache.qpid.transport.OpenException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,14 +144,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver sc.setTcpNoDelay(config.getTcpNoDelay()); } - // if we do not use the executor pool threading model we get the default - // leader follower - // implementation provided by MINA - if (_executorPool) - { - sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); - } - if (sslFactory != null) { _sslFactory = sslFactory; @@ -227,14 +215,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver } SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); - - // if we do not use our own thread model we get the MINA default which is to use - // its own leader-follower model - boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); - if (readWriteThreading) - { - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); - } SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true); @@ -258,8 +238,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver throw new OpenException("Could not open connection", _lastException); } _ioSession = future.getSession(); - ReadWriteThreadModel.getInstance().getAsynchronousReadFilter().createNewJobForSession(_ioSession); - ReadWriteThreadModel.getInstance().getAsynchronousWriteFilter().createNewJobForSession(_ioSession); _ioSession.setAttachment(engine); engine.setNetworkDriver(this); _protocolEngine = engine; |