summaryrefslogtreecommitdiff
path: root/rts/RaiseAsync.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2010-03-29 14:44:56 +0000
committerSimon Marlow <marlowsd@gmail.com>2010-03-29 14:44:56 +0000
commit5d52d9b64c21dcf77849866584744722f8121389 (patch)
tree25aeafc9b761e73714c24ae414c0b1c41765c99f /rts/RaiseAsync.c
parent79957d77c1bff767f1041d3fabdeb94d92a52878 (diff)
downloadhaskell-5d52d9b64c21dcf77849866584744722f8121389.tar.gz
New implementation of BLACKHOLEs
This replaces the global blackhole_queue with a clever scheme that enables us to queue up blocked threads on the closure that they are blocked on, while still avoiding atomic instructions in the common case. Advantages: - gets rid of a locked global data structure and some tricky GC code (replacing it with some per-thread data structures and different tricky GC code :) - wakeups are more prompt: parallel/concurrent performance should benefit. I haven't seen anything dramatic in the parallel benchmarks so far, but a couple of threading benchmarks do improve a bit. - waking up a thread blocked on a blackhole is now O(1) (e.g. if it is the target of throwTo). - less sharing and better separation of Capabilities: communication is done with messages, the data structures are strictly owned by a Capability and cannot be modified except by sending messages. - this change will utlimately enable us to do more intelligent scheduling when threads block on each other. This is what started off the whole thing, but it isn't done yet (#3838). I'll be documenting all this on the wiki in due course.
Diffstat (limited to 'rts/RaiseAsync.c')
-rw-r--r--rts/RaiseAsync.c175
1 files changed, 69 insertions, 106 deletions
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index d02a2567ff..d5a4918f34 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -18,6 +18,7 @@
#include "STM.h"
#include "sm/Sanity.h"
#include "Profiling.h"
+#include "Messages.h"
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
@@ -66,13 +67,12 @@ void
throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
rtsBool stop_at_atomically)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
}
- while (tso->what_next == ThreadRelocated) {
- tso = tso->_link;
- }
// Remove it from any blocking queues
removeFromQueues(cap,tso);
@@ -83,13 +83,12 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
void
suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
}
- while (tso->what_next == ThreadRelocated) {
- tso = tso->_link;
- }
// Remove it from any blocking queues
removeFromQueues(cap,tso);
@@ -164,7 +163,7 @@ throwTo (Capability *cap, // the Capability we hold
msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
// message starts locked; the caller has to unlock it when it is
// ready.
- msg->header.info = &stg_WHITEHOLE_info;
+ SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
msg->source = source;
msg->target = target;
msg->exception = exception;
@@ -185,14 +184,24 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
{
StgWord status;
StgTSO *target = msg->target;
+ Capability *target_cap;
+ goto check_target;
+
+retry:
+ write_barrier();
+ debugTrace(DEBUG_sched, "throwTo: retrying...");
+
+check_target:
ASSERT(target != END_TSO_QUEUE);
// follow ThreadRelocated links in the target first
- while (target->what_next == ThreadRelocated) {
- target = target->_link;
- // No, it might be a WHITEHOLE:
- // ASSERT(get_itbl(target)->type == TSO);
+ target = deRefTSO(target);
+
+ // Thread already dead?
+ if (target->what_next == ThreadComplete
+ || target->what_next == ThreadKilled) {
+ return THROWTO_SUCCESS;
}
debugTraceCap(DEBUG_sched, cap,
@@ -204,18 +213,10 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
traceThreadStatus(DEBUG_sched, target);
#endif
- goto check_target;
-retry:
- write_barrier();
- debugTrace(DEBUG_sched, "throwTo: retrying...");
-
-check_target:
- ASSERT(target != END_TSO_QUEUE);
-
- // Thread already dead?
- if (target->what_next == ThreadComplete
- || target->what_next == ThreadKilled) {
- return THROWTO_SUCCESS;
+ target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
}
status = target->why_blocked;
@@ -282,28 +283,19 @@ check_target:
have also seen the write to Q.
*/
{
- Capability *target_cap;
-
write_barrier();
- target_cap = target->cap;
- if (target_cap != cap) {
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
+ if ((target->flags & TSO_BLOCKEX) == 0) {
+ // It's on our run queue and not blocking exceptions
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
} else {
- if ((target->flags & TSO_BLOCKEX) == 0) {
- // It's on our run queue and not blocking exceptions
- raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- return THROWTO_SUCCESS;
- } else {
- blockedThrowTo(cap,target,msg);
- return THROWTO_BLOCKED;
- }
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
}
}
case BlockedOnMsgThrowTo:
{
- Capability *target_cap;
const StgInfoTable *i;
MessageThrowTo *m;
@@ -340,13 +332,6 @@ check_target:
goto retry;
}
- target_cap = target->cap;
- if (target_cap != cap) {
- unlockClosure((StgClosure*)m, i);
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
- }
-
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
unlockClosure((StgClosure*)m, i);
@@ -358,7 +343,6 @@ check_target:
unlockClosure((StgClosure*)m, &stg_IND_info);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
return THROWTO_SUCCESS;
}
@@ -400,48 +384,30 @@ check_target:
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
+ blockedThrowTo(cap,target,msg);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_BLOCKED;
} else {
removeThreadFromMVarQueue(cap, mvar, target);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
- unlockClosure((StgClosure *)mvar, info);
+ if (info == &stg_MVAR_CLEAN_info) {
+ dirty_MVAR(&cap->r,(StgClosure*)mvar);
+ }
+ unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info);
return THROWTO_SUCCESS;
}
}
case BlockedOnBlackHole:
{
- ACQUIRE_LOCK(&sched_mutex);
- // double checking the status after the memory barrier:
- if (target->why_blocked != BlockedOnBlackHole) {
- RELEASE_LOCK(&sched_mutex);
- goto retry;
- }
-
- if (target->flags & TSO_BLOCKEX) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
- RELEASE_LOCK(&sched_mutex);
- return THROWTO_BLOCKED; // caller releases lock
- } else {
- removeThreadFromQueue(cap, &blackhole_queue, target);
- raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
- RELEASE_LOCK(&sched_mutex);
- return THROWTO_SUCCESS;
- }
+ // Revoke the message by replacing it with IND. We're not
+ // locking anything here, so we might still get a TRY_WAKEUP
+ // message from the owner of the blackhole some time in the
+ // future, but that doesn't matter.
+ ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
+ OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
}
case BlockedOnSTM:
@@ -454,35 +420,19 @@ check_target:
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
+ blockedThrowTo(cap,target,msg);
unlockTSO(target);
return THROWTO_BLOCKED;
} else {
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
unlockTSO(target);
return THROWTO_SUCCESS;
}
case BlockedOnCCall:
case BlockedOnCCall_NoUnblockExc:
- {
- Capability *target_cap;
-
- target_cap = target->cap;
- if (target_cap != cap) {
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
- }
-
blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
- }
#ifndef THREADEDED_RTS
case BlockedOnRead:
@@ -515,9 +465,9 @@ throwToSendMsg (Capability *cap STG_UNUSED,
{
#ifdef THREADED_RTS
- debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
- sendMessage(target_cap, (Message*)msg);
+ sendMessage(cap, target_cap, (Message*)msg);
#endif
}
@@ -532,7 +482,7 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
ASSERT(target->cap == cap);
- msg->link = (Message*)target->blocked_exceptions;
+ msg->link = target->blocked_exceptions;
target->blocked_exceptions = msg;
dirty_TSO(cap,target); // we modified the blocked_exceptions queue
}
@@ -571,7 +521,7 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
(tso->flags & TSO_BLOCKEX) != 0) {
- debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
}
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
@@ -664,16 +614,18 @@ removeFromQueues(Capability *cap, StgTSO *tso)
case BlockedOnMVar:
removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
+ // we aren't doing a write barrier here: the MVar is supposed to
+ // be already locked, so replacing the info pointer would unlock it.
goto done;
case BlockedOnBlackHole:
- removeThreadFromQueue(cap, &blackhole_queue, tso);
+ // nothing to do
goto done;
case BlockedOnMsgWakeup:
{
// kill the message, atomically:
- tso->block_info.wakeup->header.info = &stg_IND_info;
+ OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info);
break;
}
@@ -725,7 +677,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
* asynchronous exception in an existing thread.
*
* We first remove the thread from any queue on which it might be
- * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
+ * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and
+ * TSO blocked_exception queues.
*
* We strip the stack down to the innermost CATCH_FRAME, building
* thunks in the heap for all the active computations, so they can
@@ -764,8 +717,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
StgClosure *updatee;
nat i;
- debugTrace(DEBUG_sched,
- "raising exception in thread %ld.", (long)tso->id);
+ debugTraceCap(DEBUG_sched, cap,
+ "raising exception in thread %ld.", (long)tso->id);
#if defined(PROFILING)
/*
@@ -784,6 +737,15 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
tso->what_next != ThreadKilled &&
tso->what_next != ThreadRelocated);
+ // only if we own this TSO (except that deleteThread() calls this
+ ASSERT(tso->cap == cap);
+
+ // wake it up
+ if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) {
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+ }
+
// mark it dirty; we're about to change its stack.
dirty_TSO(cap, tso);
@@ -871,7 +833,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
// Perform the update
// TODO: this may waste some work, if the thunk has
// already been updated by another thread.
- UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+ updateThunk(cap, tso,
+ ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
}
sp += sizeofW(StgUpdateFrame) - 1;
@@ -963,8 +926,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
{
StgTRecHeader *trec = tso -> trec;
StgTRecHeader *outer = trec -> enclosing_trec;
- debugTrace(DEBUG_stm,
- "found atomically block delivering async exception");
+ debugTraceCap(DEBUG_stm, cap,
+ "found atomically block delivering async exception");
stmAbortTransaction(cap, trec);
stmFreeAbortedTRec(cap, trec);
tso -> trec = outer;