diff options
Diffstat (limited to 'rts')
-rw-r--r-- | rts/Capability.c | 40 | ||||
-rw-r--r-- | rts/Capability.h | 2 | ||||
-rw-r--r-- | rts/Schedule.c | 144 | ||||
-rw-r--r-- | rts/Schedule.h | 11 | ||||
-rw-r--r-- | rts/Stats.c | 2 | ||||
-rw-r--r-- | rts/Task.c | 255 | ||||
-rw-r--r-- | rts/Task.h | 133 | ||||
-rw-r--r-- | rts/Threads.c | 4 | ||||
-rw-r--r-- | rts/sm/Compact.c | 10 |
9 files changed, 328 insertions, 273 deletions
diff --git a/rts/Capability.c b/rts/Capability.c index cf85372ce0..ce6ecebd72 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -173,10 +173,10 @@ STATIC_INLINE void newReturningTask (Capability *cap, Task *task) { ASSERT_LOCK_HELD(&cap->lock); - ASSERT(task->return_link == NULL); + ASSERT(task->next == NULL); if (cap->returning_tasks_hd) { - ASSERT(cap->returning_tasks_tl->return_link == NULL); - cap->returning_tasks_tl->return_link = task; + ASSERT(cap->returning_tasks_tl->next == NULL); + cap->returning_tasks_tl->next = task; } else { cap->returning_tasks_hd = task; } @@ -190,11 +190,11 @@ popReturningTask (Capability *cap) Task *task; task = cap->returning_tasks_hd; ASSERT(task); - cap->returning_tasks_hd = task->return_link; + cap->returning_tasks_hd = task->next; if (!cap->returning_tasks_hd) { cap->returning_tasks_tl = NULL; } - task->return_link = NULL; + task->next = NULL; return task; } #endif @@ -220,7 +220,7 @@ initCapability( Capability *cap, nat i ) initMutex(&cap->lock); cap->running_task = NULL; // indicates cap is free cap->spare_workers = NULL; - cap->suspended_ccalling_tasks = NULL; + cap->suspended_ccalls = NULL; cap->returning_tasks_hd = NULL; cap->returning_tasks_tl = NULL; cap->wakeup_queue_hd = END_TSO_QUEUE; @@ -342,7 +342,7 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) ASSERT_LOCK_HELD(&cap->lock); ASSERT(task->cap == cap); debugTrace(DEBUG_sched, "passing capability %d to %s %p", - cap->no, task->tso ? "bound task" : "worker", + cap->no, task->incall->tso ? "bound task" : "worker", (void *)task->id); ACQUIRE_LOCK(&task->lock); task->wakeup = rtsTrue; @@ -398,7 +398,7 @@ releaseCapability_ (Capability* cap, // assertion is false: in schedule() we force a yield after // ThreadBlocked, but the thread may be back on the run queue // by now. - task = cap->run_queue_hd->bound; + task = cap->run_queue_hd->bound->task; giveCapabilityToTask(cap,task); return; } @@ -411,7 +411,7 @@ releaseCapability_ (Capability* cap, if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) { debugTrace(DEBUG_sched, "starting new worker on capability %d", cap->no); - startWorkerTask(cap, workerStart); + startWorkerTask(cap); return; } } @@ -462,9 +462,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS) // in which case it is not replaced on the spare_worker queue. // This happens when the system is shutting down (see // Schedule.c:workerStart()). - // Also, be careful to check that this task hasn't just exited - // Haskell to do a foreign call (task->suspended_tso). - if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) { + if (!isBoundTask(task) && !task->stopped) { task->next = cap->spare_workers; cap->spare_workers = task; } @@ -612,7 +610,7 @@ yieldCapability (Capability** pCap, Task *task) continue; } - if (task->tso == NULL) { + if (task->incall->tso == NULL) { ASSERT(cap->spare_workers != NULL); // if we're not at the front of the queue, release it // again. This is unlikely to happen. @@ -655,12 +653,12 @@ wakeupThreadOnCapability (Capability *my_cap, // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) if (tso->bound) { - ASSERT(tso->bound->cap == tso->cap); - tso->bound->cap = other_cap; + ASSERT(tso->bound->task->cap == tso->cap); + tso->bound->task->cap = other_cap; } tso->cap = other_cap; - ASSERT(tso->bound ? tso->bound->cap == other_cap : 1); + ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1); if (other_cap->running_task == NULL) { // nobody is running this Capability, we can add our thread @@ -781,7 +779,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe) // that will try to return to code that has been unloaded. // We can be a bit more relaxed when this is a standalone // program that is about to terminate, and let safe=false. - if (cap->suspended_ccalling_tasks && safe) { + if (cap->suspended_ccalls && safe) { debugTrace(DEBUG_sched, "thread(s) are involved in foreign calls, yielding"); cap->running_task = NULL; @@ -871,7 +869,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, { nat i; Capability *cap; - Task *task; + InCall *incall; // Each GC thread is responsible for following roots from the // Capability of the same number. There will usually be the same @@ -886,9 +884,9 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd); evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl); #endif - for (task = cap->suspended_ccalling_tasks; task != NULL; - task=task->next) { - evac(user, (StgClosure **)(void *)&task->suspended_tso); + for (incall = cap->suspended_ccalls; incall != NULL; + incall=incall->next) { + evac(user, (StgClosure **)(void *)&incall->suspended_tso); } #if defined(THREADED_RTS) diff --git a/rts/Capability.h b/rts/Capability.h index 4b51548564..41974dc710 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -56,7 +56,7 @@ struct Capability_ { // the suspended TSOs easily. Hence, when migrating a Task from // the returning_tasks list, we must also migrate its entry from // this list. - Task *suspended_ccalling_tasks; + InCall *suspended_ccalls; // One mutable list per generation, so we don't need to take any // locks when updating an old-generation thunk. This also lets us diff --git a/rts/Schedule.c b/rts/Schedule.c index 66af8be7cb..4cca469869 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -312,7 +312,7 @@ schedule (Capability *initialCapability, Task *task) // If we are a worker, just exit. If we're a bound thread // then we will exit below when we've removed our TSO from // the run queue. - if (task->tso == NULL && emptyRunQueue(cap)) { + if (!isBoundTask(task) && emptyRunQueue(cap)) { return cap; } break; @@ -378,10 +378,10 @@ schedule (Capability *initialCapability, Task *task) // Check whether we can run this thread in the current task. // If not, we have to pass our capability to the right task. { - Task *bound = t->bound; + InCall *bound = t->bound; if (bound) { - if (bound == task) { + if (bound->task == task) { // yes, the Haskell thread is bound to the current native thread } else { debugTrace(DEBUG_sched, @@ -393,7 +393,7 @@ schedule (Capability *initialCapability, Task *task) } } else { // The thread we want to run is unbound. - if (task->tso) { + if (task->incall->tso) { debugTrace(DEBUG_sched, "this OS thread cannot run thread %lu", (unsigned long)t->id); @@ -441,7 +441,7 @@ run_thread: ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); ASSERT(t->cap == cap); - ASSERT(t->bound ? t->bound->cap == cap : 1); + ASSERT(t->bound ? t->bound->task->cap == cap : 1); prev_what_next = t->what_next; @@ -639,9 +639,9 @@ shouldYieldCapability (Capability *cap, Task *task) // and this task it bound). return (waiting_for_gc || cap->returning_tasks_hd != NULL || - (!emptyRunQueue(cap) && (task->tso == NULL + (!emptyRunQueue(cap) && (task->incall->tso == NULL ? cap->run_queue_hd->bound != NULL - : cap->run_queue_hd->bound != task))); + : cap->run_queue_hd->bound != task->incall))); } // This is the single place where a Task goes to sleep. There are @@ -768,7 +768,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, next = t->_link; t->_link = END_TSO_QUEUE; if (t->what_next == ThreadRelocated - || t->bound == task // don't move my bound thread + || t->bound == task->incall // don't move my bound thread || tsoLocked(t)) { // don't move a locked thread setTSOLink(cap, prev, t); prev = t; @@ -781,9 +781,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, } else { appendToRunQueue(free_caps[i],t); - traceEventMigrateThread (cap, t, free_caps[i]->no); + traceEventMigrateThread (cap, t, free_caps[i]->no); - if (t->bound) { t->bound->cap = free_caps[i]; } + if (t->bound) { t->bound->task->cap = free_caps[i]; } t->cap = free_caps[i]; i++; } @@ -979,13 +979,13 @@ scheduleDetectDeadlock (Capability *cap, Task *task) /* Probably a real deadlock. Send the current main thread the * Deadlock exception. */ - if (task->tso) { - switch (task->tso->why_blocked) { + if (task->incall->tso) { + switch (task->incall->tso->why_blocked) { case BlockedOnSTM: case BlockedOnBlackHole: case BlockedOnException: case BlockedOnMVar: - throwToSingleThreaded(cap, task->tso, + throwToSingleThreaded(cap, task->incall->tso, (StgClosure *)nonTermination_closure); return; default: @@ -1174,8 +1174,8 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) /* The TSO attached to this Task may have moved, so update the * pointer to it. */ - if (task->tso == t) { - task->tso = new_t; + if (task->incall->tso == t) { + task->incall->tso = new_t; } pushOnRunQueue(cap,new_t); } @@ -1285,7 +1285,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) if (t->bound) { - if (t->bound != task) { + if (t->bound != task->incall) { #if !defined(THREADED_RTS) // Must be a bound thread that is not the topmost one. Leave // it on the run queue until the stack has unwound to the @@ -1302,12 +1302,12 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) #endif } - ASSERT(task->tso == t); + ASSERT(task->incall->tso == t); if (t->what_next == ThreadComplete) { if (task->ret) { // NOTE: return val is tso->sp[1] (see StgStartup.hc) - *(task->ret) = (StgClosure *)task->tso->sp[1]; + *(task->ret) = (StgClosure *)task->incall->tso->sp[1]; } task->stat = Success; } else { @@ -1325,7 +1325,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) } } #ifdef DEBUG - removeThreadLabel((StgWord)task->tso->id); + removeThreadLabel((StgWord)task->incall->tso->id); #endif // We no longer consider this thread and task to be bound to @@ -1336,7 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) // re-used). This was a real bug: the GC updated // tso->bound->tso which lead to a deadlock. t->bound = NULL; - task->tso = NULL; + task->incall->tso = NULL; return rtsTrue; // tells schedule() to return } @@ -1586,7 +1586,6 @@ forkProcess(HsStablePtr *entry ) { #ifdef FORKPROCESS_PRIMOP_SUPPORTED - Task *task; pid_t pid; StgTSO* t,*next; Capability *cap; @@ -1661,7 +1660,7 @@ forkProcess(HsStablePtr *entry // Any suspended C-calling Tasks are no more, their OS threads // don't exist now: - cap->suspended_ccalling_tasks = NULL; + cap->suspended_ccalls = NULL; // Empty the threads lists. Otherwise, the garbage // collector may attempt to resurrect some of these threads. @@ -1669,17 +1668,7 @@ forkProcess(HsStablePtr *entry generations[g].threads = END_TSO_QUEUE; } - // Wipe the task list, except the current Task. - ACQUIRE_LOCK(&sched_mutex); - for (task = all_tasks; task != NULL; task=task->all_link) { - if (task != cap->running_task) { -#if defined(THREADED_RTS) - initMutex(&task->lock); // see #1391 -#endif - discardTask(task); - } - } - RELEASE_LOCK(&sched_mutex); + discardTasksExcept(cap->running_task); #if defined(THREADED_RTS) // Wipe our spare workers list, they no longer exist. New @@ -1747,35 +1736,41 @@ deleteAllThreads ( Capability *cap ) } /* ----------------------------------------------------------------------------- - Managing the suspended_ccalling_tasks list. + Managing the suspended_ccalls list. Locks required: sched_mutex -------------------------------------------------------------------------- */ STATIC_INLINE void suspendTask (Capability *cap, Task *task) { - ASSERT(task->next == NULL && task->prev == NULL); - task->next = cap->suspended_ccalling_tasks; - task->prev = NULL; - if (cap->suspended_ccalling_tasks) { - cap->suspended_ccalling_tasks->prev = task; - } - cap->suspended_ccalling_tasks = task; + InCall *incall; + + incall = task->incall; + ASSERT(incall->next == NULL && incall->prev == NULL); + incall->next = cap->suspended_ccalls; + incall->prev = NULL; + if (cap->suspended_ccalls) { + cap->suspended_ccalls->prev = incall; + } + cap->suspended_ccalls = incall; } STATIC_INLINE void recoverSuspendedTask (Capability *cap, Task *task) { - if (task->prev) { - task->prev->next = task->next; + InCall *incall; + + incall = task->incall; + if (incall->prev) { + incall->prev->next = incall->next; } else { - ASSERT(cap->suspended_ccalling_tasks == task); - cap->suspended_ccalling_tasks = task->next; + ASSERT(cap->suspended_ccalls == incall); + cap->suspended_ccalls = incall->next; } - if (task->next) { - task->next->prev = task->prev; + if (incall->next) { + incall->next->prev = incall->prev; } - task->next = task->prev = NULL; + incall->next = incall->prev = NULL; } /* --------------------------------------------------------------------------- @@ -1832,7 +1827,8 @@ suspendThread (StgRegTable *reg) } // Hand back capability - task->suspended_tso = tso; + task->incall->suspended_tso = tso; + task->incall->suspended_cap = cap; ACQUIRE_LOCK(&cap->lock); @@ -1853,6 +1849,7 @@ StgRegTable * resumeThread (void *task_) { StgTSO *tso; + InCall *incall; Capability *cap; Task *task = task_; int saved_errno; @@ -1865,18 +1862,22 @@ resumeThread (void *task_) saved_winerror = GetLastError(); #endif - cap = task->cap; + incall = task->incall; + cap = incall->suspended_cap; + task->cap = cap; + // Wait for permission to re-enter the RTS with the result. waitForReturnCapability(&cap,task); // we might be on a different capability now... but if so, our - // entry on the suspended_ccalling_tasks list will also have been + // entry on the suspended_ccalls list will also have been // migrated. // Remove the thread from the suspended list recoverSuspendedTask(cap,task); - tso = task->suspended_tso; - task->suspended_tso = NULL; + tso = incall->suspended_tso; + incall->suspended_tso = NULL; + incall->suspended_cap = NULL; tso->_link = END_TSO_QUEUE; // no write barrier reqd traceEventRunThread(cap, tso); @@ -1954,10 +1955,10 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) // This TSO is now a bound thread; make the Task and TSO // point to each other. - tso->bound = task; + tso->bound = task->incall; tso->cap = cap; - task->tso = tso; + task->incall->tso = tso; task->ret = ret; task->stat = NoStatus; @@ -1980,23 +1981,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) * ------------------------------------------------------------------------- */ #if defined(THREADED_RTS) -void OSThreadProcAttr -workerStart(Task *task) +void scheduleWorker (Capability *cap, Task *task) { - Capability *cap; - - // See startWorkerTask(). - ACQUIRE_LOCK(&task->lock); - cap = task->cap; - RELEASE_LOCK(&task->lock); - - if (RtsFlags.ParFlags.setAffinity) { - setThreadAffinity(cap->no, n_capabilities); - } - - // set the thread-local pointer to the Task: - taskEnter(task); - // schedule() runs without a lock. cap = schedule(cap,task); @@ -2062,6 +2048,8 @@ initScheduler(void) initSparkPools(); #endif + RELEASE_LOCK(&sched_mutex); + #if defined(THREADED_RTS) /* * Eagerly start one worker to run each Capability, except for @@ -2075,13 +2063,11 @@ initScheduler(void) for (i = 1; i < n_capabilities; i++) { cap = &capabilities[i]; ACQUIRE_LOCK(&cap->lock); - startWorkerTask(cap, workerStart); + startWorkerTask(cap); RELEASE_LOCK(&cap->lock); } } #endif - - RELEASE_LOCK(&sched_mutex); } void @@ -2102,7 +2088,7 @@ exitScheduler( sched_state = SCHED_INTERRUPTING; waitForReturnCapability(&task->cap,task); scheduleDoGC(task->cap,task,rtsFalse); - ASSERT(task->tso == NULL); + ASSERT(task->incall->tso == NULL); releaseCapability(task->cap); } sched_state = SCHED_SHUTTING_DOWN; @@ -2112,7 +2098,7 @@ exitScheduler( nat i; for (i = 0; i < n_capabilities; i++) { - ASSERT(task->tso == NULL); + ASSERT(task->incall->tso == NULL); shutdownCapability(&capabilities[i], task, wait_foreign); } } @@ -2161,7 +2147,7 @@ performGC_(rtsBool force_major) // We must grab a new Task here, because the existing Task may be // associated with a particular Capability, and chained onto the - // suspended_ccalling_tasks queue. + // suspended_ccalls queue. task = newBoundTask(); waitForReturnCapability(&task->cap,task); @@ -2368,8 +2354,8 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) // The TSO attached to this Task may have moved, so update the // pointer to it. - if (task->tso == tso) { - task->tso = new_tso; + if (task->incall->tso == tso) { + task->incall->tso = new_tso; } unlockTSO(new_tso); diff --git a/rts/Schedule.h b/rts/Schedule.h index 6751144be8..af322d804f 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -46,15 +46,8 @@ StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *excepti /* findRetryFrameHelper */ StgWord findRetryFrameHelper (StgTSO *tso); -/* workerStart() - * - * Entry point for a new worker task. - * Called from STG : NO - * Locks assumed : none - */ -#if defined(THREADED_RTS) -void OSThreadProcAttr workerStart(Task *task); -#endif +/* Entry point for a new worker */ +void scheduleWorker (Capability *cap, Task *task); /* The state of the scheduler. This is used to control the sequence * of events during shutdown, and when the runtime is interrupted diff --git a/rts/Stats.c b/rts/Stats.c index 20de32a808..58b113d74b 100644 --- a/rts/Stats.c +++ b/rts/Stats.c @@ -623,7 +623,7 @@ stat_exit(int alloc) i++, task = task->all_link) { statsPrintf(" Task %2d %-8s : %6.2fs (%6.2fs) %6.2fs (%6.2fs)\n", i, - (task->tso == NULL) ? "(worker)" : "(bound)", + (task->worker) ? "(worker)" : "(bound)", TICK_TO_DBL(task->mut_time), TICK_TO_DBL(task->mut_etime), TICK_TO_DBL(task->gc_time), diff --git a/rts/Task.c b/rts/Task.c index 9a8ebd6963..2921e9e181 100644 --- a/rts/Task.c +++ b/rts/Task.c @@ -26,12 +26,13 @@ // Task lists and global counters. // Locks required: sched_mutex. Task *all_tasks = NULL; -static Task *task_free_list = NULL; // singly-linked static nat taskCount; -static nat tasksRunning; -static nat workerCount; static int tasksInitialized = 0; +static void freeTask (Task *task); +static Task * allocTask (void); +static Task * newTask (rtsBool); + /* ----------------------------------------------------------------------------- * Remembering the current thread's Task * -------------------------------------------------------------------------- */ @@ -39,7 +40,11 @@ static int tasksInitialized = 0; // A thread-local-storage key that we can use to get access to the // current thread's Task structure. #if defined(THREADED_RTS) +# if defined(MYTASK_USE_TLV) +__thread Task *my_task; +# else ThreadLocalKey currentTaskKey; +# endif #else Task *my_task; #endif @@ -53,10 +58,8 @@ initTaskManager (void) { if (!tasksInitialized) { taskCount = 0; - workerCount = 0; - tasksRunning = 0; tasksInitialized = 1; -#if defined(THREADED_RTS) +#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV) newThreadLocalKey(¤tTaskKey); #endif } @@ -66,29 +69,24 @@ nat freeTaskManager (void) { Task *task, *next; + nat tasksRunning = 0; ASSERT_LOCK_HELD(&sched_mutex); - debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running", - tasksRunning); - for (task = all_tasks; task != NULL; task = next) { next = task->all_link; if (task->stopped) { - // We only free resources if the Task is not in use. A - // Task may still be in use if we have a Haskell thread in - // a foreign call while we are attempting to shut down the - // RTS (see conc059). -#if defined(THREADED_RTS) - closeCondition(&task->cond); - closeMutex(&task->lock); -#endif - stgFree(task); + freeTask(task); + } else { + tasksRunning++; } } + + debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running", + tasksRunning); + all_tasks = NULL; - task_free_list = NULL; -#if defined(THREADED_RTS) +#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV) freeThreadLocalKey(¤tTaskKey); #endif @@ -97,9 +95,52 @@ freeTaskManager (void) return tasksRunning; } +static Task * +allocTask (void) +{ + Task *task; + + task = myTask(); + if (task != NULL) { + return task; + } else { + task = newTask(rtsFalse); +#if defined(THREADED_RTS) + task->id = osThreadId(); +#endif + setMyTask(task); + return task; + } +} + +static void +freeTask (Task *task) +{ + InCall *incall, *next; + + // We only free resources if the Task is not in use. A + // Task may still be in use if we have a Haskell thread in + // a foreign call while we are attempting to shut down the + // RTS (see conc059). +#if defined(THREADED_RTS) + closeCondition(&task->cond); + closeMutex(&task->lock); +#endif + + for (incall = task->incall; incall != NULL; incall = next) { + next = incall->prev_stack; + stgFree(incall); + } + for (incall = task->spare_incalls; incall != NULL; incall = next) { + next = incall->next; + stgFree(incall); + } + + stgFree(task); +} static Task* -newTask (void) +newTask (rtsBool worker) { #if defined(THREADED_RTS) Ticks currentElapsedTime, currentUserTime; @@ -109,12 +150,14 @@ newTask (void) #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64) task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask"); - task->cap = NULL; - task->stopped = rtsFalse; - task->suspended_tso = NULL; - task->tso = NULL; - task->stat = NoStatus; - task->ret = NULL; + task->cap = NULL; + task->worker = worker; + task->stopped = rtsFalse; + task->stat = NoStatus; + task->ret = NULL; + task->n_spare_incalls = 0; + task->spare_incalls = NULL; + task->incall = NULL; #if defined(THREADED_RTS) initCondition(&task->cond); @@ -133,18 +176,65 @@ newTask (void) task->elapsedtimestart = currentElapsedTime; #endif - task->prev = NULL; task->next = NULL; - task->return_link = NULL; + + ACQUIRE_LOCK(&sched_mutex); task->all_link = all_tasks; all_tasks = task; taskCount++; + RELEASE_LOCK(&sched_mutex); + return task; } +// avoid the spare_incalls list growing unboundedly +#define MAX_SPARE_INCALLS 8 + +static void +newInCall (Task *task) +{ + InCall *incall; + + if (task->spare_incalls != NULL) { + incall = task->spare_incalls; + task->spare_incalls = incall->next; + task->n_spare_incalls--; + } else { + incall = stgMallocBytes((sizeof(InCall)), "newBoundTask"); + } + + incall->tso = NULL; + incall->task = task; + incall->suspended_tso = NULL; + incall->suspended_cap = NULL; + incall->next = NULL; + incall->prev = NULL; + incall->prev_stack = task->incall; + task->incall = incall; +} + +static void +endInCall (Task *task) +{ + InCall *incall; + + incall = task->incall; + incall->tso = NULL; + task->incall = task->incall->prev_stack; + + if (task->n_spare_incalls >= MAX_SPARE_INCALLS) { + stgFree(incall); + } else { + incall->next = task->spare_incalls; + task->spare_incalls = incall; + task->n_spare_incalls++; + } +} + + Task * newBoundTask (void) { @@ -155,31 +245,11 @@ newBoundTask (void) stg_exit(EXIT_FAILURE); } - // ToDo: get rid of this lock in the common case. We could store - // a free Task in thread-local storage, for example. That would - // leave just one lock on the path into the RTS: cap->lock when - // acquiring the Capability. - ACQUIRE_LOCK(&sched_mutex); - - if (task_free_list == NULL) { - task = newTask(); - } else { - task = task_free_list; - task_free_list = task->next; - task->next = NULL; - task->prev = NULL; - task->stopped = rtsFalse; - } -#if defined(THREADED_RTS) - task->id = osThreadId(); -#endif - ASSERT(task->cap == NULL); - - tasksRunning++; + task = allocTask(); - taskEnter(task); + task->stopped = rtsFalse; - RELEASE_LOCK(&sched_mutex); + newInCall(task); debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount); return task; @@ -188,27 +258,19 @@ newBoundTask (void) void boundTaskExiting (Task *task) { - task->tso = NULL; task->stopped = rtsTrue; - task->cap = NULL; #if defined(THREADED_RTS) ASSERT(osThreadId() == task->id); #endif ASSERT(myTask() == task); - setMyTask(task->prev_stack); - - tasksRunning--; - // sadly, we need a lock around the free task list. Todo: eliminate. - ACQUIRE_LOCK(&sched_mutex); - task->next = task_free_list; - task_free_list = task; - RELEASE_LOCK(&sched_mutex); + endInCall(task); debugTrace(DEBUG_sched, "task exiting"); } + #ifdef THREADED_RTS #define TASK_ID(t) (t)->id #else @@ -216,22 +278,20 @@ boundTaskExiting (Task *task) #endif void -discardTask (Task *task) +discardTasksExcept (Task *keep) { - ASSERT_LOCK_HELD(&sched_mutex); - if (!task->stopped) { - debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task)); - task->cap = NULL; - if (task->tso == NULL) { - workerCount--; - } else { - task->tso = NULL; + Task *task; + + // Wipe the task list, except the current Task. + ACQUIRE_LOCK(&sched_mutex); + for (task = all_tasks; task != NULL; task=task->all_link) { + if (task != keep) { + debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task)); + freeTask(task); } - task->stopped = rtsTrue; - tasksRunning--; - task->next = task_free_list; - task_free_list = task; } + all_tasks = keep; + RELEASE_LOCK(&sched_mutex); } void @@ -270,33 +330,43 @@ workerTaskStop (Task *task) task->cap = NULL; taskTimeStamp(task); task->stopped = rtsTrue; - tasksRunning--; - workerCount--; - - ACQUIRE_LOCK(&sched_mutex); - task->next = task_free_list; - task_free_list = task; - RELEASE_LOCK(&sched_mutex); } #endif #if defined(THREADED_RTS) +static void OSThreadProcAttr +workerStart(Task *task) +{ + Capability *cap; + + // See startWorkerTask(). + ACQUIRE_LOCK(&task->lock); + cap = task->cap; + RELEASE_LOCK(&task->lock); + + if (RtsFlags.ParFlags.setAffinity) { + setThreadAffinity(cap->no, n_capabilities); + } + + // set the thread-local pointer to the Task: + setMyTask(task); + + newInCall(task); + + scheduleWorker(cap,task); +} + void -startWorkerTask (Capability *cap, - void OSThreadProcAttr (*taskStart)(Task *task)) +startWorkerTask (Capability *cap) { int r; OSThreadId tid; Task *task; - workerCount++; - // A worker always gets a fresh Task structure. - task = newTask(); - - tasksRunning++; + task = newTask(rtsTrue); // The lock here is to synchronise with taskStart(), to make sure // that we have finished setting up the Task structure before the @@ -311,7 +381,7 @@ startWorkerTask (Capability *cap, ASSERT_LOCK_HELD(&cap->lock); cap->running_task = task; - r = createOSThread(&tid, (OSThreadProc *)taskStart, task); + r = createOSThread(&tid, (OSThreadProc*)workerStart, task); if (r != 0) { sysErrorBelch("failed to create OS thread"); stg_exit(EXIT_FAILURE); @@ -350,8 +420,9 @@ printAllTasks(void) if (task->cap) { debugBelch("on capability %d, ", task->cap->no); } - if (task->tso) { - debugBelch("bound to thread %lu", (unsigned long)task->tso->id); + if (task->incall->tso) { + debugBelch("bound to thread %lu", + (unsigned long)task->incall->tso->id); } else { debugBelch("worker"); } diff --git a/rts/Task.h b/rts/Task.h index 9b5f0253cd..c2b58f2c45 100644 --- a/rts/Task.h +++ b/rts/Task.h @@ -17,24 +17,20 @@ BEGIN_RTS_PRIVATE Definition of a Task -------------------- - A task is an OSThread that runs Haskell code. Every OSThread - created by the RTS for the purposes of running Haskell code is a - Task, and OS threads that enter the Haskell RTS for the purposes of - making a call-in are also Tasks. + A task is an OSThread that runs Haskell code. Every OSThread that + runs inside the RTS, whether as a worker created by the RTS or via + an in-call from C to Haskell, has an associated Task. The first + time an OS thread calls into Haskell it is allocated a Task, which + remains until the RTS is shut down. + + There is a one-to-one relationship between OSThreads and Tasks. + The Task for an OSThread is kept in thread-local storage, and can + be retrieved at any time using myTask(). In the THREADED_RTS build, multiple Tasks may all be running Haskell code simultaneously. A task relinquishes its Capability when it is asked to evaluate an external (C) call. - In general, there may be multiple Tasks associated with a given OS - thread. A second Task is created when one Task makes a foreign - call from Haskell, and subsequently calls back in to Haskell, - creating a new bound thread. - - A particular Task structure can belong to more than one OS thread - over its lifetime. This is to avoid creating an unbounded number - of Task structures. The stats just accumulate. - Ownership of Task ----------------- @@ -59,8 +55,8 @@ BEGIN_RTS_PRIVATE (1) a bound Task, the TSO will be on a queue somewhere (2) a worker task, on the spare_workers queue of task->cap. - (b) making a foreign call. The Task will be on the - suspended_ccalling_tasks list. + (b) making a foreign call. The InCall will be on the + suspended_ccalls list. We re-establish ownership in each case by respectively @@ -73,9 +69,46 @@ BEGIN_RTS_PRIVATE ownership of the Task and a Capability. */ +// The InCall structure represents either a single in-call from C to +// Haskell, or a worker thread. +typedef struct InCall_ { + StgTSO * tso; // the bound TSO (or NULL for a worker) + + StgTSO * suspended_tso; // the TSO is stashed here when we + // make a foreign call (NULL otherwise); + + Capability *suspended_cap; // The capability that the + // suspended_tso is on, because + // we can't read this from the TSO + // without owning a Capability in the + // first place. + + struct Task_ *task; + + // When a Haskell thread makes a foreign call that re-enters + // Haskell, we end up with another Task associated with the + // current thread. We have to remember the whole stack of InCalls + // associated with the current Task so that we can correctly + // save & restore the InCall on entry to and exit from Haskell. + struct InCall_ *prev_stack; + + // Links InCalls onto suspended_ccalls, spare_incalls + struct InCall_ *prev; + struct InCall_ *next; +} InCall; + typedef struct Task_ { #if defined(THREADED_RTS) OSThreadId id; // The OS Thread ID of this task + + Condition cond; // used for sleeping & waking up this task + Mutex lock; // lock for the condition variable + + // this flag tells the task whether it should wait on task->cond + // or just continue immediately. It's a workaround for the fact + // that signalling a condition variable doesn't do anything if the + // thread is already running, but we want it to be sticky. + rtsBool wakeup; #endif // This points to the Capability that the Task "belongs" to. If @@ -92,26 +125,18 @@ typedef struct Task_ { // must be held when modifying task->cap. struct Capability_ *cap; + // The current top-of-stack InCall + struct InCall_ *incall; + + nat n_spare_incalls; + struct InCall_ *spare_incalls; + + rtsBool worker; // == rtsTrue if this is a worker Task rtsBool stopped; // this task has stopped or exited Haskell - StgTSO * suspended_tso; // the TSO is stashed here when we - // make a foreign call (NULL otherwise); - // The following 3 fields are used by bound threads: - StgTSO * tso; // the bound TSO (or NULL) SchedulerStatus stat; // return status StgClosure ** ret; // return value -#if defined(THREADED_RTS) - Condition cond; // used for sleeping & waking up this task - Mutex lock; // lock for the condition variable - - // this flag tells the task whether it should wait on task->cond - // or just continue immediately. It's a workaround for the fact - // that signalling a condition variable doesn't do anything if the - // thread is already running, but we want it to be sticky. - rtsBool wakeup; -#endif - // Stats that we collect about this task // ToDo: we probably want to put this in a separate TaskStats // structure, so we can share it between multiple Tasks. We don't @@ -125,29 +150,19 @@ typedef struct Task_ { Ticks gc_time; Ticks gc_etime; - // Links tasks onto various lists. (ToDo: do we need double - // linking now?) - struct Task_ *prev; + // Links tasks on the returning_tasks queue of a Capability, and + // on spare_workers. struct Task_ *next; - // Links tasks on the returning_tasks queue of a Capability. - struct Task_ *return_link; - // Links tasks on the all_tasks list struct Task_ *all_link; - // When a Haskell thread makes a foreign call that re-enters - // Haskell, we end up with another Task associated with the - // current thread. We have to remember the whole stack of Tasks - // associated with the current thread so that we can correctly - // save & restore the thread-local current task pointer. - struct Task_ *prev_stack; } Task; INLINE_HEADER rtsBool isBoundTask (Task *task) { - return (task->tso != NULL); + return (task->incall->tso != NULL); } @@ -171,11 +186,6 @@ Task *newBoundTask (void); // void boundTaskExiting (Task *task); -// This must be called when a new Task is associated with the current -// thread. It sets up the thread-local current task pointer so that -// myTask() can work. -INLINE_HEADER void taskEnter (Task *task); - // Notify the task manager that a task has stopped. This is used // mainly for stats-gathering purposes. // Requires: sched_mutex. @@ -194,7 +204,7 @@ void taskTimeStamp (Task *task); // Put the task back on the free list, mark it stopped. Used by // forkProcess(). // -void discardTask (Task *task); +void discardTasksExcept (Task *keep); // Get the Task associated with the current OS thread (or NULL if none). // @@ -207,8 +217,7 @@ INLINE_HEADER Task *myTask (void); // will become the running_task for that Capability. // Requires: sched_mutex. // -void startWorkerTask (struct Capability_ *cap, - void OSThreadProcAttr (*taskStart)(Task *task)); +void startWorkerTask (Capability *cap); #endif /* THREADED_RTS */ @@ -218,7 +227,13 @@ void startWorkerTask (struct Capability_ *cap, // A thread-local-storage key that we can use to get access to the // current thread's Task structure. #if defined(THREADED_RTS) +#if defined(linux_HOST_OS) && \ + (defined(i386_HOST_ARCH) || defined(x86_64_HOST_ARCH)) +#define MYTASK_USE_TLV +extern __thread Task *my_task; +#else extern ThreadLocalKey currentTaskKey; +#endif #else extern Task *my_task; #endif @@ -232,7 +247,7 @@ extern Task *my_task; INLINE_HEADER Task * myTask (void) { -#if defined(THREADED_RTS) +#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV) return getThreadLocalVar(¤tTaskKey); #else return my_task; @@ -242,25 +257,13 @@ myTask (void) INLINE_HEADER void setMyTask (Task *task) { -#if defined(THREADED_RTS) +#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV) setThreadLocalVar(¤tTaskKey,task); #else my_task = task; #endif } -// This must be called when a new Task is associated with the current -// thread. It sets up the thread-local current task pointer so that -// myTask() can work. -INLINE_HEADER void -taskEnter (Task *task) -{ - // save the current value, just in case this Task has been created - // as a result of re-entering the RTS (defaults to NULL): - task->prev_stack = myTask(); - setMyTask(task); -} - END_RTS_PRIVATE #endif /* TASK_H */ diff --git a/rts/Threads.c b/rts/Threads.c index 4f9560c36c..08b7aab66e 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -230,8 +230,8 @@ unblockOne_ (Capability *cap, StgTSO *tso, // We are waking up this thread on the current Capability, which // might involve migrating it from the Capability it was last on. if (tso->bound) { - ASSERT(tso->bound->cap == tso->cap); - tso->bound->cap = cap; + ASSERT(tso->bound->task->cap == tso->cap); + tso->bound->task->cap = cap; } tso->cap = cap; diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c index 315eda732d..e55ae2b7c2 100644 --- a/rts/sm/Compact.c +++ b/rts/sm/Compact.c @@ -1014,10 +1014,14 @@ compact(StgClosure *static_objects) // the task list { Task *task; + InCall *incall; for (task = all_tasks; task != NULL; task = task->all_link) { - if (task->tso) { - thread_(&task->tso); - } + for (incall = task->incall; incall != NULL; + incall = incall->prev_stack) { + if (incall->tso) { + thread_(&incall->tso); + } + } } } |