diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Common/Collections')
5 files changed, 1294 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs b/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs new file mode 100644 index 0000000000..dcfacf8474 --- /dev/null +++ b/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; + +namespace Apache.Qpid.Collections +{ + public abstract class BlockingQueue : Queue + { + /** + * Inserts the specified element into this queue if it is possible to do + * so immediately without violating capacity restrictions, returning + * <tt>true</tt> upon success and <tt>false</tt> if no space is currently + * available. When using a capacity-restricted queue, this method is + * generally preferable to {@link #add}, which can fail to insert an + * element only by throwing an exception. + * + * @param e the element to add + * @return <tt>true</tt> if the element was added to this queue, else + * <tt>false</tt> + * @throws ClassCastException if the class of the specified element + * prevents it from being added to this queue + * @throws NullPointerException if the specified element is null + * @throws IllegalArgumentException if some property of the specified + * element prevents it from being added to this queue + */ + public abstract bool EnqueueNoThrow(Object e); + + /** + * Inserts the specified element into this queue, waiting if necessary + * for space to become available. + * + * @param e the element to add + * @throws InterruptedException if interrupted while waiting + * @throws ClassCastException if the class of the specified element + * prevents it from being added to this queue + * @throws NullPointerException if the specified element is null + * @throws IllegalArgumentException if some property of the specified + * element prevents it from being added to this queue + */ + public abstract void EnqueueBlocking(object e); + + /** + * Retrieves and removes the head of this queue, waiting up to the + * specified wait time if necessary for an element to become available. + * + * @param timeout how long to wait before giving up, in units of + * <tt>unit</tt> + * @param unit a <tt>TimeUnit</tt> determining how to interpret the + * <tt>timeout</tt> parameter + * @return the head of this queue, or <tt>null</tt> if the + * specified waiting time elapses before an element is available + * @throws InterruptedException if interrupted while waiting + */ + public abstract object DequeueBlocking(); + + /** + * Returns the number of additional elements that this queue can ideally + * (in the absence of memory or resource constraints) accept without + * blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic + * limit. + * + * <p>Note that you <em>cannot</em> always tell if an attempt to insert + * an element will succeed by inspecting <tt>remainingCapacity</tt> + * because it may be the case that another thread is about to + * insert or remove an element. + * + * @return the remaining capacity + */ + public abstract int RemainingCapacity + { + get; + } + } +} + + diff --git a/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs b/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs new file mode 100644 index 0000000000..131f316da6 --- /dev/null +++ b/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Threading; + + +namespace Apache.Qpid.Collections +{ + /// <summary> + /// Simple FIFO queue to support multi-threaded consumer + /// and producers. It supports timeouts in dequeue operations. + /// </summary> + public sealed class ConsumerProducerQueue + { + private Queue _queue = new Queue(); + private WaitSemaphore _semaphore = new WaitSemaphore(); + + /// <summary> + /// Put an item into the tail of the queue + /// </summary> + /// <param name="item"></param> + public void Enqueue(object item) + { + lock ( _queue.SyncRoot ) + { + _queue.Enqueue(item); + _semaphore.Increment(); + } + } + + /// <summary> + /// Wait indefinitely for an item to be available + /// on the queue. + /// </summary> + /// <returns>The object at the head of the queue</returns> + public object Dequeue() + { + return Dequeue(Timeout.Infinite); + } + + /// <summary> + /// Wait up to the number of milliseconds specified + /// for an item to be available on the queue + /// </summary> + /// <param name="timeout">Number of milliseconds to wait</param> + /// <returns>The object at the head of the queue, or null + /// if the timeout expires</returns> + public object Dequeue(long timeout) + { + if ( _semaphore.Decrement(timeout) ) + { + lock ( _queue.SyncRoot ) + { + return _queue.Dequeue(); + } + } + return null; + } + + #region Simple Semaphore + // + // Simple Semaphore + // + + class WaitSemaphore + { + private int _count; + private AutoResetEvent _event = new AutoResetEvent(false); + + public void Increment() + { + Interlocked.Increment(ref _count); + _event.Set(); + } + + public bool Decrement(long timeout) + { + if ( timeout > int.MaxValue ) + throw new ArgumentOutOfRangeException("timeout", timeout, "Must be <= Int32.MaxValue"); + + int millis = (int) (timeout & 0x7FFFFFFF); + if ( Interlocked.Decrement(ref _count) > 0 ) + { + // there are messages in queue, so no need to wait + return true; + } else + { + return _event.WaitOne(millis, false); + } + } + } + #endregion // Simple Semaphore + } +} diff --git a/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs b/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs new file mode 100644 index 0000000000..be92576951 --- /dev/null +++ b/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs @@ -0,0 +1,384 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; + +namespace Apache.Qpid.Collections +{ + public class LinkedBlockingQueue : BlockingQueue + { + + /* + * A variant of the "two lock queue" algorithm. The putLock gates + * entry to put (and offer), and has an associated condition for + * waiting puts. Similarly for the takeLock. The "count" field + * that they both rely on is maintained as an atomic to avoid + * needing to get both locks in most cases. Also, to minimize need + * for puts to get takeLock and vice-versa, cascading notifies are + * used. When a put notices that it has enabled at least one take, + * it signals taker. That taker in turn signals others if more + * items have been entered since the signal. And symmetrically for + * takes signalling puts. Operations such as remove(Object) and + * iterators acquire both locks. + */ + + /** + * Linked list node class + */ + internal class Node + { + /** The item, volatile to ensure barrier separating write and read */ + internal volatile Object item; + internal Node next; + internal Node(Object x) { item = x; } + } + + /** The capacity bound, or Integer.MAX_VALUE if none */ + private readonly int capacity; + + /** Current number of elements */ + private volatile int count = 0; + + /** Head of linked list */ + private Node head; + + /** Tail of linked list */ + private Node last; + + /** Lock held by take, poll, etc */ + private readonly object takeLock = new Object(); //new SerializableLock(); + + /** Lock held by put, offer, etc */ + private readonly object putLock = new Object();//new SerializableLock(); + + /** + * Signals a waiting take. Called only from put/offer (which do not + * otherwise ordinarily lock takeLock.) + */ + private void SignalNotEmpty() + { + lock (takeLock) + { + Monitor.Pulse(takeLock); + } + } + + /** + * Signals a waiting put. Called only from take/poll. + */ + private void SignalNotFull() + { + lock (putLock) + { + Monitor.Pulse(putLock); + } + } + + /** + * Creates a node and links it at end of queue. + * @param x the item + */ + private void Insert(Object x) + { + last = last.next = new Node(x); + } + + /** + * Removes a node from head of queue, + * @return the node + */ + private Object Extract() + { + Node first = head.next; + head = first; + Object x = first.item; + first.item = null; + return x; + } + + + /** + * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of + * {@link Integer#MAX_VALUE}. + */ + public LinkedBlockingQueue() : this(Int32.MaxValue) + { + } + + /** + * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity. + * + * @param capacity the capacity of this queue + * @throws IllegalArgumentException if <tt>capacity</tt> is not greater + * than zero + */ + public LinkedBlockingQueue(int capacity) + { + if (capacity <= 0) throw new ArgumentException("Capacity must be positive, was passed " + capacity); + this.capacity = capacity; + last = head = new Node(null); + } + + // this doc comment is overridden to remove the reference to collections + // greater in size than Integer.MAX_VALUE + /** + * Returns the number of elements in this queue. + * + * @return the number of elements in this queue + */ + public int Size + { + get + { + return count; + } + } + + // this doc comment is a modified copy of the inherited doc comment, + // without the reference to unlimited queues. + /** + * Returns the number of additional elements that this queue can ideally + * (in the absence of memory or resource constraints) accept without + * blocking. This is always equal to the initial capacity of this queue + * less the current <tt>size</tt> of this queue. + * + * <p>Note that you <em>cannot</em> always tell if an attempt to insert + * an element will succeed by inspecting <tt>remainingCapacity</tt> + * because it may be the case that another thread is about to + * insert or remove an element. + */ + public override int RemainingCapacity + { + get + { + return capacity - count; + } + } + + /** + * Inserts the specified element at the tail of this queue, waiting if + * necessary for space to become available. + * + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + public override void EnqueueBlocking(Object e) + { + if (e == null) throw new ArgumentNullException("Object must not be null"); + // Note: convention in all put/take/etc is to preset + // local var holding count negative to indicate failure unless set. + int c = -1; + lock (putLock) + { + /* + * Note that count is used in wait guard even though it is + * not protected by lock. This works because count can + * only decrease at this point (all other puts are shut + * out by lock), and we (or some other waiting put) are + * signalled if it ever changes from + * capacity. Similarly for all other uses of count in + * other wait guards. + */ + while (count == capacity) + { + Monitor.Wait(putLock); + } + + Insert(e); + lock(this) + { + c = count++; + } + if (c + 1 < capacity) + { + Monitor.Pulse(putLock); + } + } + + if (c == 0) + { + SignalNotEmpty(); + } + } + + /** + * Inserts the specified element at the tail of this queue if it is + * possible to do so immediately without exceeding the queue's capacity, + * returning <tt>true</tt> upon success and <tt>false</tt> if this queue + * is full. + * When using a capacity-restricted queue, this method is generally + * preferable to method {@link BlockingQueue#add add}, which can fail to + * insert an element only by throwing an exception. + * + * @throws NullPointerException if the specified element is null + */ + public override bool EnqueueNoThrow(Object e) + { + if (e == null) throw new ArgumentNullException("e must not be null"); + if (count == capacity) + { + return false; + } + int c = -1; + lock (putLock) + { + if (count < capacity) + { + Insert(e); + lock (this) + { + c = count++; + } + if (c + 1 < capacity) + { + Monitor.Pulse(putLock); + } + } + } + if (c == 0) + { + SignalNotEmpty(); + } + return c >= 0; + } + + /** + * Retrieves and removes the head of this queue, waiting if necessary + * until an element becomes available. + * + * @return the head of this queue + * @throws InterruptedException if interrupted while waiting + */ + public override Object DequeueBlocking() + { + Object x; + int c = -1; + lock (takeLock) + { + + while (count == 0) + { + Monitor.Wait(takeLock); + } + + + x = Extract(); + lock (this) { c = count--; } + if (c > 1) + { + Monitor.Pulse(takeLock); + } + } + if (c == capacity) + { + SignalNotFull(); + } + return x; + } + + public Object Poll() + { + if (count == 0) + { + return null; + } + Object x = null; + int c = -1; + lock (takeLock) + { + if (count > 0) + { + x = Extract(); + lock (this) { c = count--; } + if (c > 1) + { + Monitor.Pulse(takeLock); + } + } + } + if (c == capacity) + { + SignalNotFull(); + } + return x; + } + + + public override Object Peek() + { + if (count == 0) + { + return null; + } + lock (takeLock) + { + Node first = head.next; + if (first == null) + { + return null; + } + else + { + return first.item; + } + } + } + + public override String ToString() + { + lock (putLock) + { + lock (takeLock) + { + return base.ToString(); + } + } + } + + /** + * Atomically removes all of the elements from this queue. + * The queue will be empty after this call returns. + */ + public override void Clear() + { + lock (putLock) + { + lock (takeLock) + { + head.next = null; + last = head; + int c; + lock (this) + { + c = count; + count = 0; + } + if (c == capacity) + { + Monitor.PulseAll(putLock); + } + } + } + } + } +} + + diff --git a/qpid/dotnet/Qpid.Common/Collections/LinkedHashtable.cs b/qpid/dotnet/Qpid.Common/Collections/LinkedHashtable.cs new file mode 100644 index 0000000000..10ab5c674d --- /dev/null +++ b/qpid/dotnet/Qpid.Common/Collections/LinkedHashtable.cs @@ -0,0 +1,327 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; + +namespace Apache.Qpid.Collections +{ + public class LinkedHashtable : IDictionary + { + /// <summary> + /// Maps from key to LinkedDictionaryEntry + /// </summary> + private Hashtable _indexedValues = new Hashtable(); + + private LinkedDictionaryEntry _head; + + private LinkedDictionaryEntry _tail; + + private class LinkedDictionaryEntry + { + public LinkedDictionaryEntry _previous; + public LinkedDictionaryEntry _next; + internal DictionaryEntry _entry; + + public LinkedDictionaryEntry(object key, object value) + { + _entry = new DictionaryEntry(key, value); + } + } + + public object this[object key] + { + get + { + LinkedDictionaryEntry entry = (LinkedDictionaryEntry)_indexedValues[key]; + if (entry == null) + { + return null; // key not found + } + else + { + return entry._entry.Value; + } + } + + set + { + LinkedDictionaryEntry entry = (LinkedDictionaryEntry)_indexedValues[key]; + if (entry == null) + { + Add(key, value); + } + else + { + entry._entry.Value = value; + } + } + } + + /// <summary> + /// Collect keys in linked order. + /// </summary> + public ICollection Keys + { + get + { + IList result = new ArrayList(); + foreach (DictionaryEntry entry in this) + { + result.Add(entry.Key); + } + return result; + } + } + + /// <summary> + /// Collect values in linked order. + /// </summary> + public ICollection Values + { + get + { + IList result = new ArrayList(); + foreach (DictionaryEntry entry in this) + { + result.Add(entry.Value); + } + return result; + } + } + + public bool IsReadOnly + { + get { return _indexedValues.IsReadOnly; } + } + + public bool IsFixedSize + { + get { return _indexedValues.IsFixedSize; } + } + + public bool Contains(object key) + { + return _indexedValues.Contains(key); + } + + public void Add(object key, object value) + { + if (key == null) throw new ArgumentNullException("key"); + + if (Contains(key)) + { + throw new ArgumentException("LinkedHashtable already contains key. key=" + key); + } + + LinkedDictionaryEntry de = new LinkedDictionaryEntry(key, value); + if (_head == null) + { + _head = de; + _tail = de; + } + else + { + _tail._next = de; + de._previous = _tail; + _tail = de; + } + _indexedValues[key] = de; + } + + public void Clear() + { + _indexedValues.Clear(); + } + + IDictionaryEnumerator IDictionary.GetEnumerator() + { + return new LHTEnumerator(this); + } + + public void Remove(object key) + { + if (key == null) throw new ArgumentNullException("key"); + + LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key]; + if (de == null) return; // key not found. + LinkedDictionaryEntry prev = de._previous; + if (prev == null) + { + _head = de._next; + } + else + { + prev._next = de._next; + } + + LinkedDictionaryEntry next = de._next; + if (next == null) + { + _tail = de; + } + else + { + next._previous = de._previous; + } + } + + private LinkedDictionaryEntry Head + { + get + { + return _head; + } + } + +// private LinkedDictionaryEntry Tail +// { +// get +// { +// return _tail; +// } +// } + + private class LHTEnumerator : IDictionaryEnumerator + { + private LinkedHashtable _container; + + private LinkedDictionaryEntry _current; + + /// <summary> + /// Set once we have navigated off the end of the collection + /// </summary> + private bool _needsReset = false; + + public LHTEnumerator(LinkedHashtable container) + { + _container = container; + } + + public object Current + { + get + { + if (_current == null) + { + throw new Exception("Iterator before first element"); + } + else + { + return _current._entry; + } + } + } + + public object Key + { + get { return _current._entry.Key; } + } + + public object Value + { + get { return _current._entry.Value; } + } + + public DictionaryEntry Entry + { + get + { + return _current._entry; + } + } + + public bool MoveNext() + { + if (_needsReset) + { + return false; + } + else if (_current == null) + { + _current = _container.Head; + } + else + { + _current = _current._next; + } + _needsReset = (_current == null); + return !_needsReset; + } + + public void Reset() + { + _current = null; + _needsReset = false; + } + } + + public void MoveToHead(object key) + { + LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key]; + if (de == null) + { + throw new ArgumentException("Key " + key + " not found"); + } + // if the head is the element then there is nothing to do + if (_head == de) + { + return; + } + de._previous._next = de._next; + if (de._next != null) + { + de._next._previous = de._previous; + } + else + { + _tail = de._previous; + } + de._next = _head; + _head = de; + de._previous = null; + } + + public void CopyTo(Array array, int index) + { + _indexedValues.CopyTo(array, index); + } + + public int Count + { + get { return _indexedValues.Count; } + } + + public object SyncRoot + { + get { return _indexedValues.SyncRoot; } + } + + public bool IsSynchronized + { + get { return _indexedValues.IsSynchronized; } + } + + public IEnumerator GetEnumerator() + { + return new LHTEnumerator(this); + } + } +} diff --git a/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs b/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs new file mode 100644 index 0000000000..3c12df6067 --- /dev/null +++ b/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs @@ -0,0 +1,375 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; + +namespace Apache.Qpid.Collections +{ + public class SynchronousQueue : BlockingQueue + { + /// <summary> + /// Lock protecting both wait queues + /// </summary> +// private readonly object _qlock = new object(); + + /// <summary> + /// Queue holding waiting puts + /// </summary> +// private readonly WaitQueue _waitingProducers; + + /// <summary> + /// Queue holding waiting takes + /// </summary> +// private readonly WaitQueue _waitingConsumers; + + /** + * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below. + * These queues have all transient fields, but are serializable + * in order to recover fairness settings when deserialized. + */ + internal abstract class WaitQueue + { + /** Creates, adds, and returns node for x. */ + internal abstract Node Enq(Object x); + /** Removes and returns node, or null if empty. */ + internal abstract Node Deq(); + /** Removes a cancelled node to avoid garbage retention. */ + internal abstract void Unlink(Node node); + /** Returns true if a cancelled node might be on queue. */ + internal abstract bool ShouldUnlink(Node node); + } + + /** + * FIFO queue to hold waiting puts/takes. + */ + sealed class FifoWaitQueue : WaitQueue + { + private Node head; + private Node last; + + internal override Node Enq(Object x) + { + Node p = new Node(x); + if (last == null) + { + last = head = p; + } + else + { + last = last.next = p; + } + return p; + } + + internal override Node Deq() + { + Node p = head; + if (p != null) + { + if ((head = p.next) == null) + { + last = null; + } + p.next = null; + } + return p; + } + + internal override bool ShouldUnlink(Node node) + { + return (node == last || node.next != null); + } + + internal override void Unlink(Node node) + { + Node p = head; + Node trail = null; + while (p != null) + { + if (p == node) + { + Node next = p.next; + if (trail == null) + { + head = next; + } + else + { + trail.next = next; + } + if (last == node) + { + last = trail; + } + break; + } + trail = p; + p = p.next; + } + } + } + + /** + * LIFO queue to hold waiting puts/takes. + */ + sealed class LifoWaitQueue : WaitQueue + { + private Node head; + + internal override Node Enq(Object x) + { + return head = new Node(x, head); + } + + internal override Node Deq() + { + Node p = head; + if (p != null) + { + head = p.next; + p.next = null; + } + return p; + } + + internal override bool ShouldUnlink(Node node) + { + // Return false if already dequeued or is bottom node (in which + // case we might retain at most one garbage node) + return (node == head || node.next != null); + } + + internal override void Unlink(Node node) + { + Node p = head; + Node trail = null; + while (p != null) + { + if (p == node) + { + Node next = p.next; + if (trail == null) + head = next; + else + trail.next = next; + break; + } + trail = p; + p = p.next; + } + } + } + + /** + * Nodes each maintain an item and handle waits and signals for + * getting and setting it. The class extends + * AbstractQueuedSynchronizer to manage blocking, using AQS state + * 0 for waiting, 1 for ack, -1 for cancelled. + */ + sealed internal class Node + { + + /** Synchronization state value representing that node acked */ + private const int ACK = 1; + /** Synchronization state value representing that node cancelled */ + private const int CANCEL = -1; + + internal int state = 0; + + /** The item being transferred */ + internal Object item; + /** Next node in wait queue */ + internal Node next; + + /** Creates a node with initial item */ + internal Node(Object x) + { + item = x; + } + + /** Creates a node with initial item and next */ + internal Node(Object x, Node n) + { + item = x; + next = n; + } + + /** + * Takes item and nulls out field (for sake of GC) + * + * PRE: lock owned + */ + private Object Extract() + { + Object x = item; + item = null; + return x; + } + + /** + * Tries to cancel on interrupt; if so rethrowing, + * else setting interrupt state + * + * PRE: lock owned + */ + /*private void checkCancellationOnInterrupt(InterruptedException ie) + throws InterruptedException + { + if (state == 0) { + state = CANCEL; + notify(); + throw ie; + } + Thread.currentThread().interrupt(); + }*/ + + /** + * Fills in the slot created by the consumer and signal consumer to + * continue. + */ + internal bool SetItem(Object x) + { + lock (this) + { + if (state != 0) return false; + item = x; + state = ACK; + Monitor.Pulse(this); + return true; + } + } + + /** + * Removes item from slot created by producer and signal producer + * to continue. + */ + internal Object GetItem() + { + if (state != 0) return null; + state = ACK; + Monitor.Pulse(this); + return Extract(); + } + + /** + * Waits for a consumer to take item placed by producer. + */ + internal void WaitForTake() //throws InterruptedException { + { + while (state == 0) + { + Monitor.Wait(this); + } + } + + /** + * Waits for a producer to put item placed by consumer. + */ + internal object WaitForPut() + { + lock (this) + { + while (state == 0) Monitor.Wait(this); + } + return Extract(); + } + + private bool Attempt(long nanos) + { + if (state != 0) return true; + if (nanos <= 0) { + state = CANCEL; + Monitor.Pulse(this); + return false; + } + + while (true) + { + Monitor.Wait(nanos); + //TimeUnit.NANOSECONDS.timedWait(this, nanos); + if (state != 0) + { + return true; + } + //nanos = deadline - Utils.nanoTime(); + //if (nanos <= 0) + else + { + state = CANCEL; + Monitor.Pulse(this); + return false; + } + } + } + + /** + * Waits for a consumer to take item placed by producer or time out. + */ + internal bool WaitForTake(long nanos) + { + return Attempt(nanos); + } + + /** + * Waits for a producer to put item placed by consumer, or time out. + */ + internal object WaitForPut(long nanos) + { + if (!Attempt(nanos)) + { + return null; + } + else + { + return Extract(); + } + } + } + + public SynchronousQueue(bool strict) + { + // TODO !!!! + } + + public override bool EnqueueNoThrow(object e) + { + throw new NotImplementedException(); + } + + public override void EnqueueBlocking(object e) + { + throw new NotImplementedException(); + } + + public override object DequeueBlocking() + { + throw new NotImplementedException(); + } + + public override int RemainingCapacity + { + get + { + throw new NotImplementedException(); + } + } + } +} |