diff options
author | Simon Marlow <marlowsd@gmail.com> | 2010-04-01 09:16:05 +0000 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2010-04-01 09:16:05 +0000 |
commit | f4692220c7cbdadaa633f50eb2b30b59edb30183 (patch) | |
tree | 3d29f1b4770bedb7c69c31b2828f9b5acca3e2c3 /rts/Threads.c | |
parent | 7c4cb84efd774a21f11fb03118feb0434282ecf3 (diff) | |
download | haskell-f4692220c7cbdadaa633f50eb2b30b59edb30183.tar.gz |
Change the representation of the MVar blocked queue
The list of threads blocked on an MVar is now represented as a list of
separately allocated objects rather than being linked through the TSOs
themselves. This lets us remove a TSO from the list in O(1) time
rather than O(n) time, by marking the list object. Removing this
linear component fixes some pathalogical performance cases where many
threads were blocked on an MVar and became unreachable simultaneously
(nofib/smp/threads007), or when sending an asynchronous exception to a
TSO in a long list of thread blocked on an MVar.
MVar performance has actually improved by a few percent as a result of
this change, slightly to my surprise.
This is the final cleanup in the sequence, which let me remove the old
way of waking up threads (unblockOne(), MSG_WAKEUP) in favour of the
new way (tryWakeupThread and MSG_TRY_WAKEUP, which is idempotent). It
is now the case that only the Capability that owns a TSO may modify
its state (well, almost), and this simplifies various things. More of
the RTS is based on message-passing between Capabilities now.
Diffstat (limited to 'rts/Threads.c')
-rw-r--r-- | rts/Threads.c | 173 |
1 files changed, 49 insertions, 124 deletions
diff --git a/rts/Threads.c b/rts/Threads.c index 05a13c7f3b..e8a835bdae 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -205,83 +205,21 @@ removeThreadFromDeQueue (Capability *cap, barf("removeThreadFromMVarQueue: not found"); } -void -removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso) -{ - // caller must do the write barrier, because replacing the info - // pointer will unlock the MVar. - removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso); - tso->_link = END_TSO_QUEUE; -} - /* ---------------------------------------------------------------------------- - unblockOne() - - unblock a single thread. - ------------------------------------------------------------------------- */ - -StgTSO * -unblockOne (Capability *cap, StgTSO *tso) -{ - return unblockOne_(cap,tso,rtsTrue); // allow migration -} - -StgTSO * -unblockOne_ (Capability *cap, StgTSO *tso, - rtsBool allow_migrate USED_IF_THREADS) -{ - StgTSO *next; - - // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO); - ASSERT(tso->why_blocked != NotBlocked); - ASSERT(tso->why_blocked != BlockedOnMsgWakeup || - tso->block_info.closure->header.info == &stg_IND_info); - - next = tso->_link; - tso->_link = END_TSO_QUEUE; + tryWakeupThread() -#if defined(THREADED_RTS) - if (tso->cap == cap || (!tsoLocked(tso) && - allow_migrate && - RtsFlags.ParFlags.wakeupMigrate)) { - // We are waking up this thread on the current Capability, which - // might involve migrating it from the Capability it was last on. - if (tso->bound) { - ASSERT(tso->bound->task->cap == tso->cap); - tso->bound->task->cap = cap; - } - - tso->cap = cap; - write_barrier(); - tso->why_blocked = NotBlocked; - appendToRunQueue(cap,tso); - - // context-switch soonish so we can migrate the new thread if - // necessary. NB. not contextSwitchCapability(cap), which would - // force a context switch immediately. - cap->context_switch = 1; - } else { - // we'll try to wake it up on the Capability it was last on. - wakeupThreadOnCapability(cap, tso->cap, tso); - } -#else - tso->why_blocked = NotBlocked; - appendToRunQueue(cap,tso); - - // context-switch soonish so we can migrate the new thread if - // necessary. NB. not contextSwitchCapability(cap), which would - // force a context switch immediately. - cap->context_switch = 1; -#endif - - traceEventThreadWakeup (cap, tso, tso->cap->no); + Attempt to wake up a thread. tryWakeupThread is idempotent: it is + always safe to call it too many times, but it is not safe in + general to omit a call. - return next; -} + ------------------------------------------------------------------------- */ void tryWakeupThread (Capability *cap, StgTSO *tso) { + + traceEventThreadWakeup (cap, tso, tso->cap->no); + #ifdef THREADED_RTS if (tso->cap != cap) { @@ -298,6 +236,16 @@ tryWakeupThread (Capability *cap, StgTSO *tso) switch (tso->why_blocked) { + case BlockedOnMVar: + { + if (tso->_link == END_TSO_QUEUE) { + tso->block_info.closure = (StgClosure*)END_TSO_QUEUE; + goto unblock; + } else { + return; + } + } + case BlockedOnMsgThrowTo: { const StgInfoTable *i; @@ -307,27 +255,45 @@ tryWakeupThread (Capability *cap, StgTSO *tso) if (i != &stg_MSG_NULL_info) { debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)", (lnat)tso->id, tso->block_info.throwto->header.info); - break; // still blocked + return; } // remove the block frame from the stack ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info); tso->sp += 3; - // fall through... + goto unblock; } + case BlockedOnBlackHole: case BlockedOnSTM: - { - // just run the thread now, if the BH is not really available, - // we'll block again. - tso->why_blocked = NotBlocked; - appendToRunQueue(cap,tso); - break; - } + case ThreadMigrating: + goto unblock; + default: // otherwise, do nothing - break; + return; } + +unblock: + // just run the thread now, if the BH is not really available, + // we'll block again. + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); +} + +/* ---------------------------------------------------------------------------- + migrateThread + ------------------------------------------------------------------------- */ + +void +migrateThread (Capability *from, StgTSO *tso, Capability *to) +{ + traceEventMigrateThread (from, tso, to->no); + // ThreadMigrating tells the target cap that it needs to be added to + // the run queue when it receives the MSG_TRY_WAKEUP. + tso->what_next = ThreadMigrating; + tso->cap = to; + tryWakeupThread(from, tso); } /* ---------------------------------------------------------------------------- @@ -450,47 +416,6 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val) } } -/* ---------------------------------------------------------------------------- - * Wake up a thread on a Capability. - * - * This is used when the current Task is running on a Capability and - * wishes to wake up a thread on a different Capability. - * ------------------------------------------------------------------------- */ - -#ifdef THREADED_RTS - -void -wakeupThreadOnCapability (Capability *cap, - Capability *other_cap, - StgTSO *tso) -{ - MessageWakeup *msg; - - // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) - if (tso->bound) { - ASSERT(tso->bound->task->cap == tso->cap); - tso->bound->task->cap = other_cap; - } - tso->cap = other_cap; - - ASSERT(tso->why_blocked != BlockedOnMsgWakeup || - tso->block_info.closure->header.info == &stg_IND_info); - - ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info); - - msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup)); - SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM); - msg->tso = tso; - tso->block_info.closure = (StgClosure *)msg; - dirty_TSO(cap, tso); - write_barrier(); - tso->why_blocked = BlockedOnMsgWakeup; - - sendMessage(cap, other_cap, (Message*)msg); -} - -#endif /* THREADED_RTS */ - /* --------------------------------------------------------------------------- * rtsSupportsBoundThreads(): is the RTS built to support bound threads? * used by Control.Concurrent for error checking. @@ -549,15 +474,15 @@ printThreadBlockage(StgTSO *tso) debugBelch("is blocked on a black hole %p", ((StgBlockingQueue*)tso->block_info.bh->bh)); break; - case BlockedOnMsgWakeup: - debugBelch("is blocked on a wakeup message"); - break; case BlockedOnMsgThrowTo: debugBelch("is blocked on a throwto message"); break; case NotBlocked: debugBelch("is not blocked"); break; + case ThreadMigrating: + debugBelch("is runnable, but not on the run queue"); + break; case BlockedOnCCall: debugBelch("is blocked on an external call"); break; |