summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2010-04-01 09:16:05 +0000
committerSimon Marlow <marlowsd@gmail.com>2010-04-01 09:16:05 +0000
commitf4692220c7cbdadaa633f50eb2b30b59edb30183 (patch)
tree3d29f1b4770bedb7c69c31b2828f9b5acca3e2c3
parent7c4cb84efd774a21f11fb03118feb0434282ecf3 (diff)
downloadhaskell-f4692220c7cbdadaa633f50eb2b30b59edb30183.tar.gz
Change the representation of the MVar blocked queue
The list of threads blocked on an MVar is now represented as a list of separately allocated objects rather than being linked through the TSOs themselves. This lets us remove a TSO from the list in O(1) time rather than O(n) time, by marking the list object. Removing this linear component fixes some pathalogical performance cases where many threads were blocked on an MVar and became unreachable simultaneously (nofib/smp/threads007), or when sending an asynchronous exception to a TSO in a long list of thread blocked on an MVar. MVar performance has actually improved by a few percent as a result of this change, slightly to my surprise. This is the final cleanup in the sequence, which let me remove the old way of waking up threads (unblockOne(), MSG_WAKEUP) in favour of the new way (tryWakeupThread and MSG_TRY_WAKEUP, which is idempotent). It is now the case that only the Capability that owns a TSO may modify its state (well, almost), and this simplifies various things. More of the RTS is based on message-passing between Capabilities now.
-rw-r--r--includes/mkDerivedConstants.c4
-rw-r--r--includes/rts/Constants.h8
-rw-r--r--includes/rts/storage/Block.h3
-rw-r--r--includes/rts/storage/Closures.h14
-rw-r--r--includes/rts/storage/TSO.h1
-rw-r--r--includes/stg/MiscClosures.h9
-rw-r--r--rts/Capability.h8
-rw-r--r--rts/HeapStackCheck.cmm10
-rw-r--r--rts/Inlines.c1
-rw-r--r--rts/Messages.c19
-rw-r--r--rts/PrimOps.cmm350
-rw-r--r--rts/RaiseAsync.c141
-rw-r--r--rts/Schedule.c39
-rw-r--r--rts/StgMiscClosures.cmm9
-rw-r--r--rts/Threads.c173
-rw-r--r--rts/Threads.h3
-rw-r--r--rts/sm/Compact.c4
-rw-r--r--rts/sm/Scav.c3
18 files changed, 362 insertions, 437 deletions
diff --git a/includes/mkDerivedConstants.c b/includes/mkDerivedConstants.c
index 92685cae3a..7efcf47d48 100644
--- a/includes/mkDerivedConstants.c
+++ b/includes/mkDerivedConstants.c
@@ -372,6 +372,10 @@ main(int argc, char *argv[])
closure_field(StgMVar,tail);
closure_field(StgMVar,value);
+ closure_size(StgMVarTSOQueue);
+ closure_field(StgMVarTSOQueue, link);
+ closure_field(StgMVarTSOQueue, tso);
+
closure_size(StgBCO);
closure_field(StgBCO, instrs);
closure_field(StgBCO, literals);
diff --git a/includes/rts/Constants.h b/includes/rts/Constants.h
index bfc77fa361..354abbbdd9 100644
--- a/includes/rts/Constants.h
+++ b/includes/rts/Constants.h
@@ -227,8 +227,12 @@
/* same as above but don't unblock async exceptions in resumeThread() */
/* Involved in a message sent to tso->msg_cap */
-#define BlockedOnMsgWakeup 12
-#define BlockedOnMsgThrowTo 13
+#define BlockedOnMsgThrowTo 12
+
+/* The thread is not on any run queues, but can be woken up
+ by tryWakeupThread() */
+#define ThreadMigrating 13
+
/*
* These constants are returned to the scheduler by a thread that has
* stopped for one reason or another. See typedef StgThreadReturnCode
diff --git a/includes/rts/storage/Block.h b/includes/rts/storage/Block.h
index 3114bea014..d6a4d4c487 100644
--- a/includes/rts/storage/Block.h
+++ b/includes/rts/storage/Block.h
@@ -109,7 +109,8 @@ typedef struct bdescr_ {
#else
-INLINE_HEADER bdescr *Bdescr(StgPtr p)
+EXTERN_INLINE bdescr *Bdescr(StgPtr p);
+EXTERN_INLINE bdescr *Bdescr(StgPtr p)
{
return (bdescr *)
((((W_)p & MBLOCK_MASK & ~BLOCK_MASK) >> (BLOCK_SHIFT-BDESCR_SHIFT))
diff --git a/includes/rts/storage/Closures.h b/includes/rts/storage/Closures.h
index 802746868c..a0ff738aa7 100644
--- a/includes/rts/storage/Closures.h
+++ b/includes/rts/storage/Closures.h
@@ -305,11 +305,17 @@ typedef struct {
/* Concurrent communication objects */
+typedef struct StgMVarTSOQueue_ {
+ StgHeader header;
+ struct StgMVarTSOQueue_ *link;
+ struct StgTSO_ *tso;
+} StgMVarTSOQueue;
+
typedef struct {
- StgHeader header;
- struct StgTSO_ *head;
- struct StgTSO_ *tail;
- StgClosure* value;
+ StgHeader header;
+ struct StgMVarTSOQueue_ *head;
+ struct StgMVarTSOQueue_ *tail;
+ StgClosure* value;
} StgMVar;
diff --git a/includes/rts/storage/TSO.h b/includes/rts/storage/TSO.h
index abe621564d..0e9883f1a6 100644
--- a/includes/rts/storage/TSO.h
+++ b/includes/rts/storage/TSO.h
@@ -82,7 +82,6 @@ typedef struct StgTSO_ {
/*
Currently used for linking TSOs on:
* cap->run_queue_{hd,tl}
- * MVAR queue
* (non-THREADED_RTS); the blocked_queue
* and pointing to the relocated version of a ThreadRelocated
diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h
index 4abb67dc55..0aa6f99675 100644
--- a/includes/stg/MiscClosures.h
+++ b/includes/stg/MiscClosures.h
@@ -42,10 +42,10 @@
# define RTS_FUN(f) RTS_FUN_INFO(f##_info)
# define RTS_THUNK(f) RTS_THUNK_INFO(f##_info)
#else
-# define RTS_RET(f) RTS_INFO(f##_info) RTS_FUN_DECL(f##_ret)
-# define RTS_ENTRY(f) RTS_INFO(f##_info) RTS_FUN_DECL(f##_entry)
-# define RTS_FUN(f) RTS_FUN_INFO(f##_info) RTS_FUN_DECL(f##_entry)
-# define RTS_THUNK(f) RTS_THUNK_INFO(f##_info) RTS_FUN_DECL(f##_entry)
+# define RTS_RET(f) RTS_INFO(f##_info); RTS_FUN_DECL(f##_ret)
+# define RTS_ENTRY(f) RTS_INFO(f##_info); RTS_FUN_DECL(f##_entry)
+# define RTS_FUN(f) RTS_FUN_INFO(f##_info); RTS_FUN_DECL(f##_entry)
+# define RTS_THUNK(f) RTS_THUNK_INFO(f##_info); RTS_FUN_DECL(f##_entry)
#endif
/* Stack frames */
@@ -109,7 +109,6 @@ RTS_ENTRY(stg_MUT_ARR_PTRS_FROZEN0);
RTS_ENTRY(stg_MUT_VAR_CLEAN);
RTS_ENTRY(stg_MUT_VAR_DIRTY);
RTS_ENTRY(stg_END_TSO_QUEUE);
-RTS_ENTRY(stg_MSG_WAKEUP);
RTS_ENTRY(stg_MSG_TRY_WAKEUP);
RTS_ENTRY(stg_MSG_THROWTO);
RTS_ENTRY(stg_MSG_BLACKHOLE);
diff --git a/rts/Capability.h b/rts/Capability.h
index e12b8ce6e9..d8eba0d733 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -199,9 +199,9 @@ extern volatile StgWord waiting_for_gc;
//
void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
-INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
+EXTERN_INLINE void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
-INLINE_HEADER void recordClosureMutated (Capability *cap, StgClosure *p);
+EXTERN_INLINE void recordClosureMutated (Capability *cap, StgClosure *p);
#if defined(THREADED_RTS)
@@ -291,7 +291,7 @@ INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
* INLINE functions... private below here
* -------------------------------------------------------------------------- */
-INLINE_HEADER void
+EXTERN_INLINE void
recordMutableCap (StgClosure *p, Capability *cap, nat gen)
{
bdescr *bd;
@@ -310,7 +310,7 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
*bd->free++ = (StgWord)p;
}
-INLINE_HEADER void
+EXTERN_INLINE void
recordClosureMutated (Capability *cap, StgClosure *p)
{
bdescr *bd;
diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm
index f8bccc091d..f8195768fb 100644
--- a/rts/HeapStackCheck.cmm
+++ b/rts/HeapStackCheck.cmm
@@ -481,9 +481,13 @@ INFO_TABLE_RET( stg_gc_gen, RET_DYN )
stg_gc_gen
{
+ // Hack; see Note [mvar-heap-check] in PrimOps.cmm
+ if (R10 == stg_putMVarzh || R10 == stg_takeMVarzh) {
+ unlockClosure(R1, stg_MVAR_DIRTY_info)
+ }
SAVE_EVERYTHING;
GC_GENERIC
-}
+}
// A heap check at an unboxed tuple return point. The return address
// is on the stack, and we can find it by using the offsets given
@@ -583,11 +587,7 @@ INFO_TABLE_RET( stg_block_takemvar, RET_SMALL, P_ unused )
// code fragment executed just before we return to the scheduler
stg_block_takemvar_finally
{
-#ifdef THREADED_RTS
unlockClosure(R3, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(R3, stg_MVAR_DIRTY_info);
-#endif
jump StgReturn;
}
diff --git a/rts/Inlines.c b/rts/Inlines.c
index fa521e4217..ccb30bf76d 100644
--- a/rts/Inlines.c
+++ b/rts/Inlines.c
@@ -5,3 +5,4 @@
#include "PosixSource.h"
#include "Rts.h"
#include "Schedule.h"
+#include "Capability.h"
diff --git a/rts/Messages.c b/rts/Messages.c
index ae5d5d1abc..5a1e5bd3c4 100644
--- a/rts/Messages.c
+++ b/rts/Messages.c
@@ -28,8 +28,7 @@ void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
#ifdef DEBUG
{
const StgInfoTable *i = msg->header.info;
- if (i != &stg_MSG_WAKEUP_info &&
- i != &stg_MSG_THROWTO_info &&
+ if (i != &stg_MSG_THROWTO_info &&
i != &stg_MSG_BLACKHOLE_info &&
i != &stg_MSG_TRY_WAKEUP_info &&
i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
@@ -71,21 +70,7 @@ executeMessage (Capability *cap, Message *m)
loop:
write_barrier(); // allow m->header to be modified by another thread
i = m->header.info;
- if (i == &stg_MSG_WAKEUP_info)
- {
- // the plan is to eventually get rid of these and use
- // TRY_WAKEUP instead.
- MessageWakeup *w = (MessageWakeup *)m;
- StgTSO *tso = w->tso;
- debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
- (lnat)tso->id);
- ASSERT(tso->cap == cap);
- ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
- ASSERT(tso->block_info.closure == (StgClosure *)m);
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap, tso);
- }
- else if (i == &stg_MSG_TRY_WAKEUP_info)
+ if (i == &stg_MSG_TRY_WAKEUP_info)
{
StgTSO *tso = ((MessageWakeup *)m)->tso;
debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index 5c575f695b..03eb490b6b 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -1140,7 +1140,7 @@ stg_newMVarzh
stg_takeMVarzh
{
- W_ mvar, val, info, tso;
+ W_ mvar, val, info, tso, q;
/* args: R1 = MVar closure */
mvar = R1;
@@ -1159,72 +1159,85 @@ stg_takeMVarzh
* and wait until we're woken up.
*/
if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {
+
+ // Note [mvar-heap-check] We want to do the heap check in the
+ // branch here, to avoid the conditional in the common case.
+ // However, we've already locked the MVar above, so we better
+ // be careful to unlock it again if the the heap check fails.
+ // Unfortunately we don't have an easy way to inject any code
+ // into the heap check generated by the code generator, so we
+ // have to do it in stg_gc_gen (see HeapStackCheck.cmm).
+ HP_CHK_GEN_TICKY(SIZEOF_StgMVarTSOQueue, R1_PTR, stg_takeMVarzh);
+
+ q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
+
+ StgHeader_info(q) = stg_MVAR_TSO_QUEUE_info;
+ StgMVarTSOQueue_link(q) = END_TSO_QUEUE;
+ StgMVarTSOQueue_tso(q) = CurrentTSO;
+
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_head(mvar) = CurrentTSO;
+ StgMVar_head(mvar) = q;
} else {
- foreign "C" setTSOLink(MyCapability() "ptr",
- StgMVar_tail(mvar) "ptr",
- CurrentTSO) [];
+ StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
+ foreign "C" recordClosureMutated(MyCapability() "ptr",
+ StgMVar_tail(mvar)) [];
}
- StgTSO__link(CurrentTSO) = stg_END_TSO_QUEUE_closure;
+ StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
- // write barrier for throwTo(), which looks at block_info
- // if why_blocked==BlockedOnMVar.
- prim %write_barrier() [];
StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
- StgMVar_tail(mvar) = CurrentTSO;
+ StgMVar_tail(mvar) = q;
R1 = mvar;
jump stg_block_takemvar;
- }
-
- /* we got the value... */
- val = StgMVar_value(mvar);
-
- if (StgMVar_head(mvar) != stg_END_TSO_QUEUE_closure)
- {
- /* There are putMVar(s) waiting...
- * wake up the first thread on the queue
- */
- ASSERT(StgTSO_why_blocked(StgMVar_head(mvar)) == BlockedOnMVar::I16);
-
- /* actually perform the putMVar for the thread that we just woke up */
- tso = StgMVar_head(mvar);
- PerformPut(tso,StgMVar_value(mvar));
-
- if (TO_W_(StgTSO_dirty(tso)) == 0) {
- foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
- }
-
- ("ptr" tso) = foreign "C" unblockOne_(MyCapability() "ptr",
- StgMVar_head(mvar) "ptr", 1) [];
- StgMVar_head(mvar) = tso;
-
- if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
- }
-
- unlockClosure(mvar, stg_MVAR_DIRTY_info);
- RET_P(val);
- }
- else
- {
- /* No further putMVars, MVar is now empty */
- StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
-
- unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ }
+
+ /* we got the value... */
+ val = StgMVar_value(mvar);
+
+ q = StgMVar_head(mvar);
+loop:
+ if (q == stg_END_TSO_QUEUE_closure) {
+ /* No further putMVars, MVar is now empty */
+ StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ RET_P(val);
+ }
+ if (StgHeader_info(q) == stg_IND_info ||
+ StgHeader_info(q) == stg_MSG_NULL_info) {
+ q = StgInd_indirectee(q);
+ goto loop;
+ }
+
+ // There are putMVar(s) waiting... wake up the first thread on the queue
+
+ tso = StgMVarTSOQueue_tso(q);
+ ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
+ ASSERT(StgTSO_block_info(tso) == mvar);
+ // actually perform the putMVar for the thread that we just woke up
+ PerformPut(tso,StgMVar_value(mvar));
+
+ StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
+ if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
+ }
+
+ // indicate that the putMVar has now completed:
+ StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
+
+ // no need to mark the TSO dirty, we have only written END_TSO_QUEUE.
- RET_P(val);
- }
+ foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
+
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ RET_P(val);
}
stg_tryTakeMVarzh
{
- W_ mvar, val, info, tso;
+ W_ mvar, val, info, tso, q;
/* args: R1 = MVar closure */
-
mvar = R1;
#if defined(THREADED_RTS)
@@ -1232,7 +1245,10 @@ stg_tryTakeMVarzh
#else
info = GET_INFO(mvar);
#endif
-
+
+ /* If the MVar is empty, put ourselves on its blocking queue,
+ * and wait until we're woken up.
+ */
if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {
#if defined(THREADED_RTS)
unlockClosure(mvar, info);
@@ -1242,51 +1258,56 @@ stg_tryTakeMVarzh
*/
RET_NP(0, stg_NO_FINALIZER_closure);
}
-
+
if (info == stg_MVAR_CLEAN_info) {
- foreign "C" dirty_MVAR(BaseReg "ptr", mvar "ptr");
+ foreign "C" dirty_MVAR(BaseReg "ptr", mvar "ptr") [];
}
/* we got the value... */
val = StgMVar_value(mvar);
-
- if (StgMVar_head(mvar) != stg_END_TSO_QUEUE_closure) {
-
- /* There are putMVar(s) waiting...
- * wake up the first thread on the queue
- */
- ASSERT(StgTSO_why_blocked(StgMVar_head(mvar)) == BlockedOnMVar::I16);
-
- /* actually perform the putMVar for the thread that we just woke up */
- tso = StgMVar_head(mvar);
- PerformPut(tso,StgMVar_value(mvar));
- if (TO_W_(StgTSO_dirty(tso)) == 0) {
- foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
- }
-
- ("ptr" tso) = foreign "C" unblockOne_(MyCapability() "ptr",
- StgMVar_head(mvar) "ptr", 1) [];
- StgMVar_head(mvar) = tso;
-
- if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
- }
+
+ q = StgMVar_head(mvar);
+loop:
+ if (q == stg_END_TSO_QUEUE_closure) {
+ /* No further putMVars, MVar is now empty */
+ StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ RET_NP(1, val);
}
- else
- {
- /* No further putMVars, MVar is now empty */
- StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
- unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ if (StgHeader_info(q) == stg_IND_info ||
+ StgHeader_info(q) == stg_MSG_NULL_info) {
+ q = StgInd_indirectee(q);
+ goto loop;
}
- RET_NP(1, val);
+ // There are putMVar(s) waiting... wake up the first thread on the queue
+
+ tso = StgMVarTSOQueue_tso(q);
+ ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
+ ASSERT(StgTSO_block_info(tso) == mvar);
+ // actually perform the putMVar for the thread that we just woke up
+ PerformPut(tso,StgMVar_value(mvar));
+
+ StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
+ if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
+ }
+
+ // indicate that the putMVar has now completed:
+ StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
+
+ // no need to mark the TSO dirty, we have only written END_TSO_QUEUE.
+
+ foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
+
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ RET_P(val);
}
stg_putMVarzh
{
- W_ mvar, val, info, tso;
+ W_ mvar, val, info, tso, q;
/* args: R1 = MVar, R2 = value */
mvar = R1;
@@ -1303,76 +1324,92 @@ stg_putMVarzh
}
if (StgMVar_value(mvar) != stg_END_TSO_QUEUE_closure) {
+
+ // see Note [mvar-heap-check] above
+ HP_CHK_GEN_TICKY(SIZEOF_StgMVarTSOQueue, R1_PTR, stg_putMVarzh);
+
+ q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
+
+ StgHeader_info(q) = stg_MVAR_TSO_QUEUE_info;
+ StgMVarTSOQueue_link(q) = END_TSO_QUEUE;
+ StgMVarTSOQueue_tso(q) = CurrentTSO;
+
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_head(mvar) = CurrentTSO;
+ StgMVar_head(mvar) = q;
} else {
- foreign "C" setTSOLink(MyCapability() "ptr",
- StgMVar_tail(mvar) "ptr",
- CurrentTSO) [];
+ StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
+ foreign "C" recordClosureMutated(MyCapability() "ptr",
+ StgMVar_tail(mvar)) [];
}
- StgTSO__link(CurrentTSO) = stg_END_TSO_QUEUE_closure;
+ StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
- // write barrier for throwTo(), which looks at block_info
- // if why_blocked==BlockedOnMVar.
- prim %write_barrier() [];
StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
- StgMVar_tail(mvar) = CurrentTSO;
-
+ StgMVar_tail(mvar) = q;
+
R1 = mvar;
R2 = val;
jump stg_block_putmvar;
}
- if (StgMVar_head(mvar) != stg_END_TSO_QUEUE_closure) {
-
- /* There are takeMVar(s) waiting: wake up the first one
- */
- ASSERT(StgTSO_why_blocked(StgMVar_head(mvar)) == BlockedOnMVar::I16);
-
- /* actually perform the takeMVar */
- tso = StgMVar_head(mvar);
- PerformTake(tso, val);
- if (TO_W_(StgTSO_dirty(tso)) == 0) {
- foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
- }
-
- ("ptr" tso) = foreign "C" unblockOne_(MyCapability() "ptr",
- StgMVar_head(mvar) "ptr", 1) [];
- StgMVar_head(mvar) = tso;
-
- if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
- }
-
- unlockClosure(mvar, stg_MVAR_DIRTY_info);
- jump %ENTRY_CODE(Sp(0));
- }
- else
- {
+ q = StgMVar_head(mvar);
+loop:
+ if (q == stg_END_TSO_QUEUE_closure) {
/* No further takes, the MVar is now full. */
StgMVar_value(mvar) = val;
-
unlockClosure(mvar, stg_MVAR_DIRTY_info);
jump %ENTRY_CODE(Sp(0));
}
+ if (StgHeader_info(q) == stg_IND_info ||
+ StgHeader_info(q) == stg_MSG_NULL_info) {
+ q = StgInd_indirectee(q);
+ goto loop;
+ }
+
+ // There are takeMVar(s) waiting: wake up the first one
+
+ tso = StgMVarTSOQueue_tso(q);
+ ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
+ ASSERT(StgTSO_block_info(tso) == mvar);
+ // actually perform the takeMVar
+ PerformTake(tso, val);
+
+ if (TO_W_(StgTSO_dirty(tso)) == 0) {
+ foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
+ }
+
+ StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
+ if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
+ }
+
+ // indicate that the takeMVar has now completed:
+ StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
- /* ToDo: yield afterward for better communication performance? */
+ foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
+
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ jump %ENTRY_CODE(Sp(0));
}
stg_tryPutMVarzh
{
- W_ mvar, info, tso;
+ W_ mvar, val, info, tso, q;
/* args: R1 = MVar, R2 = value */
mvar = R1;
+ val = R2;
#if defined(THREADED_RTS)
- ("ptr" info) = foreign "C" lockClosure(mvar "ptr") [R2];
+ ("ptr" info) = foreign "C" lockClosure(mvar "ptr") [];
#else
info = GET_INFO(mvar);
#endif
+ if (info == stg_MVAR_CLEAN_info) {
+ foreign "C" dirty_MVAR(BaseReg "ptr", mvar "ptr");
+ }
+
if (StgMVar_value(mvar) != stg_END_TSO_QUEUE_closure) {
#if defined(THREADED_RTS)
unlockClosure(mvar, info);
@@ -1380,43 +1417,46 @@ stg_tryPutMVarzh
RET_N(0);
}
- if (info == stg_MVAR_CLEAN_info) {
- foreign "C" dirty_MVAR(BaseReg "ptr", mvar "ptr");
- }
-
- if (StgMVar_head(mvar) != stg_END_TSO_QUEUE_closure) {
-
- /* There are takeMVar(s) waiting: wake up the first one
- */
- ASSERT(StgTSO_why_blocked(StgMVar_head(mvar)) == BlockedOnMVar::I16);
-
- /* actually perform the takeMVar */
- tso = StgMVar_head(mvar);
- PerformTake(tso, R2);
- if (TO_W_(StgTSO_dirty(tso)) == 0) {
- foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
- }
-
- ("ptr" tso) = foreign "C" unblockOne_(MyCapability() "ptr",
- StgMVar_head(mvar) "ptr", 1) [];
- StgMVar_head(mvar) = tso;
-
- if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
- }
-
+ q = StgMVar_head(mvar);
+loop:
+ if (q == stg_END_TSO_QUEUE_closure) {
+ /* No further takes, the MVar is now full. */
+ StgMVar_value(mvar) = val;
unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ jump %ENTRY_CODE(Sp(0));
+ }
+ if (StgHeader_info(q) == stg_IND_info ||
+ StgHeader_info(q) == stg_MSG_NULL_info) {
+ q = StgInd_indirectee(q);
+ goto loop;
}
- else
- {
- /* No further takes, the MVar is now full. */
- StgMVar_value(mvar) = R2;
- unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ /* There are takeMVar(s) waiting: wake up the first one
+ */
+ // There are takeMVar(s) waiting: wake up the first one
+
+ tso = StgMVarTSOQueue_tso(q);
+ ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
+ ASSERT(StgTSO_block_info(tso) == mvar);
+ // actually perform the takeMVar
+ PerformTake(tso, val);
+
+ if (TO_W_(StgTSO_dirty(tso)) == 0) {
+ foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
}
- RET_N(1);
- /* ToDo: yield afterward for better communication performance? */
+ StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
+ if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
+ }
+
+ // indicate that the takeMVar has now completed:
+ StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
+
+ foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
+
+ unlockClosure(mvar, stg_MVAR_DIRTY_info);
+ jump %ENTRY_CODE(Sp(0));
}
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index f974f8c0d7..bebbcd4722 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -31,6 +31,8 @@ static void raiseAsync (Capability *cap,
static void removeFromQueues(Capability *cap, StgTSO *tso);
+static void removeFromMVarBlockedQueue (StgTSO *tso);
+
static void blockedThrowTo (Capability *cap,
StgTSO *target, MessageThrowTo *msg);
@@ -124,7 +126,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
Currently we send a message if the target belongs to another
Capability, and it is
- - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo,
+ - NotBlocked, BlockedOnMsgThrowTo,
BlockedOnCCall
- or it is masking exceptions (TSO_BLOCKEX)
@@ -221,67 +223,7 @@ check_target:
switch (status) {
case NotBlocked:
- case BlockedOnMsgWakeup:
- /* if status==NotBlocked, and target->cap == cap, then
- we own this TSO and can raise the exception.
-
- How do we establish this condition? Very carefully.
-
- Let
- P = (status == NotBlocked)
- Q = (tso->cap == cap)
-
- Now, if P & Q are true, then the TSO is locked and owned by
- this capability. No other OS thread can steal it.
-
- If P==0 and Q==1: the TSO is blocked, but attached to this
- capabilty, and it can be stolen by another capability.
-
- If P==1 and Q==0: the TSO is runnable on another
- capability. At any time, the TSO may change from runnable
- to blocked and vice versa, while it remains owned by
- another capability.
-
- Suppose we test like this:
-
- p = P
- q = Q
- if (p && q) ...
-
- this is defeated by another capability stealing a blocked
- TSO from us to wake it up (Schedule.c:unblockOne()). The
- other thread is doing
-
- Q = 0
- P = 1
-
- assuming arbitrary reordering, we could see this
- interleaving:
-
- start: P==0 && Q==1
- P = 1
- p = P
- q = Q
- Q = 0
- if (p && q) ...
-
- so we need a memory barrier:
-
- p = P
- mb()
- q = Q
- if (p && q) ...
-
- this avoids the problematic case. There are other cases
- to consider, but this is the tricky one.
-
- Note that we must be sure that unblockOne() does the
- writes in the correct order: Q before P. The memory
- barrier ensures that if we have seen the write to P, we
- have also seen the write to Q.
- */
{
- write_barrier();
if ((target->flags & TSO_BLOCKEX) == 0) {
// It's on our run queue and not blocking exceptions
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
@@ -389,18 +331,26 @@ check_target:
goto retry;
}
+ if (target->_link == END_TSO_QUEUE) {
+ // the MVar operation has already completed. There is a
+ // MSG_TRY_WAKEUP on the way, but we can just wake up the
+ // thread now anyway and ignore the message when it
+ // arrives.
+ unlockClosure((StgClosure *)mvar, info);
+ tryWakeupThread(cap, target);
+ goto retry;
+ }
+
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
blockedThrowTo(cap,target,msg);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_BLOCKED;
} else {
- removeThreadFromMVarQueue(cap, mvar, target);
+ // revoke the MVar operation
+ removeFromMVarBlockedQueue(target);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- if (info == &stg_MVAR_CLEAN_info) {
- dirty_MVAR(&cap->r,(StgClosure*)mvar);
- }
- unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info);
+ unlockClosure((StgClosure *)mvar, info);
return THROWTO_SUCCESS;
}
}
@@ -588,11 +538,54 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
-------------------------------------------------------------------------- */
static void
+removeFromMVarBlockedQueue (StgTSO *tso)
+{
+ StgMVar *mvar = (StgMVar*)tso->block_info.closure;
+ StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
+
+ if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
+ // already removed from this MVar
+ return;
+ }
+
+ // Assume the MVar is locked. (not assertable; sometimes it isn't
+ // actually WHITEHOLE'd).
+
+ // We want to remove the MVAR_TSO_QUEUE object from the queue. It
+ // isn't doubly-linked so we can't actually remove it; instead we
+ // just overwrite it with an IND if possible and let the GC short
+ // it out. However, we have to be careful to maintain the deque
+ // structure:
+
+ if (mvar->head == q) {
+ mvar->head = q->link;
+ q->header.info = &stg_IND_info;
+ if (mvar->tail == q) {
+ mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
+ }
+ }
+ else if (mvar->tail == q) {
+ // we can't replace it with an IND in this case, because then
+ // we lose the tail pointer when the GC shorts out the IND.
+ // So we use MSG_NULL as a kind of non-dupable indirection;
+ // these are ignored by takeMVar/putMVar.
+ q->header.info = &stg_MSG_NULL_info;
+ }
+ else {
+ q->header.info = &stg_IND_info;
+ }
+
+ // revoke the MVar operation
+ tso->_link = END_TSO_QUEUE;
+}
+
+static void
removeFromQueues(Capability *cap, StgTSO *tso)
{
switch (tso->why_blocked) {
case NotBlocked:
+ case ThreadMigrating:
return;
case BlockedOnSTM:
@@ -605,22 +598,13 @@ removeFromQueues(Capability *cap, StgTSO *tso)
goto done;
case BlockedOnMVar:
- removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
- // we aren't doing a write barrier here: the MVar is supposed to
- // be already locked, so replacing the info pointer would unlock it.
+ removeFromMVarBlockedQueue(tso);
goto done;
case BlockedOnBlackHole:
// nothing to do
goto done;
- case BlockedOnMsgWakeup:
- {
- // kill the message, atomically:
- OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info);
- break;
- }
-
case BlockedOnMsgThrowTo:
{
MessageThrowTo *m = tso->block_info.throwto;
@@ -659,7 +643,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
}
done:
- unblockOne(cap, tso);
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap, tso);
}
/* -----------------------------------------------------------------------------
@@ -733,7 +718,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
ASSERT(tso->cap == cap);
// wake it up
- if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) {
+ if (tso->why_blocked != NotBlocked) {
tso->why_blocked = NotBlocked;
appendToRunQueue(cap,tso);
}
diff --git a/rts/Schedule.c b/rts/Schedule.c
index d7d57411b7..f7b26a4876 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -125,7 +125,7 @@ static Capability *schedule (Capability *initialCapability, Task *task);
static void schedulePreLoop (void);
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
-static void scheduleYield (Capability **pcap, Task *task, rtsBool);
+static void scheduleYield (Capability **pcap, Task *task);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
@@ -204,7 +204,6 @@ schedule (Capability *initialCapability, Task *task)
rtsBool ready_to_gc;
#if defined(THREADED_RTS)
rtsBool first = rtsTrue;
- rtsBool force_yield = rtsFalse;
#endif
cap = initialCapability;
@@ -328,9 +327,7 @@ schedule (Capability *initialCapability, Task *task)
// ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
}
- yield:
- scheduleYield(&cap,task,force_yield);
- force_yield = rtsFalse;
+ scheduleYield(&cap,task);
if (emptyRunQueue(cap)) continue; // look for work again
#endif
@@ -490,19 +487,6 @@ run_thread:
traceEventStopThread(cap, t, ret);
-#if defined(THREADED_RTS)
- // If ret is ThreadBlocked, and this Task is bound to the TSO that
- // blocked, we are in limbo - the TSO is now owned by whatever it
- // is blocked on, and may in fact already have been woken up,
- // perhaps even on a different Capability. It may be the case
- // that task->cap != cap. We better yield this Capability
- // immediately and return to normaility.
- if (ret == ThreadBlocked) {
- force_yield = rtsTrue;
- goto yield;
- }
-#endif
-
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
@@ -639,23 +623,13 @@ shouldYieldCapability (Capability *cap, Task *task)
// and also check the benchmarks in nofib/parallel for regressions.
static void
-scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
+scheduleYield (Capability **pcap, Task *task)
{
Capability *cap = *pcap;
// if we have work, and we don't need to give up the Capability, continue.
//
- // The force_yield flag is used when a bound thread blocks. This
- // is a particularly tricky situation: the current Task does not
- // own the TSO any more, since it is on some queue somewhere, and
- // might be woken up or manipulated by another thread at any time.
- // The TSO and Task might be migrated to another Capability.
- // Certain invariants might be in doubt, such as task->bound->cap
- // == cap. We have to yield the current Capability immediately,
- // no messing around.
- //
- if (!force_yield &&
- !shouldYieldCapability(cap,task) &&
+ if (!shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
!emptyInbox(cap) ||
sched_state >= SCHED_INTERRUPTING))
@@ -1891,8 +1865,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
- traceEventMigrateThread (cap, tso, capabilities[cpu].no);
- wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
+ migrateThread(cap, tso, &capabilities[cpu]);
}
#else
appendToRunQueue(cap,tso);
@@ -2372,8 +2345,8 @@ deleteThread_(Capability *cap, StgTSO *tso)
if (tso->why_blocked == BlockedOnCCall ||
tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
- unblockOne(cap,tso);
tso->what_next = ThreadKilled;
+ appendToRunQueue(tso->cap, tso);
} else {
deleteThread(cap,tso);
}
diff --git a/rts/StgMiscClosures.cmm b/rts/StgMiscClosures.cmm
index 7e1e6a73d5..1dec6e68b7 100644
--- a/rts/StgMiscClosures.cmm
+++ b/rts/StgMiscClosures.cmm
@@ -491,8 +491,6 @@ CLOSURE(stg_NO_TREC_closure,stg_NO_TREC);
------------------------------------------------------------------------- */
// PRIM rather than CONSTR, because PRIM objects cannot be duplicated by the GC.
-INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP")
-{ foreign "C" barf("MSG_WAKEUP object entered!") never returns; }
INFO_TABLE_CONSTR(stg_MSG_TRY_WAKEUP,2,0,0,PRIM,"MSG_TRY_WAKEUP","MSG_TRY_WAKEUP")
{ foreign "C" barf("MSG_TRY_WAKEUP object entered!") never returns; }
@@ -573,6 +571,13 @@ INFO_TABLE( stg_dummy_ret, 0, 0, CONSTR_NOCAF_STATIC, "DUMMY_RET", "DUMMY_RET")
CLOSURE(stg_dummy_ret_closure,stg_dummy_ret);
/* ----------------------------------------------------------------------------
+ MVAR_TSO_QUEUE
+ ------------------------------------------------------------------------- */
+
+INFO_TABLE_CONSTR(stg_MVAR_TSO_QUEUE,2,0,0,PRIM,"MVAR_TSO_QUEUE","MVAR_TSO_QUEUE")
+{ foreign "C" barf("MVAR_TSO_QUEUE object entered!") never returns; }
+
+/* ----------------------------------------------------------------------------
CHARLIKE and INTLIKE closures.
These are static representations of Chars and small Ints, so that
diff --git a/rts/Threads.c b/rts/Threads.c
index 05a13c7f3b..e8a835bdae 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -205,83 +205,21 @@ removeThreadFromDeQueue (Capability *cap,
barf("removeThreadFromMVarQueue: not found");
}
-void
-removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
-{
- // caller must do the write barrier, because replacing the info
- // pointer will unlock the MVar.
- removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
- tso->_link = END_TSO_QUEUE;
-}
-
/* ----------------------------------------------------------------------------
- unblockOne()
-
- unblock a single thread.
- ------------------------------------------------------------------------- */
-
-StgTSO *
-unblockOne (Capability *cap, StgTSO *tso)
-{
- return unblockOne_(cap,tso,rtsTrue); // allow migration
-}
-
-StgTSO *
-unblockOne_ (Capability *cap, StgTSO *tso,
- rtsBool allow_migrate USED_IF_THREADS)
-{
- StgTSO *next;
-
- // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
- ASSERT(tso->why_blocked != NotBlocked);
- ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
- tso->block_info.closure->header.info == &stg_IND_info);
-
- next = tso->_link;
- tso->_link = END_TSO_QUEUE;
+ tryWakeupThread()
-#if defined(THREADED_RTS)
- if (tso->cap == cap || (!tsoLocked(tso) &&
- allow_migrate &&
- RtsFlags.ParFlags.wakeupMigrate)) {
- // We are waking up this thread on the current Capability, which
- // might involve migrating it from the Capability it was last on.
- if (tso->bound) {
- ASSERT(tso->bound->task->cap == tso->cap);
- tso->bound->task->cap = cap;
- }
-
- tso->cap = cap;
- write_barrier();
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
-
- // context-switch soonish so we can migrate the new thread if
- // necessary. NB. not contextSwitchCapability(cap), which would
- // force a context switch immediately.
- cap->context_switch = 1;
- } else {
- // we'll try to wake it up on the Capability it was last on.
- wakeupThreadOnCapability(cap, tso->cap, tso);
- }
-#else
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
-
- // context-switch soonish so we can migrate the new thread if
- // necessary. NB. not contextSwitchCapability(cap), which would
- // force a context switch immediately.
- cap->context_switch = 1;
-#endif
-
- traceEventThreadWakeup (cap, tso, tso->cap->no);
+ Attempt to wake up a thread. tryWakeupThread is idempotent: it is
+ always safe to call it too many times, but it is not safe in
+ general to omit a call.
- return next;
-}
+ ------------------------------------------------------------------------- */
void
tryWakeupThread (Capability *cap, StgTSO *tso)
{
+
+ traceEventThreadWakeup (cap, tso, tso->cap->no);
+
#ifdef THREADED_RTS
if (tso->cap != cap)
{
@@ -298,6 +236,16 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
switch (tso->why_blocked)
{
+ case BlockedOnMVar:
+ {
+ if (tso->_link == END_TSO_QUEUE) {
+ tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
+ goto unblock;
+ } else {
+ return;
+ }
+ }
+
case BlockedOnMsgThrowTo:
{
const StgInfoTable *i;
@@ -307,27 +255,45 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
if (i != &stg_MSG_NULL_info) {
debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
(lnat)tso->id, tso->block_info.throwto->header.info);
- break; // still blocked
+ return;
}
// remove the block frame from the stack
ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
tso->sp += 3;
- // fall through...
+ goto unblock;
}
+
case BlockedOnBlackHole:
case BlockedOnSTM:
- {
- // just run the thread now, if the BH is not really available,
- // we'll block again.
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
- break;
- }
+ case ThreadMigrating:
+ goto unblock;
+
default:
// otherwise, do nothing
- break;
+ return;
}
+
+unblock:
+ // just run the thread now, if the BH is not really available,
+ // we'll block again.
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+}
+
+/* ----------------------------------------------------------------------------
+ migrateThread
+ ------------------------------------------------------------------------- */
+
+void
+migrateThread (Capability *from, StgTSO *tso, Capability *to)
+{
+ traceEventMigrateThread (from, tso, to->no);
+ // ThreadMigrating tells the target cap that it needs to be added to
+ // the run queue when it receives the MSG_TRY_WAKEUP.
+ tso->what_next = ThreadMigrating;
+ tso->cap = to;
+ tryWakeupThread(from, tso);
}
/* ----------------------------------------------------------------------------
@@ -450,47 +416,6 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
}
}
-/* ----------------------------------------------------------------------------
- * Wake up a thread on a Capability.
- *
- * This is used when the current Task is running on a Capability and
- * wishes to wake up a thread on a different Capability.
- * ------------------------------------------------------------------------- */
-
-#ifdef THREADED_RTS
-
-void
-wakeupThreadOnCapability (Capability *cap,
- Capability *other_cap,
- StgTSO *tso)
-{
- MessageWakeup *msg;
-
- // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
- if (tso->bound) {
- ASSERT(tso->bound->task->cap == tso->cap);
- tso->bound->task->cap = other_cap;
- }
- tso->cap = other_cap;
-
- ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
- tso->block_info.closure->header.info == &stg_IND_info);
-
- ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
-
- msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
- SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
- msg->tso = tso;
- tso->block_info.closure = (StgClosure *)msg;
- dirty_TSO(cap, tso);
- write_barrier();
- tso->why_blocked = BlockedOnMsgWakeup;
-
- sendMessage(cap, other_cap, (Message*)msg);
-}
-
-#endif /* THREADED_RTS */
-
/* ---------------------------------------------------------------------------
* rtsSupportsBoundThreads(): is the RTS built to support bound threads?
* used by Control.Concurrent for error checking.
@@ -549,15 +474,15 @@ printThreadBlockage(StgTSO *tso)
debugBelch("is blocked on a black hole %p",
((StgBlockingQueue*)tso->block_info.bh->bh));
break;
- case BlockedOnMsgWakeup:
- debugBelch("is blocked on a wakeup message");
- break;
case BlockedOnMsgThrowTo:
debugBelch("is blocked on a throwto message");
break;
case NotBlocked:
debugBelch("is not blocked");
break;
+ case ThreadMigrating:
+ debugBelch("is runnable, but not on the run queue");
+ break;
case BlockedOnCCall:
debugBelch("is blocked on an external call");
break;
diff --git a/rts/Threads.h b/rts/Threads.h
index 000cf1b8e2..e3680f2d50 100644
--- a/rts/Threads.h
+++ b/rts/Threads.h
@@ -19,6 +19,7 @@ StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate);
void checkBlockingQueues (Capability *cap, StgTSO *tso);
void wakeBlockingQueue (Capability *cap, StgBlockingQueue *bq);
void tryWakeupThread (Capability *cap, StgTSO *tso);
+void migrateThread (Capability *from, StgTSO *tso, Capability *to);
// Wakes up a thread on a Capability (probably a different Capability
// from the one held by the current Task).
@@ -32,8 +33,6 @@ void wakeupThreadOnCapability (Capability *cap,
void updateThunk (Capability *cap, StgTSO *tso,
StgClosure *thunk, StgClosure *val);
-void removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso);
-
rtsBool removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso);
rtsBool removeThreadFromDeQueue (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso);
diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c
index 6de42efae4..7eeb90f7ca 100644
--- a/rts/sm/Compact.c
+++ b/rts/sm/Compact.c
@@ -472,7 +472,6 @@ thread_TSO (StgTSO *tso)
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
- || tso->why_blocked == BlockedOnMsgWakeup
) {
thread_(&tso->block_info.closure);
}
@@ -625,7 +624,6 @@ thread_obj (StgInfoTable *info, StgPtr p)
case CONSTR:
case PRIM:
case MUT_PRIM:
- case IND_PERM:
case MUT_VAR_CLEAN:
case MUT_VAR_DIRTY:
case BLACKHOLE:
@@ -664,6 +662,8 @@ thread_obj (StgInfoTable *info, StgPtr p)
return p + sizeofW(StgMVar);
}
+ case IND:
+ case IND_PERM:
case IND_OLDGEN:
case IND_OLDGEN_PERM:
thread(&((StgInd *)p)->indirectee);
diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
index e6234f6c0f..ae9e81cff5 100644
--- a/rts/sm/Scav.c
+++ b/rts/sm/Scav.c
@@ -84,7 +84,6 @@ scavengeTSO (StgTSO *tso)
evacuate((StgClosure **)&tso->_link);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnBlackHole
- || tso->why_blocked == BlockedOnMsgWakeup
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
) {
@@ -896,6 +895,7 @@ scavenge_mark_stack(void)
// no "old" generation.
break;
+ case IND:
case IND_OLDGEN:
case IND_OLDGEN_PERM:
case BLACKHOLE:
@@ -1407,7 +1407,6 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
evacuate((StgClosure **)&tso->_link);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnBlackHole
- || tso->why_blocked == BlockedOnMsgWakeup
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
) {