summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/pool/Job.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/pool/Job.java')
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Job.java139
1 files changed, 100 insertions, 39 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java
index 00da005515..82b600de88 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -21,9 +21,12 @@
package org.apache.qpid.pool;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.mina.common.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
@@ -52,35 +55,28 @@ import org.apache.mina.common.IoSession;
*/
public class Job implements ReadWriteRunnable
{
+
+ /** Defines the maximum number of events that will be batched into a single job. */
+ public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+
/** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
private final int _maxEvents;
- /** The Mina session. */
- private final IoSession _session;
-
/** Holds the queue of events that make up the job. */
- private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
+ private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>();
/** Holds a status flag, that indicates when the job is actively running. */
private final AtomicBoolean _active = new AtomicBoolean();
- /** Holds the completion continuation, called upon completion of a run of the job. */
- private final JobCompletionHandler _completionHandler;
-
private final boolean _readJob;
- /**
- * Creates a new job that aggregates many continuations together.
- *
- * @param session The Mina session.
- * @param completionHandler The per job run, terminal continuation.
- * @param maxEvents The maximum number of aggregated continuations to process per run of the job.
- * @param readJob
- */
- Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
+ private ReferenceCountingExecutorService _poolReference;
+
+ private final static Logger _logger = LoggerFactory.getLogger(Job.class);
+
+ public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob)
{
- _session = session;
- _completionHandler = completionHandler;
+ _poolReference = poolReference;
_maxEvents = maxEvents;
_readJob = readJob;
}
@@ -90,7 +86,7 @@ public class Job implements ReadWriteRunnable
*
* @param evt The continuation to enqueue.
*/
- void add(Event evt)
+ public void add(Runnable evt)
{
_eventQueue.add(evt);
}
@@ -104,14 +100,14 @@ public class Job implements ReadWriteRunnable
int i = _maxEvents;
while( --i != 0 )
{
- Event e = _eventQueue.poll();
+ Runnable e = _eventQueue.poll();
if (e == null)
{
return true;
}
else
{
- e.process(_session);
+ e.run();
}
}
return false;
@@ -153,40 +149,105 @@ public class Job implements ReadWriteRunnable
if(processAll())
{
deactivate();
- _completionHandler.completed(_session, this);
+ completed();
}
else
{
- _completionHandler.notCompleted(_session, this);
+ notCompleted();
}
}
- public boolean isReadJob()
- {
- return _readJob;
- }
-
public boolean isRead()
{
return _readJob;
}
-
- public boolean isWrite()
+
+ /**
+ * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+ *
+ * @param job The job.
+ * @param event The event to hand off asynchronously.
+ */
+ public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event)
{
- return !_readJob;
- }
+ job.add(event);
+
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ // rather than perform additional checks on pool to check that it hasn't shutdown.
+ // catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate())
+ {
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+ }
+ }
+
/**
- * Another interface for a continuation.
+ * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
+ * of a batch of events this is called. This method simply re-activates the job, if it has more events to process.
*
- * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask,
- * Runnable or a custom Continuation interface.
+ * @param session The Mina session to work in.
+ * @param job The job that completed.
*/
- static interface JobCompletionHandler
+ public void completed()
+ {
+ if (!isComplete())
+ {
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+
+ // ritchiem : 2006-12-13 Do we need to perform the additional checks here?
+ // Can the pool be shutdown at this point?
+ if (activate())
+ {
+ try
+ {
+ pool.execute(this);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+
+ }
+ }
+ }
+
+ public void notCompleted()
{
- public void completed(IoSession session, Job job);
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
- public void notCompleted(final IoSession session, final Job job);
+ try
+ {
+ pool.execute(this);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
}
+
}