summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--includes/mkDerivedConstants.c4
-rw-r--r--includes/rts/Constants.h8
-rw-r--r--includes/rts/storage/Block.h3
-rw-r--r--includes/rts/storage/Closures.h14
-rw-r--r--includes/rts/storage/TSO.h1
-rw-r--r--includes/stg/MiscClosures.h9
-rw-r--r--rts/Capability.h8
-rw-r--r--rts/HeapStackCheck.cmm10
-rw-r--r--rts/Inlines.c1
-rw-r--r--rts/Messages.c19
-rw-r--r--rts/PrimOps.cmm350
-rw-r--r--rts/RaiseAsync.c141
-rw-r--r--rts/Schedule.c39
-rw-r--r--rts/StgMiscClosures.cmm9
-rw-r--r--rts/Threads.c173
-rw-r--r--rts/Threads.h3
-rw-r--r--rts/sm/Compact.c4
-rw-r--r--rts/sm/Scav.c3
18 files changed, 362 insertions, 437 deletions
diff --git a/includes/mkDerivedConstants.c b/includes/mkDerivedConstants.c
index 92685cae3a..7efcf47d48 100644
--- a/includes/mkDerivedConstants.c
+++ b/includes/mkDerivedConstants.c
@@ -372,6 +372,10 @@ main(int argc, char *argv[])
closure_field(StgMVar,tail);
closure_field(StgMVar,value);
+ closure_size(StgMVarTSOQueue);
+ closure_field(StgMVarTSOQueue, link);
+ closure_field(StgMVarTSOQueue, tso);
+
closure_size(StgBCO);
closure_field(StgBCO, instrs);
closure_field(StgBCO, literals);
diff --git a/includes/rts/Constants.h b/includes/rts/Constants.h
index bfc77fa361..354abbbdd9 100644
--- a/includes/rts/Constants.h
+++ b/includes/rts/Constants.h
@@ -227,8 +227,12 @@
/* same as above but don't unblock async exceptions in resumeThread() */
/* Involved in a message sent to tso->msg_cap */
-#define BlockedOnMsgWakeup 12
-#define BlockedOnMsgThrowTo 13
+#define BlockedOnMsgThrowTo 12
+
+/* The thread is not on any run queues, but can be woken up
+ by tryWakeupThread() */
+#define ThreadMigrating 13
+
/*
* These constants are returned to the scheduler by a thread that has
* stopped for one reason or another. See typedef StgThreadReturnCode
diff --git a/includes/rts/storage/Block.h b/includes/rts/storage/Block.h
index 3114bea014..d6a4d4c487 100644
--- a/includes/rts/storage/Block.h
+++ b/includes/rts/storage/Block.h
@@ -109,7 +109,8 @@ typedef struct bdescr_ {
#else
-INLINE_HEADER bdescr *Bdescr(StgPtr p)
+EXTERN_INLINE bdescr *Bdescr(StgPtr p);
+EXTERN_INLINE bdescr *Bdescr(StgPtr p)
{
return (bdescr *)
((((W_)p & MBLOCK_MASK & ~BLOCK_MASK) >> (BLOCK_SHIFT-BDESCR_SHIFT))
diff --git a/includes/rts/storage/Closures.h b/includes/rts/storage/Closures.h
index 802746868c..a0ff738aa7 100644
--- a/includes/rts/storage/Closures.h
+++ b/includes/rts/storage/Closures.h
@@ -305,11 +305,17 @@ typedef struct {
/* Concurrent communication objects */
+typedef struct StgMVarTSOQueue_ {
+ StgHeader header;
+ struct StgMVarTSOQueue_ *link;
+ struct StgTSO_ *tso;
+} StgMVarTSOQueue;
+
typedef struct {
- StgHeader header;
- struct StgTSO_ *head;
- struct StgTSO_ *tail;
- StgClosure* value;
+ StgHeader header;
+ struct StgMVarTSOQueue_ *head;
+ struct StgMVarTSOQueue_ *tail;
+ StgClosure* value;
} StgMVar;
diff --git a/includes/rts/storage/TSO.h b/includes/rts/storage/TSO.h
index abe621564d..0e9883f1a6 100644
--- a/includes/rts/storage/TSO.h
+++ b/includes/rts/storage/TSO.h
@@ -82,7 +82,6 @@ typedef struct StgTSO_ {
/*
Currently used for linking TSOs on:
* cap->run_queue_{hd,tl}
- * MVAR queue
* (non-THREADED_RTS); the blocked_queue
* and pointing to the relocated version of a ThreadRelocated
diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h
index 4abb67dc55..0aa6f99675 100644
--- a/includes/stg/MiscClosures.h
+++ b/includes/stg/MiscClosures.h
@@ -42,10 +42,10 @@
# define RTS_FUN(f) RTS_FUN_INFO(f##_info)
# define RTS_THUNK(f) RTS_THUNK_INFO(f##_info)
#else
-# define RTS_RET(f) RTS_INFO(f##_info) RTS_FUN_DECL(f##_ret)
-# define RTS_ENTRY(f) RTS_INFO(f##_info) RTS_FUN_DECL(f##_entry)
-# define RTS_FUN(f) RTS_FUN_INFO(f##_info) RTS_FUN_DECL(f##_entry)
-# define RTS_THUNK(f) RTS_THUNK_INFO(f##_info) RTS_FUN_DECL(f##_entry)
+# define RTS_RET(f) RTS_INFO(f##_info); RTS_FUN_DECL(f##_ret)
+# define RTS_ENTRY(f) RTS_INFO(f##_info); RTS_FUN_DECL(f##_entry)
+# define RTS_FUN(f) RTS_FUN_INFO(f##_info); RTS_FUN_DECL(f##_entry)
+# define RTS_THUNK(f) RTS_THUNK_INFO(f##_info); RTS_FUN_DECL(f##_entry)
#endif
/* Stack frames */
@@ -109,7 +109,6 @@ RTS_ENTRY(stg_MUT_ARR_PTRS_FROZEN0);
RTS_ENTRY(stg_MUT_VAR_CLEAN);
RTS_ENTRY(stg_MUT_VAR_DIRTY);
RTS_ENTRY(stg_END_TSO_QUEUE);
-RTS_ENTRY(stg_MSG_WAKEUP);
RTS_ENTRY(stg_MSG_TRY_WAKEUP);
RTS_ENTRY(stg_MSG_THROWTO);
RTS_ENTRY(stg_MSG_BLACKHOLE);
diff --git a/rts/Capability.h b/rts/Capability.h
index e12b8ce6e9..d8eba0d733 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -199,9 +199,9 @@ extern volatile StgWord waiting_for_gc;
//
void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
-INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
+EXTERN_INLINE void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
-INLINE_HEADER void recordClosureMutated (Capability *cap, StgClosure *p);
+EXTERN_INLINE void recordClosureMutated (Capability *cap, StgClosure *p);
#if defined(THREADED_RTS)
@@ -291,7 +291,7 @@ INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
* INLINE functions... private below here
* -------------------------------------------------------------------------- */
-INLINE_HEADER void
+EXTERN_INLINE void
recordMutableCap (StgClosure *p, Capability *cap, nat gen)
{
bdescr *bd;
@@ -310,7 +310,7 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
*bd->free++ = (StgWord)p;
}
-INLINE_HEADER void
+EXTERN_INLINE void
recordClosureMutated (Capability *cap, StgClosure *p)
{
bdescr *bd;
diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm
index f8bccc091d..f8195768fb 100644
--- a/rts/HeapStackCheck.cmm
+++ b/rts/HeapStackCheck.cmm
@@ -481,9 +481,13 @@ INFO_TABLE_RET( stg_gc_gen, RET_DYN )
stg_gc_gen
{
+ // Hack; see Note [mvar-heap-check] in PrimOps.cmm
+ if (R10 == stg_putMVarzh || R10 == stg_takeMVarzh) {
+ unlockClosure(R1, stg_MVAR_DIRTY_info)
+ }
SAVE_EVERYTHING;
GC_GENERIC
-}
+}
// A heap check at an unboxed tuple return point. The return address
// is on the stack, and we can find it by using the offsets given
@@ -583,11 +587,7 @@ INFO_TABLE_RET( stg_block_takemvar, RET_SMALL, P_ unused )
// code fragment executed just before we return to the scheduler
stg_block_takemvar_finally
{
-#ifdef THREADED_RTS
unlockClosure(R3, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(R3, stg_MVAR_DIRTY_info);
-#endif
jump StgReturn;
}
diff --git a/rts/Inlines.c b/rts/Inlines.c
index fa521e4217..ccb30bf76d 100644
--- a/rts/Inlines.c
+++ b/rts/Inlines.c
@@ -5,3 +5,4 @@
#include "PosixSource.h"
#include "Rts.h"
#include "Schedule.h"
+#include "Capability.h"
diff --git a/rts/Messages.c b/rts/Messages.c
index ae5d5d1abc..5a1e5bd3c4 100644
--- a/rts/Messages.c
+++ b/rts/Messages.c
@@ -28,8 +28,7 @@ void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
#ifdef DEBUG
{
const StgInfoTable *i = msg->header.info;
- if (i != &stg_MSG_WAKEUP_info &&
- i != &stg_MSG_THROWTO_info &&
+ if (i != &stg_MSG_THROWTO_info &&
i != &stg_MSG_BLACKHOLE_info &&
i != &stg_MSG_TRY_WAKEUP_info &&
i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
@@ -71,21 +70,7 @@ executeMessage (Capability *cap, Message *m)
loop:
write_barrier(); // allow m->header to be modified by another thread
i = m->header.info;
- if (i == &stg_MSG_WAKEUP_info)
- {
- // the plan is to eventually get rid of these and use
- // TRY_WAKEUP instead.
- MessageWakeup *w = (MessageWakeup *)m;
- StgTSO *tso = w->tso;
- debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
- (lnat)tso->id);
- ASSERT(tso->cap == cap);
- ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
- ASSERT(tso->block_info.closure == (StgClosure *)m);
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap, tso);
- }
- else if (i == &stg_MSG_TRY_WAKEUP_info)
+ if (i == &stg_MSG_TRY_WAKEUP_info)
{
StgTSO *tso = ((MessageWakeup *)m)->tso;
debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index 5c575f695b..03eb490b6b 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -1140,7 +1140,7 @@ stg_newMVarzh
stg_takeMVarzh
{
- W_ mvar, val, info, tso;
+ W_ mvar, val, info, tso, q;
/* args: R1 = MVar closure */
mvar = R1;
@@ -1159,72 +1159,85 @@ stg_takeMVarzh
* and wait until we're woken up.
*/
if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {
+
+ // Note [mvar-heap-check] We want to do the heap check in the
+ // branch here, to avoid the conditional in the common case.
+ // However, we've already locked the MVar above, so we better
+ // be careful to unlock it again if the the heap check fails.
+ // Unfortunately we don't have an easy way to inject any code
+ // into the heap check generated by the code generator, so we
+ // have to do it in stg_gc_gen (see HeapStackCheck.cmm).
+ HP_CHK_GEN_TICKY(SIZEOF_StgMVarTSOQueue, R1_PTR, stg_takeMVarzh);
+
+ q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
+
+ StgHeader_info(q) = stg_MVAR_TSO_QUEUE_info;
+ StgMVarTSOQueue_link(q) = END_TSO_QUEUE;
+ StgMVarTSOQueue_tso(q) = CurrentTSO;
+
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_head(mvar) = CurrentTSO;
+ StgMVar_head(mvar) = q;
} else {
- foreign "C" setTSOLink(MyCapability() "ptr",
- StgMVar_tail(mvar) "ptr",
- CurrentTSO) [];
+ StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
+ foreign "C" recordClosureMutated(MyCapability() "ptr",
+ StgMVar_tail(mvar)) [];
}
- StgTSO__link(CurrentTSO) = stg_END_TSO_QUEUE_closure;
+ StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
- // write barrier for throwTo(), which looks at block_info
- // if why_blocked==BlockedOnMVar.
- prim %write_barrier() [];
StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
- StgMVar_tail(mvar) = CurrentTSO;
+ StgMVar_tail(mvar) = q;
R1 = mvar;
jump stg_block_takemvar;
- }
-
- /* we got the value... */
- val = StgMVar_value(mvar);
-
- if (StgMVar_head(mvar) != stg_END_TSO_QUEUE_closure)
- {
- /* There are putMVar(s) waiting...
- * wake up the first thread on the queue
- */
- ASSERT(StgTSO_why_blocked(StgMVar_head(mvar)) == BlockedOnMVar::I16);
-
- /* actually perform the putMVar for the thread that we just woke up */
- tso = StgMVar_head(mvar);
- PerformPut(tso,StgMVar_value(mvar));
-
- if (TO_W_(StgTSO_dirty(tso)) == 0) {
- foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
- }
-
- ("ptr" tso) = foreign "C" unblockOne_(MyCapability() "ptr",
- StgMVar_head(mvar) "ptr", 1) [];
- StgMVar_head(mvar) = tso;
-
- if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
- }
-
- unlockClosure(mvar, stg_MVAR_DIRTY_info);
- RET_P(val);
- }
- else
- {
- /* No further putMVars, MVar is now empty */
- StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
-
- unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ }
+
+ /* we got the value... */
+ val = StgMVar_value(mvar);
+
+ q = StgMVar_head(mvar);
+loop:
+ if (q == stg_END_TSO_QUEUE_closure) {
+ /* No further putMVars, MVar is now empty */
+ StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ RET_P(val);
+ }
+ if (StgHeader_info(q) == stg_IND_info ||
+ StgHeader_info(q) == stg_MSG_NULL_info) {
+ q = StgInd_indirectee(q);
+ goto loop;
+ }
+
+ // There are putMVar(s) waiting... wake up the first thread on the queue
+
+ tso = StgMVarTSOQueue_tso(q);
+ ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
+ ASSERT(StgTSO_block_info(tso) == mvar);
+ // actually perform the putMVar for the thread that we just woke up
+ PerformPut(tso,StgMVar_value(mvar));
+
+ StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
+ if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
+ }
+
+ // indicate that the putMVar has now completed:
+ StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
+
+ // no need to mark the TSO dirty, we have only written END_TSO_QUEUE.
- RET_P(val);
- }
+ foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
+
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ RET_P(val);
}
stg_tryTakeMVarzh
{
- W_ mvar, val, info, tso;
+ W_ mvar, val, info, tso, q;
/* args: R1 = MVar closure */
-
mvar = R1;
#if defined(THREADED_RTS)
@@ -1232,7 +1245,10 @@ stg_tryTakeMVarzh
#else
info = GET_INFO(mvar);
#endif
-
+
+ /* If the MVar is empty, put ourselves on its blocking queue,
+ * and wait until we're woken up.
+ */
if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {
#if defined(THREADED_RTS)
unlockClosure(mvar, info);
@@ -1242,51 +1258,56 @@ stg_tryTakeMVarzh
*/
RET_NP(0, stg_NO_FINALIZER_closure);
}
-
+
if (info == stg_MVAR_CLEAN_info) {
- foreign "C" dirty_MVAR(BaseReg "ptr", mvar "ptr");
+ foreign "C" dirty_MVAR(BaseReg "ptr", mvar "ptr") [];
}
/* we got the value... */
val = StgMVar_value(mvar);
-
- if (StgMVar_head(mvar) != stg_END_TSO_QUEUE_closure) {
-
- /* There are putMVar(s) waiting...
- * wake up the first thread on the queue
- */
- ASSERT(StgTSO_why_blocked(StgMVar_head(mvar)) == BlockedOnMVar::I16);
-
- /* actually perform the putMVar for the thread that we just woke up */
- tso = StgMVar_head(mvar);
- PerformPut(tso,StgMVar_value(mvar));
- if (TO_W_(StgTSO_dirty(tso)) == 0) {
- foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
- }
-
- ("ptr" tso) = foreign "C" unblockOne_(MyCapability() "ptr",
- StgMVar_head(mvar) "ptr", 1) [];
- StgMVar_head(mvar) = tso;
-
- if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
- }
+
+ q = StgMVar_head(mvar);
+loop:
+ if (q == stg_END_TSO_QUEUE_closure) {
+ /* No further putMVars, MVar is now empty */
+ StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ RET_NP(1, val);
}
- else
- {
- /* No further putMVars, MVar is now empty */
- StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
- unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ if (StgHeader_info(q) == stg_IND_info ||
+ StgHeader_info(q) == stg_MSG_NULL_info) {
+ q = StgInd_indirectee(q);
+ goto loop;
}
- RET_NP(1, val);
+ // There are putMVar(s) waiting... wake up the first thread on the queue
+
+ tso = StgMVarTSOQueue_tso(q);
+ ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
+ ASSERT(StgTSO_block_info(tso) == mvar);
+ // actually perform the putMVar for the thread that we just woke up
+ PerformPut(tso,StgMVar_value(mvar));
+
+ StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
+ if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
+ }
+
+ // indicate that the putMVar has now completed:
+ StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
+
+ // no need to mark the TSO dirty, we have only written END_TSO_QUEUE.
+
+ foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
+
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ RET_P(val);
}
stg_putMVarzh
{
- W_ mvar, val, info, tso;
+ W_ mvar, val, info, tso, q;
/* args: R1 = MVar, R2 = value */
mvar = R1;
@@ -1303,76 +1324,92 @@ stg_putMVarzh
}
if (StgMVar_value(mvar) != stg_END_TSO_QUEUE_closure) {
+
+ // see Note [mvar-heap-check] above
+ HP_CHK_GEN_TICKY(SIZEOF_StgMVarTSOQueue, R1_PTR, stg_putMVarzh);
+
+ q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
+
+ StgHeader_info(q) = stg_MVAR_TSO_QUEUE_info;
+ StgMVarTSOQueue_link(q) = END_TSO_QUEUE;
+ StgMVarTSOQueue_tso(q) = CurrentTSO;
+
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_head(mvar) = CurrentTSO;
+ StgMVar_head(mvar) = q;
} else {
- foreign "C" setTSOLink(MyCapability() "ptr",
- StgMVar_tail(mvar) "ptr",
- CurrentTSO) [];
+ StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
+ foreign "C" recordClosureMutated(MyCapability() "ptr",
+ StgMVar_tail(mvar)) [];
}
- StgTSO__link(CurrentTSO) = stg_END_TSO_QUEUE_closure;
+ StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
- // write barrier for throwTo(), which looks at block_info
- // if why_blocked==BlockedOnMVar.
- prim %write_barrier() [];
StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
- StgMVar_tail(mvar) = CurrentTSO;
-
+ StgMVar_tail(mvar) = q;
+
R1 = mvar;
R2 = val;
jump stg_block_putmvar;
}
- if (StgMVar_head(mvar) != stg_END_TSO_QUEUE_closure) {
-
- /* There are takeMVar(s) waiting: wake up the first one
- */
- ASSERT(StgTSO_why_blocked(StgMVar_head(mvar)) == BlockedOnMVar::I16);
-
- /* actually perform the takeMVar */
- tso = StgMVar_head(mvar);
- PerformTake(tso, val);
- if (TO_W_(StgTSO_dirty(tso)) == 0) {
- foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
- }
-
- ("ptr" tso) = foreign "C" unblockOne_(MyCapability() "ptr",
- StgMVar_head(mvar) "ptr", 1) [];
- StgMVar_head(mvar) = tso;
-
- if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
- }
-
- unlockClosure(mvar, stg_MVAR_DIRTY_info);
- jump %ENTRY_CODE(Sp(0));
- }
- else
- {
+ q = StgMVar_head(mvar);
+loop:
+ if (q == stg_END_TSO_QUEUE_closure) {
/* No further takes, the MVar is now full. */
StgMVar_value(mvar) = val;
-
unlockClosure(mvar, stg_MVAR_DIRTY_info);
jump %ENTRY_CODE(Sp(0));
}
+ if (StgHeader_info(q) == stg_IND_info ||
+ StgHeader_info(q) == stg_MSG_NULL_info) {
+ q = StgInd_indirectee(q);
+ goto loop;
+ }
+
+ // There are takeMVar(s) waiting: wake up the first one
+
+ tso = StgMVarTSOQueue_tso(q);
+ ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
+ ASSERT(StgTSO_block_info(tso) == mvar);
+ // actually perform the takeMVar
+ PerformTake(tso, val);
+
+ if (TO_W_(StgTSO_dirty(tso)) == 0) {
+ foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
+ }
+
+ StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
+ if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
+ }
+
+ // indicate that the takeMVar has now completed:
+ StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
- /* ToDo: yield afterward for better communication performance? */
+ foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
+
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ jump %ENTRY_CODE(Sp(0));
}
stg_tryPutMVarzh
{
- W_ mvar, info, tso;
+ W_ mvar, val, info, tso, q;
/* args: R1 = MVar, R2 = value */
mvar = R1;
+ val = R2;
#if defined(THREADED_RTS)
- ("ptr" info) = foreign "C" lockClosure(mvar "ptr") [R2];
+ ("ptr" info) = foreign "C" lockClosure(mvar "ptr") [];
#else
info = GET_INFO(mvar);
#endif
+ if (info == stg_MVAR_CLEAN_info) {
+ foreign "C" dirty_MVAR(BaseReg "ptr", mvar "ptr");
+ }
+
if (StgMVar_value(mvar) != stg_END_TSO_QUEUE_closure) {
#if defined(THREADED_RTS)
unlockClosure(mvar, info);
@@ -1380,43 +1417,46 @@ stg_tryPutMVarzh
RET_N(0);
}
- if (info == stg_MVAR_CLEAN_info) {
- foreign "C" dirty_MVAR(BaseReg "ptr", mvar "ptr");
- }
-
- if (StgMVar_head(mvar) != stg_END_TSO_QUEUE_closure) {
-
- /* There are takeMVar(s) waiting: wake up the first one
- */
- ASSERT(StgTSO_why_blocked(StgMVar_head(mvar)) == BlockedOnMVar::I16);
-
- /* actually perform the takeMVar */
- tso = StgMVar_head(mvar);
- PerformTake(tso, R2);
- if (TO_W_(StgTSO_dirty(tso)) == 0) {
- foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
- }
-
- ("ptr" tso) = foreign "C" unblockOne_(MyCapability() "ptr",
- StgMVar_head(mvar) "ptr", 1) [];
- StgMVar_head(mvar) = tso;
-
- if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
- }
-
+ q = StgMVar_head(mvar);
+loop:
+ if (q == stg_END_TSO_QUEUE_closure) {
+ /* No further takes, the MVar is now full. */
+ StgMVar_value(mvar) = val;
unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ jump %ENTRY_CODE(Sp(0));
+ }
+ if (StgHeader_info(q) == stg_IND_info ||
+ StgHeader_info(q) == stg_MSG_NULL_info) {
+ q = StgInd_indirectee(q);
+ goto loop;
}
- else
- {
- /* No further takes, the MVar is now full. */
- StgMVar_value(mvar) = R2;
- unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ /* There are takeMVar(s) waiting: wake up the first one
+ */
+ // There are takeMVar(s) waiting: wake up the first one
+
+ tso = StgMVarTSOQueue_tso(q);
+ ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
+ ASSERT(StgTSO_block_info(tso) == mvar);
+ // actually perform the takeMVar
+ PerformTake(tso, val);
+
+ if (TO_W_(StgTSO_dirty(tso)) == 0) {
+ foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
}
- RET_N(1);
- /* ToDo: yield afterward for better communication performance? */
+ StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
+ if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
+ }
+
+ // indicate that the takeMVar has now completed:
+ StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
+
+ foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
+
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ jump %ENTRY_CODE(Sp(0));
}
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index f974f8c0d7..bebbcd4722 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -31,6 +31,8 @@ static void raiseAsync (Capability *cap,
static void removeFromQueues(Capability *cap, StgTSO *tso);
+static void removeFromMVarBlockedQueue (StgTSO *tso);
+
static void blockedThrowTo (Capability *cap,
StgTSO *target, MessageThrowTo *msg);
@@ -124,7 +126,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
Currently we send a message if the target belongs to another
Capability, and it is
- - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo,
+ - NotBlocked, BlockedOnMsgThrowTo,
BlockedOnCCall
- or it is masking exceptions (TSO_BLOCKEX)
@@ -221,67 +223,7 @@ check_target:
switch (status) {
case NotBlocked:
- case BlockedOnMsgWakeup:
- /* if status==NotBlocked, and target->cap == cap, then
- we own this TSO and can raise the exception.
-
- How do we establish this condition? Very carefully.
-
- Let
- P = (status == NotBlocked)
- Q = (tso->cap == cap)
-
- Now, if P & Q are true, then the TSO is locked and owned by
- this capability. No other OS thread can steal it.
-
- If P==0 and Q==1: the TSO is blocked, but attached to this
- capabilty, and it can be stolen by another capability.
-
- If P==1 and Q==0: the TSO is runnable on another
- capability. At any time, the TSO may change from runnable
- to blocked and vice versa, while it remains owned by
- another capability.
-
- Suppose we test like this:
-
- p = P
- q = Q
- if (p && q) ...
-
- this is defeated by another capability stealing a blocked
- TSO from us to wake it up (Schedule.c:unblockOne()). The
- other thread is doing
-
- Q = 0
- P = 1
-
- assuming arbitrary reordering, we could see this
- interleaving:
-
- start: P==0 && Q==1
- P = 1
- p = P
- q = Q
- Q = 0
- if (p && q) ...
-
- so we need a memory barrier:
-
- p = P
- mb()
- q = Q
- if (p && q) ...
-
- this avoids the problematic case. There are other cases
- to consider, but this is the tricky one.
-
- Note that we must be sure that unblockOne() does the
- writes in the correct order: Q before P. The memory
- barrier ensures that if we have seen the write to P, we
- have also seen the write to Q.
- */
{
- write_barrier();
if ((target->flags & TSO_BLOCKEX) == 0) {
// It's on our run queue and not blocking exceptions
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
@@ -389,18 +331,26 @@ check_target:
goto retry;
}
+ if (target->_link == END_TSO_QUEUE) {
+ // the MVar operation has already completed. There is a
+ // MSG_TRY_WAKEUP on the way, but we can just wake up the
+ // thread now anyway and ignore the message when it
+ // arrives.
+ unlockClosure((StgClosure *)mvar, info);
+ tryWakeupThread(cap, target);
+ goto retry;
+ }
+
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
blockedThrowTo(cap,target,msg);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_BLOCKED;
} else {
- removeThreadFromMVarQueue(cap, mvar, target);
+ // revoke the MVar operation
+ removeFromMVarBlockedQueue(target);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- if (info == &stg_MVAR_CLEAN_info) {
- dirty_MVAR(&cap->r,(StgClosure*)mvar);
- }
- unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info);
+ unlockClosure((StgClosure *)mvar, info);
return THROWTO_SUCCESS;
}
}
@@ -588,11 +538,54 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
-------------------------------------------------------------------------- */
static void
+removeFromMVarBlockedQueue (StgTSO *tso)
+{
+ StgMVar *mvar = (StgMVar*)tso->block_info.closure;
+ StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
+
+ if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
+ // already removed from this MVar
+ return;
+ }
+
+ // Assume the MVar is locked. (not assertable; sometimes it isn't
+ // actually WHITEHOLE'd).
+
+ // We want to remove the MVAR_TSO_QUEUE object from the queue. It
+ // isn't doubly-linked so we can't actually remove it; instead we
+ // just overwrite it with an IND if possible and let the GC short
+ // it out. However, we have to be careful to maintain the deque
+ // structure:
+
+ if (mvar->head == q) {
+ mvar->head = q->link;
+ q->header.info = &stg_IND_info;
+ if (mvar->tail == q) {
+ mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
+ }
+ }
+ else if (mvar->tail == q) {
+ // we can't replace it with an IND in this case, because then
+ // we lose the tail pointer when the GC shorts out the IND.
+ // So we use MSG_NULL as a kind of non-dupable indirection;
+ // these are ignored by takeMVar/putMVar.
+ q->header.info = &stg_MSG_NULL_info;
+ }
+ else {
+ q->header.info = &stg_IND_info;
+ }
+
+ // revoke the MVar operation
+ tso->_link = END_TSO_QUEUE;
+}
+
+static void
removeFromQueues(Capability *cap, StgTSO *tso)
{
switch (tso->why_blocked) {
case NotBlocked:
+ case ThreadMigrating:
return;
case BlockedOnSTM:
@@ -605,22 +598,13 @@ removeFromQueues(Capability *cap, StgTSO *tso)
goto done;
case BlockedOnMVar:
- removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
- // we aren't doing a write barrier here: the MVar is supposed to
- // be already locked, so replacing the info pointer would unlock it.
+ removeFromMVarBlockedQueue(tso);
goto done;
case BlockedOnBlackHole:
// nothing to do
goto done;
- case BlockedOnMsgWakeup:
- {
- // kill the message, atomically:
- OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info);
- break;
- }
-
case BlockedOnMsgThrowTo:
{
MessageThrowTo *m = tso->block_info.throwto;
@@ -659,7 +643,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
}
done:
- unblockOne(cap, tso);
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap, tso);
}
/* -----------------------------------------------------------------------------
@@ -733,7 +718,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
ASSERT(tso->cap == cap);
// wake it up
- if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) {
+ if (tso->why_blocked != NotBlocked) {
tso->why_blocked = NotBlocked;
appendToRunQueue(cap,tso);
}
diff --git a/rts/Schedule.c b/rts/Schedule.c
index d7d57411b7..f7b26a4876 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -125,7 +125,7 @@ static Capability *schedule (Capability *initialCapability, Task *task);
static void schedulePreLoop (void);
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
-static void scheduleYield (Capability **pcap, Task *task, rtsBool);
+static void scheduleYield (Capability **pcap, Task *task);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
@@ -204,7 +204,6 @@ schedule (Capability *initialCapability, Task *task)
rtsBool ready_to_gc;
#if defined(THREADED_RTS)
rtsBool first = rtsTrue;
- rtsBool force_yield = rtsFalse;
#endif
cap = initialCapability;
@@ -328,9 +327,7 @@ schedule (Capability *initialCapability, Task *task)
// ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
}
- yield:
- scheduleYield(&cap,task,force_yield);
- force_yield = rtsFalse;
+ scheduleYield(&cap,task);
if (emptyRunQueue(cap)) continue; // look for work again
#endif
@@ -490,19 +487,6 @@ run_thread:
traceEventStopThread(cap, t, ret);
-#if defined(THREADED_RTS)
- // If ret is ThreadBlocked, and this Task is bound to the TSO that
- // blocked, we are in limbo - the TSO is now owned by whatever it
- // is blocked on, and may in fact already have been woken up,
- // perhaps even on a different Capability. It may be the case
- // that task->cap != cap. We better yield this Capability
- // immediately and return to normaility.
- if (ret == ThreadBlocked) {
- force_yield = rtsTrue;
- goto yield;
- }
-#endif
-
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
@@ -639,23 +623,13 @@ shouldYieldCapability (Capability *cap, Task *task)
// and also check the benchmarks in nofib/parallel for regressions.
static void
-scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
+scheduleYield (Capability **pcap, Task *task)
{
Capability *cap = *pcap;
// if we have work, and we don't need to give up the Capability, continue.
//
- // The force_yield flag is used when a bound thread blocks. This
- // is a particularly tricky situation: the current Task does not
- // own the TSO any more, since it is on some queue somewhere, and
- // might be woken up or manipulated by another thread at any time.
- // The TSO and Task might be migrated to another Capability.
- // Certain invariants might be in doubt, such as task->bound->cap
- // == cap. We have to yield the current Capability immediately,
- // no messing around.
- //
- if (!force_yield &&
- !shouldYieldCapability(cap,task) &&
+ if (!shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
!emptyInbox(cap) ||
sched_state >= SCHED_INTERRUPTING))
@@ -1891,8 +1865,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
- traceEventMigrateThread (cap, tso, capabilities[cpu].no);
- wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
+ migrateThread(cap, tso, &capabilities[cpu]);
}
#else
appendToRunQueue(cap,tso);
@@ -2372,8 +2345,8 @@ deleteThread_(Capability *cap, StgTSO *tso)
if (tso->why_blocked == BlockedOnCCall ||
tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
- unblockOne(cap,tso);
tso->what_next = ThreadKilled;
+ appendToRunQueue(tso->cap, tso);
} else {
deleteThread(cap,tso);
}
diff --git a/rts/StgMiscClosures.cmm b/rts/StgMiscClosures.cmm
index 7e1e6a73d5..1dec6e68b7 100644
--- a/rts/StgMiscClosures.cmm
+++ b/rts/StgMiscClosures.cmm
@@ -491,8 +491,6 @@ CLOSURE(stg_NO_TREC_closure,stg_NO_TREC);
------------------------------------------------------------------------- */
// PRIM rather than CONSTR, because PRIM objects cannot be duplicated by the GC.
-INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP")
-{ foreign "C" barf("MSG_WAKEUP object entered!") never returns; }
INFO_TABLE_CONSTR(stg_MSG_TRY_WAKEUP,2,0,0,PRIM,"MSG_TRY_WAKEUP","MSG_TRY_WAKEUP")
{ foreign "C" barf("MSG_TRY_WAKEUP object entered!") never returns; }
@@ -573,6 +571,13 @@ INFO_TABLE( stg_dummy_ret, 0, 0, CONSTR_NOCAF_STATIC, "DUMMY_RET", "DUMMY_RET")
CLOSURE(stg_dummy_ret_closure,stg_dummy_ret);
/* ----------------------------------------------------------------------------
+ MVAR_TSO_QUEUE
+ ------------------------------------------------------------------------- */
+
+INFO_TABLE_CONSTR(stg_MVAR_TSO_QUEUE,2,0,0,PRIM,"MVAR_TSO_QUEUE","MVAR_TSO_QUEUE")
+{ foreign "C" barf("MVAR_TSO_QUEUE object entered!") never returns; }
+
+/* ----------------------------------------------------------------------------
CHARLIKE and INTLIKE closures.
These are static representations of Chars and small Ints, so that
diff --git a/rts/Threads.c b/rts/Threads.c
index 05a13c7f3b..e8a835bdae 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -205,83 +205,21 @@ removeThreadFromDeQueue (Capability *cap,
barf("removeThreadFromMVarQueue: not found");
}
-void
-removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
-{
- // caller must do the write barrier, because replacing the info
- // pointer will unlock the MVar.
- removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
- tso->_link = END_TSO_QUEUE;
-}
-
/* ----------------------------------------------------------------------------
- unblockOne()
-
- unblock a single thread.
- ------------------------------------------------------------------------- */
-
-StgTSO *
-unblockOne (Capability *cap, StgTSO *tso)
-{
- return unblockOne_(cap,tso,rtsTrue); // allow migration
-}
-
-StgTSO *
-unblockOne_ (Capability *cap, StgTSO *tso,
- rtsBool allow_migrate USED_IF_THREADS)
-{
- StgTSO *next;
-
- // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
- ASSERT(tso->why_blocked != NotBlocked);
- ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
- tso->block_info.closure->header.info == &stg_IND_info);
-
- next = tso->_link;
- tso->_link = END_TSO_QUEUE;
+ tryWakeupThread()
-#if defined(THREADED_RTS)
- if (tso->cap == cap || (!tsoLocked(tso) &&
- allow_migrate &&
- RtsFlags.ParFlags.wakeupMigrate)) {
- // We are waking up this thread on the current Capability, which
- // might involve migrating it from the Capability it was last on.
- if (tso->bound) {
- ASSERT(tso->bound->task->cap == tso->cap);
- tso->bound->task->cap = cap;
- }
-
- tso->cap = cap;
- write_barrier();
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
-
- // context-switch soonish so we can migrate the new thread if
- // necessary. NB. not contextSwitchCapability(cap), which would
- // force a context switch immediately.
- cap->context_switch = 1;
- } else {
- // we'll try to wake it up on the Capability it was last on.
- wakeupThreadOnCapability(cap, tso->cap, tso);
- }
-#else
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
-
- // context-switch soonish so we can migrate the new thread if
- // necessary. NB. not contextSwitchCapability(cap), which would
- // force a context switch immediately.
- cap->context_switch = 1;
-#endif
-
- traceEventThreadWakeup (cap, tso, tso->cap->no);
+ Attempt to wake up a thread. tryWakeupThread is idempotent: it is
+ always safe to call it too many times, but it is not safe in
+ general to omit a call.
- return next;
-}
+ ------------------------------------------------------------------------- */
void
tryWakeupThread (Capability *cap, StgTSO *tso)
{
+
+ traceEventThreadWakeup (cap, tso, tso->cap->no);
+
#ifdef THREADED_RTS
if (tso->cap != cap)
{
@@ -298,6 +236,16 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
switch (tso->why_blocked)
{
+ case BlockedOnMVar:
+ {
+ if (tso->_link == END_TSO_QUEUE) {
+ tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
+ goto unblock;
+ } else {
+ return;
+ }
+ }
+
case BlockedOnMsgThrowTo:
{
const StgInfoTable *i;
@@ -307,27 +255,45 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
if (i != &stg_MSG_NULL_info) {
debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
(lnat)tso->id, tso->block_info.throwto->header.info);
- break; // still blocked
+ return;
}
// remove the block frame from the stack
ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
tso->sp += 3;
- // fall through...
+ goto unblock;
}
+
case BlockedOnBlackHole:
case BlockedOnSTM:
- {
- // just run the thread now, if the BH is not really available,
- // we'll block again.
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
- break;
- }
+ case ThreadMigrating:
+ goto unblock;
+
default:
// otherwise, do nothing
- break;
+ return;
}
+
+unblock:
+ // just run the thread now, if the BH is not really available,
+ // we'll block again.
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+}
+
+/* ----------------------------------------------------------------------------
+ migrateThread
+ ------------------------------------------------------------------------- */
+
+void
+migrateThread (Capability *from, StgTSO *tso, Capability *to)
+{
+ traceEventMigrateThread (from, tso, to->no);
+ // ThreadMigrating tells the target cap that it needs to be added to
+ // the run queue when it receives the MSG_TRY_WAKEUP.
+ tso->what_next = ThreadMigrating;
+ tso->cap = to;
+ tryWakeupThread(from, tso);
}
/* ----------------------------------------------------------------------------
@@ -450,47 +416,6 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
}
}
-/* ----------------------------------------------------------------------------
- * Wake up a thread on a Capability.
- *
- * This is used when the current Task is running on a Capability and
- * wishes to wake up a thread on a different Capability.
- * ------------------------------------------------------------------------- */
-
-#ifdef THREADED_RTS
-
-void
-wakeupThreadOnCapability (Capability *cap,
- Capability *other_cap,
- StgTSO *tso)
-{
- MessageWakeup *msg;
-
- // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
- if (tso->bound) {
- ASSERT(tso->bound->task->cap == tso->cap);
- tso->bound->task->cap = other_cap;
- }
- tso->cap = other_cap;
-
- ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
- tso->block_info.closure->header.info == &stg_IND_info);
-
- ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
-
- msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
- SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
- msg->tso = tso;
- tso->block_info.closure = (StgClosure *)msg;
- dirty_TSO(cap, tso);
- write_barrier();
- tso->why_blocked = BlockedOnMsgWakeup;
-
- sendMessage(cap, other_cap, (Message*)msg);
-}
-
-#endif /* THREADED_RTS */
-
/* ---------------------------------------------------------------------------
* rtsSupportsBoundThreads(): is the RTS built to support bound threads?
* used by Control.Concurrent for error checking.
@@ -549,15 +474,15 @@ printThreadBlockage(StgTSO *tso)
debugBelch("is blocked on a black hole %p",
((StgBlockingQueue*)tso->block_info.bh->bh));
break;
- case BlockedOnMsgWakeup:
- debugBelch("is blocked on a wakeup message");
- break;
case BlockedOnMsgThrowTo:
debugBelch("is blocked on a throwto message");
break;
case NotBlocked:
debugBelch("is not blocked");
break;
+ case ThreadMigrating:
+ debugBelch("is runnable, but not on the run queue");
+ break;
case BlockedOnCCall:
debugBelch("is blocked on an external call");
break;
diff --git a/rts/Threads.h b/rts/Threads.h
index 000cf1b8e2..e3680f2d50 100644
--- a/rts/Threads.h
+++ b/rts/Threads.h
@@ -19,6 +19,7 @@ StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate);
void checkBlockingQueues (Capability *cap, StgTSO *tso);
void wakeBlockingQueue (Capability *cap, StgBlockingQueue *bq);
void tryWakeupThread (Capability *cap, StgTSO *tso);
+void migrateThread (Capability *from, StgTSO *tso, Capability *to);
// Wakes up a thread on a Capability (probably a different Capability
// from the one held by the current Task).
@@ -32,8 +33,6 @@ void wakeupThreadOnCapability (Capability *cap,
void updateThunk (Capability *cap, StgTSO *tso,
StgClosure *thunk, StgClosure *val);
-void removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso);
-
rtsBool removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso);
rtsBool removeThreadFromDeQueue (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso);
diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c
index 6de42efae4..7eeb90f7ca 100644
--- a/rts/sm/Compact.c
+++ b/rts/sm/Compact.c
@@ -472,7 +472,6 @@ thread_TSO (StgTSO *tso)
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
- || tso->why_blocked == BlockedOnMsgWakeup
) {
thread_(&tso->block_info.closure);
}
@@ -625,7 +624,6 @@ thread_obj (StgInfoTable *info, StgPtr p)
case CONSTR:
case PRIM:
case MUT_PRIM:
- case IND_PERM:
case MUT_VAR_CLEAN:
case MUT_VAR_DIRTY:
case BLACKHOLE:
@@ -664,6 +662,8 @@ thread_obj (StgInfoTable *info, StgPtr p)
return p + sizeofW(StgMVar);
}
+ case IND:
+ case IND_PERM:
case IND_OLDGEN:
case IND_OLDGEN_PERM:
thread(&((StgInd *)p)->indirectee);
diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
index e6234f6c0f..ae9e81cff5 100644
--- a/rts/sm/Scav.c
+++ b/rts/sm/Scav.c
@@ -84,7 +84,6 @@ scavengeTSO (StgTSO *tso)
evacuate((StgClosure **)&tso->_link);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnBlackHole
- || tso->why_blocked == BlockedOnMsgWakeup
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
) {
@@ -896,6 +895,7 @@ scavenge_mark_stack(void)
// no "old" generation.
break;
+ case IND:
case IND_OLDGEN:
case IND_OLDGEN_PERM:
case BLACKHOLE:
@@ -1407,7 +1407,6 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
evacuate((StgClosure **)&tso->_link);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnBlackHole
- || tso->why_blocked == BlockedOnMsgWakeup
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
) {