diff options
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r-- | rts/Schedule.c | 185 |
1 files changed, 91 insertions, 94 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c index 4cca469869..70e0246fbe 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -139,7 +139,7 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); -static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); +static void scheduleProcessInbox(Capability *cap); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); static void schedulePushWork(Capability *cap, Task *task); @@ -618,7 +618,7 @@ scheduleFindWork (Capability *cap) // list each time around the scheduler. if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - scheduleCheckWakeupThreads(cap); + scheduleProcessInbox(cap); scheduleCheckBlockedThreads(cap); @@ -673,7 +673,7 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield) if (!force_yield && !shouldYieldCapability(cap,task) && (!emptyRunQueue(cap) || - !emptyWakeupQueue(cap) || + !emptyInbox(cap) || blackholes_need_checking || sched_state >= SCHED_INTERRUPTING)) return; @@ -725,7 +725,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, for (i=0, n_free_caps=0; i < n_capabilities; i++) { cap0 = &capabilities[i]; if (cap != cap0 && tryGrabCapability(cap0,task)) { - if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) { + if (!emptyRunQueue(cap0) + || cap->returning_tasks_hd != NULL + || cap->inbox != (Message*)END_TSO_QUEUE) { // it already has some work, we just grabbed it at // the wrong moment. Or maybe it's deadlocked! releaseCapability(cap0); @@ -871,23 +873,89 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) * Check for threads woken up by other Capabilities * ------------------------------------------------------------------------- */ +#if defined(THREADED_RTS) +static void +executeMessage (Capability *cap, Message *m) +{ + const StgInfoTable *i; + +loop: + write_barrier(); // allow m->header to be modified by another thread + i = m->header.info; + if (i == &stg_MSG_WAKEUP_info) + { + MessageWakeup *w = (MessageWakeup *)m; + StgTSO *tso = w->tso; + debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", + (lnat)tso->id); + ASSERT(tso->cap == cap); + ASSERT(tso->why_blocked == BlockedOnMsgWakeup); + ASSERT(tso->block_info.closure == (StgClosure *)m); + tso->why_blocked = NotBlocked; + appendToRunQueue(cap, tso); + } + else if (i == &stg_MSG_THROWTO_info) + { + MessageThrowTo *t = (MessageThrowTo *)m; + nat r; + const StgInfoTable *i; + + i = lockClosure((StgClosure*)m); + if (i != &stg_MSG_THROWTO_info) { + unlockClosure((StgClosure*)m, i); + goto loop; + } + + debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", + (lnat)t->source->id, (lnat)t->target->id); + + ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo); + ASSERT(t->source->block_info.closure == (StgClosure *)m); + + r = throwToMsg(cap, t); + + switch (r) { + case THROWTO_SUCCESS: + ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info); + t->source->sp += 3; + unblockOne(cap, t->source); + // this message is done + unlockClosure((StgClosure*)m, &stg_IND_info); + break; + case THROWTO_BLOCKED: + // unlock the message + unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info); + break; + } + } + else if (i == &stg_IND_info) + { + // message was revoked + return; + } + else if (i == &stg_WHITEHOLE_info) + { + goto loop; + } + else + { + barf("executeMessage: %p", i); + } +} +#endif + static void -scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS) +scheduleProcessInbox (Capability *cap USED_IF_THREADS) { #if defined(THREADED_RTS) - // Any threads that were woken up by other Capabilities get - // appended to our run queue. - if (!emptyWakeupQueue(cap)) { - ACQUIRE_LOCK(&cap->lock); - if (emptyRunQueue(cap)) { - cap->run_queue_hd = cap->wakeup_queue_hd; - cap->run_queue_tl = cap->wakeup_queue_tl; - } else { - setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd); - cap->run_queue_tl = cap->wakeup_queue_tl; - } - cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; - RELEASE_LOCK(&cap->lock); + Message *m; + + while (!emptyInbox(cap)) { + ACQUIRE_LOCK(&cap->lock); + m = cap->inbox; + cap->inbox = m->link; + RELEASE_LOCK(&cap->lock); + executeMessage(cap, (Message *)m); } #endif } @@ -983,7 +1051,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) switch (task->incall->tso->why_blocked) { case BlockedOnSTM: case BlockedOnBlackHole: - case BlockedOnException: + case BlockedOnMsgThrowTo: case BlockedOnMVar: throwToSingleThreaded(cap, task->incall->tso, (StgClosure *)nonTermination_closure); @@ -1268,9 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) */ // blocked exceptions can now complete, even if the thread was in - // blocked mode (see #2910). This unconditionally calls - // lockTSO(), which ensures that we don't miss any threads that - // are engaged in throwTo() with this thread as a target. + // blocked mode (see #2910). awakenBlockedExceptionQueue (cap, t); // @@ -1884,7 +1950,7 @@ resumeThread (void *task_) if (tso->why_blocked == BlockedOnCCall) { // avoid locking the TSO if we don't have to - if (tso->blocked_exceptions != END_TSO_QUEUE) { + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { awakenBlockedExceptionQueue(cap,tso); } tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE); @@ -2187,10 +2253,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) IF_DEBUG(sanity,checkTSO(tso)); - // don't allow throwTo() to modify the blocked_exceptions queue - // while we are moving the TSO: - lockClosure((StgClosure *)tso); - if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) { // NB. never raise a StackOverflow exception if the thread is @@ -2201,7 +2263,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) // if (tso->flags & TSO_SQUEEZED) { - unlockTSO(tso); return tso; } // #3677: In a stack overflow situation, stack squeezing may @@ -2223,7 +2284,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) tso->sp+64))); // Send this thread the StackOverflow exception - unlockTSO(tso); throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure); return tso; } @@ -2239,7 +2299,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) // the stack anyway. if ((tso->flags & TSO_SQUEEZED) && ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) { - unlockTSO(tso); return tso; } @@ -2289,9 +2348,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->why_blocked = NotBlocked; - unlockTSO(dest); - unlockTSO(tso); - IF_DEBUG(sanity,checkTSO(dest)); #if 0 IF_DEBUG(scheduler,printTSO(dest)); @@ -2324,10 +2380,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) return tso; } - // don't allow throwTo() to modify the blocked_exceptions queue - // while we are moving the TSO: - lockClosure((StgClosure *)tso); - // this is the number of words we'll free free_w = round_to_mblocks(tso_size_w/2); @@ -2358,9 +2410,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) task->incall->tso = new_tso; } - unlockTSO(new_tso); - unlockTSO(tso); - IF_DEBUG(sanity,checkTSO(new_tso)); return new_tso; @@ -2691,61 +2740,9 @@ resurrectThreads (StgTSO *threads) * can wake up threads, remember...). */ continue; - case BlockedOnException: - // throwTo should never block indefinitely: if the target - // thread dies or completes, throwTo returns. - barf("resurrectThreads: thread BlockedOnException"); - break; default: - barf("resurrectThreads: thread blocked in a strange way"); + barf("resurrectThreads: thread blocked in a strange way: %d", + tso->why_blocked); } } } - -/* ----------------------------------------------------------------------------- - performPendingThrowTos is called after garbage collection, and - passed a list of threads that were found to have pending throwTos - (tso->blocked_exceptions was not empty), and were blocked. - Normally this doesn't happen, because we would deliver the - exception directly if the target thread is blocked, but there are - small windows where it might occur on a multiprocessor (see - throwTo()). - - NB. we must be holding all the capabilities at this point, just - like resurrectThreads(). - -------------------------------------------------------------------------- */ - -void -performPendingThrowTos (StgTSO *threads) -{ - StgTSO *tso, *next; - Capability *cap; - Task *task, *saved_task;; - generation *gen; - - task = myTask(); - cap = task->cap; - - for (tso = threads; tso != END_TSO_QUEUE; tso = next) { - next = tso->global_link; - - gen = Bdescr((P_)tso)->gen; - tso->global_link = gen->threads; - gen->threads = tso; - - debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id); - - // We must pretend this Capability belongs to the current Task - // for the time being, as invariants will be broken otherwise. - // In fact the current Task has exclusive access to the systme - // at this point, so this is just bookkeeping: - task->cap = tso->cap; - saved_task = tso->cap->running_task; - tso->cap->running_task = task; - maybePerformBlockedException(tso->cap, tso); - tso->cap->running_task = saved_task; - } - - // Restore our original Capability: - task->cap = cap; -} |