summaryrefslogtreecommitdiff
path: root/rts/RaiseAsync.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2010-03-11 09:57:44 +0000
committerSimon Marlow <marlowsd@gmail.com>2010-03-11 09:57:44 +0000
commit7408b39235bccdcde48df2a73337ff976fbc09b7 (patch)
treecf20c372fdc5787170d53df36fc24ecf8113c89e /rts/RaiseAsync.c
parent12cfec943127f0c81e1ffa1ca5ce46e888e3027c (diff)
downloadhaskell-7408b39235bccdcde48df2a73337ff976fbc09b7.tar.gz
Use message-passing to implement throwTo in the RTS
This replaces some complicated locking schemes with message-passing in the implementation of throwTo. The benefits are - previously it was impossible to guarantee that a throwTo from a thread running on one CPU to a thread running on another CPU would be noticed, and we had to rely on the GC to pick up these forgotten exceptions. This no longer happens. - the locking regime is simpler (though the code is about the same size) - threads can be unblocked from a blocked_exceptions queue without having to traverse the whole queue now. It's a rare case, but replaces an O(n) operation with an O(1). - generally we move in the direction of sharing less between Capabilities (aka HECs), which will become important with other changes we have planned. Also in this patch I replaced several STM-specific closure types with a generic MUT_PRIM closure type, which allowed a lot of code in the GC and other places to go away, hence the line-count reduction. The message-passing changes resulted in about a net zero line-count difference.
Diffstat (limited to 'rts/RaiseAsync.c')
-rw-r--r--rts/RaiseAsync.c512
1 files changed, 273 insertions, 239 deletions
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index ca5e5ea4c1..d54f823d6e 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -30,10 +30,14 @@ static void raiseAsync (Capability *cap,
static void removeFromQueues(Capability *cap, StgTSO *tso);
-static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target);
+static void blockedThrowTo (Capability *cap,
+ StgTSO *target, MessageThrowTo *msg);
-static void performBlockedException (Capability *cap,
- StgTSO *source, StgTSO *target);
+static void throwToSendMsg (Capability *cap USED_IF_THREADS,
+ Capability *target_cap USED_IF_THREADS,
+ MessageThrowTo *msg USED_IF_THREADS);
+
+static void performBlockedException (Capability *cap, MessageThrowTo *msg);
/* -----------------------------------------------------------------------------
throwToSingleThreaded
@@ -96,59 +100,85 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
may be blocked and could be woken up at any point by another CPU.
We have some delicate synchronisation to do.
- There is a completely safe fallback scheme: it is always possible
- to just block the source TSO on the target TSO's blocked_exceptions
- queue. This queue is locked using lockTSO()/unlockTSO(). It is
- checked at regular intervals: before and after running a thread
- (schedule() and threadPaused() respectively), and just before GC
- (scheduleDoGC()). Activating a thread on this queue should be done
- using maybePerformBlockedException(): this is done in the context
- of the target thread, so the exception can be raised eagerly.
-
- This fallback scheme works even if the target thread is complete or
- killed: scheduleDoGC() will discover the blocked thread before the
- target is GC'd.
-
- Blocking the source thread on the target thread's blocked_exception
- queue is also employed when the target thread is currently blocking
- exceptions (ie. inside Control.Exception.block).
-
- We could use the safe fallback scheme exclusively, but that
- wouldn't be ideal: most calls to throwTo would block immediately,
- possibly until the next GC, which might require the deadlock
- detection mechanism to kick in. So we try to provide promptness
- wherever possible.
-
- We can promptly deliver the exception if the target thread is:
-
- - runnable, on the same Capability as the source thread (because
- we own the run queue and therefore the target thread).
-
- - blocked, and we can obtain exclusive access to it. Obtaining
- exclusive access to the thread depends on how it is blocked.
-
- We must also be careful to not trip over threadStackOverflow(),
- which might be moving the TSO to enlarge its stack.
- lockTSO()/unlockTSO() are used here too.
-
+ The underlying scheme when multiple Capabilities are in use is
+ message passing: when the target of a throwTo is on another
+ Capability, we send a message (a MessageThrowTo closure) to that
+ Capability.
+
+ If the throwTo needs to block because the target TSO is masking
+ exceptions (the TSO_BLOCKEX flag), then the message is placed on
+ the blocked_exceptions queue attached to the target TSO. When the
+ target TSO enters the unmasked state again, it must check the
+ queue. The blocked_exceptions queue is not locked; only the
+ Capability owning the TSO may modify it.
+
+ To make things simpler for throwTo, we always create the message
+ first before deciding what to do. The message may get sent, or it
+ may get attached to a TSO's blocked_exceptions queue, or the
+ exception may get thrown immediately and the message dropped,
+ depending on the current state of the target.
+
+ Currently we send a message if the target belongs to another
+ Capability, and it is
+
+ - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo,
+ BlockedOnCCall
+
+ - or it is masking exceptions (TSO_BLOCKEX)
+
+ Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
+ BlockedOnBlackHole then we acquire ownership of the TSO by locking
+ its parent container (e.g. the MVar) and then raise the exception.
+ We might change these cases to be more message-passing-like in the
+ future.
+
Returns:
- THROWTO_SUCCESS exception was raised, ok to continue
+ NULL exception was raised, ok to continue
- THROWTO_BLOCKED exception was not raised; block the source
- thread then call throwToReleaseTarget() when
- the source thread is properly tidied away.
+ MessageThrowTo * exception was not raised; the source TSO
+ should now put itself in the state
+ BlockedOnMsgThrowTo, and when it is ready
+ it should unlock the mssage using
+ unlockClosure(msg, &stg_MSG_THROWTO_info);
+ If it decides not to raise the exception after
+ all, it can revoke it safely with
+ unlockClosure(msg, &stg_IND_info);
-------------------------------------------------------------------------- */
-nat
+MessageThrowTo *
throwTo (Capability *cap, // the Capability we hold
StgTSO *source, // the TSO sending the exception (or NULL)
StgTSO *target, // the TSO receiving the exception
- StgClosure *exception, // the exception closure
- /*[out]*/ void **out USED_IF_THREADS)
+ StgClosure *exception) // the exception closure
+{
+ MessageThrowTo *msg;
+
+ msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
+ // message starts locked; the caller has to unlock it when it is
+ // ready.
+ msg->header.info = &stg_WHITEHOLE_info;
+ msg->source = source;
+ msg->target = target;
+ msg->exception = exception;
+
+ switch (throwToMsg(cap, msg))
+ {
+ case THROWTO_SUCCESS:
+ return NULL;
+ case THROWTO_BLOCKED:
+ default:
+ return msg;
+ }
+}
+
+
+nat
+throwToMsg (Capability *cap, MessageThrowTo *msg)
{
StgWord status;
+ StgTSO *target = msg->target;
ASSERT(target != END_TSO_QUEUE);
@@ -159,13 +189,10 @@ throwTo (Capability *cap, // the Capability we hold
// ASSERT(get_itbl(target)->type == TSO);
}
- if (source != NULL) {
- debugTrace(DEBUG_sched, "throwTo: from thread %lu to thread %lu",
- (unsigned long)source->id, (unsigned long)target->id);
- } else {
- debugTrace(DEBUG_sched, "throwTo: from RTS to thread %lu",
- (unsigned long)target->id);
- }
+ debugTraceCap(DEBUG_sched, cap,
+ "throwTo: from thread %lu to thread %lu",
+ (unsigned long)msg->source->id,
+ (unsigned long)msg->target->id);
#ifdef DEBUG
traceThreadStatus(DEBUG_sched, target);
@@ -173,6 +200,7 @@ throwTo (Capability *cap, // the Capability we hold
goto check_target;
retry:
+ write_barrier();
debugTrace(DEBUG_sched, "throwTo: retrying...");
check_target:
@@ -188,6 +216,7 @@ check_target:
switch (status) {
case NotBlocked:
+ case BlockedOnMsgWakeup:
/* if status==NotBlocked, and target->cap == cap, then
we own this TSO and can raise the exception.
@@ -251,37 +280,80 @@ check_target:
write_barrier();
target_cap = target->cap;
- if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) {
- // It's on our run queue and not blocking exceptions
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- return THROWTO_SUCCESS;
- } else {
- // Otherwise, just block on the blocked_exceptions queue
- // of the target thread. The queue will get looked at
- // soon enough: it is checked before and after running a
- // thread, and during GC.
- lockTSO(target);
-
- // Avoid race with threadStackOverflow, which may have
- // just moved this TSO.
- if (target->what_next == ThreadRelocated) {
- unlockTSO(target);
- target = target->_link;
- goto retry;
- }
- // check again for ThreadComplete and ThreadKilled. This
- // cooperates with scheduleHandleThreadFinished to ensure
- // that we never miss any threads that are throwing an
- // exception to a thread in the process of terminating.
- if (target->what_next == ThreadComplete
- || target->what_next == ThreadKilled) {
- unlockTSO(target);
+ if (target_cap != cap) {
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
+ } else {
+ if ((target->flags & TSO_BLOCKEX) == 0) {
+ // It's on our run queue and not blocking exceptions
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
return THROWTO_SUCCESS;
+ } else {
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
}
- blockedThrowTo(cap,source,target);
- *out = target;
- return THROWTO_BLOCKED;
- }
+ }
+ }
+
+ case BlockedOnMsgThrowTo:
+ {
+ Capability *target_cap;
+ const StgInfoTable *i;
+ MessageThrowTo *m;
+
+ m = target->block_info.throwto;
+
+ // target is local to this cap, but has sent a throwto
+ // message to another cap.
+ //
+ // The source message is locked. We need to revoke the
+ // target's message so that we can raise the exception, so
+ // we attempt to lock it.
+
+ // There's a possibility of a deadlock if two threads are both
+ // trying to throwTo each other (or more generally, a cycle of
+ // threads). To break the symmetry we compare the addresses
+ // of the MessageThrowTo objects, and the one for which m <
+ // msg gets to spin, while the other can only try to lock
+ // once, but must then back off and unlock both before trying
+ // again.
+ if (m < msg) {
+ i = lockClosure((StgClosure *)m);
+ } else {
+ i = tryLockClosure((StgClosure *)m);
+ if (i == NULL) {
+// debugBelch("collision\n");
+ throwToSendMsg(cap, target->cap, msg);
+ return THROWTO_BLOCKED;
+ }
+ }
+
+ if (i != &stg_MSG_THROWTO_info) {
+ // if it's an IND, this TSO has been woken up by another Cap
+ unlockClosure((StgClosure*)m, i);
+ goto retry;
+ }
+
+ target_cap = target->cap;
+ if (target_cap != cap) {
+ unlockClosure((StgClosure*)m, i);
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
+ }
+
+ if ((target->flags & TSO_BLOCKEX) &&
+ ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+ unlockClosure((StgClosure*)m, i);
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
+ }
+
+ // nobody else can wake up this TSO after we claim the message
+ unlockClosure((StgClosure*)m, &stg_IND_info);
+
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ unblockOne(cap, target);
+ return THROWTO_SUCCESS;
}
case BlockedOnMVar:
@@ -322,14 +394,17 @@ check_target:
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- lockClosure((StgClosure *)target);
- blockedThrowTo(cap,source,target);
+ Capability *target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap,target_cap,msg);
+ } else {
+ blockedThrowTo(cap,target,msg);
+ }
unlockClosure((StgClosure *)mvar, info);
- *out = target;
- return THROWTO_BLOCKED; // caller releases TSO
+ return THROWTO_BLOCKED;
} else {
removeThreadFromMVarQueue(cap, mvar, target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
unblockOne(cap, target);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_SUCCESS;
@@ -346,84 +421,23 @@ check_target:
}
if (target->flags & TSO_BLOCKEX) {
- lockTSO(target);
- blockedThrowTo(cap,source,target);
+ Capability *target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap,target_cap,msg);
+ } else {
+ blockedThrowTo(cap,target,msg);
+ }
RELEASE_LOCK(&sched_mutex);
- *out = target;
- return THROWTO_BLOCKED; // caller releases TSO
+ return THROWTO_BLOCKED; // caller releases lock
} else {
removeThreadFromQueue(cap, &blackhole_queue, target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
unblockOne(cap, target);
RELEASE_LOCK(&sched_mutex);
return THROWTO_SUCCESS;
}
}
- case BlockedOnException:
- {
- StgTSO *target2;
- StgInfoTable *info;
-
- /*
- To obtain exclusive access to a BlockedOnException thread,
- we must call lockClosure() on the TSO on which it is blocked.
- Since the TSO might change underneath our feet, after we
- call lockClosure() we must check that
-
- (a) the closure we locked is actually a TSO
- (b) the original thread is still BlockedOnException,
- (c) the original thread is still blocked on the TSO we locked
- and (d) the target thread has not been relocated.
-
- We synchronise with threadStackOverflow() (which relocates
- threads) using lockClosure()/unlockClosure().
- */
- target2 = target->block_info.tso;
-
- info = lockClosure((StgClosure *)target2);
- if (info != &stg_TSO_info) {
- unlockClosure((StgClosure *)target2, info);
- goto retry;
- }
- if (target->what_next == ThreadRelocated) {
- target = target->_link;
- unlockTSO(target2);
- goto retry;
- }
- if (target2->what_next == ThreadRelocated) {
- target->block_info.tso = target2->_link;
- unlockTSO(target2);
- goto retry;
- }
- if (target->why_blocked != BlockedOnException
- || target->block_info.tso != target2) {
- unlockTSO(target2);
- goto retry;
- }
-
- /*
- Now we have exclusive rights to the target TSO...
-
- If it is blocking exceptions, add the source TSO to its
- blocked_exceptions queue. Otherwise, raise the exception.
- */
- if ((target->flags & TSO_BLOCKEX) &&
- ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- lockTSO(target);
- blockedThrowTo(cap,source,target);
- unlockTSO(target2);
- *out = target;
- return THROWTO_BLOCKED;
- } else {
- removeThreadFromQueue(cap, &target2->blocked_exceptions, target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- unblockOne(cap, target);
- unlockTSO(target2);
- return THROWTO_SUCCESS;
- }
- }
-
case BlockedOnSTM:
lockTSO(target);
// Unblocking BlockedOnSTM threads requires the TSO to be
@@ -434,11 +448,16 @@ check_target:
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- blockedThrowTo(cap,source,target);
- *out = target;
+ Capability *target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap,target_cap,msg);
+ } else {
+ blockedThrowTo(cap,target,msg);
+ }
+ unlockTSO(target);
return THROWTO_BLOCKED;
} else {
- raiseAsync(cap, target, exception, rtsFalse, NULL);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
unblockOne(cap, target);
unlockTSO(target);
return THROWTO_SUCCESS;
@@ -446,19 +465,18 @@ check_target:
case BlockedOnCCall:
case BlockedOnCCall_NoUnblockExc:
- // I don't think it's possible to acquire ownership of a
- // BlockedOnCCall thread. We just assume that the target
- // thread is blocking exceptions, and block on its
- // blocked_exception queue.
- lockTSO(target);
- if (target->why_blocked != BlockedOnCCall &&
- target->why_blocked != BlockedOnCCall_NoUnblockExc) {
- unlockTSO(target);
- goto retry;
- }
- blockedThrowTo(cap,source,target);
- *out = target;
+ {
+ Capability *target_cap;
+
+ target_cap = target->cap;
+ if (target_cap != cap) {
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
+ }
+
+ blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
+ }
#ifndef THREADEDED_RTS
case BlockedOnRead:
@@ -469,11 +487,11 @@ check_target:
#endif
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- blockedThrowTo(cap,source,target);
+ blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
} else {
removeFromQueues(cap,target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
return THROWTO_SUCCESS;
}
#endif
@@ -484,33 +502,34 @@ check_target:
barf("throwTo");
}
-// Block a TSO on another TSO's blocked_exceptions queue.
-// Precondition: we hold an exclusive lock on the target TSO (this is
-// complex to achieve as there's no single lock on a TSO; see
-// throwTo()).
static void
-blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target)
+throwToSendMsg (Capability *cap STG_UNUSED,
+ Capability *target_cap USED_IF_THREADS,
+ MessageThrowTo *msg USED_IF_THREADS)
+
{
- if (source != NULL) {
- debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id);
- setTSOLink(cap, source, target->blocked_exceptions);
- target->blocked_exceptions = source;
- dirty_TSO(cap,target); // we modified the blocked_exceptions queue
-
- source->block_info.tso = target;
- write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is.
- source->why_blocked = BlockedOnException;
- }
-}
+#ifdef THREADED_RTS
+ debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
+ sendMessage(target_cap, (Message*)msg);
+#endif
+}
-#ifdef THREADED_RTS
-void
-throwToReleaseTarget (void *tso)
+// Block a throwTo message on the target TSO's blocked_exceptions
+// queue. The current Capability must own the target TSO in order to
+// modify the blocked_exceptions queue.
+static void
+blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
{
- unlockTSO((StgTSO *)tso);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
+ (unsigned long)target->id);
+
+ ASSERT(target->cap == cap);
+
+ msg->link = (Message*)target->blocked_exceptions;
+ target->blocked_exceptions = msg;
+ dirty_TSO(cap,target); // we modified the blocked_exceptions queue
}
-#endif
/* -----------------------------------------------------------------------------
Waking up threads blocked in throwTo
@@ -532,10 +551,11 @@ throwToReleaseTarget (void *tso)
int
maybePerformBlockedException (Capability *cap, StgTSO *tso)
{
- StgTSO *source;
+ MessageThrowTo *msg;
+ const StgInfoTable *i;
if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
- if (tso->blocked_exceptions != END_TSO_QUEUE) {
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
awakenBlockedExceptionQueue(cap,tso);
return 1;
} else {
@@ -543,32 +563,30 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
}
}
- if (tso->blocked_exceptions != END_TSO_QUEUE &&
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
(tso->flags & TSO_BLOCKEX) != 0) {
debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
}
- if (tso->blocked_exceptions != END_TSO_QUEUE
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
&& ((tso->flags & TSO_BLOCKEX) == 0
|| ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
- // Lock the TSO, this gives us exclusive access to the queue
- lockTSO(tso);
-
- // Check the queue again; it might have changed before we
- // locked it.
- if (tso->blocked_exceptions == END_TSO_QUEUE) {
- unlockTSO(tso);
- return 0;
- }
-
// We unblock just the first thread on the queue, and perform
// its throw immediately.
- source = tso->blocked_exceptions;
- performBlockedException(cap, source, tso);
- tso->blocked_exceptions = unblockOne_(cap, source,
- rtsFalse/*no migrate*/);
- unlockTSO(tso);
+ loop:
+ msg = tso->blocked_exceptions;
+ if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
+ i = lockClosure((StgClosure*)msg);
+ tso->blocked_exceptions = (MessageThrowTo*)msg->link;
+ if (i == &stg_IND_info) {
+ unlockClosure((StgClosure*)msg,i);
+ goto loop;
+ }
+
+ performBlockedException(cap, msg);
+ unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+ unlockClosure((StgClosure*)msg,&stg_IND_info);
return 1;
}
return 0;
@@ -580,25 +598,34 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
void
awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
{
- lockTSO(tso);
- awakenBlockedQueue(cap, tso->blocked_exceptions);
- tso->blocked_exceptions = END_TSO_QUEUE;
- unlockTSO(tso);
+ MessageThrowTo *msg;
+ const StgInfoTable *i;
+
+ for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
+ msg = (MessageThrowTo*)msg->link) {
+ i = lockClosure((StgClosure *)msg);
+ if (i != &stg_IND_info) {
+ unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+ }
+ unlockClosure((StgClosure *)msg,i);
+ }
+ tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
}
static void
-performBlockedException (Capability *cap, StgTSO *source, StgTSO *target)
+performBlockedException (Capability *cap, MessageThrowTo *msg)
{
- StgClosure *exception;
+ StgTSO *source;
+
+ source = msg->source;
- ASSERT(source->why_blocked == BlockedOnException);
- ASSERT(source->block_info.tso->id == target->id);
+ ASSERT(source->why_blocked == BlockedOnMsgThrowTo);
+ ASSERT(source->block_info.closure == (StgClosure *)msg);
ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
- ASSERT(((StgTSO *)source->sp[1])->id == target->id);
+ ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id);
// check ids not pointers, because the thread might be relocated
- exception = (StgClosure *)source->sp[2];
- throwToSingleThreaded(cap, target, exception);
+ throwToSingleThreaded(cap, msg->target, msg->exception);
source->sp += 3;
}
@@ -637,22 +664,25 @@ removeFromQueues(Capability *cap, StgTSO *tso)
removeThreadFromQueue(cap, &blackhole_queue, tso);
goto done;
- case BlockedOnException:
- {
- StgTSO *target = tso->block_info.tso;
-
- // NO: when called by threadPaused(), we probably have this
- // TSO already locked (WHITEHOLEd) because we just placed
- // ourselves on its queue.
- // ASSERT(get_itbl(target)->type == TSO);
-
- while (target->what_next == ThreadRelocated) {
- target = target->_link;
- }
-
- removeThreadFromQueue(cap, &target->blocked_exceptions, tso);
- goto done;
- }
+ case BlockedOnMsgWakeup:
+ {
+ // kill the message, atomically:
+ tso->block_info.wakeup->header.info = &stg_IND_info;
+ break;
+ }
+
+ case BlockedOnMsgThrowTo:
+ {
+ MessageThrowTo *m = tso->block_info.throwto;
+ // The message is locked by us, unless we got here via
+ // deleteAllThreads(), in which case we own all the
+ // capabilities.
+ // ASSERT(m->header.info == &stg_WHITEHOLE_info);
+
+ // unlock and revoke it at the same time
+ unlockClosure((StgClosure*)m,&stg_IND_info);
+ break;
+ }
#if !defined(THREADED_RTS)
case BlockedOnRead:
@@ -743,6 +773,10 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
}
#endif
+ while (tso->what_next == ThreadRelocated) {
+ tso = tso->_link;
+ }
+
// mark it dirty; we're about to change its stack.
dirty_TSO(cap, tso);