summaryrefslogtreecommitdiff
path: root/rts/WSDeque.c
blob: 7d002afbe561ab08746b3d24eb7bce3b74ebc23a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
/* -----------------------------------------------------------------------------
 *
 * (c) The GHC Team, 2009
 *
 * Work-stealing Deque data structure
 *
 * The implementation uses Double-Ended Queues with lock-free access
 * (thereby often called "deque") as described in
 *
 * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
 * SPAA'05, July 2005, Las Vegas, USA.
 * ACM 1-58113-986-1/05/0007
 *
 * This implementation closely follows the C11 implementation presented in
 *
 * N.M. LĂȘ, A. Pop, A.Cohen, and F.Z. Nardelli. "Correct and Efficient
 * Work-Stealing for Weak Memory Models". PPoPP'13, February 2013,
 * ACM 978-1-4503-1922/13/02.
 *
 * Author: Jost Berthold MSRC 07-09/2008
 * Rewritten by: Ben Gamari (Well-Typed)
 *
 *
 * The DeQue is held as a circular array with known length. Positions
 * of top (read-end) and bottom (write-end) always increase, and the
 * array is accessed with indices modulo array-size. While this bears
 * the risk of overflow, we assume that (with 64 bit indices), a
 * program must run very long to reach that point.
 *
 * The write end of the queue (position bottom) can only be used with
 * mutual exclusion, i.e. by exactly one caller at a time.  At this
 * end, new items can be enqueued using pushBottom()/newSpark(), and
 * removed using popBottom()/reclaimSpark() (the latter implying a cas
 * synchronisation with potential concurrent readers for the case of
 * just one element).
 *
 * Multiple readers can steal from the read end (position top), and
 * are synchronised without a lock, based on a cas of the top
 * position. One reader wins, the others return NULL for a failure.
 *
 * Both popWSDeque and stealWSDeque also return NULL when the queue is empty.
 *
 * Testing: see testsuite/tests/rts/testwsdeque.c.  If
 * there's anything wrong with the deque implementation, this test
 * will probably catch it.
 *
 * ---------------------------------------------------------------------------*/

#include "rts/PosixSource.h"
#include "Rts.h"

#include "RtsUtils.h"
#include "WSDeque.h"

// Returns true on success.
static inline bool
cas_top(WSDeque *q, StgInt old, StgInt new)
{
    return (StgWord) old == SEQ_CST_RELAXED_CAS((StgPtr) &q->top,
                                                (StgWord) old, (StgWord) new);
}


/* -----------------------------------------------------------------------------
 * newWSDeque
 * -------------------------------------------------------------------------- */

/* internal helpers ... */

static StgWord
roundUp2(StgWord val)
{
    StgWord rounded = 1;

    /* StgWord is unsigned anyway, only catch 0 */
    if (val == 0) {
        barf("DeQue,roundUp2: invalid size 0 requested");
    }
    /* at least 1 bit set, shift up to its place */
    do {
        rounded = rounded << 1;
    } while (0 != (val = val>>1));
    return rounded;
}

WSDeque *
newWSDeque (uint32_t size)
{
    StgWord realsize;
    WSDeque *q;

    realsize = roundUp2(size); /* to compute modulo as a bitwise & */

    q = (WSDeque*) stgMallocBytes(sizeof(WSDeque),   /* admin fields */
                                  "newWSDeque");
    q->elements = stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
                                 "newWSDeque:data space");
    q->size = realsize;  /* power of 2 */
    q->moduloSize = realsize - 1; /* n % size == n & moduloSize  */

    q->top=0;
    RELEASE_STORE(&q->bottom, 0); /* read by writer, updated each time top is read */

    ASSERT_WSDEQUE_INVARIANTS(q);
    return q;
}

/* -----------------------------------------------------------------------------
 * freeWSDeque
 * -------------------------------------------------------------------------- */

void
freeWSDeque (WSDeque *q)
{
    stgFree(q->elements);
    stgFree(q);
}

/* -----------------------------------------------------------------------------
 *
 * popWSDeque: remove an element from the write end of the queue.
 * Returns the removed spark, and NULL if a race is lost or the pool
 * empty.
 *
 * If only one spark is left in the pool, we synchronise with
 * concurrently stealing threads by using cas to modify the top field.
 * This routine should NEVER be called by a task which does not own
 * this deque.
 *
 * -------------------------------------------------------------------------- */

void *
popWSDeque (WSDeque *q)
{
    StgInt b = RELAXED_LOAD(&q->bottom) - 1;
    RELAXED_STORE(&q->bottom, b);
    SEQ_CST_FENCE();
    StgInt t = RELAXED_LOAD(&q->top);

    void *result;
    if (t <= b) {
        /* Non-empty */
        result = RELAXED_LOAD(&q->elements[b & q->moduloSize]);
        if (t == b) {
            /* Single last element in queue */
            if (!cas_top(q, t, t+1)) {
                /* Failed race */
                result = NULL;
            }

            RELAXED_STORE(&q->bottom, b+1);
        }
    } else {
        /* Empty queue */
        result = NULL;
        RELAXED_STORE(&q->bottom, b+1);
    }

    return result;
}

/* -----------------------------------------------------------------------------
 * stealWSDeque
 * -------------------------------------------------------------------------- */

void *
stealWSDeque_ (WSDeque *q)
{
    StgInt t = ACQUIRE_LOAD(&q->top);
    SEQ_CST_FENCE();
    StgInt b = ACQUIRE_LOAD(&q->bottom);

    void *result = NULL;
    if (t < b) {
        /* Non-empty queue */
        result = RELAXED_LOAD(&q->elements[t % q->size]);
        if (!cas_top(q, t, t+1)) {
            return NULL;
        }
    }
    return result;
}

void *
stealWSDeque (WSDeque *q)
{
    void *stolen;

    do {
        stolen = stealWSDeque_(q);
    } while (stolen == NULL && !looksEmptyWSDeque(q));

    return stolen;
}

/* -----------------------------------------------------------------------------
 * pushWSQueue
 * -------------------------------------------------------------------------- */

/* Enqueue an element. Must only be called by owner. Returns true if element was
 * pushed, false if queue is full
 */
bool
pushWSDeque (WSDeque* q, void * elem)
{
    StgInt b = ACQUIRE_LOAD(&q->bottom);
    StgInt t = ACQUIRE_LOAD(&q->top);

    if ( b - t > q->size - 1 ) {
        /* Full queue */
        /* We don't implement resizing, just say we didn't push anything. */
        return false;
    }

    RELAXED_STORE(&q->elements[b & q->moduloSize], elem);
#if defined(TSAN_ENABLED)
    // ThreadSanizer doesn't know about release fences, so we need to
    // strengthen this to a release store lest we get spurious data race
    // reports.
    RELEASE_STORE(&q->bottom, b+1);
#else
    RELEASE_FENCE();
    RELAXED_STORE(&q->bottom, b+1);
#endif
    return true;
}