diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs | 375 |
1 files changed, 375 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs b/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs new file mode 100644 index 0000000000..3c12df6067 --- /dev/null +++ b/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs @@ -0,0 +1,375 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; + +namespace Apache.Qpid.Collections +{ + public class SynchronousQueue : BlockingQueue + { + /// <summary> + /// Lock protecting both wait queues + /// </summary> +// private readonly object _qlock = new object(); + + /// <summary> + /// Queue holding waiting puts + /// </summary> +// private readonly WaitQueue _waitingProducers; + + /// <summary> + /// Queue holding waiting takes + /// </summary> +// private readonly WaitQueue _waitingConsumers; + + /** + * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below. + * These queues have all transient fields, but are serializable + * in order to recover fairness settings when deserialized. + */ + internal abstract class WaitQueue + { + /** Creates, adds, and returns node for x. */ + internal abstract Node Enq(Object x); + /** Removes and returns node, or null if empty. */ + internal abstract Node Deq(); + /** Removes a cancelled node to avoid garbage retention. */ + internal abstract void Unlink(Node node); + /** Returns true if a cancelled node might be on queue. */ + internal abstract bool ShouldUnlink(Node node); + } + + /** + * FIFO queue to hold waiting puts/takes. + */ + sealed class FifoWaitQueue : WaitQueue + { + private Node head; + private Node last; + + internal override Node Enq(Object x) + { + Node p = new Node(x); + if (last == null) + { + last = head = p; + } + else + { + last = last.next = p; + } + return p; + } + + internal override Node Deq() + { + Node p = head; + if (p != null) + { + if ((head = p.next) == null) + { + last = null; + } + p.next = null; + } + return p; + } + + internal override bool ShouldUnlink(Node node) + { + return (node == last || node.next != null); + } + + internal override void Unlink(Node node) + { + Node p = head; + Node trail = null; + while (p != null) + { + if (p == node) + { + Node next = p.next; + if (trail == null) + { + head = next; + } + else + { + trail.next = next; + } + if (last == node) + { + last = trail; + } + break; + } + trail = p; + p = p.next; + } + } + } + + /** + * LIFO queue to hold waiting puts/takes. + */ + sealed class LifoWaitQueue : WaitQueue + { + private Node head; + + internal override Node Enq(Object x) + { + return head = new Node(x, head); + } + + internal override Node Deq() + { + Node p = head; + if (p != null) + { + head = p.next; + p.next = null; + } + return p; + } + + internal override bool ShouldUnlink(Node node) + { + // Return false if already dequeued or is bottom node (in which + // case we might retain at most one garbage node) + return (node == head || node.next != null); + } + + internal override void Unlink(Node node) + { + Node p = head; + Node trail = null; + while (p != null) + { + if (p == node) + { + Node next = p.next; + if (trail == null) + head = next; + else + trail.next = next; + break; + } + trail = p; + p = p.next; + } + } + } + + /** + * Nodes each maintain an item and handle waits and signals for + * getting and setting it. The class extends + * AbstractQueuedSynchronizer to manage blocking, using AQS state + * 0 for waiting, 1 for ack, -1 for cancelled. + */ + sealed internal class Node + { + + /** Synchronization state value representing that node acked */ + private const int ACK = 1; + /** Synchronization state value representing that node cancelled */ + private const int CANCEL = -1; + + internal int state = 0; + + /** The item being transferred */ + internal Object item; + /** Next node in wait queue */ + internal Node next; + + /** Creates a node with initial item */ + internal Node(Object x) + { + item = x; + } + + /** Creates a node with initial item and next */ + internal Node(Object x, Node n) + { + item = x; + next = n; + } + + /** + * Takes item and nulls out field (for sake of GC) + * + * PRE: lock owned + */ + private Object Extract() + { + Object x = item; + item = null; + return x; + } + + /** + * Tries to cancel on interrupt; if so rethrowing, + * else setting interrupt state + * + * PRE: lock owned + */ + /*private void checkCancellationOnInterrupt(InterruptedException ie) + throws InterruptedException + { + if (state == 0) { + state = CANCEL; + notify(); + throw ie; + } + Thread.currentThread().interrupt(); + }*/ + + /** + * Fills in the slot created by the consumer and signal consumer to + * continue. + */ + internal bool SetItem(Object x) + { + lock (this) + { + if (state != 0) return false; + item = x; + state = ACK; + Monitor.Pulse(this); + return true; + } + } + + /** + * Removes item from slot created by producer and signal producer + * to continue. + */ + internal Object GetItem() + { + if (state != 0) return null; + state = ACK; + Monitor.Pulse(this); + return Extract(); + } + + /** + * Waits for a consumer to take item placed by producer. + */ + internal void WaitForTake() //throws InterruptedException { + { + while (state == 0) + { + Monitor.Wait(this); + } + } + + /** + * Waits for a producer to put item placed by consumer. + */ + internal object WaitForPut() + { + lock (this) + { + while (state == 0) Monitor.Wait(this); + } + return Extract(); + } + + private bool Attempt(long nanos) + { + if (state != 0) return true; + if (nanos <= 0) { + state = CANCEL; + Monitor.Pulse(this); + return false; + } + + while (true) + { + Monitor.Wait(nanos); + //TimeUnit.NANOSECONDS.timedWait(this, nanos); + if (state != 0) + { + return true; + } + //nanos = deadline - Utils.nanoTime(); + //if (nanos <= 0) + else + { + state = CANCEL; + Monitor.Pulse(this); + return false; + } + } + } + + /** + * Waits for a consumer to take item placed by producer or time out. + */ + internal bool WaitForTake(long nanos) + { + return Attempt(nanos); + } + + /** + * Waits for a producer to put item placed by consumer, or time out. + */ + internal object WaitForPut(long nanos) + { + if (!Attempt(nanos)) + { + return null; + } + else + { + return Extract(); + } + } + } + + public SynchronousQueue(bool strict) + { + // TODO !!!! + } + + public override bool EnqueueNoThrow(object e) + { + throw new NotImplementedException(); + } + + public override void EnqueueBlocking(object e) + { + throw new NotImplementedException(); + } + + public override object DequeueBlocking() + { + throw new NotImplementedException(); + } + + public override int RemainingCapacity + { + get + { + throw new NotImplementedException(); + } + } + } +} |