From edb4b92b218cee5b309866f3d236da30c5621567 Mon Sep 17 00:00:00 2001 From: Ben Gamari Date: Fri, 27 Sep 2019 18:49:06 +0000 Subject: rts/WSDeque: Rewrite with proper atomics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After a few attempts at shoring up the previous implementation, I ended up turning to the literature and now use the proven implementation, > N.M. Lê, A. Pop, A.Cohen, and F.Z. Nardelli. "Correct and Efficient > Work-Stealing for Weak Memory Models". PPoPP'13, February 2013, > ACM 978-1-4503-1922/13/02. Note only is this approach formally proven correct under C11 semantics but it is also proved to be a bit faster in practice. --- includes/stg/SMP.h | 15 ++++ rts/Sparks.c | 4 +- rts/WSDeque.c | 215 ++++++++++++++++++----------------------------------- rts/WSDeque.h | 51 ++++++------- 4 files changed, 111 insertions(+), 174 deletions(-) diff --git a/includes/stg/SMP.h b/includes/stg/SMP.h index fa52a913c4..4679d7986e 100644 --- a/includes/stg/SMP.h +++ b/includes/stg/SMP.h @@ -453,6 +453,14 @@ load_load_barrier(void) { // Non-atomic addition for "approximate" counters that can be lossy #define NONATOMIC_ADD(ptr,val) RELAXED_STORE(ptr, RELAXED_LOAD(ptr) + val) +// Explicit fences +// +// These are typically necessary only in very specific cases (e.g. WSDeque) +// where the ordered operations aren't expressive enough to capture the desired +// ordering. +#define RELEASE_FENCE() __atomic_thread_fence(__ATOMIC_RELEASE) +#define SEQ_CST_FENCE() __atomic_thread_fence(__ATOMIC_SEQ_CST) + /* ---------------------------------------------------------------------- */ #else /* !THREADED_RTS */ @@ -479,6 +487,10 @@ EXTERN_INLINE void load_load_barrier () {} /* nothing */ // Non-atomic addition for "approximate" counters that can be lossy #define NONATOMIC_ADD(ptr,val) *ptr += val +// Fences +#define RELEASE_FENCE() +#define SEQ_CST_FENCE() + #if !IN_STG_CODE || IN_STGCRUN INLINE_HEADER StgWord xchg(StgPtr p, StgWord w) @@ -527,6 +539,9 @@ atomic_dec(StgVolatilePtr p) } #endif +/* An alias for the C11 declspec */ +#define ATOMIC + #define VOLATILE_LOAD(p) ((StgWord)*((StgWord*)(p))) #endif /* !THREADED_RTS */ diff --git a/rts/Sparks.c b/rts/Sparks.c index 2012b0682b..47cf310188 100644 --- a/rts/Sparks.c +++ b/rts/Sparks.c @@ -92,7 +92,7 @@ pruneSparkQueue (bool nonmovingMarkFinished, Capability *cap) SparkPool *pool; StgClosurePtr spark, tmp, *elements; uint32_t n, pruned_sparks; // stats only - StgWord botInd,oldBotInd,currInd; // indices in array (always < size) + StgInt botInd,oldBotInd,currInd; // indices in array (always < size) const StgInfoTable *info; n = 0; @@ -111,7 +111,6 @@ pruneSparkQueue (bool nonmovingMarkFinished, Capability *cap) // stealing is happening during GC. pool->bottom -= pool->top & ~pool->moduloSize; pool->top &= pool->moduloSize; - pool->topBound = pool->top; debugTrace(DEBUG_sparks, "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)", @@ -259,7 +258,6 @@ pruneSparkQueue (bool nonmovingMarkFinished, Capability *cap) ASSERT(currInd == oldBotInd); pool->top = oldBotInd; // where we started writing - pool->topBound = pool->top; pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); // first free place we did not use (corrected by wraparound) diff --git a/rts/WSDeque.c b/rts/WSDeque.c index 60b8948149..d930d848a4 100644 --- a/rts/WSDeque.c +++ b/rts/WSDeque.c @@ -11,7 +11,15 @@ * SPAA'05, July 2005, Las Vegas, USA. * ACM 1-58113-986-1/05/0007 * + * This implementation closely follows the C11 implementation presented in + * + * N.M. Lê, A. Pop, A.Cohen, and F.Z. Nardelli. "Correct and Efficient + * Work-Stealing for Weak Memory Models". PPoPP'13, February 2013, + * ACM 978-1-4503-1922/13/02. + * * Author: Jost Berthold MSRC 07-09/2008 + * Rewritten by: Ben Gamari (Well-Typed) + * * * The DeQue is held as a circular array with known length. Positions * of top (read-end) and bottom (write-end) always increase, and the @@ -44,7 +52,13 @@ #include "RtsUtils.h" #include "WSDeque.h" -#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new))) +// Returns true on success. +static inline bool +cas_top(WSDeque *q, StgInt old, StgInt new) +{ + return (StgWord) old == cas((StgPtr) &q->top, (StgWord) old, (StgWord) new); +} + /* ----------------------------------------------------------------------------- * newWSDeque @@ -80,13 +94,12 @@ newWSDeque (uint32_t size) "newWSDeque"); q->elements = stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */ "newWSDeque:data space"); - q->top=0; - q->bottom=0; - q->topBound=0; /* read by writer, updated each time top is read */ - q->size = realsize; /* power of 2 */ q->moduloSize = realsize - 1; /* n % size == n & moduloSize */ + q->top=0; + RELEASE_STORE(&q->bottom, 0); /* read by writer, updated each time top is read */ + ASSERT_WSDEQUE_INVARIANTS(q); return q; } @@ -118,56 +131,31 @@ freeWSDeque (WSDeque *q) void * popWSDeque (WSDeque *q) { - /* also a bit tricky, has to avoid concurrent steal() calls by - accessing top with cas, when there is only one element left */ - StgWord t, b; - long currSize; - void * removed; - - ASSERT_WSDEQUE_INVARIANTS(q); - - b = q->bottom; - - // "decrement b as a test, see what happens" - b--; - q->bottom = b; - - // very important that the following read of q->top does not occur - // before the earlier write to q->bottom. - store_load_barrier(); - - t = q->top; /* using topBound would give an *upper* bound, we - need a lower bound. We use the real top here, but - can update the topBound value */ - q->topBound = t; - currSize = (long)b - (long)t; - if (currSize < 0) { /* was empty before decrementing b, set b - consistently and abort */ - q->bottom = t; - return NULL; - } - - // read the element at b - removed = q->elements[b & q->moduloSize]; - - if (currSize > 0) { /* no danger, still elements in buffer after b-- */ - // debugBelch("popWSDeque: t=%ld b=%ld = %ld\n", t, b, removed); - return removed; - } - /* otherwise, has someone meanwhile stolen the same (last) element? - Check and increment top value to know */ - if ( !(CASTOP(&(q->top),t,t+1)) ) { - removed = NULL; /* no success, but continue adjusting bottom */ + StgInt b = RELAXED_LOAD(&q->bottom) - 1; + RELAXED_STORE(&q->bottom, b); + SEQ_CST_FENCE(); + StgInt t = RELAXED_LOAD(&q->top); + + void *result; + if (t <= b) { + /* Non-empty */ + result = RELAXED_LOAD(&q->elements[b & q->moduloSize]); + if (t == b) { + /* Single last element in queue */ + if (!cas_top(q, t, t+1)) { + /* Failed race */ + result = NULL; + } + + RELAXED_STORE(&q->bottom, b+1); + } + } else { + /* Empty queue */ + result = NULL; + RELAXED_STORE(&q->bottom, b+1); } - q->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */ - q->topBound = t+1; /* ...and cached top value as well */ - ASSERT_WSDEQUE_INVARIANTS(q); - ASSERT(q->bottom >= q->top); - - // debugBelch("popWSDeque: t=%ld b=%ld = %ld\n", t, b, removed); - - return removed; + return result; } /* ----------------------------------------------------------------------------- @@ -177,43 +165,19 @@ popWSDeque (WSDeque *q) void * stealWSDeque_ (WSDeque *q) { - void * stolen; - StgWord b,t; - -// Can't do this on someone else's spark pool: -// ASSERT_WSDEQUE_INVARIANTS(q); - - // NB. these loads must be ordered, otherwise there is a race - // between steal and pop. - t = q->top; - load_load_barrier(); - b = q->bottom; - - // NB. b and t are unsigned; we need a signed value for the test - // below, because it is possible that t > b during a - // concurrent popWSQueue() operation. - if ((long)b - (long)t <= 0 ) { - return NULL; /* already looks empty, abort */ + StgInt t = ACQUIRE_LOAD(&q->top); + SEQ_CST_FENCE(); + StgInt b = ACQUIRE_LOAD(&q->bottom); + + void *result = NULL; + if (t < b) { + /* Non-empty queue */ + result = RELAXED_LOAD(&q->elements[t % q->size]); + if (!cas_top(q, t, t+1)) { + return NULL; + } } - // NB. the load of q->bottom must be ordered before the load of - // q->elements[t & q-> moduloSize]. See comment "KG:..." below - // and Ticket #13633. - load_load_barrier(); - /* now access array, see pushBottom() */ - stolen = q->elements[t & q->moduloSize]; - - /* now decide whether we have won */ - if ( !(CASTOP(&(q->top),t,t+1)) ) { - /* lost the race, someone else has changed top in the meantime */ - return NULL; - } /* else: OK, top has been incremented by the cas call */ - - // debugBelch("stealWSDeque_: t=%d b=%d\n", t, b); - -// Can't do this on someone else's spark pool: -// ASSERT_WSDEQUE_INVARIANTS(q); - - return stolen; + return result; } void * @@ -232,67 +196,30 @@ stealWSDeque (WSDeque *q) * pushWSQueue * -------------------------------------------------------------------------- */ -#define DISCARD_NEW - -/* enqueue an element. Should always succeed by resizing the array - (not implemented yet, silently fails in that case). */ +/* Enqueue an element. Must only be called by owner. Returns true if element was + * pushed, false if queue is full + */ bool pushWSDeque (WSDeque* q, void * elem) { - StgWord t; - StgWord b; - StgWord sz = q->moduloSize; + StgInt b = ACQUIRE_LOAD(&q->bottom); + StgInt t = ACQUIRE_LOAD(&q->top); - ASSERT_WSDEQUE_INVARIANTS(q); - - /* we try to avoid reading q->top (accessed by all) and use - q->topBound (accessed only by writer) instead. - This is why we do not just call empty(q) here. - */ - b = q->bottom; - t = q->topBound; - if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) { - /* NB. 1. sz == q->size - 1, thus ">=" - 2. signed comparison, it is possible that t > b - */ - /* could be full, check the real top value in this case */ - t = q->top; - q->topBound = t; - if (b - t >= sz) { /* really no space left :-( */ - /* reallocate the array, copying the values. Concurrent steal()s - will in the meantime use the old one and modify only top. - This means: we cannot safely free the old space! Can keep it - on a free list internally here... + if ( b - t > q->size - 1 ) { + /* Full queue */ + /* We don't implement resizing, just say we didn't push anything. */ + return false; + } - Potential bug in combination with steal(): if array is - replaced, it is unclear which one concurrent steal operations - use. Must read the array base address in advance in steal(). - */ -#if defined(DISCARD_NEW) - ASSERT_WSDEQUE_INVARIANTS(q); - return false; // we didn't push anything + RELAXED_STORE(&q->elements[b & q->moduloSize], elem); +#if defined(TSAN_ENABLED) + // ThreadSanizer doesn't know about release fences, so we need to + // strengthen this to a release store lest we get spurious data race + // reports. + RELEASE_STORE(&q->bottom, b+1); #else - /* could make room by incrementing the top position here. In - * this case, should use CASTOP. If this fails, someone else has - * removed something, and new room will be available. - */ - ASSERT_WSDEQUE_INVARIANTS(q); + RELEASE_FENCE(); + RELAXED_STORE(&q->bottom, b+1); #endif - } - } - - q->elements[b & sz] = elem; - /* - KG: we need to put write barrier here since otherwise we might - end with elem not added to q->elements, but q->bottom already - modified (write reordering) and with stealWSDeque_ failing - later when invoked from another thread since it thinks elem is - there (in case there is just added element in the queue). This - issue concretely hit me on ARMv7 multi-core CPUs - */ - write_barrier(); - q->bottom = b + 1; - - ASSERT_WSDEQUE_INVARIANTS(q); return true; } diff --git a/rts/WSDeque.h b/rts/WSDeque.h index 2936c281fe..0104884bdb 100644 --- a/rts/WSDeque.h +++ b/rts/WSDeque.h @@ -11,24 +11,19 @@ typedef struct WSDeque_ { // Size of elements array. Used for modulo calculation: we round up // to powers of 2 and use the dyadic log (modulo == bitwise &) - StgWord size; + StgInt size; StgWord moduloSize; /* bitmask for modulo */ // top, index where multiple readers steal() (protected by a cas) - volatile StgWord top; + StgInt top; // bottom, index of next free place where one writer can push // elements. This happens unsynchronised. - volatile StgWord bottom; + StgInt bottom; // both top and bottom are continuously incremented, and used as // an index modulo the current array size. - // lower bound on the current top value. This is an internal - // optimisation to avoid unnecessarily accessing the top field - // inside pushBottom - volatile StgWord topBound; - // The elements array void ** elements; @@ -39,18 +34,17 @@ typedef struct WSDeque_ { } WSDeque; /* INVARIANTS, in this order: reasonable size, - topBound consistent, space pointer, space accessible to us. + space pointer, space accessible to us. NB. This is safe to use only (a) on a spark pool owned by the current thread, or (b) when there's only one thread running, or no stealing going on (e.g. during GC). */ -#define ASSERT_WSDEQUE_INVARIANTS(p) \ - ASSERT((p)->size > 0); \ - ASSERT((p)->topBound <= (p)->top); \ - ASSERT((p)->elements != NULL); \ - ASSERT(*((p)->elements) || 1); \ - ASSERT(*((p)->elements - 1 + ((p)->size)) || 1); +#define ASSERT_WSDEQUE_INVARIANTS(p) \ + ASSERT((p)->size > 0); \ + ASSERT(RELAXED_LOAD(&(p)->elements) != NULL); \ + ASSERT(RELAXED_LOAD(&(p)->elements[0]) || 1); \ + ASSERT(RELAXED_LOAD(&(p)->elements[(p)->size - 1]) || 1); // No: it is possible that top > bottom when using pop() // ASSERT((p)->bottom >= (p)->top); @@ -69,15 +63,15 @@ typedef struct WSDeque_ { WSDeque * newWSDeque (uint32_t size); void freeWSDeque (WSDeque *q); -// Take an element from the "write" end of the pool. Can be called +// (owner-only) Take an element from the "write" end of the pool. Can be called // by the pool owner only. void* popWSDeque (WSDeque *q); -// Push onto the "write" end of the pool. Return true if the push +// (owner-only) Push onto the "write" end of the pool. Return true if the push // succeeded, or false if the deque is full. bool pushWSDeque (WSDeque *q, void *elem); -// Removes all elements from the deque +// (owner-only) Removes all elements from the deque. EXTERN_INLINE void discardElements (WSDeque *q); // Removes an element of the deque from the "read" end, or returns @@ -90,23 +84,27 @@ void * stealWSDeque_ (WSDeque *q); void * stealWSDeque (WSDeque *q); // "guesses" whether a deque is empty. Can return false negatives in -// presence of concurrent steal() calls, and false positives in -// presence of a concurrent pushBottom(). +// presence of concurrent steal() calls, and false positives in +// presence of a concurrent pushBottom(). EXTERN_INLINE bool looksEmptyWSDeque (WSDeque* q); -EXTERN_INLINE long dequeElements (WSDeque *q); +// "guesses" how many elements are present on the deque. Like +// looksEmptyWSDeque, this may suggest that the deque is empty when it's not +// and vice-versa. +EXTERN_INLINE StgInt dequeElements (WSDeque *q); /* ----------------------------------------------------------------------------- * PRIVATE below here * -------------------------------------------------------------------------- */ -EXTERN_INLINE long +EXTERN_INLINE StgInt dequeElements (WSDeque *q) { - StgWord t = q->top; - StgWord b = q->bottom; + StgWord t = ACQUIRE_LOAD(&q->top); + StgWord b = ACQUIRE_LOAD(&q->bottom); // try to prefer false negatives by reading top first - return ((long)b - (long)t); + StgInt n = (StgInt)b - (StgInt)t; + return n > 0 ? n : 0; } EXTERN_INLINE bool @@ -118,6 +116,5 @@ looksEmptyWSDeque (WSDeque *q) EXTERN_INLINE void discardElements (WSDeque *q) { - q->top = q->bottom; -// pool->topBound = pool->top; + RELAXED_STORE(&q->top, RELAXED_LOAD(&q->bottom)); } -- cgit v1.2.1