summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs')
-rw-r--r--M4-RCs/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs375
1 files changed, 0 insertions, 375 deletions
diff --git a/M4-RCs/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs b/M4-RCs/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
deleted file mode 100644
index 3c12df6067..0000000000
--- a/M4-RCs/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-
-namespace Apache.Qpid.Collections
-{
- public class SynchronousQueue : BlockingQueue
- {
- /// <summary>
- /// Lock protecting both wait queues
- /// </summary>
-// private readonly object _qlock = new object();
-
- /// <summary>
- /// Queue holding waiting puts
- /// </summary>
-// private readonly WaitQueue _waitingProducers;
-
- /// <summary>
- /// Queue holding waiting takes
- /// </summary>
-// private readonly WaitQueue _waitingConsumers;
-
- /**
- * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
- * These queues have all transient fields, but are serializable
- * in order to recover fairness settings when deserialized.
- */
- internal abstract class WaitQueue
- {
- /** Creates, adds, and returns node for x. */
- internal abstract Node Enq(Object x);
- /** Removes and returns node, or null if empty. */
- internal abstract Node Deq();
- /** Removes a cancelled node to avoid garbage retention. */
- internal abstract void Unlink(Node node);
- /** Returns true if a cancelled node might be on queue. */
- internal abstract bool ShouldUnlink(Node node);
- }
-
- /**
- * FIFO queue to hold waiting puts/takes.
- */
- sealed class FifoWaitQueue : WaitQueue
- {
- private Node head;
- private Node last;
-
- internal override Node Enq(Object x)
- {
- Node p = new Node(x);
- if (last == null)
- {
- last = head = p;
- }
- else
- {
- last = last.next = p;
- }
- return p;
- }
-
- internal override Node Deq()
- {
- Node p = head;
- if (p != null)
- {
- if ((head = p.next) == null)
- {
- last = null;
- }
- p.next = null;
- }
- return p;
- }
-
- internal override bool ShouldUnlink(Node node)
- {
- return (node == last || node.next != null);
- }
-
- internal override void Unlink(Node node)
- {
- Node p = head;
- Node trail = null;
- while (p != null)
- {
- if (p == node)
- {
- Node next = p.next;
- if (trail == null)
- {
- head = next;
- }
- else
- {
- trail.next = next;
- }
- if (last == node)
- {
- last = trail;
- }
- break;
- }
- trail = p;
- p = p.next;
- }
- }
- }
-
- /**
- * LIFO queue to hold waiting puts/takes.
- */
- sealed class LifoWaitQueue : WaitQueue
- {
- private Node head;
-
- internal override Node Enq(Object x)
- {
- return head = new Node(x, head);
- }
-
- internal override Node Deq()
- {
- Node p = head;
- if (p != null)
- {
- head = p.next;
- p.next = null;
- }
- return p;
- }
-
- internal override bool ShouldUnlink(Node node)
- {
- // Return false if already dequeued or is bottom node (in which
- // case we might retain at most one garbage node)
- return (node == head || node.next != null);
- }
-
- internal override void Unlink(Node node)
- {
- Node p = head;
- Node trail = null;
- while (p != null)
- {
- if (p == node)
- {
- Node next = p.next;
- if (trail == null)
- head = next;
- else
- trail.next = next;
- break;
- }
- trail = p;
- p = p.next;
- }
- }
- }
-
- /**
- * Nodes each maintain an item and handle waits and signals for
- * getting and setting it. The class extends
- * AbstractQueuedSynchronizer to manage blocking, using AQS state
- * 0 for waiting, 1 for ack, -1 for cancelled.
- */
- sealed internal class Node
- {
-
- /** Synchronization state value representing that node acked */
- private const int ACK = 1;
- /** Synchronization state value representing that node cancelled */
- private const int CANCEL = -1;
-
- internal int state = 0;
-
- /** The item being transferred */
- internal Object item;
- /** Next node in wait queue */
- internal Node next;
-
- /** Creates a node with initial item */
- internal Node(Object x)
- {
- item = x;
- }
-
- /** Creates a node with initial item and next */
- internal Node(Object x, Node n)
- {
- item = x;
- next = n;
- }
-
- /**
- * Takes item and nulls out field (for sake of GC)
- *
- * PRE: lock owned
- */
- private Object Extract()
- {
- Object x = item;
- item = null;
- return x;
- }
-
- /**
- * Tries to cancel on interrupt; if so rethrowing,
- * else setting interrupt state
- *
- * PRE: lock owned
- */
- /*private void checkCancellationOnInterrupt(InterruptedException ie)
- throws InterruptedException
- {
- if (state == 0) {
- state = CANCEL;
- notify();
- throw ie;
- }
- Thread.currentThread().interrupt();
- }*/
-
- /**
- * Fills in the slot created by the consumer and signal consumer to
- * continue.
- */
- internal bool SetItem(Object x)
- {
- lock (this)
- {
- if (state != 0) return false;
- item = x;
- state = ACK;
- Monitor.Pulse(this);
- return true;
- }
- }
-
- /**
- * Removes item from slot created by producer and signal producer
- * to continue.
- */
- internal Object GetItem()
- {
- if (state != 0) return null;
- state = ACK;
- Monitor.Pulse(this);
- return Extract();
- }
-
- /**
- * Waits for a consumer to take item placed by producer.
- */
- internal void WaitForTake() //throws InterruptedException {
- {
- while (state == 0)
- {
- Monitor.Wait(this);
- }
- }
-
- /**
- * Waits for a producer to put item placed by consumer.
- */
- internal object WaitForPut()
- {
- lock (this)
- {
- while (state == 0) Monitor.Wait(this);
- }
- return Extract();
- }
-
- private bool Attempt(long nanos)
- {
- if (state != 0) return true;
- if (nanos <= 0) {
- state = CANCEL;
- Monitor.Pulse(this);
- return false;
- }
-
- while (true)
- {
- Monitor.Wait(nanos);
- //TimeUnit.NANOSECONDS.timedWait(this, nanos);
- if (state != 0)
- {
- return true;
- }
- //nanos = deadline - Utils.nanoTime();
- //if (nanos <= 0)
- else
- {
- state = CANCEL;
- Monitor.Pulse(this);
- return false;
- }
- }
- }
-
- /**
- * Waits for a consumer to take item placed by producer or time out.
- */
- internal bool WaitForTake(long nanos)
- {
- return Attempt(nanos);
- }
-
- /**
- * Waits for a producer to put item placed by consumer, or time out.
- */
- internal object WaitForPut(long nanos)
- {
- if (!Attempt(nanos))
- {
- return null;
- }
- else
- {
- return Extract();
- }
- }
- }
-
- public SynchronousQueue(bool strict)
- {
- // TODO !!!!
- }
-
- public override bool EnqueueNoThrow(object e)
- {
- throw new NotImplementedException();
- }
-
- public override void EnqueueBlocking(object e)
- {
- throw new NotImplementedException();
- }
-
- public override object DequeueBlocking()
- {
- throw new NotImplementedException();
- }
-
- public override int RemainingCapacity
- {
- get
- {
- throw new NotImplementedException();
- }
- }
- }
-}