diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java | 253 |
1 files changed, 253 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java new file mode 100644 index 0000000000..82b600de88 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -0,0 +1,253 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.pool; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation. + * The {@link Event}s themselves provide methods to process themselves, so processing a job simply consists of sequentially + * processing all of its aggregated events. + * + * The constructor accepts a maximum number of events for the job, and only runs up to that maximum number when + * processing the job, but the add method does not enforce this maximum. In other words, not all the enqueued events + * may be processed in each run of the job, several runs may be required to clear the queue. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Aggregate many coninuations together into a single continuation. + * <tr><td> Sequentially process aggregated continuations. <td> {@link Event} + * <tr><td> Provide running and completion status of the aggregate continuation. + * <tr><td> Execute a terminal continuation upon job completion. <td> {@link JobCompletionHandler} + * </table> + * + * @todo Could make Job implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as a + * continuation. Job is a continuation that aggregates other continuations and as such is a usefull re-usable + * piece of code. There may be other palces than the mina filter chain where continuation batching is used within + * qpid, so abstracting this out could provide a usefull building block. This also opens the way to different + * kinds of job with a common interface, e.g. parallel or sequential jobs etc. + * + * @todo For better re-usability could make the completion handler optional. Only run it when one is set. + */ +public class Job implements ReadWriteRunnable +{ + + /** Defines the maximum number of events that will be batched into a single job. */ + public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + + /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ + private final int _maxEvents; + + /** Holds the queue of events that make up the job. */ + private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>(); + + /** Holds a status flag, that indicates when the job is actively running. */ + private final AtomicBoolean _active = new AtomicBoolean(); + + private final boolean _readJob; + + private ReferenceCountingExecutorService _poolReference; + + private final static Logger _logger = LoggerFactory.getLogger(Job.class); + + public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob) + { + _poolReference = poolReference; + _maxEvents = maxEvents; + _readJob = readJob; + } + + /** + * Enqueus a continuation for sequential processing by this job. + * + * @param evt The continuation to enqueue. + */ + public void add(Runnable evt) + { + _eventQueue.add(evt); + } + + /** + * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job. + */ + boolean processAll() + { + // limit the number of events processed in one run + int i = _maxEvents; + while( --i != 0 ) + { + Runnable e = _eventQueue.poll(); + if (e == null) + { + return true; + } + else + { + e.run(); + } + } + return false; + } + + /** + * Tests if there are no more enqueued continuations to process. + * + * @return <tt>true</tt> if there are no enqueued continuations in this job, <tt>false</tt> otherwise. + */ + public boolean isComplete() + { + return _eventQueue.peek() == null; + } + + /** + * Marks this job as active if it is inactive. This method is thread safe. + * + * @return <tt>true</tt> if this job was inactive and has now been marked as active, <tt>false</tt> otherwise. + */ + public boolean activate() + { + return _active.compareAndSet(false, true); + } + + /** + * Marks this job as inactive. This method is thread safe. + */ + public void deactivate() + { + _active.set(false); + } + + /** + * Processes a batch of aggregated continuations, marks this job as inactive and call the terminal continuation. + */ + public void run() + { + if(processAll()) + { + deactivate(); + completed(); + } + else + { + notCompleted(); + } + } + + public boolean isRead() + { + return _readJob; + } + + /** + * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. + * + * @param job The job. + * @param event The event to hand off asynchronously. + */ + public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event) + { + + job.add(event); + + + if(pool == null) + { + return; + } + + // rather than perform additional checks on pool to check that it hasn't shutdown. + // catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate()) + { + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } + + } + + /** + * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing + * of a batch of events this is called. This method simply re-activates the job, if it has more events to process. + * + * @param session The Mina session to work in. + * @param job The job that completed. + */ + public void completed() + { + if (!isComplete()) + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + + // ritchiem : 2006-12-13 Do we need to perform the additional checks here? + // Can the pool be shutdown at this point? + if (activate()) + { + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + + } + } + } + + public void notCompleted() + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } + +} |