summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Common/Collections/LinkedBlockingQueue.cs
blob: be92576951bd112ab2bff96770825de806e77112 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
using System;
using System.Threading;

namespace Apache.Qpid.Collections
{
    public class LinkedBlockingQueue : BlockingQueue
    {             

        /*
         * A variant of the "two lock queue" algorithm.  The putLock gates
         * entry to put (and offer), and has an associated condition for
         * waiting puts.  Similarly for the takeLock.  The "count" field
         * that they both rely on is maintained as an atomic to avoid
         * needing to get both locks in most cases. Also, to minimize need
         * for puts to get takeLock and vice-versa, cascading notifies are
         * used. When a put notices that it has enabled at least one take,
         * it signals taker. That taker in turn signals others if more
         * items have been entered since the signal. And symmetrically for
         * takes signalling puts. Operations such as remove(Object) and
         * iterators acquire both locks.
         */

        /**
         * Linked list node class
         */
        internal class Node
        {
            /** The item, volatile to ensure barrier separating write and read */
            internal volatile Object item;
            internal Node next;
            internal Node(Object x) { item = x; }
        }

        /** The capacity bound, or Integer.MAX_VALUE if none */
        private readonly int capacity;

        /** Current number of elements */
        private volatile int count = 0;

        /** Head of linked list */
        private Node head;

        /** Tail of linked list */
        private Node last;

        /** Lock held by take, poll, etc */
        private readonly object takeLock = new Object(); //new SerializableLock();

        /** Lock held by put, offer, etc */
        private readonly object putLock = new Object();//new SerializableLock();

        /**
         * Signals a waiting take. Called only from put/offer (which do not
         * otherwise ordinarily lock takeLock.)
         */
        private void SignalNotEmpty()
        {
            lock (takeLock)
            {
                Monitor.Pulse(takeLock);
            }
        }

        /**
         * Signals a waiting put. Called only from take/poll.
         */
        private void SignalNotFull()
        {
            lock (putLock)
            {
                Monitor.Pulse(putLock);
            }
        }

        /**
         * Creates a node and links it at end of queue.
         * @param x the item
         */
        private void Insert(Object x)
        {
            last = last.next = new Node(x);
        }

        /**
         * Removes a node from head of queue,
         * @return the node
         */
        private Object Extract()
        {
            Node first = head.next;
            head = first;
            Object x = first.item;
            first.item = null;
            return x;
        }


        /**
         * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
         * {@link Integer#MAX_VALUE}.
         */
        public LinkedBlockingQueue() : this(Int32.MaxValue)
        {            
        }

        /**
         * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
         *
         * @param capacity the capacity of this queue
         * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
         *         than zero
         */
        public LinkedBlockingQueue(int capacity)
        {
            if (capacity <= 0) throw new ArgumentException("Capacity must be positive, was passed " + capacity);
            this.capacity = capacity;
            last = head = new Node(null);
        }        

        // this doc comment is overridden to remove the reference to collections
        // greater in size than Integer.MAX_VALUE
        /**
         * Returns the number of elements in this queue.
         *
         * @return the number of elements in this queue
         */
        public int Size
        {
            get
            {
                return count;
            }            
        }

        // this doc comment is a modified copy of the inherited doc comment,
        // without the reference to unlimited queues.
        /**
         * Returns the number of additional elements that this queue can ideally
         * (in the absence of memory or resource constraints) accept without
         * blocking. This is always equal to the initial capacity of this queue
         * less the current <tt>size</tt> of this queue.
         *
         * <p>Note that you <em>cannot</em> always tell if an attempt to insert
         * an element will succeed by inspecting <tt>remainingCapacity</tt>
         * because it may be the case that another thread is about to
         * insert or remove an element.
         */
        public override int RemainingCapacity
        {
            get
            {
                return capacity - count;
            }            
        }

        /**
         * Inserts the specified element at the tail of this queue, waiting if
         * necessary for space to become available.
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public override void EnqueueBlocking(Object e)
        {
            if (e == null) throw new ArgumentNullException("Object must not be null");
            // Note: convention in all put/take/etc is to preset
            // local var holding count  negative to indicate failure unless set.
            int c = -1;
            lock (putLock) 
            {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from
                 * capacity. Similarly for all other uses of count in
                 * other wait guards.
                 */                
                while (count == capacity)
                {
                    Monitor.Wait(putLock);
                }
                
                Insert(e);
                lock(this)
                {
                    c = count++;
                }
                if (c + 1 < capacity)
                {
                    Monitor.Pulse(putLock);
                }                    
            }

            if (c == 0)
            {
                SignalNotEmpty();
            }
        }
        
        /**
         * Inserts the specified element at the tail of this queue if it is
         * possible to do so immediately without exceeding the queue's capacity,
         * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
         * is full.
         * When using a capacity-restricted queue, this method is generally
         * preferable to method {@link BlockingQueue#add add}, which can fail to
         * insert an element only by throwing an exception.
         *
         * @throws NullPointerException if the specified element is null
         */
        public override bool EnqueueNoThrow(Object e)
        {
            if (e == null) throw new ArgumentNullException("e must not be null");
            if (count == capacity)
            {
                return false;
            }
            int c = -1;
            lock (putLock) 
            {
                if (count < capacity) 
                {
                    Insert(e);
                    lock (this)
                    {
                        c = count++;
                    }
                    if (c + 1 < capacity)
                    {
                        Monitor.Pulse(putLock);
                    }
                }
            }
            if (c == 0)
            {
                SignalNotEmpty();
            }
            return c >= 0;
        }

        /**
         * Retrieves and removes the head of this queue, waiting if necessary
         * until an element becomes available.
         *
         * @return the head of this queue
         * @throws InterruptedException if interrupted while waiting
         */
        public override Object DequeueBlocking()
        {
            Object x;
            int c = -1;
            lock (takeLock) 
            {
                
                while (count == 0)
                {
                    Monitor.Wait(takeLock);
                }
                

                x = Extract();
                lock (this) { c = count--; }
                if (c > 1)
                {
                    Monitor.Pulse(takeLock);
                }
            }
            if (c == capacity)
            {
                SignalNotFull();
            }
            return x;
        }
        
        public Object Poll()
        {
            if (count == 0)
            {
                return null;
            }
            Object x = null;
            int c = -1;
            lock (takeLock) 
            {
                if (count > 0) 
                {
                    x = Extract();
                    lock (this) { c = count--; }
                    if (c > 1)
                    {
                        Monitor.Pulse(takeLock);
                    }
                }
            }
            if (c == capacity)
            {
                SignalNotFull();
            }
            return x;
        }


        public override Object Peek()
        {
            if (count == 0)
            {
                return null;
            }
            lock (takeLock) 
            {
                Node first = head.next;
                if (first == null)
                {
                    return null;
                }
                else
                {
                    return first.item;
                }
            }
        }
        
        public override String ToString()
        {
            lock (putLock) 
            {
                lock (takeLock) 
                {
                    return base.ToString();
                }
            }
        }

        /**
         * Atomically removes all of the elements from this queue.
         * The queue will be empty after this call returns.
         */
        public override void Clear()
        {
            lock (putLock) 
            {
                lock (takeLock) 
                {
                    head.next = null;                
                    last = head;
                    int c;
                    lock (this) 
                    {
                        c = count;
                        count = 0;
                    }
                    if (c == capacity)
                    {
                        Monitor.PulseAll(putLock);
                    }
                }
            }
        }                       
    }
}