diff options
author | Simon Marlow <marlowsd@gmail.com> | 2010-03-29 14:44:56 +0000 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2010-03-29 14:44:56 +0000 |
commit | 5d52d9b64c21dcf77849866584744722f8121389 (patch) | |
tree | 25aeafc9b761e73714c24ae414c0b1c41765c99f /rts/RaiseAsync.c | |
parent | 79957d77c1bff767f1041d3fabdeb94d92a52878 (diff) | |
download | haskell-5d52d9b64c21dcf77849866584744722f8121389.tar.gz |
New implementation of BLACKHOLEs
This replaces the global blackhole_queue with a clever scheme that
enables us to queue up blocked threads on the closure that they are
blocked on, while still avoiding atomic instructions in the common
case.
Advantages:
- gets rid of a locked global data structure and some tricky GC code
(replacing it with some per-thread data structures and different
tricky GC code :)
- wakeups are more prompt: parallel/concurrent performance should
benefit. I haven't seen anything dramatic in the parallel
benchmarks so far, but a couple of threading benchmarks do improve
a bit.
- waking up a thread blocked on a blackhole is now O(1) (e.g. if
it is the target of throwTo).
- less sharing and better separation of Capabilities: communication
is done with messages, the data structures are strictly owned by a
Capability and cannot be modified except by sending messages.
- this change will utlimately enable us to do more intelligent
scheduling when threads block on each other. This is what started
off the whole thing, but it isn't done yet (#3838).
I'll be documenting all this on the wiki in due course.
Diffstat (limited to 'rts/RaiseAsync.c')
-rw-r--r-- | rts/RaiseAsync.c | 175 |
1 files changed, 69 insertions, 106 deletions
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index d02a2567ff..d5a4918f34 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -18,6 +18,7 @@ #include "STM.h" #include "sm/Sanity.h" #include "Profiling.h" +#include "Messages.h" #if defined(mingw32_HOST_OS) #include "win32/IOManager.h" #endif @@ -66,13 +67,12 @@ void throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) { + tso = deRefTSO(tso); + // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { return; } - while (tso->what_next == ThreadRelocated) { - tso = tso->_link; - } // Remove it from any blocking queues removeFromQueues(cap,tso); @@ -83,13 +83,12 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, void suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) { + tso = deRefTSO(tso); + // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { return; } - while (tso->what_next == ThreadRelocated) { - tso = tso->_link; - } // Remove it from any blocking queues removeFromQueues(cap,tso); @@ -164,7 +163,7 @@ throwTo (Capability *cap, // the Capability we hold msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo)); // message starts locked; the caller has to unlock it when it is // ready. - msg->header.info = &stg_WHITEHOLE_info; + SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM); msg->source = source; msg->target = target; msg->exception = exception; @@ -185,14 +184,24 @@ throwToMsg (Capability *cap, MessageThrowTo *msg) { StgWord status; StgTSO *target = msg->target; + Capability *target_cap; + goto check_target; + +retry: + write_barrier(); + debugTrace(DEBUG_sched, "throwTo: retrying..."); + +check_target: ASSERT(target != END_TSO_QUEUE); // follow ThreadRelocated links in the target first - while (target->what_next == ThreadRelocated) { - target = target->_link; - // No, it might be a WHITEHOLE: - // ASSERT(get_itbl(target)->type == TSO); + target = deRefTSO(target); + + // Thread already dead? + if (target->what_next == ThreadComplete + || target->what_next == ThreadKilled) { + return THROWTO_SUCCESS; } debugTraceCap(DEBUG_sched, cap, @@ -204,18 +213,10 @@ throwToMsg (Capability *cap, MessageThrowTo *msg) traceThreadStatus(DEBUG_sched, target); #endif - goto check_target; -retry: - write_barrier(); - debugTrace(DEBUG_sched, "throwTo: retrying..."); - -check_target: - ASSERT(target != END_TSO_QUEUE); - - // Thread already dead? - if (target->what_next == ThreadComplete - || target->what_next == ThreadKilled) { - return THROWTO_SUCCESS; + target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; } status = target->why_blocked; @@ -282,28 +283,19 @@ check_target: have also seen the write to Q. */ { - Capability *target_cap; - write_barrier(); - target_cap = target->cap; - if (target_cap != cap) { - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; + if ((target->flags & TSO_BLOCKEX) == 0) { + // It's on our run queue and not blocking exceptions + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; } else { - if ((target->flags & TSO_BLOCKEX) == 0) { - // It's on our run queue and not blocking exceptions - raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - return THROWTO_SUCCESS; - } else { - blockedThrowTo(cap,target,msg); - return THROWTO_BLOCKED; - } + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; } } case BlockedOnMsgThrowTo: { - Capability *target_cap; const StgInfoTable *i; MessageThrowTo *m; @@ -340,13 +332,6 @@ check_target: goto retry; } - target_cap = target->cap; - if (target_cap != cap) { - unlockClosure((StgClosure*)m, i); - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; - } - if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { unlockClosure((StgClosure*)m, i); @@ -358,7 +343,6 @@ check_target: unlockClosure((StgClosure*)m, &stg_IND_info); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); return THROWTO_SUCCESS; } @@ -400,48 +384,30 @@ check_target: if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } + blockedThrowTo(cap,target,msg); unlockClosure((StgClosure *)mvar, info); return THROWTO_BLOCKED; } else { removeThreadFromMVarQueue(cap, mvar, target); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); - unlockClosure((StgClosure *)mvar, info); + if (info == &stg_MVAR_CLEAN_info) { + dirty_MVAR(&cap->r,(StgClosure*)mvar); + } + unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info); return THROWTO_SUCCESS; } } case BlockedOnBlackHole: { - ACQUIRE_LOCK(&sched_mutex); - // double checking the status after the memory barrier: - if (target->why_blocked != BlockedOnBlackHole) { - RELEASE_LOCK(&sched_mutex); - goto retry; - } - - if (target->flags & TSO_BLOCKEX) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } - RELEASE_LOCK(&sched_mutex); - return THROWTO_BLOCKED; // caller releases lock - } else { - removeThreadFromQueue(cap, &blackhole_queue, target); - raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); - RELEASE_LOCK(&sched_mutex); - return THROWTO_SUCCESS; - } + // Revoke the message by replacing it with IND. We're not + // locking anything here, so we might still get a TRY_WAKEUP + // message from the owner of the blackhole some time in the + // future, but that doesn't matter. + ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info); + OVERWRITE_INFO(target->block_info.bh, &stg_IND_info); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; } case BlockedOnSTM: @@ -454,35 +420,19 @@ check_target: } if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } + blockedThrowTo(cap,target,msg); unlockTSO(target); return THROWTO_BLOCKED; } else { raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); unlockTSO(target); return THROWTO_SUCCESS; } case BlockedOnCCall: case BlockedOnCCall_NoUnblockExc: - { - Capability *target_cap; - - target_cap = target->cap; - if (target_cap != cap) { - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; - } - blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; - } #ifndef THREADEDED_RTS case BlockedOnRead: @@ -515,9 +465,9 @@ throwToSendMsg (Capability *cap STG_UNUSED, { #ifdef THREADED_RTS - debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); + debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); - sendMessage(target_cap, (Message*)msg); + sendMessage(cap, target_cap, (Message*)msg); #endif } @@ -532,7 +482,7 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg) ASSERT(target->cap == cap); - msg->link = (Message*)target->blocked_exceptions; + msg->link = target->blocked_exceptions; target->blocked_exceptions = msg; dirty_TSO(cap,target); // we modified the blocked_exceptions queue } @@ -571,7 +521,7 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && (tso->flags & TSO_BLOCKEX) != 0) { - debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); + debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); } if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE @@ -664,16 +614,18 @@ removeFromQueues(Capability *cap, StgTSO *tso) case BlockedOnMVar: removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso); + // we aren't doing a write barrier here: the MVar is supposed to + // be already locked, so replacing the info pointer would unlock it. goto done; case BlockedOnBlackHole: - removeThreadFromQueue(cap, &blackhole_queue, tso); + // nothing to do goto done; case BlockedOnMsgWakeup: { // kill the message, atomically: - tso->block_info.wakeup->header.info = &stg_IND_info; + OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info); break; } @@ -725,7 +677,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) * asynchronous exception in an existing thread. * * We first remove the thread from any queue on which it might be - * blocked. The possible blockages are MVARs and BLACKHOLE_BQs. + * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and + * TSO blocked_exception queues. * * We strip the stack down to the innermost CATCH_FRAME, building * thunks in the heap for all the active computations, so they can @@ -764,8 +717,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, StgClosure *updatee; nat i; - debugTrace(DEBUG_sched, - "raising exception in thread %ld.", (long)tso->id); + debugTraceCap(DEBUG_sched, cap, + "raising exception in thread %ld.", (long)tso->id); #if defined(PROFILING) /* @@ -784,6 +737,15 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, tso->what_next != ThreadKilled && tso->what_next != ThreadRelocated); + // only if we own this TSO (except that deleteThread() calls this + ASSERT(tso->cap == cap); + + // wake it up + if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) { + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); + } + // mark it dirty; we're about to change its stack. dirty_TSO(cap, tso); @@ -871,7 +833,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // Perform the update // TODO: this may waste some work, if the thunk has // already been updated by another thread. - UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); + updateThunk(cap, tso, + ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); } sp += sizeofW(StgUpdateFrame) - 1; @@ -963,8 +926,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, { StgTRecHeader *trec = tso -> trec; StgTRecHeader *outer = trec -> enclosing_trec; - debugTrace(DEBUG_stm, - "found atomically block delivering async exception"); + debugTraceCap(DEBUG_stm, cap, + "found atomically block delivering async exception"); stmAbortTransaction(cap, trec); stmFreeAbortedTRec(cap, trec); tso -> trec = outer; |