diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/pool/Job.java')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/pool/Job.java | 139 |
1 files changed, 100 insertions, 39 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java index 00da005515..82b600de88 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -21,9 +21,12 @@ package org.apache.qpid.pool; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.mina.common.IoSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation. @@ -52,35 +55,28 @@ import org.apache.mina.common.IoSession; */ public class Job implements ReadWriteRunnable { + + /** Defines the maximum number of events that will be batched into a single job. */ + public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; - /** The Mina session. */ - private final IoSession _session; - /** Holds the queue of events that make up the job. */ - private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); + private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>(); /** Holds a status flag, that indicates when the job is actively running. */ private final AtomicBoolean _active = new AtomicBoolean(); - /** Holds the completion continuation, called upon completion of a run of the job. */ - private final JobCompletionHandler _completionHandler; - private final boolean _readJob; - /** - * Creates a new job that aggregates many continuations together. - * - * @param session The Mina session. - * @param completionHandler The per job run, terminal continuation. - * @param maxEvents The maximum number of aggregated continuations to process per run of the job. - * @param readJob - */ - Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) + private ReferenceCountingExecutorService _poolReference; + + private final static Logger _logger = LoggerFactory.getLogger(Job.class); + + public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob) { - _session = session; - _completionHandler = completionHandler; + _poolReference = poolReference; _maxEvents = maxEvents; _readJob = readJob; } @@ -90,7 +86,7 @@ public class Job implements ReadWriteRunnable * * @param evt The continuation to enqueue. */ - void add(Event evt) + public void add(Runnable evt) { _eventQueue.add(evt); } @@ -104,14 +100,14 @@ public class Job implements ReadWriteRunnable int i = _maxEvents; while( --i != 0 ) { - Event e = _eventQueue.poll(); + Runnable e = _eventQueue.poll(); if (e == null) { return true; } else { - e.process(_session); + e.run(); } } return false; @@ -153,40 +149,105 @@ public class Job implements ReadWriteRunnable if(processAll()) { deactivate(); - _completionHandler.completed(_session, this); + completed(); } else { - _completionHandler.notCompleted(_session, this); + notCompleted(); } } - public boolean isReadJob() - { - return _readJob; - } - public boolean isRead() { return _readJob; } - - public boolean isWrite() + + /** + * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. + * + * @param job The job. + * @param event The event to hand off asynchronously. + */ + public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event) { - return !_readJob; - } + job.add(event); + + + if(pool == null) + { + return; + } + + // rather than perform additional checks on pool to check that it hasn't shutdown. + // catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate()) + { + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } + } + /** - * Another interface for a continuation. + * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing + * of a batch of events this is called. This method simply re-activates the job, if it has more events to process. * - * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask, - * Runnable or a custom Continuation interface. + * @param session The Mina session to work in. + * @param job The job that completed. */ - static interface JobCompletionHandler + public void completed() + { + if (!isComplete()) + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + + // ritchiem : 2006-12-13 Do we need to perform the additional checks here? + // Can the pool be shutdown at this point? + if (activate()) + { + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + + } + } + } + + public void notCompleted() { - public void completed(IoSession session, Job job); + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } - public void notCompleted(final IoSession session, final Job job); + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } } + } |