summaryrefslogtreecommitdiff
path: root/rts/parallel/WSDeque.c
blob: 6dfabd927a470df08a6043eefceffb3f55fcad99 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
/* -----------------------------------------------------------------------------
 *
 * (c) The GHC Team, 2009
 *
 * Work-stealing Deque data structure
 * 
 * The implementation uses Double-Ended Queues with lock-free access
 * (thereby often called "deque") as described in
 *
 * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
 * SPAA'05, July 2005, Las Vegas, USA.
 * ACM 1-58113-986-1/05/0007
 *
 * Author: Jost Berthold MSRC 07-09/2008
 *
 * The DeQue is held as a circular array with known length. Positions
 * of top (read-end) and bottom (write-end) always increase, and the
 * array is accessed with indices modulo array-size. While this bears
 * the risk of overflow, we assume that (with 64 bit indices), a
 * program must run very long to reach that point.
 * 
 * The write end of the queue (position bottom) can only be used with
 * mutual exclusion, i.e. by exactly one caller at a time.  At this
 * end, new items can be enqueued using pushBottom()/newSpark(), and
 * removed using popBottom()/reclaimSpark() (the latter implying a cas
 * synchronisation with potential concurrent readers for the case of
 * just one element).
 * 
 * Multiple readers can steal from the read end (position top), and
 * are synchronised without a lock, based on a cas of the top
 * position. One reader wins, the others return NULL for a failure.
 * 
 * Both popWSDeque and stealWSDeque also return NULL when the queue is empty.
 *
 * Testing: see testsuite/tests/ghc-regress/rts/testwsdeque.c.  If
 * there's anything wrong with the deque implementation, this test
 * will probably catch it.
 * 
 * ---------------------------------------------------------------------------*/

#include "Rts.h"
#include "RtsUtils.h"
#include "WSDeque.h"
#include "SMP.h" // for cas

#if defined(THREADED_RTS)

#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))

/* -----------------------------------------------------------------------------
 * newWSDeque
 * -------------------------------------------------------------------------- */

/* internal helpers ... */

static StgWord
roundUp2(StgWord val)
{
    StgWord rounded = 1;
    
    /* StgWord is unsigned anyway, only catch 0 */
    if (val == 0) {
        barf("DeQue,roundUp2: invalid size 0 requested");
    }
    /* at least 1 bit set, shift up to its place */
    do {
        rounded = rounded << 1;
    } while (0 != (val = val>>1));
    return rounded;
}

WSDeque *
newWSDeque (nat size)
{
    StgWord realsize; 
    WSDeque *q;
    
    realsize = roundUp2(size); /* to compute modulo as a bitwise & */
    
    q = (WSDeque*) stgMallocBytes(sizeof(WSDeque),   /* admin fields */
                                  "newWSDeque");
    q->elements = stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
                                 "newWSDeque:data space");
    q->top=0;
    q->bottom=0;
    q->topBound=0; /* read by writer, updated each time top is read */
    
    q->size = realsize;  /* power of 2 */
    q->moduloSize = realsize - 1; /* n % size == n & moduloSize  */
    
    ASSERT_WSDEQUE_INVARIANTS(q); 
    return q;
}

/* -----------------------------------------------------------------------------
 * freeWSDeque
 * -------------------------------------------------------------------------- */

void
freeWSDeque (WSDeque *q)
{
    stgFree(q->elements);
    stgFree(q);
}

/* -----------------------------------------------------------------------------
 * 
 * popWSDeque: remove an element from the write end of the queue.
 * Returns the removed spark, and NULL if a race is lost or the pool
 * empty.
 *
 * If only one spark is left in the pool, we synchronise with
 * concurrently stealing threads by using cas to modify the top field.
 * This routine should NEVER be called by a task which does not own
 * this deque.
 *
 * -------------------------------------------------------------------------- */

void *
popWSDeque (WSDeque *q)
{
    /* also a bit tricky, has to avoid concurrent steal() calls by
       accessing top with cas, when there is only one element left */
    StgWord t, b;
    void ** pos;
    long  currSize;
    void * removed;
    
    ASSERT_WSDEQUE_INVARIANTS(q); 
    
    b = q->bottom;
    /* "decrement b as a test, see what happens" */
    q->bottom = --b; 
    pos = (q->elements) + (b & (q->moduloSize));
    t = q->top; /* using topBound would give an *upper* bound, we
                   need a lower bound. We use the real top here, but
                   can update the topBound value */
    q->topBound = t;
    currSize = b - t;
    if (currSize < 0) { /* was empty before decrementing b, set b
                           consistently and abort */
        q->bottom = t;
        return NULL;
    }
    removed = *pos;
    if (currSize > 0) { /* no danger, still elements in buffer after b-- */
        // debugBelch("popWSDeque: t=%ld b=%ld = %ld\n", t, b, removed);
        return removed;
    } 
    /* otherwise, has someone meanwhile stolen the same (last) element?
       Check and increment top value to know  */
    if ( !(CASTOP(&(q->top),t,t+1)) ) {
        removed = NULL; /* no success, but continue adjusting bottom */
    }
    q->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
    q->topBound = t+1; /* ...and cached top value as well */
    
    ASSERT_WSDEQUE_INVARIANTS(q); 
    ASSERT(q->bottom >= q->top);
    
    // debugBelch("popWSDeque: t=%ld b=%ld = %ld\n", t, b, removed);

    return removed;
}

/* -----------------------------------------------------------------------------
 * stealWSDeque
 * -------------------------------------------------------------------------- */

void *
stealWSDeque_ (WSDeque *q)
{
    void ** pos;
    void ** arraybase;
    StgWord sz;
    void * stolen;
    StgWord b,t; 
    
// Can't do this on someone else's spark pool:
// ASSERT_WSDEQUE_INVARIANTS(q); 
    
    b = q->bottom;
    t = q->top;
    
    // NB. b and t are unsigned; we need a signed value for the test
    // below, because it is possible that t > b during a
    // concurrent popWSQueue() operation.
    if ((long)b - (long)t <= 0 ) { 
        return NULL; /* already looks empty, abort */
  }
    
    /* now access array, see pushBottom() */
    arraybase = q->elements;
    sz = q->moduloSize;
    pos = arraybase + (t & sz);  
    stolen = *pos;
    
    /* now decide whether we have won */
    if ( !(CASTOP(&(q->top),t,t+1)) ) {
        /* lost the race, someon else has changed top in the meantime */
        return NULL;
    }  /* else: OK, top has been incremented by the cas call */

    // debugBelch("stealWSDeque_: t=%d b=%d\n", t, b);

// Can't do this on someone else's spark pool:
// ASSERT_WSDEQUE_INVARIANTS(q); 
    
    return stolen;
}

void *
stealWSDeque (WSDeque *q)
{
    void *stolen;
    
    do { 
        stolen = stealWSDeque_(q);
    } while (stolen == NULL && !looksEmptyWSDeque(q));
    
    return stolen;
}

/* -----------------------------------------------------------------------------
 * pushWSQueue
 * -------------------------------------------------------------------------- */

#define DISCARD_NEW

/* enqueue an element. Should always succeed by resizing the array
   (not implemented yet, silently fails in that case). */
rtsBool
pushWSDeque (WSDeque* q, void * elem)
{
    StgWord t;
    void ** pos;
    StgWord sz = q->moduloSize; 
    StgWord b = q->bottom;
    
    ASSERT_WSDEQUE_INVARIANTS(q); 
    
    /* we try to avoid reading q->top (accessed by all) and use
       q->topBound (accessed only by writer) instead. 
       This is why we do not just call empty(q) here.
    */
    t = q->topBound;
    if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) { 
        /* NB. 1. sz == q->size - 1, thus ">="
           2. signed comparison, it is possible that t > b
        */
        /* could be full, check the real top value in this case */
        t = q->top;
        q->topBound = t;
        if (b - t >= sz) { /* really no space left :-( */
            /* reallocate the array, copying the values. Concurrent steal()s
               will in the meantime use the old one and modify only top.
               This means: we cannot safely free the old space! Can keep it
               on a free list internally here...
               
               Potential bug in combination with steal(): if array is
               replaced, it is unclear which one concurrent steal operations
               use. Must read the array base address in advance in steal().
            */
#if defined(DISCARD_NEW)
            ASSERT_WSDEQUE_INVARIANTS(q); 
            return rtsFalse; // we didn't push anything
#else
            /* could make room by incrementing the top position here.  In
             * this case, should use CASTOP. If this fails, someone else has
             * removed something, and new room will be available.
             */
            ASSERT_WSDEQUE_INVARIANTS(q); 
#endif
        }
    }
    pos = (q->elements) + (b & sz);
    *pos = elem;
    q->bottom = b + 1;
    
    ASSERT_WSDEQUE_INVARIANTS(q); 
    return rtsTrue;
}

#endif