diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs | 384 |
1 files changed, 384 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs b/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs new file mode 100644 index 0000000000..be92576951 --- /dev/null +++ b/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs @@ -0,0 +1,384 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; + +namespace Apache.Qpid.Collections +{ + public class LinkedBlockingQueue : BlockingQueue + { + + /* + * A variant of the "two lock queue" algorithm. The putLock gates + * entry to put (and offer), and has an associated condition for + * waiting puts. Similarly for the takeLock. The "count" field + * that they both rely on is maintained as an atomic to avoid + * needing to get both locks in most cases. Also, to minimize need + * for puts to get takeLock and vice-versa, cascading notifies are + * used. When a put notices that it has enabled at least one take, + * it signals taker. That taker in turn signals others if more + * items have been entered since the signal. And symmetrically for + * takes signalling puts. Operations such as remove(Object) and + * iterators acquire both locks. + */ + + /** + * Linked list node class + */ + internal class Node + { + /** The item, volatile to ensure barrier separating write and read */ + internal volatile Object item; + internal Node next; + internal Node(Object x) { item = x; } + } + + /** The capacity bound, or Integer.MAX_VALUE if none */ + private readonly int capacity; + + /** Current number of elements */ + private volatile int count = 0; + + /** Head of linked list */ + private Node head; + + /** Tail of linked list */ + private Node last; + + /** Lock held by take, poll, etc */ + private readonly object takeLock = new Object(); //new SerializableLock(); + + /** Lock held by put, offer, etc */ + private readonly object putLock = new Object();//new SerializableLock(); + + /** + * Signals a waiting take. Called only from put/offer (which do not + * otherwise ordinarily lock takeLock.) + */ + private void SignalNotEmpty() + { + lock (takeLock) + { + Monitor.Pulse(takeLock); + } + } + + /** + * Signals a waiting put. Called only from take/poll. + */ + private void SignalNotFull() + { + lock (putLock) + { + Monitor.Pulse(putLock); + } + } + + /** + * Creates a node and links it at end of queue. + * @param x the item + */ + private void Insert(Object x) + { + last = last.next = new Node(x); + } + + /** + * Removes a node from head of queue, + * @return the node + */ + private Object Extract() + { + Node first = head.next; + head = first; + Object x = first.item; + first.item = null; + return x; + } + + + /** + * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of + * {@link Integer#MAX_VALUE}. + */ + public LinkedBlockingQueue() : this(Int32.MaxValue) + { + } + + /** + * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity. + * + * @param capacity the capacity of this queue + * @throws IllegalArgumentException if <tt>capacity</tt> is not greater + * than zero + */ + public LinkedBlockingQueue(int capacity) + { + if (capacity <= 0) throw new ArgumentException("Capacity must be positive, was passed " + capacity); + this.capacity = capacity; + last = head = new Node(null); + } + + // this doc comment is overridden to remove the reference to collections + // greater in size than Integer.MAX_VALUE + /** + * Returns the number of elements in this queue. + * + * @return the number of elements in this queue + */ + public int Size + { + get + { + return count; + } + } + + // this doc comment is a modified copy of the inherited doc comment, + // without the reference to unlimited queues. + /** + * Returns the number of additional elements that this queue can ideally + * (in the absence of memory or resource constraints) accept without + * blocking. This is always equal to the initial capacity of this queue + * less the current <tt>size</tt> of this queue. + * + * <p>Note that you <em>cannot</em> always tell if an attempt to insert + * an element will succeed by inspecting <tt>remainingCapacity</tt> + * because it may be the case that another thread is about to + * insert or remove an element. + */ + public override int RemainingCapacity + { + get + { + return capacity - count; + } + } + + /** + * Inserts the specified element at the tail of this queue, waiting if + * necessary for space to become available. + * + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + public override void EnqueueBlocking(Object e) + { + if (e == null) throw new ArgumentNullException("Object must not be null"); + // Note: convention in all put/take/etc is to preset + // local var holding count negative to indicate failure unless set. + int c = -1; + lock (putLock) + { + /* + * Note that count is used in wait guard even though it is + * not protected by lock. This works because count can + * only decrease at this point (all other puts are shut + * out by lock), and we (or some other waiting put) are + * signalled if it ever changes from + * capacity. Similarly for all other uses of count in + * other wait guards. + */ + while (count == capacity) + { + Monitor.Wait(putLock); + } + + Insert(e); + lock(this) + { + c = count++; + } + if (c + 1 < capacity) + { + Monitor.Pulse(putLock); + } + } + + if (c == 0) + { + SignalNotEmpty(); + } + } + + /** + * Inserts the specified element at the tail of this queue if it is + * possible to do so immediately without exceeding the queue's capacity, + * returning <tt>true</tt> upon success and <tt>false</tt> if this queue + * is full. + * When using a capacity-restricted queue, this method is generally + * preferable to method {@link BlockingQueue#add add}, which can fail to + * insert an element only by throwing an exception. + * + * @throws NullPointerException if the specified element is null + */ + public override bool EnqueueNoThrow(Object e) + { + if (e == null) throw new ArgumentNullException("e must not be null"); + if (count == capacity) + { + return false; + } + int c = -1; + lock (putLock) + { + if (count < capacity) + { + Insert(e); + lock (this) + { + c = count++; + } + if (c + 1 < capacity) + { + Monitor.Pulse(putLock); + } + } + } + if (c == 0) + { + SignalNotEmpty(); + } + return c >= 0; + } + + /** + * Retrieves and removes the head of this queue, waiting if necessary + * until an element becomes available. + * + * @return the head of this queue + * @throws InterruptedException if interrupted while waiting + */ + public override Object DequeueBlocking() + { + Object x; + int c = -1; + lock (takeLock) + { + + while (count == 0) + { + Monitor.Wait(takeLock); + } + + + x = Extract(); + lock (this) { c = count--; } + if (c > 1) + { + Monitor.Pulse(takeLock); + } + } + if (c == capacity) + { + SignalNotFull(); + } + return x; + } + + public Object Poll() + { + if (count == 0) + { + return null; + } + Object x = null; + int c = -1; + lock (takeLock) + { + if (count > 0) + { + x = Extract(); + lock (this) { c = count--; } + if (c > 1) + { + Monitor.Pulse(takeLock); + } + } + } + if (c == capacity) + { + SignalNotFull(); + } + return x; + } + + + public override Object Peek() + { + if (count == 0) + { + return null; + } + lock (takeLock) + { + Node first = head.next; + if (first == null) + { + return null; + } + else + { + return first.item; + } + } + } + + public override String ToString() + { + lock (putLock) + { + lock (takeLock) + { + return base.ToString(); + } + } + } + + /** + * Atomically removes all of the elements from this queue. + * The queue will be empty after this call returns. + */ + public override void Clear() + { + lock (putLock) + { + lock (takeLock) + { + head.next = null; + last = head; + int c; + lock (this) + { + c = count; + count = 0; + } + if (c == capacity) + { + Monitor.PulseAll(putLock); + } + } + } + } + } +} + + |