diff options
author | Simon Marlow <marlowsd@gmail.com> | 2010-03-11 09:57:44 +0000 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2010-03-11 09:57:44 +0000 |
commit | 7408b39235bccdcde48df2a73337ff976fbc09b7 (patch) | |
tree | cf20c372fdc5787170d53df36fc24ecf8113c89e /rts/RaiseAsync.c | |
parent | 12cfec943127f0c81e1ffa1ca5ce46e888e3027c (diff) | |
download | haskell-7408b39235bccdcde48df2a73337ff976fbc09b7.tar.gz |
Use message-passing to implement throwTo in the RTS
This replaces some complicated locking schemes with message-passing
in the implementation of throwTo. The benefits are
- previously it was impossible to guarantee that a throwTo from
a thread running on one CPU to a thread running on another CPU
would be noticed, and we had to rely on the GC to pick up these
forgotten exceptions. This no longer happens.
- the locking regime is simpler (though the code is about the same
size)
- threads can be unblocked from a blocked_exceptions queue without
having to traverse the whole queue now. It's a rare case, but
replaces an O(n) operation with an O(1).
- generally we move in the direction of sharing less between
Capabilities (aka HECs), which will become important with other
changes we have planned.
Also in this patch I replaced several STM-specific closure types with
a generic MUT_PRIM closure type, which allowed a lot of code in the GC
and other places to go away, hence the line-count reduction. The
message-passing changes resulted in about a net zero line-count
difference.
Diffstat (limited to 'rts/RaiseAsync.c')
-rw-r--r-- | rts/RaiseAsync.c | 512 |
1 files changed, 273 insertions, 239 deletions
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index ca5e5ea4c1..d54f823d6e 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -30,10 +30,14 @@ static void raiseAsync (Capability *cap, static void removeFromQueues(Capability *cap, StgTSO *tso); -static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target); +static void blockedThrowTo (Capability *cap, + StgTSO *target, MessageThrowTo *msg); -static void performBlockedException (Capability *cap, - StgTSO *source, StgTSO *target); +static void throwToSendMsg (Capability *cap USED_IF_THREADS, + Capability *target_cap USED_IF_THREADS, + MessageThrowTo *msg USED_IF_THREADS); + +static void performBlockedException (Capability *cap, MessageThrowTo *msg); /* ----------------------------------------------------------------------------- throwToSingleThreaded @@ -96,59 +100,85 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) may be blocked and could be woken up at any point by another CPU. We have some delicate synchronisation to do. - There is a completely safe fallback scheme: it is always possible - to just block the source TSO on the target TSO's blocked_exceptions - queue. This queue is locked using lockTSO()/unlockTSO(). It is - checked at regular intervals: before and after running a thread - (schedule() and threadPaused() respectively), and just before GC - (scheduleDoGC()). Activating a thread on this queue should be done - using maybePerformBlockedException(): this is done in the context - of the target thread, so the exception can be raised eagerly. - - This fallback scheme works even if the target thread is complete or - killed: scheduleDoGC() will discover the blocked thread before the - target is GC'd. - - Blocking the source thread on the target thread's blocked_exception - queue is also employed when the target thread is currently blocking - exceptions (ie. inside Control.Exception.block). - - We could use the safe fallback scheme exclusively, but that - wouldn't be ideal: most calls to throwTo would block immediately, - possibly until the next GC, which might require the deadlock - detection mechanism to kick in. So we try to provide promptness - wherever possible. - - We can promptly deliver the exception if the target thread is: - - - runnable, on the same Capability as the source thread (because - we own the run queue and therefore the target thread). - - - blocked, and we can obtain exclusive access to it. Obtaining - exclusive access to the thread depends on how it is blocked. - - We must also be careful to not trip over threadStackOverflow(), - which might be moving the TSO to enlarge its stack. - lockTSO()/unlockTSO() are used here too. - + The underlying scheme when multiple Capabilities are in use is + message passing: when the target of a throwTo is on another + Capability, we send a message (a MessageThrowTo closure) to that + Capability. + + If the throwTo needs to block because the target TSO is masking + exceptions (the TSO_BLOCKEX flag), then the message is placed on + the blocked_exceptions queue attached to the target TSO. When the + target TSO enters the unmasked state again, it must check the + queue. The blocked_exceptions queue is not locked; only the + Capability owning the TSO may modify it. + + To make things simpler for throwTo, we always create the message + first before deciding what to do. The message may get sent, or it + may get attached to a TSO's blocked_exceptions queue, or the + exception may get thrown immediately and the message dropped, + depending on the current state of the target. + + Currently we send a message if the target belongs to another + Capability, and it is + + - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo, + BlockedOnCCall + + - or it is masking exceptions (TSO_BLOCKEX) + + Currently, if the target is BlockedOnMVar, BlockedOnSTM, or + BlockedOnBlackHole then we acquire ownership of the TSO by locking + its parent container (e.g. the MVar) and then raise the exception. + We might change these cases to be more message-passing-like in the + future. + Returns: - THROWTO_SUCCESS exception was raised, ok to continue + NULL exception was raised, ok to continue - THROWTO_BLOCKED exception was not raised; block the source - thread then call throwToReleaseTarget() when - the source thread is properly tidied away. + MessageThrowTo * exception was not raised; the source TSO + should now put itself in the state + BlockedOnMsgThrowTo, and when it is ready + it should unlock the mssage using + unlockClosure(msg, &stg_MSG_THROWTO_info); + If it decides not to raise the exception after + all, it can revoke it safely with + unlockClosure(msg, &stg_IND_info); -------------------------------------------------------------------------- */ -nat +MessageThrowTo * throwTo (Capability *cap, // the Capability we hold StgTSO *source, // the TSO sending the exception (or NULL) StgTSO *target, // the TSO receiving the exception - StgClosure *exception, // the exception closure - /*[out]*/ void **out USED_IF_THREADS) + StgClosure *exception) // the exception closure +{ + MessageThrowTo *msg; + + msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo)); + // message starts locked; the caller has to unlock it when it is + // ready. + msg->header.info = &stg_WHITEHOLE_info; + msg->source = source; + msg->target = target; + msg->exception = exception; + + switch (throwToMsg(cap, msg)) + { + case THROWTO_SUCCESS: + return NULL; + case THROWTO_BLOCKED: + default: + return msg; + } +} + + +nat +throwToMsg (Capability *cap, MessageThrowTo *msg) { StgWord status; + StgTSO *target = msg->target; ASSERT(target != END_TSO_QUEUE); @@ -159,13 +189,10 @@ throwTo (Capability *cap, // the Capability we hold // ASSERT(get_itbl(target)->type == TSO); } - if (source != NULL) { - debugTrace(DEBUG_sched, "throwTo: from thread %lu to thread %lu", - (unsigned long)source->id, (unsigned long)target->id); - } else { - debugTrace(DEBUG_sched, "throwTo: from RTS to thread %lu", - (unsigned long)target->id); - } + debugTraceCap(DEBUG_sched, cap, + "throwTo: from thread %lu to thread %lu", + (unsigned long)msg->source->id, + (unsigned long)msg->target->id); #ifdef DEBUG traceThreadStatus(DEBUG_sched, target); @@ -173,6 +200,7 @@ throwTo (Capability *cap, // the Capability we hold goto check_target; retry: + write_barrier(); debugTrace(DEBUG_sched, "throwTo: retrying..."); check_target: @@ -188,6 +216,7 @@ check_target: switch (status) { case NotBlocked: + case BlockedOnMsgWakeup: /* if status==NotBlocked, and target->cap == cap, then we own this TSO and can raise the exception. @@ -251,37 +280,80 @@ check_target: write_barrier(); target_cap = target->cap; - if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) { - // It's on our run queue and not blocking exceptions - raiseAsync(cap, target, exception, rtsFalse, NULL); - return THROWTO_SUCCESS; - } else { - // Otherwise, just block on the blocked_exceptions queue - // of the target thread. The queue will get looked at - // soon enough: it is checked before and after running a - // thread, and during GC. - lockTSO(target); - - // Avoid race with threadStackOverflow, which may have - // just moved this TSO. - if (target->what_next == ThreadRelocated) { - unlockTSO(target); - target = target->_link; - goto retry; - } - // check again for ThreadComplete and ThreadKilled. This - // cooperates with scheduleHandleThreadFinished to ensure - // that we never miss any threads that are throwing an - // exception to a thread in the process of terminating. - if (target->what_next == ThreadComplete - || target->what_next == ThreadKilled) { - unlockTSO(target); + if (target_cap != cap) { + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; + } else { + if ((target->flags & TSO_BLOCKEX) == 0) { + // It's on our run queue and not blocking exceptions + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); return THROWTO_SUCCESS; + } else { + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; } - blockedThrowTo(cap,source,target); - *out = target; - return THROWTO_BLOCKED; - } + } + } + + case BlockedOnMsgThrowTo: + { + Capability *target_cap; + const StgInfoTable *i; + MessageThrowTo *m; + + m = target->block_info.throwto; + + // target is local to this cap, but has sent a throwto + // message to another cap. + // + // The source message is locked. We need to revoke the + // target's message so that we can raise the exception, so + // we attempt to lock it. + + // There's a possibility of a deadlock if two threads are both + // trying to throwTo each other (or more generally, a cycle of + // threads). To break the symmetry we compare the addresses + // of the MessageThrowTo objects, and the one for which m < + // msg gets to spin, while the other can only try to lock + // once, but must then back off and unlock both before trying + // again. + if (m < msg) { + i = lockClosure((StgClosure *)m); + } else { + i = tryLockClosure((StgClosure *)m); + if (i == NULL) { +// debugBelch("collision\n"); + throwToSendMsg(cap, target->cap, msg); + return THROWTO_BLOCKED; + } + } + + if (i != &stg_MSG_THROWTO_info) { + // if it's an IND, this TSO has been woken up by another Cap + unlockClosure((StgClosure*)m, i); + goto retry; + } + + target_cap = target->cap; + if (target_cap != cap) { + unlockClosure((StgClosure*)m, i); + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; + } + + if ((target->flags & TSO_BLOCKEX) && + ((target->flags & TSO_INTERRUPTIBLE) == 0)) { + unlockClosure((StgClosure*)m, i); + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; + } + + // nobody else can wake up this TSO after we claim the message + unlockClosure((StgClosure*)m, &stg_IND_info); + + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + unblockOne(cap, target); + return THROWTO_SUCCESS; } case BlockedOnMVar: @@ -322,14 +394,17 @@ check_target: if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - lockClosure((StgClosure *)target); - blockedThrowTo(cap,source,target); + Capability *target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap,target_cap,msg); + } else { + blockedThrowTo(cap,target,msg); + } unlockClosure((StgClosure *)mvar, info); - *out = target; - return THROWTO_BLOCKED; // caller releases TSO + return THROWTO_BLOCKED; } else { removeThreadFromMVarQueue(cap, mvar, target); - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); unblockOne(cap, target); unlockClosure((StgClosure *)mvar, info); return THROWTO_SUCCESS; @@ -346,84 +421,23 @@ check_target: } if (target->flags & TSO_BLOCKEX) { - lockTSO(target); - blockedThrowTo(cap,source,target); + Capability *target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap,target_cap,msg); + } else { + blockedThrowTo(cap,target,msg); + } RELEASE_LOCK(&sched_mutex); - *out = target; - return THROWTO_BLOCKED; // caller releases TSO + return THROWTO_BLOCKED; // caller releases lock } else { removeThreadFromQueue(cap, &blackhole_queue, target); - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); unblockOne(cap, target); RELEASE_LOCK(&sched_mutex); return THROWTO_SUCCESS; } } - case BlockedOnException: - { - StgTSO *target2; - StgInfoTable *info; - - /* - To obtain exclusive access to a BlockedOnException thread, - we must call lockClosure() on the TSO on which it is blocked. - Since the TSO might change underneath our feet, after we - call lockClosure() we must check that - - (a) the closure we locked is actually a TSO - (b) the original thread is still BlockedOnException, - (c) the original thread is still blocked on the TSO we locked - and (d) the target thread has not been relocated. - - We synchronise with threadStackOverflow() (which relocates - threads) using lockClosure()/unlockClosure(). - */ - target2 = target->block_info.tso; - - info = lockClosure((StgClosure *)target2); - if (info != &stg_TSO_info) { - unlockClosure((StgClosure *)target2, info); - goto retry; - } - if (target->what_next == ThreadRelocated) { - target = target->_link; - unlockTSO(target2); - goto retry; - } - if (target2->what_next == ThreadRelocated) { - target->block_info.tso = target2->_link; - unlockTSO(target2); - goto retry; - } - if (target->why_blocked != BlockedOnException - || target->block_info.tso != target2) { - unlockTSO(target2); - goto retry; - } - - /* - Now we have exclusive rights to the target TSO... - - If it is blocking exceptions, add the source TSO to its - blocked_exceptions queue. Otherwise, raise the exception. - */ - if ((target->flags & TSO_BLOCKEX) && - ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - lockTSO(target); - blockedThrowTo(cap,source,target); - unlockTSO(target2); - *out = target; - return THROWTO_BLOCKED; - } else { - removeThreadFromQueue(cap, &target2->blocked_exceptions, target); - raiseAsync(cap, target, exception, rtsFalse, NULL); - unblockOne(cap, target); - unlockTSO(target2); - return THROWTO_SUCCESS; - } - } - case BlockedOnSTM: lockTSO(target); // Unblocking BlockedOnSTM threads requires the TSO to be @@ -434,11 +448,16 @@ check_target: } if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - blockedThrowTo(cap,source,target); - *out = target; + Capability *target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap,target_cap,msg); + } else { + blockedThrowTo(cap,target,msg); + } + unlockTSO(target); return THROWTO_BLOCKED; } else { - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); unblockOne(cap, target); unlockTSO(target); return THROWTO_SUCCESS; @@ -446,19 +465,18 @@ check_target: case BlockedOnCCall: case BlockedOnCCall_NoUnblockExc: - // I don't think it's possible to acquire ownership of a - // BlockedOnCCall thread. We just assume that the target - // thread is blocking exceptions, and block on its - // blocked_exception queue. - lockTSO(target); - if (target->why_blocked != BlockedOnCCall && - target->why_blocked != BlockedOnCCall_NoUnblockExc) { - unlockTSO(target); - goto retry; - } - blockedThrowTo(cap,source,target); - *out = target; + { + Capability *target_cap; + + target_cap = target->cap; + if (target_cap != cap) { + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; + } + + blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; + } #ifndef THREADEDED_RTS case BlockedOnRead: @@ -469,11 +487,11 @@ check_target: #endif if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - blockedThrowTo(cap,source,target); + blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; } else { removeFromQueues(cap,target); - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); return THROWTO_SUCCESS; } #endif @@ -484,33 +502,34 @@ check_target: barf("throwTo"); } -// Block a TSO on another TSO's blocked_exceptions queue. -// Precondition: we hold an exclusive lock on the target TSO (this is -// complex to achieve as there's no single lock on a TSO; see -// throwTo()). static void -blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target) +throwToSendMsg (Capability *cap STG_UNUSED, + Capability *target_cap USED_IF_THREADS, + MessageThrowTo *msg USED_IF_THREADS) + { - if (source != NULL) { - debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id); - setTSOLink(cap, source, target->blocked_exceptions); - target->blocked_exceptions = source; - dirty_TSO(cap,target); // we modified the blocked_exceptions queue - - source->block_info.tso = target; - write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is. - source->why_blocked = BlockedOnException; - } -} +#ifdef THREADED_RTS + debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); + sendMessage(target_cap, (Message*)msg); +#endif +} -#ifdef THREADED_RTS -void -throwToReleaseTarget (void *tso) +// Block a throwTo message on the target TSO's blocked_exceptions +// queue. The current Capability must own the target TSO in order to +// modify the blocked_exceptions queue. +static void +blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg) { - unlockTSO((StgTSO *)tso); + debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu", + (unsigned long)target->id); + + ASSERT(target->cap == cap); + + msg->link = (Message*)target->blocked_exceptions; + target->blocked_exceptions = msg; + dirty_TSO(cap,target); // we modified the blocked_exceptions queue } -#endif /* ----------------------------------------------------------------------------- Waking up threads blocked in throwTo @@ -532,10 +551,11 @@ throwToReleaseTarget (void *tso) int maybePerformBlockedException (Capability *cap, StgTSO *tso) { - StgTSO *source; + MessageThrowTo *msg; + const StgInfoTable *i; if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) { - if (tso->blocked_exceptions != END_TSO_QUEUE) { + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { awakenBlockedExceptionQueue(cap,tso); return 1; } else { @@ -543,32 +563,30 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) } } - if (tso->blocked_exceptions != END_TSO_QUEUE && + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && (tso->flags & TSO_BLOCKEX) != 0) { debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); } - if (tso->blocked_exceptions != END_TSO_QUEUE + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && ((tso->flags & TSO_BLOCKEX) == 0 || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) { - // Lock the TSO, this gives us exclusive access to the queue - lockTSO(tso); - - // Check the queue again; it might have changed before we - // locked it. - if (tso->blocked_exceptions == END_TSO_QUEUE) { - unlockTSO(tso); - return 0; - } - // We unblock just the first thread on the queue, and perform // its throw immediately. - source = tso->blocked_exceptions; - performBlockedException(cap, source, tso); - tso->blocked_exceptions = unblockOne_(cap, source, - rtsFalse/*no migrate*/); - unlockTSO(tso); + loop: + msg = tso->blocked_exceptions; + if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0; + i = lockClosure((StgClosure*)msg); + tso->blocked_exceptions = (MessageThrowTo*)msg->link; + if (i == &stg_IND_info) { + unlockClosure((StgClosure*)msg,i); + goto loop; + } + + performBlockedException(cap, msg); + unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); + unlockClosure((StgClosure*)msg,&stg_IND_info); return 1; } return 0; @@ -580,25 +598,34 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) { - lockTSO(tso); - awakenBlockedQueue(cap, tso->blocked_exceptions); - tso->blocked_exceptions = END_TSO_QUEUE; - unlockTSO(tso); + MessageThrowTo *msg; + const StgInfoTable *i; + + for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE; + msg = (MessageThrowTo*)msg->link) { + i = lockClosure((StgClosure *)msg); + if (i != &stg_IND_info) { + unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); + } + unlockClosure((StgClosure *)msg,i); + } + tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; } static void -performBlockedException (Capability *cap, StgTSO *source, StgTSO *target) +performBlockedException (Capability *cap, MessageThrowTo *msg) { - StgClosure *exception; + StgTSO *source; + + source = msg->source; - ASSERT(source->why_blocked == BlockedOnException); - ASSERT(source->block_info.tso->id == target->id); + ASSERT(source->why_blocked == BlockedOnMsgThrowTo); + ASSERT(source->block_info.closure == (StgClosure *)msg); ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info); - ASSERT(((StgTSO *)source->sp[1])->id == target->id); + ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id); // check ids not pointers, because the thread might be relocated - exception = (StgClosure *)source->sp[2]; - throwToSingleThreaded(cap, target, exception); + throwToSingleThreaded(cap, msg->target, msg->exception); source->sp += 3; } @@ -637,22 +664,25 @@ removeFromQueues(Capability *cap, StgTSO *tso) removeThreadFromQueue(cap, &blackhole_queue, tso); goto done; - case BlockedOnException: - { - StgTSO *target = tso->block_info.tso; - - // NO: when called by threadPaused(), we probably have this - // TSO already locked (WHITEHOLEd) because we just placed - // ourselves on its queue. - // ASSERT(get_itbl(target)->type == TSO); - - while (target->what_next == ThreadRelocated) { - target = target->_link; - } - - removeThreadFromQueue(cap, &target->blocked_exceptions, tso); - goto done; - } + case BlockedOnMsgWakeup: + { + // kill the message, atomically: + tso->block_info.wakeup->header.info = &stg_IND_info; + break; + } + + case BlockedOnMsgThrowTo: + { + MessageThrowTo *m = tso->block_info.throwto; + // The message is locked by us, unless we got here via + // deleteAllThreads(), in which case we own all the + // capabilities. + // ASSERT(m->header.info == &stg_WHITEHOLE_info); + + // unlock and revoke it at the same time + unlockClosure((StgClosure*)m,&stg_IND_info); + break; + } #if !defined(THREADED_RTS) case BlockedOnRead: @@ -743,6 +773,10 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, } #endif + while (tso->what_next == ThreadRelocated) { + tso = tso->_link; + } + // mark it dirty; we're about to change its stack. dirty_TSO(cap, tso); |