summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--compiler/prelude/primops.txt.pp9
-rw-r--r--includes/rts/Constants.h25
-rw-r--r--includes/stg/MiscClosures.h3
-rw-r--r--rts/HeapStackCheck.cmm31
-rw-r--r--rts/Linker.c2
-rw-r--r--rts/PrimOps.cmm79
-rw-r--r--rts/RaiseAsync.c4
-rw-r--r--rts/RaiseAsync.h1
-rw-r--r--rts/RetainerProfile.c1
-rw-r--r--rts/Schedule.c2
-rw-r--r--rts/Threads.c4
-rw-r--r--rts/Trace.c1
-rw-r--r--rts/sm/Compact.c1
-rw-r--r--rts/sm/Sanity.c1
-rw-r--r--rts/sm/Scav.c1
15 files changed, 147 insertions, 18 deletions
diff --git a/compiler/prelude/primops.txt.pp b/compiler/prelude/primops.txt.pp
index 7203c11389..739092d5f5 100644
--- a/compiler/prelude/primops.txt.pp
+++ b/compiler/prelude/primops.txt.pp
@@ -1717,6 +1717,15 @@ primop TryPutMVarOp "tryPutMVar#" GenPrimOp
out_of_line = True
has_side_effects = True
+primop AtomicReadMVarOp "atomicReadMVar#" GenPrimOp
+ MVar# s a -> State# s -> (# State# s, a #)
+ {If {\tt MVar\#} is empty, block until it becomes full.
+ Then read its contents without modifying the MVar, without possibility
+ of intervention from other threads.}
+ with
+ out_of_line = True
+ has_side_effects = True
+
primop SameMVarOp "sameMVar#" GenPrimOp
MVar# s a -> MVar# s a -> Bool
diff --git a/includes/rts/Constants.h b/includes/rts/Constants.h
index 5ff4d4e51e..4739e3ad1a 100644
--- a/includes/rts/Constants.h
+++ b/includes/rts/Constants.h
@@ -202,31 +202,32 @@
*/
#define NotBlocked 0
#define BlockedOnMVar 1
-#define BlockedOnBlackHole 2
-#define BlockedOnRead 3
-#define BlockedOnWrite 4
-#define BlockedOnDelay 5
-#define BlockedOnSTM 6
+#define BlockedOnMVarRead 2
+#define BlockedOnBlackHole 3
+#define BlockedOnRead 4
+#define BlockedOnWrite 5
+#define BlockedOnDelay 6
+#define BlockedOnSTM 7
/* Win32 only: */
-#define BlockedOnDoProc 7
+#define BlockedOnDoProc 8
/* Only relevant for PAR: */
/* blocked on a remote closure represented by a Global Address: */
-#define BlockedOnGA 8
+#define BlockedOnGA 9
/* same as above but without sending a Fetch message */
-#define BlockedOnGA_NoSend 9
+#define BlockedOnGA_NoSend 10
/* Only relevant for THREADED_RTS: */
-#define BlockedOnCCall 10
-#define BlockedOnCCall_Interruptible 11
+#define BlockedOnCCall 11
+#define BlockedOnCCall_Interruptible 12
/* same as above but permit killing the worker thread */
/* Involved in a message sent to tso->msg_cap */
-#define BlockedOnMsgThrowTo 12
+#define BlockedOnMsgThrowTo 13
/* The thread is not on any run queues, but can be woken up
by tryWakeupThread() */
-#define ThreadMigrating 13
+#define ThreadMigrating 14
/*
* These constants are returned to the scheduler by a thread that has
diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h
index 8717687f3e..88cee597b5 100644
--- a/includes/stg/MiscClosures.h
+++ b/includes/stg/MiscClosures.h
@@ -293,7 +293,9 @@ RTS_FUN_DECL(stg_block_noregs);
RTS_FUN_DECL(stg_block_blackhole);
RTS_FUN_DECL(stg_block_blackhole_finally);
RTS_FUN_DECL(stg_block_takemvar);
+RTS_FUN_DECL(stg_block_atomicreadmvar);
RTS_RET(stg_block_takemvar);
+RTS_RET(stg_block_atomicreadmvar);
RTS_FUN_DECL(stg_block_putmvar);
RTS_RET(stg_block_putmvar);
#ifdef mingw32_HOST_OS
@@ -376,6 +378,7 @@ RTS_FUN_DECL(stg_isEmptyMVarzh);
RTS_FUN_DECL(stg_newMVarzh);
RTS_FUN_DECL(stg_takeMVarzh);
RTS_FUN_DECL(stg_putMVarzh);
+RTS_FUN_DECL(stg_atomicReadMVarzh);
RTS_FUN_DECL(stg_tryTakeMVarzh);
RTS_FUN_DECL(stg_tryPutMVarzh);
diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm
index fbceb7691a..20cd9dffb9 100644
--- a/rts/HeapStackCheck.cmm
+++ b/rts/HeapStackCheck.cmm
@@ -487,11 +487,11 @@ stg_block_noregs
/* -----------------------------------------------------------------------------
* takeMVar/putMVar-specific blocks
*
- * Stack layout for a thread blocked in takeMVar:
+ * Stack layout for a thread blocked in takeMVar/atomicReadMVar:
*
* ret. addr
* ptr to MVar (R1)
- * stg_block_takemvar_info
+ * stg_block_takemvar_info (or stg_block_readmvar_info)
*
* Stack layout for a thread blocked in putMVar:
*
@@ -531,6 +531,33 @@ stg_block_takemvar /* mvar passed in R1 */
BLOCK_BUT_FIRST(stg_block_takemvar_finally);
}
+INFO_TABLE_RET ( stg_block_atomicreadmvar, RET_SMALL, W_ info_ptr, P_ mvar )
+ return ()
+{
+ jump stg_atomicReadMVarzh(mvar);
+}
+
+// code fragment executed just before we return to the scheduler
+stg_block_atomicreadmvar_finally
+{
+ W_ r1, r3;
+ r1 = R1;
+ r3 = R3;
+ unlockClosure(R3, stg_MVAR_DIRTY_info);
+ R1 = r1;
+ R3 = r3;
+ jump StgReturn [R1];
+}
+
+stg_block_atomicreadmvar /* mvar passed in R1 */
+{
+ Sp_adj(-2);
+ Sp(1) = R1;
+ Sp(0) = stg_block_atomicreadmvar_info;
+ R3 = R1; // mvar communicated to stg_block_atomicreadmvar_finally in R3
+ BLOCK_BUT_FIRST(stg_block_atomicreadmvar_finally);
+}
+
INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, W_ info_ptr,
P_ mvar, P_ val )
return ()
diff --git a/rts/Linker.c b/rts/Linker.c
index 43edde23f8..9129b46be6 100644
--- a/rts/Linker.c
+++ b/rts/Linker.c
@@ -1058,6 +1058,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_yield_to_interpreter) \
SymI_HasProto(stg_block_noregs) \
SymI_HasProto(stg_block_takemvar) \
+ SymI_HasProto(stg_block_atomicreadmvar) \
SymI_HasProto(stg_block_putmvar) \
MAIN_CAP_SYM \
SymI_HasProto(MallocFailHook) \
@@ -1314,6 +1315,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_bh_upd_frame_info) \
SymI_HasProto(suspendThread) \
SymI_HasProto(stg_takeMVarzh) \
+ SymI_HasProto(stg_atomicReadMVarzh) \
SymI_HasProto(stg_threadStatuszh) \
SymI_HasProto(stg_tryPutMVarzh) \
SymI_HasProto(stg_tryTakeMVarzh) \
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index a227e776a7..63babd04f2 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -1433,7 +1433,7 @@ loop:
goto loop;
}
- // There are takeMVar(s) waiting: wake up the first one
+ // There are atomicReadMVar/takeMVar(s) waiting: wake up the first one
tso = StgMVarTSOQueue_tso(q);
StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
@@ -1441,8 +1441,11 @@ loop:
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
- ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
ASSERT(StgTSO_block_info(tso) == mvar);
+ // save why_blocked here, because waking up the thread destroys
+ // this information
+ W_ why_blocked;
+ why_blocked = TO_W_(StgTSO_why_blocked(tso));
// actually perform the takeMVar
W_ stack;
@@ -1458,6 +1461,15 @@ loop:
ccall tryWakeupThread(MyCapability() "ptr", tso);
+ // If it was an atomicReadMVar, then we can still do work,
+ // so loop back. (XXX: This could take a while)
+ if (why_blocked == BlockedOnMVarRead) {
+ q = StgMVarTSOQueue_link(q);
+ goto loop;
+ }
+
+ ASSERT(why_blocked == BlockedOnMVar);
+
unlockClosure(mvar, info);
return ();
}
@@ -1512,8 +1524,11 @@ loop:
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
- ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
ASSERT(StgTSO_block_info(tso) == mvar);
+ // save why_blocked here, because waking up the thread destroys
+ // this information
+ W_ why_blocked;
+ why_blocked = TO_W_(StgTSO_why_blocked(tso));
// actually perform the takeMVar
W_ stack;
@@ -1529,10 +1544,68 @@ loop:
ccall tryWakeupThread(MyCapability() "ptr", tso);
+ // If it was an atomicReadMVar, then we can still do work,
+ // so loop back. (XXX: This could take a while)
+ if (why_blocked == BlockedOnMVarRead) {
+ q = StgMVarTSOQueue_link(q);
+ goto loop;
+ }
+
+ ASSERT(why_blocked == BlockedOnMVar);
+
unlockClosure(mvar, info);
return (1);
}
+stg_atomicReadMVarzh ( P_ mvar, /* :: MVar a */ )
+{
+ W_ val, info, tso, q;
+
+#if defined(THREADED_RTS)
+ ("ptr" info) = ccall lockClosure(mvar "ptr");
+#else
+ info = GET_INFO(mvar);
+#endif
+
+ if (info == stg_MVAR_CLEAN_info) {
+ ccall dirty_MVAR(BaseReg "ptr", mvar "ptr");
+ }
+
+ /* If the MVar is empty, put ourselves on the blocked readers
+ * list and wait until we're woken up.
+ */
+ if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {
+
+ ALLOC_PRIM_WITH_CUSTOM_FAILURE
+ (SIZEOF_StgMVarTSOQueue,
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ GC_PRIM_P(stg_atomicReadMVarzh, mvar));
+
+ q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
+
+ // readMVars are pushed to the front of the queue, so
+ // they get handled immediately
+ SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+ StgMVarTSOQueue_link(q) = StgMVar_head(mvar);
+ StgMVarTSOQueue_tso(q) = CurrentTSO;
+
+ StgTSO__link(CurrentTSO) = q;
+ StgTSO_block_info(CurrentTSO) = mvar;
+ StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
+ StgMVar_head(mvar) = q;
+
+ if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(mvar) = q;
+ }
+
+ jump stg_block_atomicreadmvar(mvar);
+ }
+
+ val = StgMVar_value(mvar);
+
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ return (val);
+}
/* -----------------------------------------------------------------------------
Stable pointer primitives
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index 11f518a87d..edc4a91193 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -294,6 +294,7 @@ check_target:
}
case BlockedOnMVar:
+ case BlockedOnMVarRead:
{
/*
To establish ownership of this TSO, we need to acquire a
@@ -318,7 +319,7 @@ check_target:
// we have the MVar, let's check whether the thread
// is still blocked on the same MVar.
- if (target->why_blocked != BlockedOnMVar
+ if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
|| (StgMVar *)target->block_info.closure != mvar) {
unlockClosure((StgClosure *)mvar, info);
goto retry;
@@ -637,6 +638,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
goto done;
case BlockedOnMVar:
+ case BlockedOnMVarRead:
removeFromMVarBlockedQueue(tso);
goto done;
diff --git a/rts/RaiseAsync.h b/rts/RaiseAsync.h
index 336ab30e33..d804f6bc64 100644
--- a/rts/RaiseAsync.h
+++ b/rts/RaiseAsync.h
@@ -49,6 +49,7 @@ interruptible(StgTSO *t)
{
switch (t->why_blocked) {
case BlockedOnMVar:
+ case BlockedOnMVarRead:
case BlockedOnMsgThrowTo:
case BlockedOnRead:
case BlockedOnWrite:
diff --git a/rts/RetainerProfile.c b/rts/RetainerProfile.c
index 77dc77c65a..dc21149d98 100644
--- a/rts/RetainerProfile.c
+++ b/rts/RetainerProfile.c
@@ -1672,6 +1672,7 @@ inner_loop:
retainClosure(tso->bq, c, c_child_r);
retainClosure(tso->trec, c, c_child_r);
if ( tso->why_blocked == BlockedOnMVar
+ || tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
) {
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 88bfd8c4d3..408146f195 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -947,6 +947,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
case BlockedOnBlackHole:
case BlockedOnMsgThrowTo:
case BlockedOnMVar:
+ case BlockedOnMVarRead:
throwToSingleThreaded(cap, task->incall->tso,
(StgClosure *)nonTermination_closure);
return;
@@ -2843,6 +2844,7 @@ resurrectThreads (StgTSO *threads)
switch (tso->why_blocked) {
case BlockedOnMVar:
+ case BlockedOnMVarRead:
/* Called by GC - sched_mutex lock is currently held. */
throwToSingleThreaded(cap, tso,
(StgClosure *)blockedIndefinitelyOnMVar_closure);
diff --git a/rts/Threads.c b/rts/Threads.c
index 4c990f118c..f2b800512e 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -255,6 +255,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
switch (tso->why_blocked)
{
case BlockedOnMVar:
+ case BlockedOnMVarRead:
{
if (tso->_link == END_TSO_QUEUE) {
tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
@@ -734,6 +735,9 @@ printThreadBlockage(StgTSO *tso)
case BlockedOnMVar:
debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
+ case BlockedOnMVarRead:
+ debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure);
+ break;
case BlockedOnBlackHole:
debugBelch("is blocked on a black hole %p",
((StgBlockingQueue*)tso->block_info.bh->bh));
diff --git a/rts/Trace.c b/rts/Trace.c
index 78dfead450..21901891cb 100644
--- a/rts/Trace.c
+++ b/rts/Trace.c
@@ -179,6 +179,7 @@ static char *thread_stop_reasons[] = {
[ThreadFinished] = "finished",
[THREAD_SUSPENDED_FOREIGN_CALL] = "suspended while making a foreign call",
[6 + BlockedOnMVar] = "blocked on an MVar",
+ [6 + BlockedOnMVarRead] = "blocked on an atomic MVar read",
[6 + BlockedOnBlackHole] = "blocked on a black hole",
[6 + BlockedOnRead] = "blocked on a read operation",
[6 + BlockedOnWrite] = "blocked on a write operation",
diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c
index 9c98dc9895..247f1a01c6 100644
--- a/rts/sm/Compact.c
+++ b/rts/sm/Compact.c
@@ -442,6 +442,7 @@ thread_TSO (StgTSO *tso)
thread_(&tso->global_link);
if ( tso->why_blocked == BlockedOnMVar
+ || tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c
index f0e1659e12..9b579abbbc 100644
--- a/rts/sm/Sanity.c
+++ b/rts/sm/Sanity.c
@@ -519,6 +519,7 @@ checkTSO(StgTSO *tso)
info == &stg_WHITEHOLE_info); // happens due to STM doing lockTSO()
if ( tso->why_blocked == BlockedOnMVar
+ || tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
index 6137f6d862..e0cc688b95 100644
--- a/rts/sm/Scav.c
+++ b/rts/sm/Scav.c
@@ -71,6 +71,7 @@ scavengeTSO (StgTSO *tso)
evacuate((StgClosure **)&tso->_link);
if ( tso->why_blocked == BlockedOnMVar
+ || tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked