summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
blob: 15d1c20ff1ace160aa233b709955aa2527a8b8d0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.pool;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.mina.common.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
 * The {@link Event}s themselves provide methods to process themselves, so processing a job simply consists of sequentially
 * processing all of its aggregated events.
 *
 * The constructor accepts a maximum number of events for the job, and only runs up to that maximum number when
 * processing the job, but the add method does not enforce this maximum. In other words, not all the enqueued events
 * may be processed in each run of the job, several runs may be required to clear the queue.
 *
 * <p/><table id="crc"><caption>CRC Card</caption>
 * <tr><th> Responsibilities <th> Collaborations
 * <tr><td> Aggregate many coninuations together into a single continuation.
 * <tr><td> Sequentially process aggregated continuations. <td> {@link Event}
 * <tr><td> Provide running and completion status of the aggregate continuation.
 * <tr><td> Execute a terminal continuation upon job completion. <td> {@link JobCompletionHandler}
 * </table>
 *
 * @todo Could make Job implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as a
 *       continuation. Job is a continuation that aggregates other continuations and as such is a usefull re-usable
 *       piece of code. There may be other palces than the mina filter chain where continuation batching is used within
 *       qpid, so abstracting this out could provide a usefull building block. This also opens the way to different
 *       kinds of job with a common interface, e.g. parallel or sequential jobs etc.
 *
 * @todo For better re-usability could make the completion handler optional. Only run it when one is set.
 */
public class Job implements ReadWriteRunnable
{
    /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
    private final int _maxEvents;

    /** Holds the queue of events that make up the job. */
    private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();

    /** Holds a status flag, that indicates when the job is actively running. */
    private final AtomicBoolean _active = new AtomicBoolean();

    /** Holds the completion continuation, called upon completion of a run of the job. */
    private final JobCompletionHandler _completionHandler;

    private final boolean _readJob;

    private final static Logger _logger = LoggerFactory.getLogger(Job.class);
    
    /**
     * Creates a new job that aggregates many continuations together.
     *
     * @param session           The Mina session.
     * @param completionHandler The per job run, terminal continuation.
     * @param maxEvents         The maximum number of aggregated continuations to process per run of the job.
     * @param readJob
     */
    Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
    {
        _completionHandler = completionHandler;
        _maxEvents = maxEvents;
        _readJob = readJob;
    }

    public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob)
    {
        _completionHandler = completionHandler;
        _maxEvents = maxEvents;
        _readJob = readJob;
    }

    /**
     * Enqueus a continuation for sequential processing by this job.
     *
     * @param evt The continuation to enqueue.
     */
    public void add(Event evt)
    {
        _eventQueue.add(evt);
    }

    /**
     * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job.
     */
    boolean processAll()
    {
        // limit the number of events processed in one run
        int i = _maxEvents;
        while( --i != 0 )
        {
            Event e = _eventQueue.poll();
            if (e == null)
            {
                return true;
            }
            else
            {
                e.process();
            }
        }
        return false;
    }

    /**
     * Tests if there are no more enqueued continuations to process.
     *
     * @return <tt>true</tt> if there are no enqueued continuations in this job, <tt>false</tt> otherwise.
     */
    public boolean isComplete()
    {
        return _eventQueue.peek() == null;
    }

    /**
     * Marks this job as active if it is inactive. This method is thread safe.
     *
     * @return <tt>true</tt> if this job was inactive and has now been marked as active, <tt>false</tt> otherwise.
     */
    public boolean activate()
    {
        return _active.compareAndSet(false, true);
    }

    /**
     * Marks this job as inactive. This method is thread safe.
     */
    public void deactivate()
    {
        _active.set(false);
    }

    /**
     * Processes a batch of aggregated continuations, marks this job as inactive and call the terminal continuation.
     */
    public void run()
    {
        if(processAll())
        {
            deactivate();
            _completionHandler.completed(this);
        }
        else
        {
            _completionHandler.notCompleted(this);
        }
    }

    public boolean isRead()
    {
        return _readJob;
    }

    /**
     * Another interface for a continuation.
     *
     * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask,
     *       Runnable or a custom Continuation interface.
     */
    static interface JobCompletionHandler
    {
        public void completed(Job job);

        public void notCompleted(final Job job);
    }
    
    /**
     * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
     *
     * @param job The job.
     * @param event   The event to hand off asynchronously.
     */
    public static void fireAsynchEvent(ExecutorService pool, Job job, Event event)
    {

        job.add(event);


        if(pool == null)
        {
            return;
        }

        // rather than perform additional checks on pool to check that it hasn't shutdown.
        // catch the RejectedExecutionException that will result from executing on a shutdown pool
        if (job.activate())
        {
            try
            {
                pool.execute(job);
            }
            catch(RejectedExecutionException e)
            {
                _logger.warn("Thread pool shutdown while tasks still outstanding");
            }
        }

    }
    
}