summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Common/Collections
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/Qpid.Common/Collections')
-rw-r--r--qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs95
-rw-r--r--qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs113
-rw-r--r--qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs384
-rw-r--r--qpid/dotnet/Qpid.Common/Collections/LinkedHashtable.cs327
-rw-r--r--qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs375
5 files changed, 1294 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs b/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs
new file mode 100644
index 0000000000..dcfacf8474
--- /dev/null
+++ b/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+
+namespace Apache.Qpid.Collections
+{
+ public abstract class BlockingQueue : Queue
+ {
+ /**
+ * Inserts the specified element into this queue if it is possible to do
+ * so immediately without violating capacity restrictions, returning
+ * <tt>true</tt> upon success and <tt>false</tt> if no space is currently
+ * available. When using a capacity-restricted queue, this method is
+ * generally preferable to {@link #add}, which can fail to insert an
+ * element only by throwing an exception.
+ *
+ * @param e the element to add
+ * @return <tt>true</tt> if the element was added to this queue, else
+ * <tt>false</tt>
+ * @throws ClassCastException if the class of the specified element
+ * prevents it from being added to this queue
+ * @throws NullPointerException if the specified element is null
+ * @throws IllegalArgumentException if some property of the specified
+ * element prevents it from being added to this queue
+ */
+ public abstract bool EnqueueNoThrow(Object e);
+
+ /**
+ * Inserts the specified element into this queue, waiting if necessary
+ * for space to become available.
+ *
+ * @param e the element to add
+ * @throws InterruptedException if interrupted while waiting
+ * @throws ClassCastException if the class of the specified element
+ * prevents it from being added to this queue
+ * @throws NullPointerException if the specified element is null
+ * @throws IllegalArgumentException if some property of the specified
+ * element prevents it from being added to this queue
+ */
+ public abstract void EnqueueBlocking(object e);
+
+ /**
+ * Retrieves and removes the head of this queue, waiting up to the
+ * specified wait time if necessary for an element to become available.
+ *
+ * @param timeout how long to wait before giving up, in units of
+ * <tt>unit</tt>
+ * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+ * <tt>timeout</tt> parameter
+ * @return the head of this queue, or <tt>null</tt> if the
+ * specified waiting time elapses before an element is available
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public abstract object DequeueBlocking();
+
+ /**
+ * Returns the number of additional elements that this queue can ideally
+ * (in the absence of memory or resource constraints) accept without
+ * blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic
+ * limit.
+ *
+ * <p>Note that you <em>cannot</em> always tell if an attempt to insert
+ * an element will succeed by inspecting <tt>remainingCapacity</tt>
+ * because it may be the case that another thread is about to
+ * insert or remove an element.
+ *
+ * @return the remaining capacity
+ */
+ public abstract int RemainingCapacity
+ {
+ get;
+ }
+ }
+}
+
+
diff --git a/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs b/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
new file mode 100644
index 0000000000..131f316da6
--- /dev/null
+++ b/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Threading;
+
+
+namespace Apache.Qpid.Collections
+{
+ /// <summary>
+ /// Simple FIFO queue to support multi-threaded consumer
+ /// and producers. It supports timeouts in dequeue operations.
+ /// </summary>
+ public sealed class ConsumerProducerQueue
+ {
+ private Queue _queue = new Queue();
+ private WaitSemaphore _semaphore = new WaitSemaphore();
+
+ /// <summary>
+ /// Put an item into the tail of the queue
+ /// </summary>
+ /// <param name="item"></param>
+ public void Enqueue(object item)
+ {
+ lock ( _queue.SyncRoot )
+ {
+ _queue.Enqueue(item);
+ _semaphore.Increment();
+ }
+ }
+
+ /// <summary>
+ /// Wait indefinitely for an item to be available
+ /// on the queue.
+ /// </summary>
+ /// <returns>The object at the head of the queue</returns>
+ public object Dequeue()
+ {
+ return Dequeue(Timeout.Infinite);
+ }
+
+ /// <summary>
+ /// Wait up to the number of milliseconds specified
+ /// for an item to be available on the queue
+ /// </summary>
+ /// <param name="timeout">Number of milliseconds to wait</param>
+ /// <returns>The object at the head of the queue, or null
+ /// if the timeout expires</returns>
+ public object Dequeue(long timeout)
+ {
+ if ( _semaphore.Decrement(timeout) )
+ {
+ lock ( _queue.SyncRoot )
+ {
+ return _queue.Dequeue();
+ }
+ }
+ return null;
+ }
+
+ #region Simple Semaphore
+ //
+ // Simple Semaphore
+ //
+
+ class WaitSemaphore
+ {
+ private int _count;
+ private AutoResetEvent _event = new AutoResetEvent(false);
+
+ public void Increment()
+ {
+ Interlocked.Increment(ref _count);
+ _event.Set();
+ }
+
+ public bool Decrement(long timeout)
+ {
+ if ( timeout > int.MaxValue )
+ throw new ArgumentOutOfRangeException("timeout", timeout, "Must be <= Int32.MaxValue");
+
+ int millis = (int) (timeout & 0x7FFFFFFF);
+ if ( Interlocked.Decrement(ref _count) > 0 )
+ {
+ // there are messages in queue, so no need to wait
+ return true;
+ } else
+ {
+ return _event.WaitOne(millis, false);
+ }
+ }
+ }
+ #endregion // Simple Semaphore
+ }
+}
diff --git a/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs b/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs
new file mode 100644
index 0000000000..be92576951
--- /dev/null
+++ b/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs
@@ -0,0 +1,384 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+
+namespace Apache.Qpid.Collections
+{
+ public class LinkedBlockingQueue : BlockingQueue
+ {
+
+ /*
+ * A variant of the "two lock queue" algorithm. The putLock gates
+ * entry to put (and offer), and has an associated condition for
+ * waiting puts. Similarly for the takeLock. The "count" field
+ * that they both rely on is maintained as an atomic to avoid
+ * needing to get both locks in most cases. Also, to minimize need
+ * for puts to get takeLock and vice-versa, cascading notifies are
+ * used. When a put notices that it has enabled at least one take,
+ * it signals taker. That taker in turn signals others if more
+ * items have been entered since the signal. And symmetrically for
+ * takes signalling puts. Operations such as remove(Object) and
+ * iterators acquire both locks.
+ */
+
+ /**
+ * Linked list node class
+ */
+ internal class Node
+ {
+ /** The item, volatile to ensure barrier separating write and read */
+ internal volatile Object item;
+ internal Node next;
+ internal Node(Object x) { item = x; }
+ }
+
+ /** The capacity bound, or Integer.MAX_VALUE if none */
+ private readonly int capacity;
+
+ /** Current number of elements */
+ private volatile int count = 0;
+
+ /** Head of linked list */
+ private Node head;
+
+ /** Tail of linked list */
+ private Node last;
+
+ /** Lock held by take, poll, etc */
+ private readonly object takeLock = new Object(); //new SerializableLock();
+
+ /** Lock held by put, offer, etc */
+ private readonly object putLock = new Object();//new SerializableLock();
+
+ /**
+ * Signals a waiting take. Called only from put/offer (which do not
+ * otherwise ordinarily lock takeLock.)
+ */
+ private void SignalNotEmpty()
+ {
+ lock (takeLock)
+ {
+ Monitor.Pulse(takeLock);
+ }
+ }
+
+ /**
+ * Signals a waiting put. Called only from take/poll.
+ */
+ private void SignalNotFull()
+ {
+ lock (putLock)
+ {
+ Monitor.Pulse(putLock);
+ }
+ }
+
+ /**
+ * Creates a node and links it at end of queue.
+ * @param x the item
+ */
+ private void Insert(Object x)
+ {
+ last = last.next = new Node(x);
+ }
+
+ /**
+ * Removes a node from head of queue,
+ * @return the node
+ */
+ private Object Extract()
+ {
+ Node first = head.next;
+ head = first;
+ Object x = first.item;
+ first.item = null;
+ return x;
+ }
+
+
+ /**
+ * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
+ * {@link Integer#MAX_VALUE}.
+ */
+ public LinkedBlockingQueue() : this(Int32.MaxValue)
+ {
+ }
+
+ /**
+ * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
+ *
+ * @param capacity the capacity of this queue
+ * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
+ * than zero
+ */
+ public LinkedBlockingQueue(int capacity)
+ {
+ if (capacity <= 0) throw new ArgumentException("Capacity must be positive, was passed " + capacity);
+ this.capacity = capacity;
+ last = head = new Node(null);
+ }
+
+ // this doc comment is overridden to remove the reference to collections
+ // greater in size than Integer.MAX_VALUE
+ /**
+ * Returns the number of elements in this queue.
+ *
+ * @return the number of elements in this queue
+ */
+ public int Size
+ {
+ get
+ {
+ return count;
+ }
+ }
+
+ // this doc comment is a modified copy of the inherited doc comment,
+ // without the reference to unlimited queues.
+ /**
+ * Returns the number of additional elements that this queue can ideally
+ * (in the absence of memory or resource constraints) accept without
+ * blocking. This is always equal to the initial capacity of this queue
+ * less the current <tt>size</tt> of this queue.
+ *
+ * <p>Note that you <em>cannot</em> always tell if an attempt to insert
+ * an element will succeed by inspecting <tt>remainingCapacity</tt>
+ * because it may be the case that another thread is about to
+ * insert or remove an element.
+ */
+ public override int RemainingCapacity
+ {
+ get
+ {
+ return capacity - count;
+ }
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue, waiting if
+ * necessary for space to become available.
+ *
+ * @throws InterruptedException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public override void EnqueueBlocking(Object e)
+ {
+ if (e == null) throw new ArgumentNullException("Object must not be null");
+ // Note: convention in all put/take/etc is to preset
+ // local var holding count negative to indicate failure unless set.
+ int c = -1;
+ lock (putLock)
+ {
+ /*
+ * Note that count is used in wait guard even though it is
+ * not protected by lock. This works because count can
+ * only decrease at this point (all other puts are shut
+ * out by lock), and we (or some other waiting put) are
+ * signalled if it ever changes from
+ * capacity. Similarly for all other uses of count in
+ * other wait guards.
+ */
+ while (count == capacity)
+ {
+ Monitor.Wait(putLock);
+ }
+
+ Insert(e);
+ lock(this)
+ {
+ c = count++;
+ }
+ if (c + 1 < capacity)
+ {
+ Monitor.Pulse(putLock);
+ }
+ }
+
+ if (c == 0)
+ {
+ SignalNotEmpty();
+ }
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue if it is
+ * possible to do so immediately without exceeding the queue's capacity,
+ * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
+ * is full.
+ * When using a capacity-restricted queue, this method is generally
+ * preferable to method {@link BlockingQueue#add add}, which can fail to
+ * insert an element only by throwing an exception.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public override bool EnqueueNoThrow(Object e)
+ {
+ if (e == null) throw new ArgumentNullException("e must not be null");
+ if (count == capacity)
+ {
+ return false;
+ }
+ int c = -1;
+ lock (putLock)
+ {
+ if (count < capacity)
+ {
+ Insert(e);
+ lock (this)
+ {
+ c = count++;
+ }
+ if (c + 1 < capacity)
+ {
+ Monitor.Pulse(putLock);
+ }
+ }
+ }
+ if (c == 0)
+ {
+ SignalNotEmpty();
+ }
+ return c >= 0;
+ }
+
+ /**
+ * Retrieves and removes the head of this queue, waiting if necessary
+ * until an element becomes available.
+ *
+ * @return the head of this queue
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public override Object DequeueBlocking()
+ {
+ Object x;
+ int c = -1;
+ lock (takeLock)
+ {
+
+ while (count == 0)
+ {
+ Monitor.Wait(takeLock);
+ }
+
+
+ x = Extract();
+ lock (this) { c = count--; }
+ if (c > 1)
+ {
+ Monitor.Pulse(takeLock);
+ }
+ }
+ if (c == capacity)
+ {
+ SignalNotFull();
+ }
+ return x;
+ }
+
+ public Object Poll()
+ {
+ if (count == 0)
+ {
+ return null;
+ }
+ Object x = null;
+ int c = -1;
+ lock (takeLock)
+ {
+ if (count > 0)
+ {
+ x = Extract();
+ lock (this) { c = count--; }
+ if (c > 1)
+ {
+ Monitor.Pulse(takeLock);
+ }
+ }
+ }
+ if (c == capacity)
+ {
+ SignalNotFull();
+ }
+ return x;
+ }
+
+
+ public override Object Peek()
+ {
+ if (count == 0)
+ {
+ return null;
+ }
+ lock (takeLock)
+ {
+ Node first = head.next;
+ if (first == null)
+ {
+ return null;
+ }
+ else
+ {
+ return first.item;
+ }
+ }
+ }
+
+ public override String ToString()
+ {
+ lock (putLock)
+ {
+ lock (takeLock)
+ {
+ return base.ToString();
+ }
+ }
+ }
+
+ /**
+ * Atomically removes all of the elements from this queue.
+ * The queue will be empty after this call returns.
+ */
+ public override void Clear()
+ {
+ lock (putLock)
+ {
+ lock (takeLock)
+ {
+ head.next = null;
+ last = head;
+ int c;
+ lock (this)
+ {
+ c = count;
+ count = 0;
+ }
+ if (c == capacity)
+ {
+ Monitor.PulseAll(putLock);
+ }
+ }
+ }
+ }
+ }
+}
+
+
diff --git a/qpid/dotnet/Qpid.Common/Collections/LinkedHashtable.cs b/qpid/dotnet/Qpid.Common/Collections/LinkedHashtable.cs
new file mode 100644
index 0000000000..10ab5c674d
--- /dev/null
+++ b/qpid/dotnet/Qpid.Common/Collections/LinkedHashtable.cs
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+
+namespace Apache.Qpid.Collections
+{
+ public class LinkedHashtable : IDictionary
+ {
+ /// <summary>
+ /// Maps from key to LinkedDictionaryEntry
+ /// </summary>
+ private Hashtable _indexedValues = new Hashtable();
+
+ private LinkedDictionaryEntry _head;
+
+ private LinkedDictionaryEntry _tail;
+
+ private class LinkedDictionaryEntry
+ {
+ public LinkedDictionaryEntry _previous;
+ public LinkedDictionaryEntry _next;
+ internal DictionaryEntry _entry;
+
+ public LinkedDictionaryEntry(object key, object value)
+ {
+ _entry = new DictionaryEntry(key, value);
+ }
+ }
+
+ public object this[object key]
+ {
+ get
+ {
+ LinkedDictionaryEntry entry = (LinkedDictionaryEntry)_indexedValues[key];
+ if (entry == null)
+ {
+ return null; // key not found
+ }
+ else
+ {
+ return entry._entry.Value;
+ }
+ }
+
+ set
+ {
+ LinkedDictionaryEntry entry = (LinkedDictionaryEntry)_indexedValues[key];
+ if (entry == null)
+ {
+ Add(key, value);
+ }
+ else
+ {
+ entry._entry.Value = value;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Collect keys in linked order.
+ /// </summary>
+ public ICollection Keys
+ {
+ get
+ {
+ IList result = new ArrayList();
+ foreach (DictionaryEntry entry in this)
+ {
+ result.Add(entry.Key);
+ }
+ return result;
+ }
+ }
+
+ /// <summary>
+ /// Collect values in linked order.
+ /// </summary>
+ public ICollection Values
+ {
+ get
+ {
+ IList result = new ArrayList();
+ foreach (DictionaryEntry entry in this)
+ {
+ result.Add(entry.Value);
+ }
+ return result;
+ }
+ }
+
+ public bool IsReadOnly
+ {
+ get { return _indexedValues.IsReadOnly; }
+ }
+
+ public bool IsFixedSize
+ {
+ get { return _indexedValues.IsFixedSize; }
+ }
+
+ public bool Contains(object key)
+ {
+ return _indexedValues.Contains(key);
+ }
+
+ public void Add(object key, object value)
+ {
+ if (key == null) throw new ArgumentNullException("key");
+
+ if (Contains(key))
+ {
+ throw new ArgumentException("LinkedHashtable already contains key. key=" + key);
+ }
+
+ LinkedDictionaryEntry de = new LinkedDictionaryEntry(key, value);
+ if (_head == null)
+ {
+ _head = de;
+ _tail = de;
+ }
+ else
+ {
+ _tail._next = de;
+ de._previous = _tail;
+ _tail = de;
+ }
+ _indexedValues[key] = de;
+ }
+
+ public void Clear()
+ {
+ _indexedValues.Clear();
+ }
+
+ IDictionaryEnumerator IDictionary.GetEnumerator()
+ {
+ return new LHTEnumerator(this);
+ }
+
+ public void Remove(object key)
+ {
+ if (key == null) throw new ArgumentNullException("key");
+
+ LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key];
+ if (de == null) return; // key not found.
+ LinkedDictionaryEntry prev = de._previous;
+ if (prev == null)
+ {
+ _head = de._next;
+ }
+ else
+ {
+ prev._next = de._next;
+ }
+
+ LinkedDictionaryEntry next = de._next;
+ if (next == null)
+ {
+ _tail = de;
+ }
+ else
+ {
+ next._previous = de._previous;
+ }
+ }
+
+ private LinkedDictionaryEntry Head
+ {
+ get
+ {
+ return _head;
+ }
+ }
+
+// private LinkedDictionaryEntry Tail
+// {
+// get
+// {
+// return _tail;
+// }
+// }
+
+ private class LHTEnumerator : IDictionaryEnumerator
+ {
+ private LinkedHashtable _container;
+
+ private LinkedDictionaryEntry _current;
+
+ /// <summary>
+ /// Set once we have navigated off the end of the collection
+ /// </summary>
+ private bool _needsReset = false;
+
+ public LHTEnumerator(LinkedHashtable container)
+ {
+ _container = container;
+ }
+
+ public object Current
+ {
+ get
+ {
+ if (_current == null)
+ {
+ throw new Exception("Iterator before first element");
+ }
+ else
+ {
+ return _current._entry;
+ }
+ }
+ }
+
+ public object Key
+ {
+ get { return _current._entry.Key; }
+ }
+
+ public object Value
+ {
+ get { return _current._entry.Value; }
+ }
+
+ public DictionaryEntry Entry
+ {
+ get
+ {
+ return _current._entry;
+ }
+ }
+
+ public bool MoveNext()
+ {
+ if (_needsReset)
+ {
+ return false;
+ }
+ else if (_current == null)
+ {
+ _current = _container.Head;
+ }
+ else
+ {
+ _current = _current._next;
+ }
+ _needsReset = (_current == null);
+ return !_needsReset;
+ }
+
+ public void Reset()
+ {
+ _current = null;
+ _needsReset = false;
+ }
+ }
+
+ public void MoveToHead(object key)
+ {
+ LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key];
+ if (de == null)
+ {
+ throw new ArgumentException("Key " + key + " not found");
+ }
+ // if the head is the element then there is nothing to do
+ if (_head == de)
+ {
+ return;
+ }
+ de._previous._next = de._next;
+ if (de._next != null)
+ {
+ de._next._previous = de._previous;
+ }
+ else
+ {
+ _tail = de._previous;
+ }
+ de._next = _head;
+ _head = de;
+ de._previous = null;
+ }
+
+ public void CopyTo(Array array, int index)
+ {
+ _indexedValues.CopyTo(array, index);
+ }
+
+ public int Count
+ {
+ get { return _indexedValues.Count; }
+ }
+
+ public object SyncRoot
+ {
+ get { return _indexedValues.SyncRoot; }
+ }
+
+ public bool IsSynchronized
+ {
+ get { return _indexedValues.IsSynchronized; }
+ }
+
+ public IEnumerator GetEnumerator()
+ {
+ return new LHTEnumerator(this);
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs b/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
new file mode 100644
index 0000000000..3c12df6067
--- /dev/null
+++ b/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
@@ -0,0 +1,375 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+
+namespace Apache.Qpid.Collections
+{
+ public class SynchronousQueue : BlockingQueue
+ {
+ /// <summary>
+ /// Lock protecting both wait queues
+ /// </summary>
+// private readonly object _qlock = new object();
+
+ /// <summary>
+ /// Queue holding waiting puts
+ /// </summary>
+// private readonly WaitQueue _waitingProducers;
+
+ /// <summary>
+ /// Queue holding waiting takes
+ /// </summary>
+// private readonly WaitQueue _waitingConsumers;
+
+ /**
+ * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
+ * These queues have all transient fields, but are serializable
+ * in order to recover fairness settings when deserialized.
+ */
+ internal abstract class WaitQueue
+ {
+ /** Creates, adds, and returns node for x. */
+ internal abstract Node Enq(Object x);
+ /** Removes and returns node, or null if empty. */
+ internal abstract Node Deq();
+ /** Removes a cancelled node to avoid garbage retention. */
+ internal abstract void Unlink(Node node);
+ /** Returns true if a cancelled node might be on queue. */
+ internal abstract bool ShouldUnlink(Node node);
+ }
+
+ /**
+ * FIFO queue to hold waiting puts/takes.
+ */
+ sealed class FifoWaitQueue : WaitQueue
+ {
+ private Node head;
+ private Node last;
+
+ internal override Node Enq(Object x)
+ {
+ Node p = new Node(x);
+ if (last == null)
+ {
+ last = head = p;
+ }
+ else
+ {
+ last = last.next = p;
+ }
+ return p;
+ }
+
+ internal override Node Deq()
+ {
+ Node p = head;
+ if (p != null)
+ {
+ if ((head = p.next) == null)
+ {
+ last = null;
+ }
+ p.next = null;
+ }
+ return p;
+ }
+
+ internal override bool ShouldUnlink(Node node)
+ {
+ return (node == last || node.next != null);
+ }
+
+ internal override void Unlink(Node node)
+ {
+ Node p = head;
+ Node trail = null;
+ while (p != null)
+ {
+ if (p == node)
+ {
+ Node next = p.next;
+ if (trail == null)
+ {
+ head = next;
+ }
+ else
+ {
+ trail.next = next;
+ }
+ if (last == node)
+ {
+ last = trail;
+ }
+ break;
+ }
+ trail = p;
+ p = p.next;
+ }
+ }
+ }
+
+ /**
+ * LIFO queue to hold waiting puts/takes.
+ */
+ sealed class LifoWaitQueue : WaitQueue
+ {
+ private Node head;
+
+ internal override Node Enq(Object x)
+ {
+ return head = new Node(x, head);
+ }
+
+ internal override Node Deq()
+ {
+ Node p = head;
+ if (p != null)
+ {
+ head = p.next;
+ p.next = null;
+ }
+ return p;
+ }
+
+ internal override bool ShouldUnlink(Node node)
+ {
+ // Return false if already dequeued or is bottom node (in which
+ // case we might retain at most one garbage node)
+ return (node == head || node.next != null);
+ }
+
+ internal override void Unlink(Node node)
+ {
+ Node p = head;
+ Node trail = null;
+ while (p != null)
+ {
+ if (p == node)
+ {
+ Node next = p.next;
+ if (trail == null)
+ head = next;
+ else
+ trail.next = next;
+ break;
+ }
+ trail = p;
+ p = p.next;
+ }
+ }
+ }
+
+ /**
+ * Nodes each maintain an item and handle waits and signals for
+ * getting and setting it. The class extends
+ * AbstractQueuedSynchronizer to manage blocking, using AQS state
+ * 0 for waiting, 1 for ack, -1 for cancelled.
+ */
+ sealed internal class Node
+ {
+
+ /** Synchronization state value representing that node acked */
+ private const int ACK = 1;
+ /** Synchronization state value representing that node cancelled */
+ private const int CANCEL = -1;
+
+ internal int state = 0;
+
+ /** The item being transferred */
+ internal Object item;
+ /** Next node in wait queue */
+ internal Node next;
+
+ /** Creates a node with initial item */
+ internal Node(Object x)
+ {
+ item = x;
+ }
+
+ /** Creates a node with initial item and next */
+ internal Node(Object x, Node n)
+ {
+ item = x;
+ next = n;
+ }
+
+ /**
+ * Takes item and nulls out field (for sake of GC)
+ *
+ * PRE: lock owned
+ */
+ private Object Extract()
+ {
+ Object x = item;
+ item = null;
+ return x;
+ }
+
+ /**
+ * Tries to cancel on interrupt; if so rethrowing,
+ * else setting interrupt state
+ *
+ * PRE: lock owned
+ */
+ /*private void checkCancellationOnInterrupt(InterruptedException ie)
+ throws InterruptedException
+ {
+ if (state == 0) {
+ state = CANCEL;
+ notify();
+ throw ie;
+ }
+ Thread.currentThread().interrupt();
+ }*/
+
+ /**
+ * Fills in the slot created by the consumer and signal consumer to
+ * continue.
+ */
+ internal bool SetItem(Object x)
+ {
+ lock (this)
+ {
+ if (state != 0) return false;
+ item = x;
+ state = ACK;
+ Monitor.Pulse(this);
+ return true;
+ }
+ }
+
+ /**
+ * Removes item from slot created by producer and signal producer
+ * to continue.
+ */
+ internal Object GetItem()
+ {
+ if (state != 0) return null;
+ state = ACK;
+ Monitor.Pulse(this);
+ return Extract();
+ }
+
+ /**
+ * Waits for a consumer to take item placed by producer.
+ */
+ internal void WaitForTake() //throws InterruptedException {
+ {
+ while (state == 0)
+ {
+ Monitor.Wait(this);
+ }
+ }
+
+ /**
+ * Waits for a producer to put item placed by consumer.
+ */
+ internal object WaitForPut()
+ {
+ lock (this)
+ {
+ while (state == 0) Monitor.Wait(this);
+ }
+ return Extract();
+ }
+
+ private bool Attempt(long nanos)
+ {
+ if (state != 0) return true;
+ if (nanos <= 0) {
+ state = CANCEL;
+ Monitor.Pulse(this);
+ return false;
+ }
+
+ while (true)
+ {
+ Monitor.Wait(nanos);
+ //TimeUnit.NANOSECONDS.timedWait(this, nanos);
+ if (state != 0)
+ {
+ return true;
+ }
+ //nanos = deadline - Utils.nanoTime();
+ //if (nanos <= 0)
+ else
+ {
+ state = CANCEL;
+ Monitor.Pulse(this);
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Waits for a consumer to take item placed by producer or time out.
+ */
+ internal bool WaitForTake(long nanos)
+ {
+ return Attempt(nanos);
+ }
+
+ /**
+ * Waits for a producer to put item placed by consumer, or time out.
+ */
+ internal object WaitForPut(long nanos)
+ {
+ if (!Attempt(nanos))
+ {
+ return null;
+ }
+ else
+ {
+ return Extract();
+ }
+ }
+ }
+
+ public SynchronousQueue(bool strict)
+ {
+ // TODO !!!!
+ }
+
+ public override bool EnqueueNoThrow(object e)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void EnqueueBlocking(object e)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override object DequeueBlocking()
+ {
+ throw new NotImplementedException();
+ }
+
+ public override int RemainingCapacity
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+ }
+}