summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java834
1 files changed, 0 insertions, 834 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
deleted file mode 100644
index 4564b1d686..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
+++ /dev/null
@@ -1,834 +0,0 @@
-package org.apache.qpid.util.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data.
- * Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being
- * allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and
- * allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is
- * processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch.
- * Unblocking/not unblocking during consumer processing to be controlled by the consumers calls.
- *
- * <p/>Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete
- * implementation of this. This queue is only accessed through the methods {@link #insert}, {@link #extract},
- * {@link #getBufferCapacity()}, {@link #peekAtBufferHead()}. An implementation can override these methods to implement
- * the buffer other than by a queue, for example, by using an array.
- *
- * <p/>Normal queue methods to work asynchronously.
- * <p/>Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately
- * when their data is taken.
- * <p/>The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the
- * option to keep producers blocked until the consumer decides to release them.
- *
- * <p/>Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to
- * synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency
- * gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io)
- * accross many producers. The only advantage of the single blocking take method is that it did take advantage of the
- * queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an
- * advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to
- * apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * </table>
- */
-public abstract class BatchSynchQueueBase<E> extends AbstractQueue<E> implements BatchSynchQueue<E>
-{
- /** Used for logging. */
- private static final Logger log = LoggerFactory.getLogger(BatchSynchQueueBase.class);
-
- /** Holds a reference to the queue implementation that holds the buffer. */
- Queue<SynchRecordImpl<E>> buffer;
-
- /** Holds the number of items in the queue */
- private int count;
-
- /** Main lock guarding all access */
- private ReentrantLock lock;
-
- /** Condition for waiting takes */
- private Condition notEmpty;
-
- /** Condition for waiting puts */
- private Condition notFull;
-
- /**
- * Creates a batch synch queue without fair thread scheduling.
- */
- public BatchSynchQueueBase()
- {
- this(false);
- }
-
- /**
- * Ensures that the underlying buffer implementation is created.
- *
- * @param fair <tt>true</tt> if fairness is to be applied to threads waiting to access the buffer.
- */
- public BatchSynchQueueBase(boolean fair)
- {
- buffer = this.createQueue();
-
- // Create the buffer lock with the fairness flag set accordingly.
- lock = new ReentrantLock(fair);
-
- // Create the non-empty and non-full condition monitors on the buffer lock.
- notEmpty = lock.newCondition();
- notFull = lock.newCondition();
- }
-
- /**
- * Returns an iterator over the elements contained in this collection.
- *
- * @return An iterator over the elements contained in this collection.
- */
- public Iterator<E> iterator()
- {
- throw new RuntimeException("Not implemented.");
- }
-
- /**
- * Returns the number of elements in this collection. If the collection contains more than
- * <tt>Integer.MAX_VALUE</tt> elements, returns <tt>Integer.MAX_VALUE</tt>.
- *
- * @return The number of elements in this collection.
- */
- public int size()
- {
- final ReentrantLock lock = this.lock;
- lock.lock();
-
- try
- {
- return count;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Inserts the specified element into this queue, if possible. When using queues that may impose insertion
- * restrictions (for example capacity bounds), method <tt>offer</tt> is generally preferable to method
- * {@link java.util.Collection#add}, which can fail to insert an element only by throwing an exception.
- *
- * @param e The element to insert.
- *
- * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt>
- */
- public boolean offer(E e)
- {
- if (e == null)
- {
- throw new NullPointerException();
- }
-
- final ReentrantLock lock = this.lock;
- lock.lock();
-
- try
- {
- return insert(e, false);
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to
- * become available.
- *
- * @param e The element to add.
- * @param timeout How long to wait before giving up, in units of <tt>unit</tt>
- * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
- *
- * @return <tt>true</tt> if successful, or <tt>false</tt> if the specified waiting time elapses before space is
- * available.
- *
- * @throws InterruptedException If interrupted while waiting.
- * @throws NullPointerException If the specified element is <tt>null</tt>.
- */
- public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
- {
- if (e == null)
- {
- throw new NullPointerException();
- }
-
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
-
- long nanos = unit.toNanos(timeout);
-
- try
- {
- do
- {
- if (insert(e, false))
- {
- return true;
- }
-
- try
- {
- nanos = notFull.awaitNanos(nanos);
- }
- catch (InterruptedException ie)
- {
- notFull.signal(); // propagate to non-interrupted thread
- throw ie;
- }
- }
- while (nanos > 0);
-
- return false;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Retrieves and removes the head of this queue, or <tt>null</tt> if this queue is empty.
- *
- * @return The head of this queue, or <tt>null</tt> if this queue is empty.
- */
- public E poll()
- {
- final ReentrantLock lock = this.lock;
-
- lock.lock();
- try
- {
- if (count == 0)
- {
- return null;
- }
-
- E x = extract(true, true).getElement();
-
- return x;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements
- * are present on this queue.
- *
- * @param timeout How long to wait before giving up, in units of <tt>unit</tt>.
- * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
- *
- * @return The head of this queue, or <tt>null</tt> if the specified waiting time elapses before an element is present.
- *
- * @throws InterruptedException If interrupted while waiting.
- */
- public E poll(long timeout, TimeUnit unit) throws InterruptedException
- {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try
- {
- long nanos = unit.toNanos(timeout);
-
- do
- {
- if (count != 0)
- {
- E x = extract(true, true).getElement();
-
- return x;
- }
-
- try
- {
- nanos = notEmpty.awaitNanos(nanos);
- }
- catch (InterruptedException ie)
- {
- notEmpty.signal(); // propagate to non-interrupted thread
- throw ie;
- }
- }
- while (nanos > 0);
-
- return null;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Retrieves, but does not remove, the head of this queue, returning <tt>null</tt> if this queue is empty.
- *
- * @return The head of this queue, or <tt>null</tt> if this queue is empty.
- */
- public E peek()
- {
- final ReentrantLock lock = this.lock;
- lock.lock();
-
- try
- {
- return peekAtBufferHead();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints)
- * accept without blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic limit.
- *
- * <p>Note that you <em>cannot</em> always tell if an attempt to <tt>add</tt> an element will succeed by
- * inspecting <tt>remainingCapacity</tt> because it may be the case that another thread is about to <tt>put</tt>
- * or <tt>take</tt> an element.
- *
- * @return The remaining capacity.
- */
- public int remainingCapacity()
- {
- final ReentrantLock lock = this.lock;
- lock.lock();
-
- try
- {
- return getBufferCapacity() - count;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Adds the specified element to this queue, waiting if necessary for space to become available.
- *
- * <p/>This method delegated to {@link #tryPut} which can raise {@link SynchException}s. If any are raised
- * this method silently ignores them. Use the {@link #tryPut} method directly if you want to catch these
- * exceptions.
- *
- * @param e The element to add.
- *
- * @throws InterruptedException If interrupted while waiting.
- */
- public void put(E e) throws InterruptedException
- {
- try
- {
- tryPut(e);
- }
- catch (SynchException ex)
- {
- // This exception is deliberately ignored. See the method comment for information about this.
- }
- }
-
- /**
- * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the
- * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}.
- *
- * @param e The data element to put into the queue. Cannot be null.
- *
- * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting
- * on its entry in the queue being consumed.
- * @throws SynchException If a consumer encounters an error whilst processing the data element.
- */
- public void tryPut(E e) throws InterruptedException, SynchException
- {
- if (e == null)
- {
- throw new NullPointerException();
- }
-
- // final Queue<E> items = this.buffer;
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
-
- try
- {
- while (count == getBufferCapacity())
- {
- // Release the lock and wait until the queue is not full.
- notFull.await();
- }
- }
- catch (InterruptedException ie)
- {
- notFull.signal(); // propagate to non-interrupted thread
- throw ie;
- }
-
- // There is room in the queue so insert must succeed. Insert into the queu, release the lock and block
- // the producer until its data is taken.
- insert(e, true);
- }
-
- /**
- * Retrieves and removes the head of this queue, waiting if no elements are present on this queue.
- * Any producer that has its data element taken by this call will be immediately unblocked. To keep the
- * producer blocked whilst taking just a single item, use the
- * {@link #drainTo(java.util.Collection<org.apache.qpid.util.concurrent.SynchRecord<E>>, int, boolean)}
- * method. There is no take method to do that because there is not usually any advantage in a synchronous hand
- * off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption
- * latencies accross many producers where possible.
- *
- * @return The head of this queue.
- *
- * @throws InterruptedException if interrupted while waiting.
- */
- public E take() throws InterruptedException
- {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
-
- try
- {
- try
- {
- while (count == 0)
- {
- // Release the lock and wait until the queue becomes non-empty.
- notEmpty.await();
- }
- }
- catch (InterruptedException ie)
- {
- notEmpty.signal(); // propagate to non-interrupted thread
- throw ie;
- }
-
- // There is data in the queue so extraction must succeed. Notify any waiting threads that the queue is
- // not full, and unblock the producer that owns the data item that is taken.
- E x = extract(true, true).getElement();
-
- return x;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Removes all available elements from this queue and adds them into the given collection. This operation may be
- * more efficient than repeatedly polling this queue. A failure encountered while attempting to <tt>add</tt> elements
- * to collection <tt>c</tt> may result in elements being in neither, either or both collections when the associated
- * exception is thrown. Attempts to drain a queue to itself result in <tt>IllegalArgumentException</tt>. Further,
- * the behavior of this operation is undefined if the specified collection is modified while the operation is in
- * progress.
- *
- * @param objects The collection to transfer elements into.
- *
- * @return The number of elements transferred.
- *
- * @throws NullPointerException If objects is null.
- * @throws IllegalArgumentException If objects is this queue.
- */
- public int drainTo(Collection<? super E> objects)
- {
- return drainTo(objects, -1);
- }
-
- /**
- * Removes at most the given number of available elements from this queue and adds them into the given collection.
- * A failure encountered while attempting to <tt>add</tt> elements to collection <tt>c</tt> may result in elements
- * being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue
- * to itself result in <tt>IllegalArgumentException</tt>. Further, the behavior of this operation is undefined if
- * the specified collection is modified while the operation is in progress.
- *
- * @param objects The collection to transfer elements into.
- * @param maxElements The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning
- * all elements.
- *
- * @return The number of elements transferred.
- *
- * @throws NullPointerException If c is null.
- * @throws IllegalArgumentException If c is this queue.
- */
- public int drainTo(Collection<? super E> objects, int maxElements)
- {
- if (objects == null)
- {
- throw new NullPointerException();
- }
-
- if (objects == this)
- {
- throw new IllegalArgumentException();
- }
-
- // final Queue<E> items = this.buffer;
- final ReentrantLock lock = this.lock;
- lock.lock();
-
- try
- {
- int n = 0;
-
- for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++)
- {
- // Take items from the queue, do unblock the producers, but don't send not full signals yet.
- objects.add(extract(true, false).getElement());
- }
-
- if (n > 0)
- {
- // count -= n;
- notFull.signalAll();
- }
-
- return n;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Takes all available data items from the queue or blocks until some become available. The returned items
- * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
- * producers, where the producers are still blocked.
- *
- * @param c The collection to drain the data items into.
- * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
- *
- * @return A count of the number of elements that were drained from the queue.
- */
- public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock)
- {
- return drainTo(c, -1, unblock);
- }
-
- /**
- * Takes up to maxElements available data items from the queue or blocks until some become available. The returned
- * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
- * producers, where the producers are still blocked.
- *
- * @param coll The collection to drain the data items into.
- * @param maxElements The maximum number of elements to drain.
- * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
- *
- * @return A count of the number of elements that were drained from the queue.
- */
- public SynchRef drainTo(Collection<SynchRecord<E>> coll, int maxElements, boolean unblock)
- {
- if (coll == null)
- {
- throw new NullPointerException();
- }
-
- // final Queue<E> items = this.buffer;
- final ReentrantLock lock = this.lock;
- lock.lock();
-
- try
- {
- int n = 0;
-
- for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++)
- {
- // Extract the next record from the queue, don't signall the not full condition yet and release
- // producers depending on whether the caller wants to or not.
- coll.add(extract(false, unblock));
- }
-
- if (n > 0)
- {
- // count -= n;
- notFull.signalAll();
- }
-
- return new SynchRefImpl(n, coll);
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * This abstract method should be overriden to return an empty queue. Different implementations of producer
- * consumer buffers can control the order in which data is accessed using different queue implementations.
- * This method allows the type of queue to be abstracted out of this class and to be supplied by concrete
- * implementations.
- *
- * @return An empty queue.
- */
- protected abstract <T> Queue<T> createQueue();
-
- /**
- * Insert element into the queue, then possibly signal that the queue is not empty and block the producer
- * on the element until permission to procede is given.
- *
- * <p/>If the producer is to be blocked then the lock must be released first, otherwise no other process
- * will be able to get access to the queue. Hence, unlock and block are always set together.
- *
- * <p/>Call only when holding the global lock.
- *
- * @param unlockAndBlock <tt>true</tt>If the global queue lock should be released and the producer should be blocked.
- *
- * @return <tt>true</tt> if the operation succeeded, <tt>false</tt> otherwise. If the result is <tt>true</tt> this
- * method may not return straight away, but only after the producer is unblocked by having its data
- * consumed if the unlockAndBlock flag is set. In the false case the method will return straight away, no
- * matter what value the unlockAndBlock flag has, leaving the global lock on.
- */
- protected boolean insert(E x, boolean unlockAndBlock)
- {
- // Create a new record for the data item.
- SynchRecordImpl<E> record = new SynchRecordImpl<E>(x);
-
- boolean result = buffer.offer(record);
-
- if (result)
- {
- count++;
-
- // Tell any waiting consumers that the queue is not empty.
- notEmpty.signal();
-
- if (unlockAndBlock)
- {
- // Allow other threads to read/write the queue.
- lock.unlock();
-
- // Wait until a consumer takes this data item.
- record.waitForConsumer();
- }
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
- /**
- * Extract element at current take position, advance, and signal.
- *
- * <p/>Call only when holding lock.
- */
- protected SynchRecordImpl<E> extract(boolean unblock, boolean signal)
- {
- SynchRecordImpl<E> result = buffer.remove();
- count--;
-
- if (signal)
- {
- notFull.signal();
- }
-
- if (unblock)
- {
- result.releaseImmediately();
- }
-
- return result;
- }
-
- /**
- * Get the capacity of the buffer. If the buffer has no maximum capacity then Integer.MAX_VALUE is returned.
- *
- * <p/>Call only when holding lock.
- *
- * @return The maximum capacity of the buffer.
- */
- protected int getBufferCapacity()
- {
- if (buffer instanceof Capacity)
- {
- return ((Capacity) buffer).getCapacity();
- }
- else
- {
- return Integer.MAX_VALUE;
- }
- }
-
- /**
- * Return the head element from the buffer.
- *
- * <p/>Call only when holding lock.
- *
- * @return The head element from the buffer.
- */
- protected E peekAtBufferHead()
- {
- return buffer.peek().getElement();
- }
-
- public class SynchRefImpl implements SynchRef
- {
- /** Holds the number of synch records associated with this reference. */
- int numRecords;
-
- /** Holds a reference to the collection of synch records managed by this. */
- Collection<SynchRecord<E>> records;
-
- public SynchRefImpl(int n, Collection<SynchRecord<E>> records)
- {
- this.numRecords = n;
- this.records = records;
- }
-
- public int getNumRecords()
- {
- return numRecords;
- }
-
- /**
- * Any producers that have had their data elements taken from the queue but have not been unblocked are unblocked
- * when this method is called. The exception to this is producers that have had their data put back onto the queue
- * by a consumer. Producers that have had exceptions for their data items registered by consumers will be unblocked
- * but will not return from their put call normally, but with an exception instead.
- */
- public void unblockProducers()
- {
- log.debug("public void unblockProducers(): called");
-
- if (records != null)
- {
- for (SynchRecord<E> record : records)
- {
- // This call takes account of items that have already been released, are to be requeued or are in
- // error.
- record.releaseImmediately();
- }
- }
-
- records = null;
- }
- }
-
- /**
- * A SynchRecordImpl is used by a {@link BatchSynchQueue} to pair together a producer with its data. This allows
- * the producer of data to be identified so that it can be unblocked when its data is consumed or sent errors when
- * its data cannot be consumed.
- */
- public class SynchRecordImpl<E> implements SynchRecord<E>
- {
- /** A boolean latch that determines when the producer for this data item will be allowed to continue. */
- BooleanLatch latch = new BooleanLatch();
-
- /** The data element associated with this item. */
- E element;
-
- /**
- * Create a new synch record.
- *
- * @param e The data element that the record encapsulates.
- */
- public SynchRecordImpl(E e)
- {
- // Keep the data element.
- element = e;
- }
-
- /**
- * Waits until the producer is given permission to proceded by a consumer.
- */
- public void waitForConsumer()
- {
- latch.await();
- }
-
- /**
- * Gets the data element contained by this record.
- *
- * @return The data element contained by this record.
- */
- public E getElement()
- {
- return element;
- }
-
- /**
- * Immediately releases the producer of this data record. Consumers can bring the synchronization time of
- * producers to a minimum by using this method to release them at the earliest possible moment when batch
- * consuming records from sychronized producers.
- */
- public void releaseImmediately()
- {
- // Check that the record has not already been released, is in error or is to be requeued.
- latch.signal();
-
- // Propagate errors to the producer.
-
- // Requeue items to be requeued.
- }
-
- /**
- * Tells the synch queue to put this element back onto the queue instead of releasing its producer.
- * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method or
- * the {@link #releaseImmediately()} method.
- *
- * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
- * element has already been unblocked.
- */
- public void reQueue()
- {
- throw new RuntimeException("Not implemented.");
- }
-
- /**
- * Tells the synch queue to raise an exception with this elements producer. The exception is not raised
- * immediately but upon calling the {@link SynchRef#unblockProducers()} method or the
- * {@link #releaseImmediately()} method. The exception will be wrapped in a {@link SynchException} before it is
- * raised on the producer.
- *
- * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used
- * because the exception is to be passed onto a different thread.
- *
- * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
- * element has already been unblocked.
- *
- * @param e The exception to raise on the producer.
- */
- public void inError(Exception e)
- {
- throw new RuntimeException("Not implemented.");
- }
- }
-}