diff options
-rw-r--r-- | compiler/prelude/primops.txt.pp | 9 | ||||
-rw-r--r-- | includes/rts/Constants.h | 25 | ||||
-rw-r--r-- | includes/stg/MiscClosures.h | 3 | ||||
-rw-r--r-- | rts/HeapStackCheck.cmm | 31 | ||||
-rw-r--r-- | rts/Linker.c | 2 | ||||
-rw-r--r-- | rts/PrimOps.cmm | 79 | ||||
-rw-r--r-- | rts/RaiseAsync.c | 4 | ||||
-rw-r--r-- | rts/RaiseAsync.h | 1 | ||||
-rw-r--r-- | rts/RetainerProfile.c | 1 | ||||
-rw-r--r-- | rts/Schedule.c | 2 | ||||
-rw-r--r-- | rts/Threads.c | 4 | ||||
-rw-r--r-- | rts/Trace.c | 1 | ||||
-rw-r--r-- | rts/sm/Compact.c | 1 | ||||
-rw-r--r-- | rts/sm/Sanity.c | 1 | ||||
-rw-r--r-- | rts/sm/Scav.c | 1 |
15 files changed, 147 insertions, 18 deletions
diff --git a/compiler/prelude/primops.txt.pp b/compiler/prelude/primops.txt.pp index 7203c11389..739092d5f5 100644 --- a/compiler/prelude/primops.txt.pp +++ b/compiler/prelude/primops.txt.pp @@ -1717,6 +1717,15 @@ primop TryPutMVarOp "tryPutMVar#" GenPrimOp out_of_line = True has_side_effects = True +primop AtomicReadMVarOp "atomicReadMVar#" GenPrimOp + MVar# s a -> State# s -> (# State# s, a #) + {If {\tt MVar\#} is empty, block until it becomes full. + Then read its contents without modifying the MVar, without possibility + of intervention from other threads.} + with + out_of_line = True + has_side_effects = True + primop SameMVarOp "sameMVar#" GenPrimOp MVar# s a -> MVar# s a -> Bool diff --git a/includes/rts/Constants.h b/includes/rts/Constants.h index 5ff4d4e51e..4739e3ad1a 100644 --- a/includes/rts/Constants.h +++ b/includes/rts/Constants.h @@ -202,31 +202,32 @@ */ #define NotBlocked 0 #define BlockedOnMVar 1 -#define BlockedOnBlackHole 2 -#define BlockedOnRead 3 -#define BlockedOnWrite 4 -#define BlockedOnDelay 5 -#define BlockedOnSTM 6 +#define BlockedOnMVarRead 2 +#define BlockedOnBlackHole 3 +#define BlockedOnRead 4 +#define BlockedOnWrite 5 +#define BlockedOnDelay 6 +#define BlockedOnSTM 7 /* Win32 only: */ -#define BlockedOnDoProc 7 +#define BlockedOnDoProc 8 /* Only relevant for PAR: */ /* blocked on a remote closure represented by a Global Address: */ -#define BlockedOnGA 8 +#define BlockedOnGA 9 /* same as above but without sending a Fetch message */ -#define BlockedOnGA_NoSend 9 +#define BlockedOnGA_NoSend 10 /* Only relevant for THREADED_RTS: */ -#define BlockedOnCCall 10 -#define BlockedOnCCall_Interruptible 11 +#define BlockedOnCCall 11 +#define BlockedOnCCall_Interruptible 12 /* same as above but permit killing the worker thread */ /* Involved in a message sent to tso->msg_cap */ -#define BlockedOnMsgThrowTo 12 +#define BlockedOnMsgThrowTo 13 /* The thread is not on any run queues, but can be woken up by tryWakeupThread() */ -#define ThreadMigrating 13 +#define ThreadMigrating 14 /* * These constants are returned to the scheduler by a thread that has diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h index 8717687f3e..88cee597b5 100644 --- a/includes/stg/MiscClosures.h +++ b/includes/stg/MiscClosures.h @@ -293,7 +293,9 @@ RTS_FUN_DECL(stg_block_noregs); RTS_FUN_DECL(stg_block_blackhole); RTS_FUN_DECL(stg_block_blackhole_finally); RTS_FUN_DECL(stg_block_takemvar); +RTS_FUN_DECL(stg_block_atomicreadmvar); RTS_RET(stg_block_takemvar); +RTS_RET(stg_block_atomicreadmvar); RTS_FUN_DECL(stg_block_putmvar); RTS_RET(stg_block_putmvar); #ifdef mingw32_HOST_OS @@ -376,6 +378,7 @@ RTS_FUN_DECL(stg_isEmptyMVarzh); RTS_FUN_DECL(stg_newMVarzh); RTS_FUN_DECL(stg_takeMVarzh); RTS_FUN_DECL(stg_putMVarzh); +RTS_FUN_DECL(stg_atomicReadMVarzh); RTS_FUN_DECL(stg_tryTakeMVarzh); RTS_FUN_DECL(stg_tryPutMVarzh); diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm index fbceb7691a..20cd9dffb9 100644 --- a/rts/HeapStackCheck.cmm +++ b/rts/HeapStackCheck.cmm @@ -487,11 +487,11 @@ stg_block_noregs /* ----------------------------------------------------------------------------- * takeMVar/putMVar-specific blocks * - * Stack layout for a thread blocked in takeMVar: + * Stack layout for a thread blocked in takeMVar/atomicReadMVar: * * ret. addr * ptr to MVar (R1) - * stg_block_takemvar_info + * stg_block_takemvar_info (or stg_block_readmvar_info) * * Stack layout for a thread blocked in putMVar: * @@ -531,6 +531,33 @@ stg_block_takemvar /* mvar passed in R1 */ BLOCK_BUT_FIRST(stg_block_takemvar_finally); } +INFO_TABLE_RET ( stg_block_atomicreadmvar, RET_SMALL, W_ info_ptr, P_ mvar ) + return () +{ + jump stg_atomicReadMVarzh(mvar); +} + +// code fragment executed just before we return to the scheduler +stg_block_atomicreadmvar_finally +{ + W_ r1, r3; + r1 = R1; + r3 = R3; + unlockClosure(R3, stg_MVAR_DIRTY_info); + R1 = r1; + R3 = r3; + jump StgReturn [R1]; +} + +stg_block_atomicreadmvar /* mvar passed in R1 */ +{ + Sp_adj(-2); + Sp(1) = R1; + Sp(0) = stg_block_atomicreadmvar_info; + R3 = R1; // mvar communicated to stg_block_atomicreadmvar_finally in R3 + BLOCK_BUT_FIRST(stg_block_atomicreadmvar_finally); +} + INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, W_ info_ptr, P_ mvar, P_ val ) return () diff --git a/rts/Linker.c b/rts/Linker.c index 43edde23f8..9129b46be6 100644 --- a/rts/Linker.c +++ b/rts/Linker.c @@ -1058,6 +1058,7 @@ typedef struct _RtsSymbolVal { SymI_HasProto(stg_yield_to_interpreter) \ SymI_HasProto(stg_block_noregs) \ SymI_HasProto(stg_block_takemvar) \ + SymI_HasProto(stg_block_atomicreadmvar) \ SymI_HasProto(stg_block_putmvar) \ MAIN_CAP_SYM \ SymI_HasProto(MallocFailHook) \ @@ -1314,6 +1315,7 @@ typedef struct _RtsSymbolVal { SymI_HasProto(stg_bh_upd_frame_info) \ SymI_HasProto(suspendThread) \ SymI_HasProto(stg_takeMVarzh) \ + SymI_HasProto(stg_atomicReadMVarzh) \ SymI_HasProto(stg_threadStatuszh) \ SymI_HasProto(stg_tryPutMVarzh) \ SymI_HasProto(stg_tryTakeMVarzh) \ diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index a227e776a7..63babd04f2 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -1433,7 +1433,7 @@ loop: goto loop; } - // There are takeMVar(s) waiting: wake up the first one + // There are atomicReadMVar/takeMVar(s) waiting: wake up the first one tso = StgMVarTSOQueue_tso(q); StgMVar_head(mvar) = StgMVarTSOQueue_link(q); @@ -1441,8 +1441,11 @@ loop: StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } - ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); ASSERT(StgTSO_block_info(tso) == mvar); + // save why_blocked here, because waking up the thread destroys + // this information + W_ why_blocked; + why_blocked = TO_W_(StgTSO_why_blocked(tso)); // actually perform the takeMVar W_ stack; @@ -1458,6 +1461,15 @@ loop: ccall tryWakeupThread(MyCapability() "ptr", tso); + // If it was an atomicReadMVar, then we can still do work, + // so loop back. (XXX: This could take a while) + if (why_blocked == BlockedOnMVarRead) { + q = StgMVarTSOQueue_link(q); + goto loop; + } + + ASSERT(why_blocked == BlockedOnMVar); + unlockClosure(mvar, info); return (); } @@ -1512,8 +1524,11 @@ loop: StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } - ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); ASSERT(StgTSO_block_info(tso) == mvar); + // save why_blocked here, because waking up the thread destroys + // this information + W_ why_blocked; + why_blocked = TO_W_(StgTSO_why_blocked(tso)); // actually perform the takeMVar W_ stack; @@ -1529,10 +1544,68 @@ loop: ccall tryWakeupThread(MyCapability() "ptr", tso); + // If it was an atomicReadMVar, then we can still do work, + // so loop back. (XXX: This could take a while) + if (why_blocked == BlockedOnMVarRead) { + q = StgMVarTSOQueue_link(q); + goto loop; + } + + ASSERT(why_blocked == BlockedOnMVar); + unlockClosure(mvar, info); return (1); } +stg_atomicReadMVarzh ( P_ mvar, /* :: MVar a */ ) +{ + W_ val, info, tso, q; + +#if defined(THREADED_RTS) + ("ptr" info) = ccall lockClosure(mvar "ptr"); +#else + info = GET_INFO(mvar); +#endif + + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); + } + + /* If the MVar is empty, put ourselves on the blocked readers + * list and wait until we're woken up. + */ + if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { + + ALLOC_PRIM_WITH_CUSTOM_FAILURE + (SIZEOF_StgMVarTSOQueue, + unlockClosure(mvar, stg_MVAR_DIRTY_info); + GC_PRIM_P(stg_atomicReadMVarzh, mvar)); + + q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1); + + // readMVars are pushed to the front of the queue, so + // they get handled immediately + SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); + StgMVarTSOQueue_link(q) = StgMVar_head(mvar); + StgMVarTSOQueue_tso(q) = CurrentTSO; + + StgTSO__link(CurrentTSO) = q; + StgTSO_block_info(CurrentTSO) = mvar; + StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16; + StgMVar_head(mvar) = q; + + if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(mvar) = q; + } + + jump stg_block_atomicreadmvar(mvar); + } + + val = StgMVar_value(mvar); + + unlockClosure(mvar, stg_MVAR_DIRTY_info); + return (val); +} /* ----------------------------------------------------------------------------- Stable pointer primitives diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index 11f518a87d..edc4a91193 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -294,6 +294,7 @@ check_target: } case BlockedOnMVar: + case BlockedOnMVarRead: { /* To establish ownership of this TSO, we need to acquire a @@ -318,7 +319,7 @@ check_target: // we have the MVar, let's check whether the thread // is still blocked on the same MVar. - if (target->why_blocked != BlockedOnMVar + if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead) || (StgMVar *)target->block_info.closure != mvar) { unlockClosure((StgClosure *)mvar, info); goto retry; @@ -637,6 +638,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) goto done; case BlockedOnMVar: + case BlockedOnMVarRead: removeFromMVarBlockedQueue(tso); goto done; diff --git a/rts/RaiseAsync.h b/rts/RaiseAsync.h index 336ab30e33..d804f6bc64 100644 --- a/rts/RaiseAsync.h +++ b/rts/RaiseAsync.h @@ -49,6 +49,7 @@ interruptible(StgTSO *t) { switch (t->why_blocked) { case BlockedOnMVar: + case BlockedOnMVarRead: case BlockedOnMsgThrowTo: case BlockedOnRead: case BlockedOnWrite: diff --git a/rts/RetainerProfile.c b/rts/RetainerProfile.c index 77dc77c65a..dc21149d98 100644 --- a/rts/RetainerProfile.c +++ b/rts/RetainerProfile.c @@ -1672,6 +1672,7 @@ inner_loop: retainClosure(tso->bq, c, c_child_r); retainClosure(tso->trec, c, c_child_r); if ( tso->why_blocked == BlockedOnMVar + || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo ) { diff --git a/rts/Schedule.c b/rts/Schedule.c index 88bfd8c4d3..408146f195 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -947,6 +947,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task) case BlockedOnBlackHole: case BlockedOnMsgThrowTo: case BlockedOnMVar: + case BlockedOnMVarRead: throwToSingleThreaded(cap, task->incall->tso, (StgClosure *)nonTermination_closure); return; @@ -2843,6 +2844,7 @@ resurrectThreads (StgTSO *threads) switch (tso->why_blocked) { case BlockedOnMVar: + case BlockedOnMVarRead: /* Called by GC - sched_mutex lock is currently held. */ throwToSingleThreaded(cap, tso, (StgClosure *)blockedIndefinitelyOnMVar_closure); diff --git a/rts/Threads.c b/rts/Threads.c index 4c990f118c..f2b800512e 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -255,6 +255,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso) switch (tso->why_blocked) { case BlockedOnMVar: + case BlockedOnMVarRead: { if (tso->_link == END_TSO_QUEUE) { tso->block_info.closure = (StgClosure*)END_TSO_QUEUE; @@ -734,6 +735,9 @@ printThreadBlockage(StgTSO *tso) case BlockedOnMVar: debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); break; + case BlockedOnMVarRead: + debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure); + break; case BlockedOnBlackHole: debugBelch("is blocked on a black hole %p", ((StgBlockingQueue*)tso->block_info.bh->bh)); diff --git a/rts/Trace.c b/rts/Trace.c index 78dfead450..21901891cb 100644 --- a/rts/Trace.c +++ b/rts/Trace.c @@ -179,6 +179,7 @@ static char *thread_stop_reasons[] = { [ThreadFinished] = "finished", [THREAD_SUSPENDED_FOREIGN_CALL] = "suspended while making a foreign call", [6 + BlockedOnMVar] = "blocked on an MVar", + [6 + BlockedOnMVarRead] = "blocked on an atomic MVar read", [6 + BlockedOnBlackHole] = "blocked on a black hole", [6 + BlockedOnRead] = "blocked on a read operation", [6 + BlockedOnWrite] = "blocked on a write operation", diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c index 9c98dc9895..247f1a01c6 100644 --- a/rts/sm/Compact.c +++ b/rts/sm/Compact.c @@ -442,6 +442,7 @@ thread_TSO (StgTSO *tso) thread_(&tso->global_link); if ( tso->why_blocked == BlockedOnMVar + || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo || tso->why_blocked == NotBlocked diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c index f0e1659e12..9b579abbbc 100644 --- a/rts/sm/Sanity.c +++ b/rts/sm/Sanity.c @@ -519,6 +519,7 @@ checkTSO(StgTSO *tso) info == &stg_WHITEHOLE_info); // happens due to STM doing lockTSO() if ( tso->why_blocked == BlockedOnMVar + || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo || tso->why_blocked == NotBlocked diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c index 6137f6d862..e0cc688b95 100644 --- a/rts/sm/Scav.c +++ b/rts/sm/Scav.c @@ -71,6 +71,7 @@ scavengeTSO (StgTSO *tso) evacuate((StgClosure **)&tso->_link); if ( tso->why_blocked == BlockedOnMVar + || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo || tso->why_blocked == NotBlocked |