diff options
author | Simon Marlow <marlowsd@gmail.com> | 2010-04-01 09:16:05 +0000 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2010-04-01 09:16:05 +0000 |
commit | f4692220c7cbdadaa633f50eb2b30b59edb30183 (patch) | |
tree | 3d29f1b4770bedb7c69c31b2828f9b5acca3e2c3 /rts/RaiseAsync.c | |
parent | 7c4cb84efd774a21f11fb03118feb0434282ecf3 (diff) | |
download | haskell-f4692220c7cbdadaa633f50eb2b30b59edb30183.tar.gz |
Change the representation of the MVar blocked queue
The list of threads blocked on an MVar is now represented as a list of
separately allocated objects rather than being linked through the TSOs
themselves. This lets us remove a TSO from the list in O(1) time
rather than O(n) time, by marking the list object. Removing this
linear component fixes some pathalogical performance cases where many
threads were blocked on an MVar and became unreachable simultaneously
(nofib/smp/threads007), or when sending an asynchronous exception to a
TSO in a long list of thread blocked on an MVar.
MVar performance has actually improved by a few percent as a result of
this change, slightly to my surprise.
This is the final cleanup in the sequence, which let me remove the old
way of waking up threads (unblockOne(), MSG_WAKEUP) in favour of the
new way (tryWakeupThread and MSG_TRY_WAKEUP, which is idempotent). It
is now the case that only the Capability that owns a TSO may modify
its state (well, almost), and this simplifies various things. More of
the RTS is based on message-passing between Capabilities now.
Diffstat (limited to 'rts/RaiseAsync.c')
-rw-r--r-- | rts/RaiseAsync.c | 141 |
1 files changed, 63 insertions, 78 deletions
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index f974f8c0d7..bebbcd4722 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -31,6 +31,8 @@ static void raiseAsync (Capability *cap, static void removeFromQueues(Capability *cap, StgTSO *tso); +static void removeFromMVarBlockedQueue (StgTSO *tso); + static void blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg); @@ -124,7 +126,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) Currently we send a message if the target belongs to another Capability, and it is - - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo, + - NotBlocked, BlockedOnMsgThrowTo, BlockedOnCCall - or it is masking exceptions (TSO_BLOCKEX) @@ -221,67 +223,7 @@ check_target: switch (status) { case NotBlocked: - case BlockedOnMsgWakeup: - /* if status==NotBlocked, and target->cap == cap, then - we own this TSO and can raise the exception. - - How do we establish this condition? Very carefully. - - Let - P = (status == NotBlocked) - Q = (tso->cap == cap) - - Now, if P & Q are true, then the TSO is locked and owned by - this capability. No other OS thread can steal it. - - If P==0 and Q==1: the TSO is blocked, but attached to this - capabilty, and it can be stolen by another capability. - - If P==1 and Q==0: the TSO is runnable on another - capability. At any time, the TSO may change from runnable - to blocked and vice versa, while it remains owned by - another capability. - - Suppose we test like this: - - p = P - q = Q - if (p && q) ... - - this is defeated by another capability stealing a blocked - TSO from us to wake it up (Schedule.c:unblockOne()). The - other thread is doing - - Q = 0 - P = 1 - - assuming arbitrary reordering, we could see this - interleaving: - - start: P==0 && Q==1 - P = 1 - p = P - q = Q - Q = 0 - if (p && q) ... - - so we need a memory barrier: - - p = P - mb() - q = Q - if (p && q) ... - - this avoids the problematic case. There are other cases - to consider, but this is the tricky one. - - Note that we must be sure that unblockOne() does the - writes in the correct order: Q before P. The memory - barrier ensures that if we have seen the write to P, we - have also seen the write to Q. - */ { - write_barrier(); if ((target->flags & TSO_BLOCKEX) == 0) { // It's on our run queue and not blocking exceptions raiseAsync(cap, target, msg->exception, rtsFalse, NULL); @@ -389,18 +331,26 @@ check_target: goto retry; } + if (target->_link == END_TSO_QUEUE) { + // the MVar operation has already completed. There is a + // MSG_TRY_WAKEUP on the way, but we can just wake up the + // thread now anyway and ignore the message when it + // arrives. + unlockClosure((StgClosure *)mvar, info); + tryWakeupThread(cap, target); + goto retry; + } + if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { blockedThrowTo(cap,target,msg); unlockClosure((StgClosure *)mvar, info); return THROWTO_BLOCKED; } else { - removeThreadFromMVarQueue(cap, mvar, target); + // revoke the MVar operation + removeFromMVarBlockedQueue(target); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - if (info == &stg_MVAR_CLEAN_info) { - dirty_MVAR(&cap->r,(StgClosure*)mvar); - } - unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info); + unlockClosure((StgClosure *)mvar, info); return THROWTO_SUCCESS; } } @@ -588,11 +538,54 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) -------------------------------------------------------------------------- */ static void +removeFromMVarBlockedQueue (StgTSO *tso) +{ + StgMVar *mvar = (StgMVar*)tso->block_info.closure; + StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link; + + if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) { + // already removed from this MVar + return; + } + + // Assume the MVar is locked. (not assertable; sometimes it isn't + // actually WHITEHOLE'd). + + // We want to remove the MVAR_TSO_QUEUE object from the queue. It + // isn't doubly-linked so we can't actually remove it; instead we + // just overwrite it with an IND if possible and let the GC short + // it out. However, we have to be careful to maintain the deque + // structure: + + if (mvar->head == q) { + mvar->head = q->link; + q->header.info = &stg_IND_info; + if (mvar->tail == q) { + mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE; + } + } + else if (mvar->tail == q) { + // we can't replace it with an IND in this case, because then + // we lose the tail pointer when the GC shorts out the IND. + // So we use MSG_NULL as a kind of non-dupable indirection; + // these are ignored by takeMVar/putMVar. + q->header.info = &stg_MSG_NULL_info; + } + else { + q->header.info = &stg_IND_info; + } + + // revoke the MVar operation + tso->_link = END_TSO_QUEUE; +} + +static void removeFromQueues(Capability *cap, StgTSO *tso) { switch (tso->why_blocked) { case NotBlocked: + case ThreadMigrating: return; case BlockedOnSTM: @@ -605,22 +598,13 @@ removeFromQueues(Capability *cap, StgTSO *tso) goto done; case BlockedOnMVar: - removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso); - // we aren't doing a write barrier here: the MVar is supposed to - // be already locked, so replacing the info pointer would unlock it. + removeFromMVarBlockedQueue(tso); goto done; case BlockedOnBlackHole: // nothing to do goto done; - case BlockedOnMsgWakeup: - { - // kill the message, atomically: - OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info); - break; - } - case BlockedOnMsgThrowTo: { MessageThrowTo *m = tso->block_info.throwto; @@ -659,7 +643,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) } done: - unblockOne(cap, tso); + tso->why_blocked = NotBlocked; + appendToRunQueue(cap, tso); } /* ----------------------------------------------------------------------------- @@ -733,7 +718,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, ASSERT(tso->cap == cap); // wake it up - if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) { + if (tso->why_blocked != NotBlocked) { tso->why_blocked = NotBlocked; appendToRunQueue(cap,tso); } |