summaryrefslogtreecommitdiff
path: root/rts/Threads.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2010-04-01 09:16:05 +0000
committerSimon Marlow <marlowsd@gmail.com>2010-04-01 09:16:05 +0000
commitf4692220c7cbdadaa633f50eb2b30b59edb30183 (patch)
tree3d29f1b4770bedb7c69c31b2828f9b5acca3e2c3 /rts/Threads.c
parent7c4cb84efd774a21f11fb03118feb0434282ecf3 (diff)
downloadhaskell-f4692220c7cbdadaa633f50eb2b30b59edb30183.tar.gz
Change the representation of the MVar blocked queue
The list of threads blocked on an MVar is now represented as a list of separately allocated objects rather than being linked through the TSOs themselves. This lets us remove a TSO from the list in O(1) time rather than O(n) time, by marking the list object. Removing this linear component fixes some pathalogical performance cases where many threads were blocked on an MVar and became unreachable simultaneously (nofib/smp/threads007), or when sending an asynchronous exception to a TSO in a long list of thread blocked on an MVar. MVar performance has actually improved by a few percent as a result of this change, slightly to my surprise. This is the final cleanup in the sequence, which let me remove the old way of waking up threads (unblockOne(), MSG_WAKEUP) in favour of the new way (tryWakeupThread and MSG_TRY_WAKEUP, which is idempotent). It is now the case that only the Capability that owns a TSO may modify its state (well, almost), and this simplifies various things. More of the RTS is based on message-passing between Capabilities now.
Diffstat (limited to 'rts/Threads.c')
-rw-r--r--rts/Threads.c173
1 files changed, 49 insertions, 124 deletions
diff --git a/rts/Threads.c b/rts/Threads.c
index 05a13c7f3b..e8a835bdae 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -205,83 +205,21 @@ removeThreadFromDeQueue (Capability *cap,
barf("removeThreadFromMVarQueue: not found");
}
-void
-removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
-{
- // caller must do the write barrier, because replacing the info
- // pointer will unlock the MVar.
- removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
- tso->_link = END_TSO_QUEUE;
-}
-
/* ----------------------------------------------------------------------------
- unblockOne()
-
- unblock a single thread.
- ------------------------------------------------------------------------- */
-
-StgTSO *
-unblockOne (Capability *cap, StgTSO *tso)
-{
- return unblockOne_(cap,tso,rtsTrue); // allow migration
-}
-
-StgTSO *
-unblockOne_ (Capability *cap, StgTSO *tso,
- rtsBool allow_migrate USED_IF_THREADS)
-{
- StgTSO *next;
-
- // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
- ASSERT(tso->why_blocked != NotBlocked);
- ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
- tso->block_info.closure->header.info == &stg_IND_info);
-
- next = tso->_link;
- tso->_link = END_TSO_QUEUE;
+ tryWakeupThread()
-#if defined(THREADED_RTS)
- if (tso->cap == cap || (!tsoLocked(tso) &&
- allow_migrate &&
- RtsFlags.ParFlags.wakeupMigrate)) {
- // We are waking up this thread on the current Capability, which
- // might involve migrating it from the Capability it was last on.
- if (tso->bound) {
- ASSERT(tso->bound->task->cap == tso->cap);
- tso->bound->task->cap = cap;
- }
-
- tso->cap = cap;
- write_barrier();
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
-
- // context-switch soonish so we can migrate the new thread if
- // necessary. NB. not contextSwitchCapability(cap), which would
- // force a context switch immediately.
- cap->context_switch = 1;
- } else {
- // we'll try to wake it up on the Capability it was last on.
- wakeupThreadOnCapability(cap, tso->cap, tso);
- }
-#else
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
-
- // context-switch soonish so we can migrate the new thread if
- // necessary. NB. not contextSwitchCapability(cap), which would
- // force a context switch immediately.
- cap->context_switch = 1;
-#endif
-
- traceEventThreadWakeup (cap, tso, tso->cap->no);
+ Attempt to wake up a thread. tryWakeupThread is idempotent: it is
+ always safe to call it too many times, but it is not safe in
+ general to omit a call.
- return next;
-}
+ ------------------------------------------------------------------------- */
void
tryWakeupThread (Capability *cap, StgTSO *tso)
{
+
+ traceEventThreadWakeup (cap, tso, tso->cap->no);
+
#ifdef THREADED_RTS
if (tso->cap != cap)
{
@@ -298,6 +236,16 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
switch (tso->why_blocked)
{
+ case BlockedOnMVar:
+ {
+ if (tso->_link == END_TSO_QUEUE) {
+ tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
+ goto unblock;
+ } else {
+ return;
+ }
+ }
+
case BlockedOnMsgThrowTo:
{
const StgInfoTable *i;
@@ -307,27 +255,45 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
if (i != &stg_MSG_NULL_info) {
debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
(lnat)tso->id, tso->block_info.throwto->header.info);
- break; // still blocked
+ return;
}
// remove the block frame from the stack
ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
tso->sp += 3;
- // fall through...
+ goto unblock;
}
+
case BlockedOnBlackHole:
case BlockedOnSTM:
- {
- // just run the thread now, if the BH is not really available,
- // we'll block again.
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
- break;
- }
+ case ThreadMigrating:
+ goto unblock;
+
default:
// otherwise, do nothing
- break;
+ return;
}
+
+unblock:
+ // just run the thread now, if the BH is not really available,
+ // we'll block again.
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+}
+
+/* ----------------------------------------------------------------------------
+ migrateThread
+ ------------------------------------------------------------------------- */
+
+void
+migrateThread (Capability *from, StgTSO *tso, Capability *to)
+{
+ traceEventMigrateThread (from, tso, to->no);
+ // ThreadMigrating tells the target cap that it needs to be added to
+ // the run queue when it receives the MSG_TRY_WAKEUP.
+ tso->what_next = ThreadMigrating;
+ tso->cap = to;
+ tryWakeupThread(from, tso);
}
/* ----------------------------------------------------------------------------
@@ -450,47 +416,6 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
}
}
-/* ----------------------------------------------------------------------------
- * Wake up a thread on a Capability.
- *
- * This is used when the current Task is running on a Capability and
- * wishes to wake up a thread on a different Capability.
- * ------------------------------------------------------------------------- */
-
-#ifdef THREADED_RTS
-
-void
-wakeupThreadOnCapability (Capability *cap,
- Capability *other_cap,
- StgTSO *tso)
-{
- MessageWakeup *msg;
-
- // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
- if (tso->bound) {
- ASSERT(tso->bound->task->cap == tso->cap);
- tso->bound->task->cap = other_cap;
- }
- tso->cap = other_cap;
-
- ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
- tso->block_info.closure->header.info == &stg_IND_info);
-
- ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
-
- msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
- SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
- msg->tso = tso;
- tso->block_info.closure = (StgClosure *)msg;
- dirty_TSO(cap, tso);
- write_barrier();
- tso->why_blocked = BlockedOnMsgWakeup;
-
- sendMessage(cap, other_cap, (Message*)msg);
-}
-
-#endif /* THREADED_RTS */
-
/* ---------------------------------------------------------------------------
* rtsSupportsBoundThreads(): is the RTS built to support bound threads?
* used by Control.Concurrent for error checking.
@@ -549,15 +474,15 @@ printThreadBlockage(StgTSO *tso)
debugBelch("is blocked on a black hole %p",
((StgBlockingQueue*)tso->block_info.bh->bh));
break;
- case BlockedOnMsgWakeup:
- debugBelch("is blocked on a wakeup message");
- break;
case BlockedOnMsgThrowTo:
debugBelch("is blocked on a throwto message");
break;
case NotBlocked:
debugBelch("is not blocked");
break;
+ case ThreadMigrating:
+ debugBelch("is runnable, but not on the run queue");
+ break;
case BlockedOnCCall:
debugBelch("is blocked on an external call");
break;