diff options
author | Edward Z. Yang <ezyang@mit.edu> | 2013-07-08 11:03:35 -0700 |
---|---|---|
committer | Edward Z. Yang <ezyang@mit.edu> | 2013-07-09 11:29:11 -0700 |
commit | 70e20631742e516c6a11c3c112fbd5b4a08c15ac (patch) | |
tree | d0097f8b1c8e5c0a67b26bb950c036ea7684c65d /rts | |
parent | ca9a431401755f119d97dec59a1fc963a8e9f681 (diff) | |
download | haskell-70e20631742e516c6a11c3c112fbd5b4a08c15ac.tar.gz |
Implement atomicReadMVar, fixing #4001.
We add the invariant to the MVar blocked threads queue that
threads blocked on an atomic read are always at the front of
the queue. This invariant is easy to maintain, since takers
are only ever added to the end of the queue.
Signed-off-by: Edward Z. Yang <ezyang@mit.edu>
Diffstat (limited to 'rts')
-rw-r--r-- | rts/HeapStackCheck.cmm | 31 | ||||
-rw-r--r-- | rts/Linker.c | 2 | ||||
-rw-r--r-- | rts/PrimOps.cmm | 79 | ||||
-rw-r--r-- | rts/RaiseAsync.c | 4 | ||||
-rw-r--r-- | rts/RaiseAsync.h | 1 | ||||
-rw-r--r-- | rts/RetainerProfile.c | 1 | ||||
-rw-r--r-- | rts/Schedule.c | 2 | ||||
-rw-r--r-- | rts/Threads.c | 4 | ||||
-rw-r--r-- | rts/Trace.c | 1 | ||||
-rw-r--r-- | rts/sm/Compact.c | 1 | ||||
-rw-r--r-- | rts/sm/Sanity.c | 1 | ||||
-rw-r--r-- | rts/sm/Scav.c | 1 |
12 files changed, 122 insertions, 6 deletions
diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm index fbceb7691a..20cd9dffb9 100644 --- a/rts/HeapStackCheck.cmm +++ b/rts/HeapStackCheck.cmm @@ -487,11 +487,11 @@ stg_block_noregs /* ----------------------------------------------------------------------------- * takeMVar/putMVar-specific blocks * - * Stack layout for a thread blocked in takeMVar: + * Stack layout for a thread blocked in takeMVar/atomicReadMVar: * * ret. addr * ptr to MVar (R1) - * stg_block_takemvar_info + * stg_block_takemvar_info (or stg_block_readmvar_info) * * Stack layout for a thread blocked in putMVar: * @@ -531,6 +531,33 @@ stg_block_takemvar /* mvar passed in R1 */ BLOCK_BUT_FIRST(stg_block_takemvar_finally); } +INFO_TABLE_RET ( stg_block_atomicreadmvar, RET_SMALL, W_ info_ptr, P_ mvar ) + return () +{ + jump stg_atomicReadMVarzh(mvar); +} + +// code fragment executed just before we return to the scheduler +stg_block_atomicreadmvar_finally +{ + W_ r1, r3; + r1 = R1; + r3 = R3; + unlockClosure(R3, stg_MVAR_DIRTY_info); + R1 = r1; + R3 = r3; + jump StgReturn [R1]; +} + +stg_block_atomicreadmvar /* mvar passed in R1 */ +{ + Sp_adj(-2); + Sp(1) = R1; + Sp(0) = stg_block_atomicreadmvar_info; + R3 = R1; // mvar communicated to stg_block_atomicreadmvar_finally in R3 + BLOCK_BUT_FIRST(stg_block_atomicreadmvar_finally); +} + INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, W_ info_ptr, P_ mvar, P_ val ) return () diff --git a/rts/Linker.c b/rts/Linker.c index 43edde23f8..9129b46be6 100644 --- a/rts/Linker.c +++ b/rts/Linker.c @@ -1058,6 +1058,7 @@ typedef struct _RtsSymbolVal { SymI_HasProto(stg_yield_to_interpreter) \ SymI_HasProto(stg_block_noregs) \ SymI_HasProto(stg_block_takemvar) \ + SymI_HasProto(stg_block_atomicreadmvar) \ SymI_HasProto(stg_block_putmvar) \ MAIN_CAP_SYM \ SymI_HasProto(MallocFailHook) \ @@ -1314,6 +1315,7 @@ typedef struct _RtsSymbolVal { SymI_HasProto(stg_bh_upd_frame_info) \ SymI_HasProto(suspendThread) \ SymI_HasProto(stg_takeMVarzh) \ + SymI_HasProto(stg_atomicReadMVarzh) \ SymI_HasProto(stg_threadStatuszh) \ SymI_HasProto(stg_tryPutMVarzh) \ SymI_HasProto(stg_tryTakeMVarzh) \ diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index a227e776a7..63babd04f2 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -1433,7 +1433,7 @@ loop: goto loop; } - // There are takeMVar(s) waiting: wake up the first one + // There are atomicReadMVar/takeMVar(s) waiting: wake up the first one tso = StgMVarTSOQueue_tso(q); StgMVar_head(mvar) = StgMVarTSOQueue_link(q); @@ -1441,8 +1441,11 @@ loop: StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } - ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); ASSERT(StgTSO_block_info(tso) == mvar); + // save why_blocked here, because waking up the thread destroys + // this information + W_ why_blocked; + why_blocked = TO_W_(StgTSO_why_blocked(tso)); // actually perform the takeMVar W_ stack; @@ -1458,6 +1461,15 @@ loop: ccall tryWakeupThread(MyCapability() "ptr", tso); + // If it was an atomicReadMVar, then we can still do work, + // so loop back. (XXX: This could take a while) + if (why_blocked == BlockedOnMVarRead) { + q = StgMVarTSOQueue_link(q); + goto loop; + } + + ASSERT(why_blocked == BlockedOnMVar); + unlockClosure(mvar, info); return (); } @@ -1512,8 +1524,11 @@ loop: StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } - ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); ASSERT(StgTSO_block_info(tso) == mvar); + // save why_blocked here, because waking up the thread destroys + // this information + W_ why_blocked; + why_blocked = TO_W_(StgTSO_why_blocked(tso)); // actually perform the takeMVar W_ stack; @@ -1529,10 +1544,68 @@ loop: ccall tryWakeupThread(MyCapability() "ptr", tso); + // If it was an atomicReadMVar, then we can still do work, + // so loop back. (XXX: This could take a while) + if (why_blocked == BlockedOnMVarRead) { + q = StgMVarTSOQueue_link(q); + goto loop; + } + + ASSERT(why_blocked == BlockedOnMVar); + unlockClosure(mvar, info); return (1); } +stg_atomicReadMVarzh ( P_ mvar, /* :: MVar a */ ) +{ + W_ val, info, tso, q; + +#if defined(THREADED_RTS) + ("ptr" info) = ccall lockClosure(mvar "ptr"); +#else + info = GET_INFO(mvar); +#endif + + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); + } + + /* If the MVar is empty, put ourselves on the blocked readers + * list and wait until we're woken up. + */ + if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { + + ALLOC_PRIM_WITH_CUSTOM_FAILURE + (SIZEOF_StgMVarTSOQueue, + unlockClosure(mvar, stg_MVAR_DIRTY_info); + GC_PRIM_P(stg_atomicReadMVarzh, mvar)); + + q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1); + + // readMVars are pushed to the front of the queue, so + // they get handled immediately + SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); + StgMVarTSOQueue_link(q) = StgMVar_head(mvar); + StgMVarTSOQueue_tso(q) = CurrentTSO; + + StgTSO__link(CurrentTSO) = q; + StgTSO_block_info(CurrentTSO) = mvar; + StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16; + StgMVar_head(mvar) = q; + + if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(mvar) = q; + } + + jump stg_block_atomicreadmvar(mvar); + } + + val = StgMVar_value(mvar); + + unlockClosure(mvar, stg_MVAR_DIRTY_info); + return (val); +} /* ----------------------------------------------------------------------------- Stable pointer primitives diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index 11f518a87d..edc4a91193 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -294,6 +294,7 @@ check_target: } case BlockedOnMVar: + case BlockedOnMVarRead: { /* To establish ownership of this TSO, we need to acquire a @@ -318,7 +319,7 @@ check_target: // we have the MVar, let's check whether the thread // is still blocked on the same MVar. - if (target->why_blocked != BlockedOnMVar + if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead) || (StgMVar *)target->block_info.closure != mvar) { unlockClosure((StgClosure *)mvar, info); goto retry; @@ -637,6 +638,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) goto done; case BlockedOnMVar: + case BlockedOnMVarRead: removeFromMVarBlockedQueue(tso); goto done; diff --git a/rts/RaiseAsync.h b/rts/RaiseAsync.h index 336ab30e33..d804f6bc64 100644 --- a/rts/RaiseAsync.h +++ b/rts/RaiseAsync.h @@ -49,6 +49,7 @@ interruptible(StgTSO *t) { switch (t->why_blocked) { case BlockedOnMVar: + case BlockedOnMVarRead: case BlockedOnMsgThrowTo: case BlockedOnRead: case BlockedOnWrite: diff --git a/rts/RetainerProfile.c b/rts/RetainerProfile.c index 77dc77c65a..dc21149d98 100644 --- a/rts/RetainerProfile.c +++ b/rts/RetainerProfile.c @@ -1672,6 +1672,7 @@ inner_loop: retainClosure(tso->bq, c, c_child_r); retainClosure(tso->trec, c, c_child_r); if ( tso->why_blocked == BlockedOnMVar + || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo ) { diff --git a/rts/Schedule.c b/rts/Schedule.c index 88bfd8c4d3..408146f195 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -947,6 +947,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task) case BlockedOnBlackHole: case BlockedOnMsgThrowTo: case BlockedOnMVar: + case BlockedOnMVarRead: throwToSingleThreaded(cap, task->incall->tso, (StgClosure *)nonTermination_closure); return; @@ -2843,6 +2844,7 @@ resurrectThreads (StgTSO *threads) switch (tso->why_blocked) { case BlockedOnMVar: + case BlockedOnMVarRead: /* Called by GC - sched_mutex lock is currently held. */ throwToSingleThreaded(cap, tso, (StgClosure *)blockedIndefinitelyOnMVar_closure); diff --git a/rts/Threads.c b/rts/Threads.c index 4c990f118c..f2b800512e 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -255,6 +255,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso) switch (tso->why_blocked) { case BlockedOnMVar: + case BlockedOnMVarRead: { if (tso->_link == END_TSO_QUEUE) { tso->block_info.closure = (StgClosure*)END_TSO_QUEUE; @@ -734,6 +735,9 @@ printThreadBlockage(StgTSO *tso) case BlockedOnMVar: debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); break; + case BlockedOnMVarRead: + debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure); + break; case BlockedOnBlackHole: debugBelch("is blocked on a black hole %p", ((StgBlockingQueue*)tso->block_info.bh->bh)); diff --git a/rts/Trace.c b/rts/Trace.c index 78dfead450..21901891cb 100644 --- a/rts/Trace.c +++ b/rts/Trace.c @@ -179,6 +179,7 @@ static char *thread_stop_reasons[] = { [ThreadFinished] = "finished", [THREAD_SUSPENDED_FOREIGN_CALL] = "suspended while making a foreign call", [6 + BlockedOnMVar] = "blocked on an MVar", + [6 + BlockedOnMVarRead] = "blocked on an atomic MVar read", [6 + BlockedOnBlackHole] = "blocked on a black hole", [6 + BlockedOnRead] = "blocked on a read operation", [6 + BlockedOnWrite] = "blocked on a write operation", diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c index 9c98dc9895..247f1a01c6 100644 --- a/rts/sm/Compact.c +++ b/rts/sm/Compact.c @@ -442,6 +442,7 @@ thread_TSO (StgTSO *tso) thread_(&tso->global_link); if ( tso->why_blocked == BlockedOnMVar + || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo || tso->why_blocked == NotBlocked diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c index f0e1659e12..9b579abbbc 100644 --- a/rts/sm/Sanity.c +++ b/rts/sm/Sanity.c @@ -519,6 +519,7 @@ checkTSO(StgTSO *tso) info == &stg_WHITEHOLE_info); // happens due to STM doing lockTSO() if ( tso->why_blocked == BlockedOnMVar + || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo || tso->why_blocked == NotBlocked diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c index 6137f6d862..e0cc688b95 100644 --- a/rts/sm/Scav.c +++ b/rts/sm/Scav.c @@ -71,6 +71,7 @@ scavengeTSO (StgTSO *tso) evacuate((StgClosure **)&tso->_link); if ( tso->why_blocked == BlockedOnMVar + || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo || tso->why_blocked == NotBlocked |