summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs')
-rw-r--r--qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs384
1 files changed, 384 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs b/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs
new file mode 100644
index 0000000000..be92576951
--- /dev/null
+++ b/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs
@@ -0,0 +1,384 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+
+namespace Apache.Qpid.Collections
+{
+ public class LinkedBlockingQueue : BlockingQueue
+ {
+
+ /*
+ * A variant of the "two lock queue" algorithm. The putLock gates
+ * entry to put (and offer), and has an associated condition for
+ * waiting puts. Similarly for the takeLock. The "count" field
+ * that they both rely on is maintained as an atomic to avoid
+ * needing to get both locks in most cases. Also, to minimize need
+ * for puts to get takeLock and vice-versa, cascading notifies are
+ * used. When a put notices that it has enabled at least one take,
+ * it signals taker. That taker in turn signals others if more
+ * items have been entered since the signal. And symmetrically for
+ * takes signalling puts. Operations such as remove(Object) and
+ * iterators acquire both locks.
+ */
+
+ /**
+ * Linked list node class
+ */
+ internal class Node
+ {
+ /** The item, volatile to ensure barrier separating write and read */
+ internal volatile Object item;
+ internal Node next;
+ internal Node(Object x) { item = x; }
+ }
+
+ /** The capacity bound, or Integer.MAX_VALUE if none */
+ private readonly int capacity;
+
+ /** Current number of elements */
+ private volatile int count = 0;
+
+ /** Head of linked list */
+ private Node head;
+
+ /** Tail of linked list */
+ private Node last;
+
+ /** Lock held by take, poll, etc */
+ private readonly object takeLock = new Object(); //new SerializableLock();
+
+ /** Lock held by put, offer, etc */
+ private readonly object putLock = new Object();//new SerializableLock();
+
+ /**
+ * Signals a waiting take. Called only from put/offer (which do not
+ * otherwise ordinarily lock takeLock.)
+ */
+ private void SignalNotEmpty()
+ {
+ lock (takeLock)
+ {
+ Monitor.Pulse(takeLock);
+ }
+ }
+
+ /**
+ * Signals a waiting put. Called only from take/poll.
+ */
+ private void SignalNotFull()
+ {
+ lock (putLock)
+ {
+ Monitor.Pulse(putLock);
+ }
+ }
+
+ /**
+ * Creates a node and links it at end of queue.
+ * @param x the item
+ */
+ private void Insert(Object x)
+ {
+ last = last.next = new Node(x);
+ }
+
+ /**
+ * Removes a node from head of queue,
+ * @return the node
+ */
+ private Object Extract()
+ {
+ Node first = head.next;
+ head = first;
+ Object x = first.item;
+ first.item = null;
+ return x;
+ }
+
+
+ /**
+ * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
+ * {@link Integer#MAX_VALUE}.
+ */
+ public LinkedBlockingQueue() : this(Int32.MaxValue)
+ {
+ }
+
+ /**
+ * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
+ *
+ * @param capacity the capacity of this queue
+ * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
+ * than zero
+ */
+ public LinkedBlockingQueue(int capacity)
+ {
+ if (capacity <= 0) throw new ArgumentException("Capacity must be positive, was passed " + capacity);
+ this.capacity = capacity;
+ last = head = new Node(null);
+ }
+
+ // this doc comment is overridden to remove the reference to collections
+ // greater in size than Integer.MAX_VALUE
+ /**
+ * Returns the number of elements in this queue.
+ *
+ * @return the number of elements in this queue
+ */
+ public int Size
+ {
+ get
+ {
+ return count;
+ }
+ }
+
+ // this doc comment is a modified copy of the inherited doc comment,
+ // without the reference to unlimited queues.
+ /**
+ * Returns the number of additional elements that this queue can ideally
+ * (in the absence of memory or resource constraints) accept without
+ * blocking. This is always equal to the initial capacity of this queue
+ * less the current <tt>size</tt> of this queue.
+ *
+ * <p>Note that you <em>cannot</em> always tell if an attempt to insert
+ * an element will succeed by inspecting <tt>remainingCapacity</tt>
+ * because it may be the case that another thread is about to
+ * insert or remove an element.
+ */
+ public override int RemainingCapacity
+ {
+ get
+ {
+ return capacity - count;
+ }
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue, waiting if
+ * necessary for space to become available.
+ *
+ * @throws InterruptedException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public override void EnqueueBlocking(Object e)
+ {
+ if (e == null) throw new ArgumentNullException("Object must not be null");
+ // Note: convention in all put/take/etc is to preset
+ // local var holding count negative to indicate failure unless set.
+ int c = -1;
+ lock (putLock)
+ {
+ /*
+ * Note that count is used in wait guard even though it is
+ * not protected by lock. This works because count can
+ * only decrease at this point (all other puts are shut
+ * out by lock), and we (or some other waiting put) are
+ * signalled if it ever changes from
+ * capacity. Similarly for all other uses of count in
+ * other wait guards.
+ */
+ while (count == capacity)
+ {
+ Monitor.Wait(putLock);
+ }
+
+ Insert(e);
+ lock(this)
+ {
+ c = count++;
+ }
+ if (c + 1 < capacity)
+ {
+ Monitor.Pulse(putLock);
+ }
+ }
+
+ if (c == 0)
+ {
+ SignalNotEmpty();
+ }
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue if it is
+ * possible to do so immediately without exceeding the queue's capacity,
+ * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
+ * is full.
+ * When using a capacity-restricted queue, this method is generally
+ * preferable to method {@link BlockingQueue#add add}, which can fail to
+ * insert an element only by throwing an exception.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public override bool EnqueueNoThrow(Object e)
+ {
+ if (e == null) throw new ArgumentNullException("e must not be null");
+ if (count == capacity)
+ {
+ return false;
+ }
+ int c = -1;
+ lock (putLock)
+ {
+ if (count < capacity)
+ {
+ Insert(e);
+ lock (this)
+ {
+ c = count++;
+ }
+ if (c + 1 < capacity)
+ {
+ Monitor.Pulse(putLock);
+ }
+ }
+ }
+ if (c == 0)
+ {
+ SignalNotEmpty();
+ }
+ return c >= 0;
+ }
+
+ /**
+ * Retrieves and removes the head of this queue, waiting if necessary
+ * until an element becomes available.
+ *
+ * @return the head of this queue
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public override Object DequeueBlocking()
+ {
+ Object x;
+ int c = -1;
+ lock (takeLock)
+ {
+
+ while (count == 0)
+ {
+ Monitor.Wait(takeLock);
+ }
+
+
+ x = Extract();
+ lock (this) { c = count--; }
+ if (c > 1)
+ {
+ Monitor.Pulse(takeLock);
+ }
+ }
+ if (c == capacity)
+ {
+ SignalNotFull();
+ }
+ return x;
+ }
+
+ public Object Poll()
+ {
+ if (count == 0)
+ {
+ return null;
+ }
+ Object x = null;
+ int c = -1;
+ lock (takeLock)
+ {
+ if (count > 0)
+ {
+ x = Extract();
+ lock (this) { c = count--; }
+ if (c > 1)
+ {
+ Monitor.Pulse(takeLock);
+ }
+ }
+ }
+ if (c == capacity)
+ {
+ SignalNotFull();
+ }
+ return x;
+ }
+
+
+ public override Object Peek()
+ {
+ if (count == 0)
+ {
+ return null;
+ }
+ lock (takeLock)
+ {
+ Node first = head.next;
+ if (first == null)
+ {
+ return null;
+ }
+ else
+ {
+ return first.item;
+ }
+ }
+ }
+
+ public override String ToString()
+ {
+ lock (putLock)
+ {
+ lock (takeLock)
+ {
+ return base.ToString();
+ }
+ }
+ }
+
+ /**
+ * Atomically removes all of the elements from this queue.
+ * The queue will be empty after this call returns.
+ */
+ public override void Clear()
+ {
+ lock (putLock)
+ {
+ lock (takeLock)
+ {
+ head.next = null;
+ last = head;
+ int c;
+ lock (this)
+ {
+ c = count;
+ count = 0;
+ }
+ if (c == capacity)
+ {
+ Monitor.PulseAll(putLock);
+ }
+ }
+ }
+ }
+ }
+}
+
+