summaryrefslogtreecommitdiff
path: root/trunk/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs')
-rw-r--r--trunk/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs384
1 files changed, 0 insertions, 384 deletions
diff --git a/trunk/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs b/trunk/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs
deleted file mode 100644
index be92576951..0000000000
--- a/trunk/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-
-namespace Apache.Qpid.Collections
-{
- public class LinkedBlockingQueue : BlockingQueue
- {
-
- /*
- * A variant of the "two lock queue" algorithm. The putLock gates
- * entry to put (and offer), and has an associated condition for
- * waiting puts. Similarly for the takeLock. The "count" field
- * that they both rely on is maintained as an atomic to avoid
- * needing to get both locks in most cases. Also, to minimize need
- * for puts to get takeLock and vice-versa, cascading notifies are
- * used. When a put notices that it has enabled at least one take,
- * it signals taker. That taker in turn signals others if more
- * items have been entered since the signal. And symmetrically for
- * takes signalling puts. Operations such as remove(Object) and
- * iterators acquire both locks.
- */
-
- /**
- * Linked list node class
- */
- internal class Node
- {
- /** The item, volatile to ensure barrier separating write and read */
- internal volatile Object item;
- internal Node next;
- internal Node(Object x) { item = x; }
- }
-
- /** The capacity bound, or Integer.MAX_VALUE if none */
- private readonly int capacity;
-
- /** Current number of elements */
- private volatile int count = 0;
-
- /** Head of linked list */
- private Node head;
-
- /** Tail of linked list */
- private Node last;
-
- /** Lock held by take, poll, etc */
- private readonly object takeLock = new Object(); //new SerializableLock();
-
- /** Lock held by put, offer, etc */
- private readonly object putLock = new Object();//new SerializableLock();
-
- /**
- * Signals a waiting take. Called only from put/offer (which do not
- * otherwise ordinarily lock takeLock.)
- */
- private void SignalNotEmpty()
- {
- lock (takeLock)
- {
- Monitor.Pulse(takeLock);
- }
- }
-
- /**
- * Signals a waiting put. Called only from take/poll.
- */
- private void SignalNotFull()
- {
- lock (putLock)
- {
- Monitor.Pulse(putLock);
- }
- }
-
- /**
- * Creates a node and links it at end of queue.
- * @param x the item
- */
- private void Insert(Object x)
- {
- last = last.next = new Node(x);
- }
-
- /**
- * Removes a node from head of queue,
- * @return the node
- */
- private Object Extract()
- {
- Node first = head.next;
- head = first;
- Object x = first.item;
- first.item = null;
- return x;
- }
-
-
- /**
- * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
- * {@link Integer#MAX_VALUE}.
- */
- public LinkedBlockingQueue() : this(Int32.MaxValue)
- {
- }
-
- /**
- * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
- *
- * @param capacity the capacity of this queue
- * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
- * than zero
- */
- public LinkedBlockingQueue(int capacity)
- {
- if (capacity <= 0) throw new ArgumentException("Capacity must be positive, was passed " + capacity);
- this.capacity = capacity;
- last = head = new Node(null);
- }
-
- // this doc comment is overridden to remove the reference to collections
- // greater in size than Integer.MAX_VALUE
- /**
- * Returns the number of elements in this queue.
- *
- * @return the number of elements in this queue
- */
- public int Size
- {
- get
- {
- return count;
- }
- }
-
- // this doc comment is a modified copy of the inherited doc comment,
- // without the reference to unlimited queues.
- /**
- * Returns the number of additional elements that this queue can ideally
- * (in the absence of memory or resource constraints) accept without
- * blocking. This is always equal to the initial capacity of this queue
- * less the current <tt>size</tt> of this queue.
- *
- * <p>Note that you <em>cannot</em> always tell if an attempt to insert
- * an element will succeed by inspecting <tt>remainingCapacity</tt>
- * because it may be the case that another thread is about to
- * insert or remove an element.
- */
- public override int RemainingCapacity
- {
- get
- {
- return capacity - count;
- }
- }
-
- /**
- * Inserts the specified element at the tail of this queue, waiting if
- * necessary for space to become available.
- *
- * @throws InterruptedException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public override void EnqueueBlocking(Object e)
- {
- if (e == null) throw new ArgumentNullException("Object must not be null");
- // Note: convention in all put/take/etc is to preset
- // local var holding count negative to indicate failure unless set.
- int c = -1;
- lock (putLock)
- {
- /*
- * Note that count is used in wait guard even though it is
- * not protected by lock. This works because count can
- * only decrease at this point (all other puts are shut
- * out by lock), and we (or some other waiting put) are
- * signalled if it ever changes from
- * capacity. Similarly for all other uses of count in
- * other wait guards.
- */
- while (count == capacity)
- {
- Monitor.Wait(putLock);
- }
-
- Insert(e);
- lock(this)
- {
- c = count++;
- }
- if (c + 1 < capacity)
- {
- Monitor.Pulse(putLock);
- }
- }
-
- if (c == 0)
- {
- SignalNotEmpty();
- }
- }
-
- /**
- * Inserts the specified element at the tail of this queue if it is
- * possible to do so immediately without exceeding the queue's capacity,
- * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
- * is full.
- * When using a capacity-restricted queue, this method is generally
- * preferable to method {@link BlockingQueue#add add}, which can fail to
- * insert an element only by throwing an exception.
- *
- * @throws NullPointerException if the specified element is null
- */
- public override bool EnqueueNoThrow(Object e)
- {
- if (e == null) throw new ArgumentNullException("e must not be null");
- if (count == capacity)
- {
- return false;
- }
- int c = -1;
- lock (putLock)
- {
- if (count < capacity)
- {
- Insert(e);
- lock (this)
- {
- c = count++;
- }
- if (c + 1 < capacity)
- {
- Monitor.Pulse(putLock);
- }
- }
- }
- if (c == 0)
- {
- SignalNotEmpty();
- }
- return c >= 0;
- }
-
- /**
- * Retrieves and removes the head of this queue, waiting if necessary
- * until an element becomes available.
- *
- * @return the head of this queue
- * @throws InterruptedException if interrupted while waiting
- */
- public override Object DequeueBlocking()
- {
- Object x;
- int c = -1;
- lock (takeLock)
- {
-
- while (count == 0)
- {
- Monitor.Wait(takeLock);
- }
-
-
- x = Extract();
- lock (this) { c = count--; }
- if (c > 1)
- {
- Monitor.Pulse(takeLock);
- }
- }
- if (c == capacity)
- {
- SignalNotFull();
- }
- return x;
- }
-
- public Object Poll()
- {
- if (count == 0)
- {
- return null;
- }
- Object x = null;
- int c = -1;
- lock (takeLock)
- {
- if (count > 0)
- {
- x = Extract();
- lock (this) { c = count--; }
- if (c > 1)
- {
- Monitor.Pulse(takeLock);
- }
- }
- }
- if (c == capacity)
- {
- SignalNotFull();
- }
- return x;
- }
-
-
- public override Object Peek()
- {
- if (count == 0)
- {
- return null;
- }
- lock (takeLock)
- {
- Node first = head.next;
- if (first == null)
- {
- return null;
- }
- else
- {
- return first.item;
- }
- }
- }
-
- public override String ToString()
- {
- lock (putLock)
- {
- lock (takeLock)
- {
- return base.ToString();
- }
- }
- }
-
- /**
- * Atomically removes all of the elements from this queue.
- * The queue will be empty after this call returns.
- */
- public override void Clear()
- {
- lock (putLock)
- {
- lock (takeLock)
- {
- head.next = null;
- last = head;
- int c;
- lock (this)
- {
- c = count;
- count = 0;
- }
- if (c == capacity)
- {
- Monitor.PulseAll(putLock);
- }
- }
- }
- }
- }
-}
-
-