summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEdward Z. Yang <ezyang@mit.edu>2013-07-08 11:03:35 -0700
committerEdward Z. Yang <ezyang@mit.edu>2013-07-09 11:29:11 -0700
commit70e20631742e516c6a11c3c112fbd5b4a08c15ac (patch)
treed0097f8b1c8e5c0a67b26bb950c036ea7684c65d
parentca9a431401755f119d97dec59a1fc963a8e9f681 (diff)
downloadhaskell-70e20631742e516c6a11c3c112fbd5b4a08c15ac.tar.gz
Implement atomicReadMVar, fixing #4001.
We add the invariant to the MVar blocked threads queue that threads blocked on an atomic read are always at the front of the queue. This invariant is easy to maintain, since takers are only ever added to the end of the queue. Signed-off-by: Edward Z. Yang <ezyang@mit.edu>
-rw-r--r--compiler/prelude/primops.txt.pp9
-rw-r--r--includes/rts/Constants.h25
-rw-r--r--includes/stg/MiscClosures.h3
-rw-r--r--rts/HeapStackCheck.cmm31
-rw-r--r--rts/Linker.c2
-rw-r--r--rts/PrimOps.cmm79
-rw-r--r--rts/RaiseAsync.c4
-rw-r--r--rts/RaiseAsync.h1
-rw-r--r--rts/RetainerProfile.c1
-rw-r--r--rts/Schedule.c2
-rw-r--r--rts/Threads.c4
-rw-r--r--rts/Trace.c1
-rw-r--r--rts/sm/Compact.c1
-rw-r--r--rts/sm/Sanity.c1
-rw-r--r--rts/sm/Scav.c1
15 files changed, 147 insertions, 18 deletions
diff --git a/compiler/prelude/primops.txt.pp b/compiler/prelude/primops.txt.pp
index 7203c11389..739092d5f5 100644
--- a/compiler/prelude/primops.txt.pp
+++ b/compiler/prelude/primops.txt.pp
@@ -1717,6 +1717,15 @@ primop TryPutMVarOp "tryPutMVar#" GenPrimOp
out_of_line = True
has_side_effects = True
+primop AtomicReadMVarOp "atomicReadMVar#" GenPrimOp
+ MVar# s a -> State# s -> (# State# s, a #)
+ {If {\tt MVar\#} is empty, block until it becomes full.
+ Then read its contents without modifying the MVar, without possibility
+ of intervention from other threads.}
+ with
+ out_of_line = True
+ has_side_effects = True
+
primop SameMVarOp "sameMVar#" GenPrimOp
MVar# s a -> MVar# s a -> Bool
diff --git a/includes/rts/Constants.h b/includes/rts/Constants.h
index 5ff4d4e51e..4739e3ad1a 100644
--- a/includes/rts/Constants.h
+++ b/includes/rts/Constants.h
@@ -202,31 +202,32 @@
*/
#define NotBlocked 0
#define BlockedOnMVar 1
-#define BlockedOnBlackHole 2
-#define BlockedOnRead 3
-#define BlockedOnWrite 4
-#define BlockedOnDelay 5
-#define BlockedOnSTM 6
+#define BlockedOnMVarRead 2
+#define BlockedOnBlackHole 3
+#define BlockedOnRead 4
+#define BlockedOnWrite 5
+#define BlockedOnDelay 6
+#define BlockedOnSTM 7
/* Win32 only: */
-#define BlockedOnDoProc 7
+#define BlockedOnDoProc 8
/* Only relevant for PAR: */
/* blocked on a remote closure represented by a Global Address: */
-#define BlockedOnGA 8
+#define BlockedOnGA 9
/* same as above but without sending a Fetch message */
-#define BlockedOnGA_NoSend 9
+#define BlockedOnGA_NoSend 10
/* Only relevant for THREADED_RTS: */
-#define BlockedOnCCall 10
-#define BlockedOnCCall_Interruptible 11
+#define BlockedOnCCall 11
+#define BlockedOnCCall_Interruptible 12
/* same as above but permit killing the worker thread */
/* Involved in a message sent to tso->msg_cap */
-#define BlockedOnMsgThrowTo 12
+#define BlockedOnMsgThrowTo 13
/* The thread is not on any run queues, but can be woken up
by tryWakeupThread() */
-#define ThreadMigrating 13
+#define ThreadMigrating 14
/*
* These constants are returned to the scheduler by a thread that has
diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h
index 8717687f3e..88cee597b5 100644
--- a/includes/stg/MiscClosures.h
+++ b/includes/stg/MiscClosures.h
@@ -293,7 +293,9 @@ RTS_FUN_DECL(stg_block_noregs);
RTS_FUN_DECL(stg_block_blackhole);
RTS_FUN_DECL(stg_block_blackhole_finally);
RTS_FUN_DECL(stg_block_takemvar);
+RTS_FUN_DECL(stg_block_atomicreadmvar);
RTS_RET(stg_block_takemvar);
+RTS_RET(stg_block_atomicreadmvar);
RTS_FUN_DECL(stg_block_putmvar);
RTS_RET(stg_block_putmvar);
#ifdef mingw32_HOST_OS
@@ -376,6 +378,7 @@ RTS_FUN_DECL(stg_isEmptyMVarzh);
RTS_FUN_DECL(stg_newMVarzh);
RTS_FUN_DECL(stg_takeMVarzh);
RTS_FUN_DECL(stg_putMVarzh);
+RTS_FUN_DECL(stg_atomicReadMVarzh);
RTS_FUN_DECL(stg_tryTakeMVarzh);
RTS_FUN_DECL(stg_tryPutMVarzh);
diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm
index fbceb7691a..20cd9dffb9 100644
--- a/rts/HeapStackCheck.cmm
+++ b/rts/HeapStackCheck.cmm
@@ -487,11 +487,11 @@ stg_block_noregs
/* -----------------------------------------------------------------------------
* takeMVar/putMVar-specific blocks
*
- * Stack layout for a thread blocked in takeMVar:
+ * Stack layout for a thread blocked in takeMVar/atomicReadMVar:
*
* ret. addr
* ptr to MVar (R1)
- * stg_block_takemvar_info
+ * stg_block_takemvar_info (or stg_block_readmvar_info)
*
* Stack layout for a thread blocked in putMVar:
*
@@ -531,6 +531,33 @@ stg_block_takemvar /* mvar passed in R1 */
BLOCK_BUT_FIRST(stg_block_takemvar_finally);
}
+INFO_TABLE_RET ( stg_block_atomicreadmvar, RET_SMALL, W_ info_ptr, P_ mvar )
+ return ()
+{
+ jump stg_atomicReadMVarzh(mvar);
+}
+
+// code fragment executed just before we return to the scheduler
+stg_block_atomicreadmvar_finally
+{
+ W_ r1, r3;
+ r1 = R1;
+ r3 = R3;
+ unlockClosure(R3, stg_MVAR_DIRTY_info);
+ R1 = r1;
+ R3 = r3;
+ jump StgReturn [R1];
+}
+
+stg_block_atomicreadmvar /* mvar passed in R1 */
+{
+ Sp_adj(-2);
+ Sp(1) = R1;
+ Sp(0) = stg_block_atomicreadmvar_info;
+ R3 = R1; // mvar communicated to stg_block_atomicreadmvar_finally in R3
+ BLOCK_BUT_FIRST(stg_block_atomicreadmvar_finally);
+}
+
INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, W_ info_ptr,
P_ mvar, P_ val )
return ()
diff --git a/rts/Linker.c b/rts/Linker.c
index 43edde23f8..9129b46be6 100644
--- a/rts/Linker.c
+++ b/rts/Linker.c
@@ -1058,6 +1058,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_yield_to_interpreter) \
SymI_HasProto(stg_block_noregs) \
SymI_HasProto(stg_block_takemvar) \
+ SymI_HasProto(stg_block_atomicreadmvar) \
SymI_HasProto(stg_block_putmvar) \
MAIN_CAP_SYM \
SymI_HasProto(MallocFailHook) \
@@ -1314,6 +1315,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_bh_upd_frame_info) \
SymI_HasProto(suspendThread) \
SymI_HasProto(stg_takeMVarzh) \
+ SymI_HasProto(stg_atomicReadMVarzh) \
SymI_HasProto(stg_threadStatuszh) \
SymI_HasProto(stg_tryPutMVarzh) \
SymI_HasProto(stg_tryTakeMVarzh) \
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index a227e776a7..63babd04f2 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -1433,7 +1433,7 @@ loop:
goto loop;
}
- // There are takeMVar(s) waiting: wake up the first one
+ // There are atomicReadMVar/takeMVar(s) waiting: wake up the first one
tso = StgMVarTSOQueue_tso(q);
StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
@@ -1441,8 +1441,11 @@ loop:
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
- ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
ASSERT(StgTSO_block_info(tso) == mvar);
+ // save why_blocked here, because waking up the thread destroys
+ // this information
+ W_ why_blocked;
+ why_blocked = TO_W_(StgTSO_why_blocked(tso));
// actually perform the takeMVar
W_ stack;
@@ -1458,6 +1461,15 @@ loop:
ccall tryWakeupThread(MyCapability() "ptr", tso);
+ // If it was an atomicReadMVar, then we can still do work,
+ // so loop back. (XXX: This could take a while)
+ if (why_blocked == BlockedOnMVarRead) {
+ q = StgMVarTSOQueue_link(q);
+ goto loop;
+ }
+
+ ASSERT(why_blocked == BlockedOnMVar);
+
unlockClosure(mvar, info);
return ();
}
@@ -1512,8 +1524,11 @@ loop:
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
- ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
ASSERT(StgTSO_block_info(tso) == mvar);
+ // save why_blocked here, because waking up the thread destroys
+ // this information
+ W_ why_blocked;
+ why_blocked = TO_W_(StgTSO_why_blocked(tso));
// actually perform the takeMVar
W_ stack;
@@ -1529,10 +1544,68 @@ loop:
ccall tryWakeupThread(MyCapability() "ptr", tso);
+ // If it was an atomicReadMVar, then we can still do work,
+ // so loop back. (XXX: This could take a while)
+ if (why_blocked == BlockedOnMVarRead) {
+ q = StgMVarTSOQueue_link(q);
+ goto loop;
+ }
+
+ ASSERT(why_blocked == BlockedOnMVar);
+
unlockClosure(mvar, info);
return (1);
}
+stg_atomicReadMVarzh ( P_ mvar, /* :: MVar a */ )
+{
+ W_ val, info, tso, q;
+
+#if defined(THREADED_RTS)
+ ("ptr" info) = ccall lockClosure(mvar "ptr");
+#else
+ info = GET_INFO(mvar);
+#endif
+
+ if (info == stg_MVAR_CLEAN_info) {
+ ccall dirty_MVAR(BaseReg "ptr", mvar "ptr");
+ }
+
+ /* If the MVar is empty, put ourselves on the blocked readers
+ * list and wait until we're woken up.
+ */
+ if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {
+
+ ALLOC_PRIM_WITH_CUSTOM_FAILURE
+ (SIZEOF_StgMVarTSOQueue,
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ GC_PRIM_P(stg_atomicReadMVarzh, mvar));
+
+ q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
+
+ // readMVars are pushed to the front of the queue, so
+ // they get handled immediately
+ SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+ StgMVarTSOQueue_link(q) = StgMVar_head(mvar);
+ StgMVarTSOQueue_tso(q) = CurrentTSO;
+
+ StgTSO__link(CurrentTSO) = q;
+ StgTSO_block_info(CurrentTSO) = mvar;
+ StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
+ StgMVar_head(mvar) = q;
+
+ if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(mvar) = q;
+ }
+
+ jump stg_block_atomicreadmvar(mvar);
+ }
+
+ val = StgMVar_value(mvar);
+
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ return (val);
+}
/* -----------------------------------------------------------------------------
Stable pointer primitives
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index 11f518a87d..edc4a91193 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -294,6 +294,7 @@ check_target:
}
case BlockedOnMVar:
+ case BlockedOnMVarRead:
{
/*
To establish ownership of this TSO, we need to acquire a
@@ -318,7 +319,7 @@ check_target:
// we have the MVar, let's check whether the thread
// is still blocked on the same MVar.
- if (target->why_blocked != BlockedOnMVar
+ if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
|| (StgMVar *)target->block_info.closure != mvar) {
unlockClosure((StgClosure *)mvar, info);
goto retry;
@@ -637,6 +638,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
goto done;
case BlockedOnMVar:
+ case BlockedOnMVarRead:
removeFromMVarBlockedQueue(tso);
goto done;
diff --git a/rts/RaiseAsync.h b/rts/RaiseAsync.h
index 336ab30e33..d804f6bc64 100644
--- a/rts/RaiseAsync.h
+++ b/rts/RaiseAsync.h
@@ -49,6 +49,7 @@ interruptible(StgTSO *t)
{
switch (t->why_blocked) {
case BlockedOnMVar:
+ case BlockedOnMVarRead:
case BlockedOnMsgThrowTo:
case BlockedOnRead:
case BlockedOnWrite:
diff --git a/rts/RetainerProfile.c b/rts/RetainerProfile.c
index 77dc77c65a..dc21149d98 100644
--- a/rts/RetainerProfile.c
+++ b/rts/RetainerProfile.c
@@ -1672,6 +1672,7 @@ inner_loop:
retainClosure(tso->bq, c, c_child_r);
retainClosure(tso->trec, c, c_child_r);
if ( tso->why_blocked == BlockedOnMVar
+ || tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
) {
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 88bfd8c4d3..408146f195 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -947,6 +947,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
case BlockedOnBlackHole:
case BlockedOnMsgThrowTo:
case BlockedOnMVar:
+ case BlockedOnMVarRead:
throwToSingleThreaded(cap, task->incall->tso,
(StgClosure *)nonTermination_closure);
return;
@@ -2843,6 +2844,7 @@ resurrectThreads (StgTSO *threads)
switch (tso->why_blocked) {
case BlockedOnMVar:
+ case BlockedOnMVarRead:
/* Called by GC - sched_mutex lock is currently held. */
throwToSingleThreaded(cap, tso,
(StgClosure *)blockedIndefinitelyOnMVar_closure);
diff --git a/rts/Threads.c b/rts/Threads.c
index 4c990f118c..f2b800512e 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -255,6 +255,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
switch (tso->why_blocked)
{
case BlockedOnMVar:
+ case BlockedOnMVarRead:
{
if (tso->_link == END_TSO_QUEUE) {
tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
@@ -734,6 +735,9 @@ printThreadBlockage(StgTSO *tso)
case BlockedOnMVar:
debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
+ case BlockedOnMVarRead:
+ debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure);
+ break;
case BlockedOnBlackHole:
debugBelch("is blocked on a black hole %p",
((StgBlockingQueue*)tso->block_info.bh->bh));
diff --git a/rts/Trace.c b/rts/Trace.c
index 78dfead450..21901891cb 100644
--- a/rts/Trace.c
+++ b/rts/Trace.c
@@ -179,6 +179,7 @@ static char *thread_stop_reasons[] = {
[ThreadFinished] = "finished",
[THREAD_SUSPENDED_FOREIGN_CALL] = "suspended while making a foreign call",
[6 + BlockedOnMVar] = "blocked on an MVar",
+ [6 + BlockedOnMVarRead] = "blocked on an atomic MVar read",
[6 + BlockedOnBlackHole] = "blocked on a black hole",
[6 + BlockedOnRead] = "blocked on a read operation",
[6 + BlockedOnWrite] = "blocked on a write operation",
diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c
index 9c98dc9895..247f1a01c6 100644
--- a/rts/sm/Compact.c
+++ b/rts/sm/Compact.c
@@ -442,6 +442,7 @@ thread_TSO (StgTSO *tso)
thread_(&tso->global_link);
if ( tso->why_blocked == BlockedOnMVar
+ || tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c
index f0e1659e12..9b579abbbc 100644
--- a/rts/sm/Sanity.c
+++ b/rts/sm/Sanity.c
@@ -519,6 +519,7 @@ checkTSO(StgTSO *tso)
info == &stg_WHITEHOLE_info); // happens due to STM doing lockTSO()
if ( tso->why_blocked == BlockedOnMVar
+ || tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
index 6137f6d862..e0cc688b95 100644
--- a/rts/sm/Scav.c
+++ b/rts/sm/Scav.c
@@ -71,6 +71,7 @@ scavengeTSO (StgTSO *tso)
evacuate((StgClosure **)&tso->_link);
if ( tso->why_blocked == BlockedOnMVar
+ || tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked