summaryrefslogtreecommitdiff
path: root/rts
diff options
context:
space:
mode:
authorBen Gamari <ben@smart-cactus.org>2023-03-24 20:41:58 -0400
committerBen Gamari <ben@smart-cactus.org>2023-04-24 06:03:49 -0400
commit3c1e4a696def01b1a42d673228b7dd7be5f6cd7b (patch)
treea9c7a094704d77a61ed34442b0308b0948084b45 /rts
parent5946bdf1054038b676e6214cd8be0c20ec4d4fbc (diff)
downloadhaskell-3c1e4a696def01b1a42d673228b7dd7be5f6cd7b.tar.gz
rts: Fix synchronization on thread blocking state
Diffstat (limited to 'rts')
-rw-r--r--rts/Exception.cmm2
-rw-r--r--rts/PrimOps.cmm50
-rw-r--r--rts/RaiseAsync.c4
-rw-r--r--rts/STM.c2
-rw-r--r--rts/Schedule.c3
-rw-r--r--rts/StgMiscClosures.cmm2
-rw-r--r--rts/Threads.c11
-rw-r--r--rts/TraverseHeap.c12
-rw-r--r--rts/include/rts/storage/TSO.h2
-rw-r--r--rts/include/stg/SMP.h13
-rw-r--r--rts/posix/Select.c4
-rw-r--r--rts/sm/Compact.c15
-rw-r--r--rts/sm/NonMovingMark.c15
-rw-r--r--rts/sm/Scav.c19
-rw-r--r--rts/win32/AsyncMIO.c2
15 files changed, 89 insertions, 67 deletions
diff --git a/rts/Exception.cmm b/rts/Exception.cmm
index ff577aace6..96b52e3c3f 100644
--- a/rts/Exception.cmm
+++ b/rts/Exception.cmm
@@ -351,9 +351,9 @@ stg_killThreadzh (P_ target, P_ exception)
if (msg == NULL) {
return ();
} else {
- StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo;
updateRemembSetPushPtr(StgTSO_block_info(CurrentTSO));
StgTSO_block_info(CurrentTSO) = msg;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo;
// we must block, and unlock the message before returning
jump stg_block_throwto (target, exception);
}
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index 51bb8981f7..d0aa154a12 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -1743,7 +1743,7 @@ stg_takeMVarzh ( P_ mvar /* :: MVar a */ )
}
StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
- StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
StgMVar_tail(mvar) = q;
jump stg_block_takemvar(mvar);
@@ -1910,7 +1910,7 @@ stg_putMVarzh ( P_ mvar, /* :: MVar a */
}
StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
- StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
StgMVar_tail(mvar) = q;
jump stg_block_putmvar(mvar,val);
@@ -2047,11 +2047,11 @@ loop:
}
}
- ASSERT(StgTSO_block_info(tso) == mvar);
// save why_blocked here, because waking up the thread destroys
// this information
W_ why_blocked;
- why_blocked = TO_W_(StgTSO_why_blocked(tso));
+ why_blocked = TO_W_(StgTSO_why_blocked(tso)); // TODO: Missing barrier
+ ASSERT(StgTSO_block_info(tso) == mvar);
// actually perform the takeMVar
W_ stack;
@@ -2112,7 +2112,7 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ )
StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
- StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
StgMVar_head(mvar) = q;
if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) {
@@ -2240,17 +2240,16 @@ stg_readIOPortzh ( P_ ioport /* :: IOPort a */ )
StgMVarTSOQueue_tso(q) = CurrentTSO;
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
- // See Note [Heap memory barriers]
- RELEASE_FENCE;
StgMVar_head(ioport) = q;
StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = ioport;
- StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
+
+ // See Note [Heap memory barriers]
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
//Unlocks the closure as well
jump stg_block_readmvar(ioport);
-
}
//This way we can check of there has been a read already.
@@ -2328,11 +2327,11 @@ loop:
// next element in the waiting list here, as there can only ever
// be one thread blocked on a port.
- ASSERT(StgTSO_block_info(tso) == ioport);
// save why_blocked here, because waking up the thread destroys
// this information
W_ why_blocked;
- why_blocked = TO_W_(StgTSO_why_blocked(tso));
+ why_blocked = TO_W_(StgTSO_why_blocked(tso)); // TODO Missing acquire
+ ASSERT(StgTSO_block_info(tso) == ioport);
// actually perform the takeMVar
W_ stack;
@@ -2574,8 +2573,8 @@ stg_waitReadzh ( W_ fd )
#else
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
- StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
StgTSO_block_info(CurrentTSO) = fd;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
// No locking - we're not going to use this interface in the
// threaded RTS anyway.
ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
@@ -2590,8 +2589,8 @@ stg_waitWritezh ( W_ fd )
#else
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
- StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
StgTSO_block_info(CurrentTSO) = fd;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
// No locking - we're not going to use this interface in the
// threaded RTS anyway.
ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
@@ -2613,7 +2612,6 @@ stg_delayzh ( W_ us_delay )
#else
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
- StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16;
#if defined(mingw32_HOST_OS)
@@ -2630,12 +2628,13 @@ stg_delayzh ( W_ us_delay )
* simplifies matters, so change the status to OnDoProc put the
* delayed thread on the blocked_queue.
*/
- StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16;
ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_async_void();
#else
+ %relaxed StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16;
(target) = ccall getDelayTarget(us_delay);
StgTSO_block_info(CurrentTSO) = target;
@@ -2657,9 +2656,6 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf )
ccall barf("asyncRead# on threaded RTS") never returns;
#else
- ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
- StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
-
/* could probably allocate this on the heap instead */
("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult,
"stg_asyncReadzh");
@@ -2668,6 +2664,10 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf )
StgAsyncIOResult_len(ares) = 0;
StgAsyncIOResult_errCode(ares) = 0;
StgTSO_block_info(CurrentTSO) = ares;
+
+ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
+
ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_async();
#endif
@@ -2682,9 +2682,6 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf )
ccall barf("asyncWrite# on threaded RTS") never returns;
#else
- ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
- StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
-
("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult,
"stg_asyncWritezh");
(reqID) = ccall addIORequest(fd, 1/*TRUE*/,is_sock,len,buf "ptr");
@@ -2693,6 +2690,10 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf )
StgAsyncIOResult_len(ares) = 0;
StgAsyncIOResult_errCode(ares) = 0;
StgTSO_block_info(CurrentTSO) = ares;
+
+ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
+
ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_async();
#endif
@@ -2707,9 +2708,6 @@ stg_asyncDoProczh ( W_ proc, W_ param )
ccall barf("asyncDoProc# on threaded RTS") never returns;
#else
- ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
- StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16;
-
/* could probably allocate this on the heap instead */
("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult,
"stg_asyncDoProczh");
@@ -2718,6 +2716,10 @@ stg_asyncDoProczh ( W_ proc, W_ param )
StgAsyncIOResult_len(ares) = 0;
StgAsyncIOResult_errCode(ares) = 0;
StgTSO_block_info(CurrentTSO) = ares;
+
+ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16;
+
ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_async();
#endif
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index f727383082..0209e334f2 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -266,7 +266,7 @@ check_target:
return THROWTO_BLOCKED;
}
- status = target->why_blocked;
+ status = ACQUIRE_LOAD(&target->why_blocked);
switch (status) {
case NotBlocked:
@@ -728,7 +728,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
}
done:
- tso->why_blocked = NotBlocked;
+ RELAXED_STORE(&tso->why_blocked, NotBlocked);
appendToRunQueue(cap, tso);
}
diff --git a/rts/STM.c b/rts/STM.c
index 105e099203..8050913033 100644
--- a/rts/STM.c
+++ b/rts/STM.c
@@ -340,8 +340,8 @@ static StgBool cond_lock_tvar(Capability *cap,
static void park_tso(StgTSO *tso) {
ASSERT(tso -> why_blocked == NotBlocked);
- tso -> why_blocked = BlockedOnSTM;
tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
+ RELEASE_STORE(&tso -> why_blocked, BlockedOnSTM);
TRACE("park_tso on tso=%p", tso);
}
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 66043c26ab..d8c71203e8 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -512,7 +512,8 @@ run_thread:
#endif
if (ret == ThreadBlocked) {
- if (t->why_blocked == BlockedOnBlackHole) {
+ uint16_t why_blocked = ACQUIRE_LOAD(&t->why_blocked);
+ if (why_blocked == BlockedOnBlackHole) {
StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
traceEventStopThread(cap, t, t->why_blocked + 6,
owner != NULL ? owner->id : 0);
diff --git a/rts/StgMiscClosures.cmm b/rts/StgMiscClosures.cmm
index 222a12e9c6..efe71bb20b 100644
--- a/rts/StgMiscClosures.cmm
+++ b/rts/StgMiscClosures.cmm
@@ -605,8 +605,8 @@ retry:
if (r == 0) {
goto retry;
} else {
- StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
StgTSO_block_info(CurrentTSO) = msg;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
jump stg_block_blackhole(node);
}
}
diff --git a/rts/Threads.c b/rts/Threads.c
index b36b4ddb81..29a9525574 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -94,8 +94,8 @@ createThread(Capability *cap, W_ size)
// Always start with the compiled code evaluator
tso->what_next = ThreadRunGHC;
- tso->why_blocked = NotBlocked;
tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
+ tso->why_blocked = NotBlocked;
tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
tso->flags = 0;
@@ -285,7 +285,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
}
#endif
- switch (tso->why_blocked)
+ switch (ACQUIRE_LOAD(&tso->why_blocked))
{
case BlockedOnMVar:
case BlockedOnMVarRead:
@@ -825,10 +825,11 @@ loop:
}
}
- ASSERT(tso->block_info.closure == (StgClosure*)mvar);
// save why_blocked here, because waking up the thread destroys
// this information
- StgWord why_blocked = RELAXED_LOAD(&tso->why_blocked);
+ StgWord why_blocked = ACQUIRE_LOAD(&tso->why_blocked);
+ ASSERT(why_blocked == BlockedOnMVarRead);
+ ASSERT(tso->block_info.closure == (StgClosure*)mvar);
// actually perform the takeMVar
StgStack* stack = tso->stackobj;
@@ -902,7 +903,7 @@ StgMutArrPtrs *listThreads(Capability *cap)
void
printThreadBlockage(StgTSO *tso)
{
- switch (tso->why_blocked) {
+ switch (ACQUIRE_LOAD(&tso->why_blocked)) {
#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
diff --git a/rts/TraverseHeap.c b/rts/TraverseHeap.c
index 027f99ebe0..5224e66aa9 100644
--- a/rts/TraverseHeap.c
+++ b/rts/TraverseHeap.c
@@ -1239,12 +1239,14 @@ inner_loop:
traversePushClosure(ts, (StgClosure *) tso->blocked_exceptions, c, sep, child_data);
traversePushClosure(ts, (StgClosure *) tso->bq, c, sep, child_data);
traversePushClosure(ts, (StgClosure *) tso->trec, c, sep, child_data);
- if ( tso->why_blocked == BlockedOnMVar
- || tso->why_blocked == BlockedOnMVarRead
- || tso->why_blocked == BlockedOnBlackHole
- || tso->why_blocked == BlockedOnMsgThrowTo
- ) {
+ switch (ACQUIRE_LOAD(&tso->why_blocked)) {
+ case BlockedOnMVar:
+ case BlockedOnMVarRead:
+ case BlockedOnBlackHole:
+ case BlockedOnMsgThrowTo:
traversePushClosure(ts, tso->block_info.closure, c, sep, child_data);
+ break;
+ default:
}
goto loop;
}
diff --git a/rts/include/rts/storage/TSO.h b/rts/include/rts/storage/TSO.h
index 4ca19853d7..7c171fbdbd 100644
--- a/rts/include/rts/storage/TSO.h
+++ b/rts/include/rts/storage/TSO.h
@@ -128,7 +128,7 @@ typedef struct StgTSO_ {
StgWord16 what_next; // Values defined in Constants.h
StgWord16 why_blocked; // Values defined in Constants.h
StgWord32 flags; // Values defined in Constants.h
- StgTSOBlockInfo block_info;
+ StgTSOBlockInfo block_info; // Barrier provided by why_blocked
StgThreadID id;
StgWord32 saved_errno;
StgWord32 dirty; /* non-zero => dirty */
diff --git a/rts/include/stg/SMP.h b/rts/include/stg/SMP.h
index 6cdbb15d14..c8facda14a 100644
--- a/rts/include/stg/SMP.h
+++ b/rts/include/stg/SMP.h
@@ -177,6 +177,7 @@ EXTERN_INLINE void load_load_barrier(void);
* - StgMutArrPtrs: payload
* - StgSmallMutArrPtrs: payload
* - StgThunk although this is a somewhat special case; see below
+ * - StgTSO: block_info
*
* Writing to a mutable pointer field must be done via a release-store.
* Reading from such a field is done via an acquire-load.
@@ -211,8 +212,8 @@ EXTERN_INLINE void load_load_barrier(void);
* particular, they have the unique property of being updatable, transforming
* from a thunk to an indirection. This transformation requires its own
* synchronization protocol. In particular, we must ensure that a reader
- * examining a thunk being updated can see the indirectee. Consequently, a
- * thunk update (see rts/Updates.h) does the following:
+ * examining a thunk being updated by another core can see the indirectee.
+ * Consequently, a thunk update (see rts/Updates.h) does the following:
*
* 1. Use a relaxed-store to place the new indirectee into the thunk's
* indirectee field
@@ -268,6 +269,14 @@ EXTERN_INLINE void load_load_barrier(void);
* the capability-local mut_list. Consequently this does not require any memory
* barrier.
*
+ * Barriers in thread blocking
+ * ---------------------------
+ * When a thread blocks (e.g. on an MVar) it will typically allocate a heap object
+ * to record its blocked-ness (e.g. a StgMVarTSOQueue), expose this via
+ * StgTSO.block_info, and update StgTSO.why_blocked to record the reason for
+ * its blocking. The visibility of the block_info is guaranteed by the ordering
+ * of the why_blocked update.
+ *
* Barriers in thread migration
* ----------------------------
* When a thread is migrated from one capability to another we must take care
diff --git a/rts/posix/Select.c b/rts/posix/Select.c
index 89a46fd763..848739a904 100644
--- a/rts/posix/Select.c
+++ b/rts/posix/Select.c
@@ -105,7 +105,7 @@ static bool wakeUpSleepingThreads (Capability *cap, LowResTime now)
break;
}
iomgr->sleeping_queue = tso->_link;
- tso->why_blocked = NotBlocked;
+ RELAXED_STORE(&tso->why_blocked, NotBlocked);
tso->_link = END_TSO_QUEUE;
IF_DEBUG(scheduler, debugBelch("Waking up sleeping thread %"
FMT_StgThreadID "\n", tso->id));
@@ -268,7 +268,7 @@ awaitEvent(Capability *cap, bool wait)
* So the (int) cast should be removed across the code base once
* GHC requires a version of FreeBSD that has that change in it.
*/
- switch (tso->why_blocked) {
+ switch (ACQUIRE_LOAD(&tso->why_blocked)) {
case BlockedOnRead:
{
int fd = tso->block_info.fd;
diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c
index f6e65ecc9a..c1b445ac88 100644
--- a/rts/sm/Compact.c
+++ b/rts/sm/Compact.c
@@ -463,13 +463,16 @@ thread_TSO (StgTSO *tso)
thread_(&tso->_link);
thread_(&tso->global_link);
- if ( tso->why_blocked == BlockedOnMVar
- || tso->why_blocked == BlockedOnMVarRead
- || tso->why_blocked == BlockedOnBlackHole
- || tso->why_blocked == BlockedOnMsgThrowTo
- || tso->why_blocked == NotBlocked
- ) {
+ switch (ACQUIRE_LOAD(&tso->why_blocked)) {
+ case BlockedOnMVar:
+ case BlockedOnMVarRead:
+ case BlockedOnBlackHole:
+ case BlockedOnMsgThrowTo:
+ case NotBlocked:
thread_(&tso->block_info.closure);
+ break;
+ default:
+ break;
}
thread_(&tso->blocked_exceptions);
thread_(&tso->bq);
diff --git a/rts/sm/NonMovingMark.c b/rts/sm/NonMovingMark.c
index b0931f5767..60ada48193 100644
--- a/rts/sm/NonMovingMark.c
+++ b/rts/sm/NonMovingMark.c
@@ -1051,13 +1051,16 @@ trace_tso (MarkQueue *queue, StgTSO *tso)
if (tso->label != NULL) {
markQueuePushClosure_(queue, (StgClosure *) tso->label);
}
- if ( tso->why_blocked == BlockedOnMVar
- || tso->why_blocked == BlockedOnMVarRead
- || tso->why_blocked == BlockedOnBlackHole
- || tso->why_blocked == BlockedOnMsgThrowTo
- || tso->why_blocked == NotBlocked
- ) {
+ switch (ACQUIRE_LOAD(&tso->why_blocked)) {
+ case BlockedOnMVar:
+ case BlockedOnMVarRead:
+ case BlockedOnBlackHole:
+ case BlockedOnMsgThrowTo:
+ case NotBlocked:
markQueuePushClosure_(queue, tso->block_info.closure);
+ break;
+ default:
+ break;
}
}
diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
index 8debec6a66..a09130a091 100644
--- a/rts/sm/Scav.c
+++ b/rts/sm/Scav.c
@@ -137,23 +137,24 @@ scavengeTSO (StgTSO *tso)
evacuate((StgClosure **)&tso->label);
}
- if ( tso->why_blocked == BlockedOnMVar
- || tso->why_blocked == BlockedOnMVarRead
- || tso->why_blocked == BlockedOnBlackHole
- || tso->why_blocked == BlockedOnMsgThrowTo
- || tso->why_blocked == NotBlocked
- ) {
+ switch (ACQUIRE_LOAD(&tso->why_blocked)) {
+ case BlockedOnMVar:
+ case BlockedOnMVarRead:
+ case BlockedOnBlackHole:
+ case BlockedOnMsgThrowTo:
+ case NotBlocked:
evacuate(&tso->block_info.closure);
- }
+ break;
+ default:
#if defined(THREADED_RTS)
// in the THREADED_RTS, block_info.closure must always point to a
// valid closure, because we assume this in throwTo(). In the
// non-threaded RTS it might be a FD (for
// BlockedOnRead/BlockedOnWrite) or a time value (BlockedOnDelay)
- else {
tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
- }
#endif
+ break;
+ }
tso->dirty = gct->failed_to_evac;
diff --git a/rts/win32/AsyncMIO.c b/rts/win32/AsyncMIO.c
index 00d1638d63..3539c36304 100644
--- a/rts/win32/AsyncMIO.c
+++ b/rts/win32/AsyncMIO.c
@@ -294,7 +294,7 @@ start:
for(tso = iomgr->blocked_queue_hd; tso != END_TSO_QUEUE;
tso = tso->_link) {
- switch(tso->why_blocked) {
+ switch(ACQUIRE_LOAD(&tso->why_blocked)) {
case BlockedOnRead:
case BlockedOnWrite:
case BlockedOnDoProc: