summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
blob: 4564b1d6863e5553afe1c3229f4ac7a620054ab0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
package org.apache.qpid.util.concurrent;
/*
 * 
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * 
 */


import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data.
 * Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being
 * allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and
 * allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is
 * processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch.
 * Unblocking/not unblocking during consumer processing to be controlled by the consumers calls.
 *
 * <p/>Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete
 * implementation of this. This queue is only accessed through the methods {@link #insert}, {@link #extract},
 * {@link #getBufferCapacity()}, {@link #peekAtBufferHead()}. An implementation can override these methods to implement
 * the buffer other than by a queue, for example, by using an array.
 *
 * <p/>Normal queue methods to work asynchronously.
 * <p/>Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately
 * when their data is taken.
 * <p/>The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the
 * option to keep producers blocked until the consumer decides to release them.
 *
 * <p/>Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to
 * synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency
 * gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io)
 * accross many producers. The only advantage of the single blocking take method is that it did take advantage of the
 * queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an
 * advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to
 * apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item.
 *
 * <p/><table id="crc"><caption>CRC Card</caption>
 * <tr><th> Responsibilities <th> Collaborations
 * </table>
 */
public abstract class BatchSynchQueueBase<E> extends AbstractQueue<E> implements BatchSynchQueue<E>
{
    /** Used for logging. */
    private static final Logger log = LoggerFactory.getLogger(BatchSynchQueueBase.class);

    /** Holds a reference to the queue implementation that holds the buffer. */
    Queue<SynchRecordImpl<E>> buffer;

    /** Holds the number of items in the queue */
    private int count;

    /** Main lock guarding all access */
    private ReentrantLock lock;

    /** Condition for waiting takes */
    private Condition notEmpty;

    /** Condition for waiting puts */
    private Condition notFull;

    /**
     * Creates a batch synch queue without fair thread scheduling.
     */
    public BatchSynchQueueBase()
    {
        this(false);
    }

    /**
     * Ensures that the underlying buffer implementation is created.
     *
     * @param fair <tt>true</tt> if fairness is to be applied to threads waiting to access the buffer.
     */
    public BatchSynchQueueBase(boolean fair)
    {
        buffer = this.createQueue();

        // Create the buffer lock with the fairness flag set accordingly.
        lock = new ReentrantLock(fair);

        // Create the non-empty and non-full condition monitors on the buffer lock.
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }

    /**
     * Returns an iterator over the elements contained in this collection.
     *
     * @return An iterator over the elements contained in this collection.
     */
    public Iterator<E> iterator()
    {
        throw new RuntimeException("Not implemented.");
    }

    /**
     * Returns the number of elements in this collection.  If the collection contains more than
     * <tt>Integer.MAX_VALUE</tt> elements, returns <tt>Integer.MAX_VALUE</tt>.
     *
     * @return The number of elements in this collection.
     */
    public int size()
    {
        final ReentrantLock lock = this.lock;
        lock.lock();

        try
        {
            return count;
        }
        finally
        {
            lock.unlock();
        }
    }

    /**
     * Inserts the specified element into this queue, if possible. When using queues that may impose insertion
     * restrictions (for example capacity bounds), method <tt>offer</tt> is generally preferable to method
     * {@link java.util.Collection#add}, which can fail to insert an element only by throwing an exception.
     *
     * @param e The element to insert.
     *
     * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt>
     */
    public boolean offer(E e)
    {
        if (e == null)
        {
            throw new NullPointerException();
        }

        final ReentrantLock lock = this.lock;
        lock.lock();

        try
        {
            return insert(e, false);
        }
        finally
        {
            lock.unlock();
        }
    }

    /**
     * Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to
     * become available.
     *
     * @param e       The element to add.
     * @param timeout How long to wait before giving up, in units of <tt>unit</tt>
     * @param unit    A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
     *
     * @return <tt>true</tt> if successful, or <tt>false</tt> if the specified waiting time elapses before space is
     *         available.
     *
     * @throws InterruptedException If interrupted while waiting.
     * @throws NullPointerException If the specified element is <tt>null</tt>.
     */
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
    {
        if (e == null)
        {
            throw new NullPointerException();
        }

        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();

        long nanos = unit.toNanos(timeout);

        try
        {
            do
            {
                if (insert(e, false))
                {
                    return true;
                }

                try
                {
                    nanos = notFull.awaitNanos(nanos);
                }
                catch (InterruptedException ie)
                {
                    notFull.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
            }
            while (nanos > 0);

            return false;
        }
        finally
        {
            lock.unlock();
        }
    }

    /**
     * Retrieves and removes the head of this queue, or <tt>null</tt> if this queue is empty.
     *
     * @return The head of this queue, or <tt>null</tt> if this queue is empty.
     */
    public E poll()
    {
        final ReentrantLock lock = this.lock;

        lock.lock();
        try
        {
            if (count == 0)
            {
                return null;
            }

            E x = extract(true, true).getElement();

            return x;
        }
        finally
        {
            lock.unlock();
        }
    }

    /**
     * Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements
     * are present on this queue.
     *
     * @param timeout How long to wait before giving up, in units of <tt>unit</tt>.
     * @param unit    A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
     *
     * @return The head of this queue, or <tt>null</tt> if the specified waiting time elapses before an element is present.
     *
     * @throws InterruptedException If interrupted while waiting.
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException
    {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try
        {
            long nanos = unit.toNanos(timeout);

            do
            {
                if (count != 0)
                {
                    E x = extract(true, true).getElement();

                    return x;
                }

                try
                {
                    nanos = notEmpty.awaitNanos(nanos);
                }
                catch (InterruptedException ie)
                {
                    notEmpty.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
            }
            while (nanos > 0);

            return null;
        }
        finally
        {
            lock.unlock();
        }
    }

    /**
     * Retrieves, but does not remove, the head of this queue, returning <tt>null</tt> if this queue is empty.
     *
     * @return The head of this queue, or <tt>null</tt> if this queue is empty.
     */
    public E peek()
    {
        final ReentrantLock lock = this.lock;
        lock.lock();

        try
        {
            return peekAtBufferHead();
        }
        finally
        {
            lock.unlock();
        }
    }

    /**
     * Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints)
     * accept without blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic limit.
     *
     * <p>Note that you <em>cannot</em> always tell if an attempt to <tt>add</tt> an element will succeed by
     * inspecting <tt>remainingCapacity</tt> because it may be the case that another thread is about to <tt>put</tt>
     * or <tt>take</tt> an element.
     *
     * @return The remaining capacity.
     */
    public int remainingCapacity()
    {
        final ReentrantLock lock = this.lock;
        lock.lock();

        try
        {
            return getBufferCapacity() - count;
        }
        finally
        {
            lock.unlock();
        }
    }

    /**
     * Adds the specified element to this queue, waiting if necessary for space to become available.
     *
     * <p/>This method delegated to {@link #tryPut} which can raise {@link SynchException}s. If any are raised
     * this method silently ignores them. Use the {@link #tryPut} method directly if you want to catch these
     * exceptions.
     *
     * @param e The element to add.
     *
     * @throws InterruptedException If interrupted while waiting.
     */
    public void put(E e) throws InterruptedException
    {
        try
        {
            tryPut(e);
        }
        catch (SynchException ex)
        {
            // This exception is deliberately ignored. See the method comment for information about this.
        }
    }

    /**
     * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the
     * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}.
     *
     * @param e The data element to put into the queue. Cannot be null.
     *
     * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting
     *                              on its entry in the queue being consumed.
     * @throws SynchException       If a consumer encounters an error whilst processing the data element.
     */
    public void tryPut(E e) throws InterruptedException, SynchException
    {
        if (e == null)
        {
            throw new NullPointerException();
        }

        // final Queue<E> items = this.buffer;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();

        try
        {
            while (count == getBufferCapacity())
            {
                // Release the lock and wait until the queue is not full.
                notFull.await();
            }
        }
        catch (InterruptedException ie)
        {
            notFull.signal(); // propagate to non-interrupted thread
            throw ie;
        }

        // There is room in the queue so insert must succeed. Insert into the queu, release the lock and block
        // the producer until its data is taken.
        insert(e, true);
    }

    /**
     * Retrieves and removes the head of this queue, waiting if no elements are present on this queue.
     * Any producer that has its data element taken by this call will be immediately unblocked. To keep the
     * producer blocked whilst taking just a single item, use the
     * {@link #drainTo(java.util.Collection<org.apache.qpid.util.concurrent.SynchRecord<E>>, int, boolean)}
     * method. There is no take method to do that because there is not usually any advantage in a synchronous hand
     * off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption
     * latencies accross many producers where possible.
     *
     * @return The head of this queue.
     *
     * @throws InterruptedException if interrupted while waiting.
     */
    public E take() throws InterruptedException
    {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();

        try
        {
            try
            {
                while (count == 0)
                {
                    // Release the lock and wait until the queue becomes non-empty.
                    notEmpty.await();
                }
            }
            catch (InterruptedException ie)
            {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }

            // There is data in the queue so extraction must succeed. Notify any waiting threads that the queue is
            // not full, and unblock the producer that owns the data item that is taken.
            E x = extract(true, true).getElement();

            return x;
        }
        finally
        {
            lock.unlock();
        }
    }

    /**
     * Removes all available elements from this queue and adds them into the given collection.  This operation may be
     * more efficient than repeatedly polling this queue. A failure encountered while attempting to <tt>add</tt> elements
     * to collection <tt>c</tt> may result in elements being in neither, either or both collections when the associated
     * exception is thrown. Attempts to drain a queue to itself result in <tt>IllegalArgumentException</tt>. Further,
     * the behavior of this operation is undefined if the specified collection is modified while the operation is in
     * progress.
     *
     * @param objects The collection to transfer elements into.
     *
     * @return The number of elements transferred.
     *
     * @throws NullPointerException     If objects is null.
     * @throws IllegalArgumentException If objects is this queue.
     */
    public int drainTo(Collection<? super E> objects)
    {
        return drainTo(objects, -1);
    }

    /**
     * Removes at most the given number of available elements from this queue and adds them into the given collection.
     * A failure encountered while attempting to <tt>add</tt> elements to collection <tt>c</tt> may result in elements
     * being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue
     * to itself result in <tt>IllegalArgumentException</tt>. Further, the behavior of this operation is undefined if
     * the specified collection is modified while the operation is in progress.
     *
     * @param objects     The collection to transfer elements into.
     * @param maxElements The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning
     *                    all elements.
     *
     * @return The number of elements transferred.
     *
     * @throws NullPointerException     If c is null.
     * @throws IllegalArgumentException If c is this queue.
     */
    public int drainTo(Collection<? super E> objects, int maxElements)
    {
        if (objects == null)
        {
            throw new NullPointerException();
        }

        if (objects == this)
        {
            throw new IllegalArgumentException();
        }

        // final Queue<E> items = this.buffer;
        final ReentrantLock lock = this.lock;
        lock.lock();

        try
        {
            int n = 0;

            for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++)
            {
                // Take items from the queue, do unblock the producers, but don't send not full signals yet.
                objects.add(extract(true, false).getElement());
            }

            if (n > 0)
            {
                // count -= n;
                notFull.signalAll();
            }

            return n;
        }
        finally
        {
            lock.unlock();
        }
    }

    /**
     * Takes all available data items from the queue or blocks until some become available. The returned items
     * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
     * producers, where the producers are still blocked.
     *
     * @param c       The collection to drain the data items into.
     * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
     *
     * @return A count of the number of elements that were drained from the queue.
     */
    public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock)
    {
        return drainTo(c, -1, unblock);
    }

    /**
     * Takes up to maxElements available data items from the queue or blocks until some become available. The returned
     * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
     * producers, where the producers are still blocked.
     *
     * @param coll        The collection to drain the data items into.
     * @param maxElements The maximum number of elements to drain.
     * @param unblock     If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
     *
     * @return A count of the number of elements that were drained from the queue.
     */
    public SynchRef drainTo(Collection<SynchRecord<E>> coll, int maxElements, boolean unblock)
    {
        if (coll == null)
        {
            throw new NullPointerException();
        }

        // final Queue<E> items = this.buffer;
        final ReentrantLock lock = this.lock;
        lock.lock();

        try
        {
            int n = 0;

            for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++)
            {
                // Extract the next record from the queue, don't signall the not full condition yet and release
                // producers depending on whether the caller wants to or not.
                coll.add(extract(false, unblock));
            }

            if (n > 0)
            {
                // count -= n;
                notFull.signalAll();
            }

            return new SynchRefImpl(n, coll);
        }
        finally
        {
            lock.unlock();
        }
    }

    /**
     * This abstract method should be overriden to return an empty queue. Different implementations of producer
     * consumer buffers can control the order in which data is accessed using different queue implementations.
     * This method allows the type of queue to be abstracted out of this class and to be supplied by concrete
     * implementations.
     *
     * @return An empty queue.
     */
    protected abstract <T> Queue<T> createQueue();

    /**
     * Insert element into the queue, then possibly signal that the queue is not empty and block the producer
     * on the element until permission to procede is given.
     *
     * <p/>If the producer is to be blocked then the lock must be released first, otherwise no other process
     * will be able to get access to the queue. Hence, unlock and block are always set together.
     *
     * <p/>Call only when holding the global lock.
     *
     * @param unlockAndBlock <tt>true</tt>If the global queue lock should be released and the producer should be blocked.
     *
     * @return <tt>true</tt> if the operation succeeded, <tt>false</tt> otherwise. If the result is <tt>true</tt> this
     *         method may not return straight away, but only after the producer is unblocked by having its data
     *         consumed if the unlockAndBlock flag is set. In the false case the method will return straight away, no
     *         matter what value the unlockAndBlock flag has, leaving the global lock on.
     */
    protected boolean insert(E x, boolean unlockAndBlock)
    {
        // Create a new record for the data item.
        SynchRecordImpl<E> record = new SynchRecordImpl<E>(x);

        boolean result = buffer.offer(record);

        if (result)
        {
            count++;

            // Tell any waiting consumers that the queue is not empty.
            notEmpty.signal();

            if (unlockAndBlock)
            {
                // Allow other threads to read/write the queue.
                lock.unlock();

                // Wait until a consumer takes this data item.
                record.waitForConsumer();
            }

            return true;
        }
        else
        {
            return false;
        }
    }

    /**
     * Extract element at current take position, advance, and signal.
     *
     * <p/>Call only when holding lock.
     */
    protected SynchRecordImpl<E> extract(boolean unblock, boolean signal)
    {
        SynchRecordImpl<E> result = buffer.remove();
        count--;

        if (signal)
        {
            notFull.signal();
        }

        if (unblock)
        {
            result.releaseImmediately();
        }

        return result;
    }

    /**
     * Get the capacity of the buffer. If the buffer has no maximum capacity then Integer.MAX_VALUE is returned.
     *
     * <p/>Call only when holding lock.
     *
     * @return The maximum capacity of the buffer.
     */
    protected int getBufferCapacity()
    {
        if (buffer instanceof Capacity)
        {
            return ((Capacity) buffer).getCapacity();
        }
        else
        {
            return Integer.MAX_VALUE;
        }
    }

    /**
     * Return the head element from the buffer.
     *
     * <p/>Call only when holding lock.
     *
     * @return The head element from the buffer.
     */
    protected E peekAtBufferHead()
    {
        return buffer.peek().getElement();
    }

    public class SynchRefImpl implements SynchRef
    {
        /** Holds the number of synch records associated with this reference. */
        int numRecords;

        /** Holds a reference to the collection of synch records managed by this. */
        Collection<SynchRecord<E>> records;

        public SynchRefImpl(int n, Collection<SynchRecord<E>> records)
        {
            this.numRecords = n;
            this.records = records;
        }

        public int getNumRecords()
        {
            return numRecords;
        }

        /**
         * Any producers that have had their data elements taken from the queue but have not been unblocked are unblocked
         * when this method is called. The exception to this is producers that have had their data put back onto the queue
         * by a consumer. Producers that have had exceptions for their data items registered by consumers will be unblocked
         * but will not return from their put call normally, but with an exception instead.
         */
        public void unblockProducers()
        {
            log.debug("public void unblockProducers(): called");

            if (records != null)
            {
                for (SynchRecord<E> record : records)
                {
                    // This call takes account of items that have already been released, are to be requeued or are in
                    // error.
                    record.releaseImmediately();
                }
            }

            records = null;
        }
    }

    /**
     * A SynchRecordImpl is used by a {@link BatchSynchQueue} to pair together a producer with its data. This allows
     * the producer of data to be identified so that it can be unblocked when its data is consumed or sent errors when
     * its data cannot be consumed.
     */
    public class SynchRecordImpl<E> implements SynchRecord<E>
    {
        /** A boolean latch that determines when the producer for this data item will be allowed to continue. */
        BooleanLatch latch = new BooleanLatch();

        /** The data element associated with this item. */
        E element;

        /**
         * Create a new synch record.
         *
         * @param e The data element that the record encapsulates.
         */
        public SynchRecordImpl(E e)
        {
            // Keep the data element.
            element = e;
        }

        /**
         * Waits until the producer is given permission to proceded by a consumer.
         */
        public void waitForConsumer()
        {
            latch.await();
        }

        /**
         * Gets the data element contained by this record.
         *
         * @return The data element contained by this record.
         */
        public E getElement()
        {
            return element;
        }

        /**
         * Immediately releases the producer of this data record. Consumers can bring the synchronization time of
         * producers to a minimum by using this method to release them at the earliest possible moment when batch
         * consuming records from sychronized producers.
         */
        public void releaseImmediately()
        {
            // Check that the record has not already been released, is in error or is to be requeued.
            latch.signal();

            // Propagate errors to the producer.

            // Requeue items to be requeued.
        }

        /**
         * Tells the synch queue to put this element back onto the queue instead of releasing its producer.
         * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method or
         * the {@link #releaseImmediately()} method.
         *
         * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
         * element has already been unblocked.
         */
        public void reQueue()
        {
            throw new RuntimeException("Not implemented.");
        }

        /**
         * Tells the synch queue to raise an exception with this elements producer. The exception is not raised
         * immediately but upon calling the {@link SynchRef#unblockProducers()} method or the
         * {@link #releaseImmediately()} method. The exception will be wrapped in a {@link SynchException} before it is
         * raised on the producer.
         *
         * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used
         * because the exception is to be passed onto a different thread.
         *
         * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
         * element has already been unblocked.
         *
         * @param e The exception to raise on the producer.
         */
        public void inError(Exception e)
        {
            throw new RuntimeException("Not implemented.");
        }
    }
}