summaryrefslogtreecommitdiff
path: root/rts
diff options
context:
space:
mode:
Diffstat (limited to 'rts')
-rw-r--r--rts/Capability.c40
-rw-r--r--rts/Capability.h2
-rw-r--r--rts/Schedule.c144
-rw-r--r--rts/Schedule.h11
-rw-r--r--rts/Stats.c2
-rw-r--r--rts/Task.c255
-rw-r--r--rts/Task.h133
-rw-r--r--rts/Threads.c4
-rw-r--r--rts/sm/Compact.c10
9 files changed, 328 insertions, 273 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index cf85372ce0..ce6ecebd72 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -173,10 +173,10 @@ STATIC_INLINE void
newReturningTask (Capability *cap, Task *task)
{
ASSERT_LOCK_HELD(&cap->lock);
- ASSERT(task->return_link == NULL);
+ ASSERT(task->next == NULL);
if (cap->returning_tasks_hd) {
- ASSERT(cap->returning_tasks_tl->return_link == NULL);
- cap->returning_tasks_tl->return_link = task;
+ ASSERT(cap->returning_tasks_tl->next == NULL);
+ cap->returning_tasks_tl->next = task;
} else {
cap->returning_tasks_hd = task;
}
@@ -190,11 +190,11 @@ popReturningTask (Capability *cap)
Task *task;
task = cap->returning_tasks_hd;
ASSERT(task);
- cap->returning_tasks_hd = task->return_link;
+ cap->returning_tasks_hd = task->next;
if (!cap->returning_tasks_hd) {
cap->returning_tasks_tl = NULL;
}
- task->return_link = NULL;
+ task->next = NULL;
return task;
}
#endif
@@ -220,7 +220,7 @@ initCapability( Capability *cap, nat i )
initMutex(&cap->lock);
cap->running_task = NULL; // indicates cap is free
cap->spare_workers = NULL;
- cap->suspended_ccalling_tasks = NULL;
+ cap->suspended_ccalls = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
@@ -342,7 +342,7 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->cap == cap);
debugTrace(DEBUG_sched, "passing capability %d to %s %p",
- cap->no, task->tso ? "bound task" : "worker",
+ cap->no, task->incall->tso ? "bound task" : "worker",
(void *)task->id);
ACQUIRE_LOCK(&task->lock);
task->wakeup = rtsTrue;
@@ -398,7 +398,7 @@ releaseCapability_ (Capability* cap,
// assertion is false: in schedule() we force a yield after
// ThreadBlocked, but the thread may be back on the run queue
// by now.
- task = cap->run_queue_hd->bound;
+ task = cap->run_queue_hd->bound->task;
giveCapabilityToTask(cap,task);
return;
}
@@ -411,7 +411,7 @@ releaseCapability_ (Capability* cap,
if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
debugTrace(DEBUG_sched,
"starting new worker on capability %d", cap->no);
- startWorkerTask(cap, workerStart);
+ startWorkerTask(cap);
return;
}
}
@@ -462,9 +462,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
// in which case it is not replaced on the spare_worker queue.
// This happens when the system is shutting down (see
// Schedule.c:workerStart()).
- // Also, be careful to check that this task hasn't just exited
- // Haskell to do a foreign call (task->suspended_tso).
- if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
+ if (!isBoundTask(task) && !task->stopped) {
task->next = cap->spare_workers;
cap->spare_workers = task;
}
@@ -612,7 +610,7 @@ yieldCapability (Capability** pCap, Task *task)
continue;
}
- if (task->tso == NULL) {
+ if (task->incall->tso == NULL) {
ASSERT(cap->spare_workers != NULL);
// if we're not at the front of the queue, release it
// again. This is unlikely to happen.
@@ -655,12 +653,12 @@ wakeupThreadOnCapability (Capability *my_cap,
// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
- ASSERT(tso->bound->cap == tso->cap);
- tso->bound->cap = other_cap;
+ ASSERT(tso->bound->task->cap == tso->cap);
+ tso->bound->task->cap = other_cap;
}
tso->cap = other_cap;
- ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
+ ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1);
if (other_cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
@@ -781,7 +779,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
// that will try to return to code that has been unloaded.
// We can be a bit more relaxed when this is a standalone
// program that is about to terminate, and let safe=false.
- if (cap->suspended_ccalling_tasks && safe) {
+ if (cap->suspended_ccalls && safe) {
debugTrace(DEBUG_sched,
"thread(s) are involved in foreign calls, yielding");
cap->running_task = NULL;
@@ -871,7 +869,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
{
nat i;
Capability *cap;
- Task *task;
+ InCall *incall;
// Each GC thread is responsible for following roots from the
// Capability of the same number. There will usually be the same
@@ -886,9 +884,9 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
#endif
- for (task = cap->suspended_ccalling_tasks; task != NULL;
- task=task->next) {
- evac(user, (StgClosure **)(void *)&task->suspended_tso);
+ for (incall = cap->suspended_ccalls; incall != NULL;
+ incall=incall->next) {
+ evac(user, (StgClosure **)(void *)&incall->suspended_tso);
}
#if defined(THREADED_RTS)
diff --git a/rts/Capability.h b/rts/Capability.h
index 4b51548564..41974dc710 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -56,7 +56,7 @@ struct Capability_ {
// the suspended TSOs easily. Hence, when migrating a Task from
// the returning_tasks list, we must also migrate its entry from
// this list.
- Task *suspended_ccalling_tasks;
+ InCall *suspended_ccalls;
// One mutable list per generation, so we don't need to take any
// locks when updating an old-generation thunk. This also lets us
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 66af8be7cb..4cca469869 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -312,7 +312,7 @@ schedule (Capability *initialCapability, Task *task)
// If we are a worker, just exit. If we're a bound thread
// then we will exit below when we've removed our TSO from
// the run queue.
- if (task->tso == NULL && emptyRunQueue(cap)) {
+ if (!isBoundTask(task) && emptyRunQueue(cap)) {
return cap;
}
break;
@@ -378,10 +378,10 @@ schedule (Capability *initialCapability, Task *task)
// Check whether we can run this thread in the current task.
// If not, we have to pass our capability to the right task.
{
- Task *bound = t->bound;
+ InCall *bound = t->bound;
if (bound) {
- if (bound == task) {
+ if (bound->task == task) {
// yes, the Haskell thread is bound to the current native thread
} else {
debugTrace(DEBUG_sched,
@@ -393,7 +393,7 @@ schedule (Capability *initialCapability, Task *task)
}
} else {
// The thread we want to run is unbound.
- if (task->tso) {
+ if (task->incall->tso) {
debugTrace(DEBUG_sched,
"this OS thread cannot run thread %lu",
(unsigned long)t->id);
@@ -441,7 +441,7 @@ run_thread:
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
- ASSERT(t->bound ? t->bound->cap == cap : 1);
+ ASSERT(t->bound ? t->bound->task->cap == cap : 1);
prev_what_next = t->what_next;
@@ -639,9 +639,9 @@ shouldYieldCapability (Capability *cap, Task *task)
// and this task it bound).
return (waiting_for_gc ||
cap->returning_tasks_hd != NULL ||
- (!emptyRunQueue(cap) && (task->tso == NULL
+ (!emptyRunQueue(cap) && (task->incall->tso == NULL
? cap->run_queue_hd->bound != NULL
- : cap->run_queue_hd->bound != task)));
+ : cap->run_queue_hd->bound != task->incall)));
}
// This is the single place where a Task goes to sleep. There are
@@ -768,7 +768,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
next = t->_link;
t->_link = END_TSO_QUEUE;
if (t->what_next == ThreadRelocated
- || t->bound == task // don't move my bound thread
+ || t->bound == task->incall // don't move my bound thread
|| tsoLocked(t)) { // don't move a locked thread
setTSOLink(cap, prev, t);
prev = t;
@@ -781,9 +781,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
} else {
appendToRunQueue(free_caps[i],t);
- traceEventMigrateThread (cap, t, free_caps[i]->no);
+ traceEventMigrateThread (cap, t, free_caps[i]->no);
- if (t->bound) { t->bound->cap = free_caps[i]; }
+ if (t->bound) { t->bound->task->cap = free_caps[i]; }
t->cap = free_caps[i];
i++;
}
@@ -979,13 +979,13 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
/* Probably a real deadlock. Send the current main thread the
* Deadlock exception.
*/
- if (task->tso) {
- switch (task->tso->why_blocked) {
+ if (task->incall->tso) {
+ switch (task->incall->tso->why_blocked) {
case BlockedOnSTM:
case BlockedOnBlackHole:
case BlockedOnException:
case BlockedOnMVar:
- throwToSingleThreaded(cap, task->tso,
+ throwToSingleThreaded(cap, task->incall->tso,
(StgClosure *)nonTermination_closure);
return;
default:
@@ -1174,8 +1174,8 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
/* The TSO attached to this Task may have moved, so update the
* pointer to it.
*/
- if (task->tso == t) {
- task->tso = new_t;
+ if (task->incall->tso == t) {
+ task->incall->tso = new_t;
}
pushOnRunQueue(cap,new_t);
}
@@ -1285,7 +1285,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
if (t->bound) {
- if (t->bound != task) {
+ if (t->bound != task->incall) {
#if !defined(THREADED_RTS)
// Must be a bound thread that is not the topmost one. Leave
// it on the run queue until the stack has unwound to the
@@ -1302,12 +1302,12 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
#endif
}
- ASSERT(task->tso == t);
+ ASSERT(task->incall->tso == t);
if (t->what_next == ThreadComplete) {
if (task->ret) {
// NOTE: return val is tso->sp[1] (see StgStartup.hc)
- *(task->ret) = (StgClosure *)task->tso->sp[1];
+ *(task->ret) = (StgClosure *)task->incall->tso->sp[1];
}
task->stat = Success;
} else {
@@ -1325,7 +1325,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
}
}
#ifdef DEBUG
- removeThreadLabel((StgWord)task->tso->id);
+ removeThreadLabel((StgWord)task->incall->tso->id);
#endif
// We no longer consider this thread and task to be bound to
@@ -1336,7 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
// re-used). This was a real bug: the GC updated
// tso->bound->tso which lead to a deadlock.
t->bound = NULL;
- task->tso = NULL;
+ task->incall->tso = NULL;
return rtsTrue; // tells schedule() to return
}
@@ -1586,7 +1586,6 @@ forkProcess(HsStablePtr *entry
)
{
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
- Task *task;
pid_t pid;
StgTSO* t,*next;
Capability *cap;
@@ -1661,7 +1660,7 @@ forkProcess(HsStablePtr *entry
// Any suspended C-calling Tasks are no more, their OS threads
// don't exist now:
- cap->suspended_ccalling_tasks = NULL;
+ cap->suspended_ccalls = NULL;
// Empty the threads lists. Otherwise, the garbage
// collector may attempt to resurrect some of these threads.
@@ -1669,17 +1668,7 @@ forkProcess(HsStablePtr *entry
generations[g].threads = END_TSO_QUEUE;
}
- // Wipe the task list, except the current Task.
- ACQUIRE_LOCK(&sched_mutex);
- for (task = all_tasks; task != NULL; task=task->all_link) {
- if (task != cap->running_task) {
-#if defined(THREADED_RTS)
- initMutex(&task->lock); // see #1391
-#endif
- discardTask(task);
- }
- }
- RELEASE_LOCK(&sched_mutex);
+ discardTasksExcept(cap->running_task);
#if defined(THREADED_RTS)
// Wipe our spare workers list, they no longer exist. New
@@ -1747,35 +1736,41 @@ deleteAllThreads ( Capability *cap )
}
/* -----------------------------------------------------------------------------
- Managing the suspended_ccalling_tasks list.
+ Managing the suspended_ccalls list.
Locks required: sched_mutex
-------------------------------------------------------------------------- */
STATIC_INLINE void
suspendTask (Capability *cap, Task *task)
{
- ASSERT(task->next == NULL && task->prev == NULL);
- task->next = cap->suspended_ccalling_tasks;
- task->prev = NULL;
- if (cap->suspended_ccalling_tasks) {
- cap->suspended_ccalling_tasks->prev = task;
- }
- cap->suspended_ccalling_tasks = task;
+ InCall *incall;
+
+ incall = task->incall;
+ ASSERT(incall->next == NULL && incall->prev == NULL);
+ incall->next = cap->suspended_ccalls;
+ incall->prev = NULL;
+ if (cap->suspended_ccalls) {
+ cap->suspended_ccalls->prev = incall;
+ }
+ cap->suspended_ccalls = incall;
}
STATIC_INLINE void
recoverSuspendedTask (Capability *cap, Task *task)
{
- if (task->prev) {
- task->prev->next = task->next;
+ InCall *incall;
+
+ incall = task->incall;
+ if (incall->prev) {
+ incall->prev->next = incall->next;
} else {
- ASSERT(cap->suspended_ccalling_tasks == task);
- cap->suspended_ccalling_tasks = task->next;
+ ASSERT(cap->suspended_ccalls == incall);
+ cap->suspended_ccalls = incall->next;
}
- if (task->next) {
- task->next->prev = task->prev;
+ if (incall->next) {
+ incall->next->prev = incall->prev;
}
- task->next = task->prev = NULL;
+ incall->next = incall->prev = NULL;
}
/* ---------------------------------------------------------------------------
@@ -1832,7 +1827,8 @@ suspendThread (StgRegTable *reg)
}
// Hand back capability
- task->suspended_tso = tso;
+ task->incall->suspended_tso = tso;
+ task->incall->suspended_cap = cap;
ACQUIRE_LOCK(&cap->lock);
@@ -1853,6 +1849,7 @@ StgRegTable *
resumeThread (void *task_)
{
StgTSO *tso;
+ InCall *incall;
Capability *cap;
Task *task = task_;
int saved_errno;
@@ -1865,18 +1862,22 @@ resumeThread (void *task_)
saved_winerror = GetLastError();
#endif
- cap = task->cap;
+ incall = task->incall;
+ cap = incall->suspended_cap;
+ task->cap = cap;
+
// Wait for permission to re-enter the RTS with the result.
waitForReturnCapability(&cap,task);
// we might be on a different capability now... but if so, our
- // entry on the suspended_ccalling_tasks list will also have been
+ // entry on the suspended_ccalls list will also have been
// migrated.
// Remove the thread from the suspended list
recoverSuspendedTask(cap,task);
- tso = task->suspended_tso;
- task->suspended_tso = NULL;
+ tso = incall->suspended_tso;
+ incall->suspended_tso = NULL;
+ incall->suspended_cap = NULL;
tso->_link = END_TSO_QUEUE; // no write barrier reqd
traceEventRunThread(cap, tso);
@@ -1954,10 +1955,10 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
// This TSO is now a bound thread; make the Task and TSO
// point to each other.
- tso->bound = task;
+ tso->bound = task->incall;
tso->cap = cap;
- task->tso = tso;
+ task->incall->tso = tso;
task->ret = ret;
task->stat = NoStatus;
@@ -1980,23 +1981,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
* ------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
-void OSThreadProcAttr
-workerStart(Task *task)
+void scheduleWorker (Capability *cap, Task *task)
{
- Capability *cap;
-
- // See startWorkerTask().
- ACQUIRE_LOCK(&task->lock);
- cap = task->cap;
- RELEASE_LOCK(&task->lock);
-
- if (RtsFlags.ParFlags.setAffinity) {
- setThreadAffinity(cap->no, n_capabilities);
- }
-
- // set the thread-local pointer to the Task:
- taskEnter(task);
-
// schedule() runs without a lock.
cap = schedule(cap,task);
@@ -2062,6 +2048,8 @@ initScheduler(void)
initSparkPools();
#endif
+ RELEASE_LOCK(&sched_mutex);
+
#if defined(THREADED_RTS)
/*
* Eagerly start one worker to run each Capability, except for
@@ -2075,13 +2063,11 @@ initScheduler(void)
for (i = 1; i < n_capabilities; i++) {
cap = &capabilities[i];
ACQUIRE_LOCK(&cap->lock);
- startWorkerTask(cap, workerStart);
+ startWorkerTask(cap);
RELEASE_LOCK(&cap->lock);
}
}
#endif
-
- RELEASE_LOCK(&sched_mutex);
}
void
@@ -2102,7 +2088,7 @@ exitScheduler(
sched_state = SCHED_INTERRUPTING;
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,rtsFalse);
- ASSERT(task->tso == NULL);
+ ASSERT(task->incall->tso == NULL);
releaseCapability(task->cap);
}
sched_state = SCHED_SHUTTING_DOWN;
@@ -2112,7 +2098,7 @@ exitScheduler(
nat i;
for (i = 0; i < n_capabilities; i++) {
- ASSERT(task->tso == NULL);
+ ASSERT(task->incall->tso == NULL);
shutdownCapability(&capabilities[i], task, wait_foreign);
}
}
@@ -2161,7 +2147,7 @@ performGC_(rtsBool force_major)
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
- // suspended_ccalling_tasks queue.
+ // suspended_ccalls queue.
task = newBoundTask();
waitForReturnCapability(&task->cap,task);
@@ -2368,8 +2354,8 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
// The TSO attached to this Task may have moved, so update the
// pointer to it.
- if (task->tso == tso) {
- task->tso = new_tso;
+ if (task->incall->tso == tso) {
+ task->incall->tso = new_tso;
}
unlockTSO(new_tso);
diff --git a/rts/Schedule.h b/rts/Schedule.h
index 6751144be8..af322d804f 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -46,15 +46,8 @@ StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *excepti
/* findRetryFrameHelper */
StgWord findRetryFrameHelper (StgTSO *tso);
-/* workerStart()
- *
- * Entry point for a new worker task.
- * Called from STG : NO
- * Locks assumed : none
- */
-#if defined(THREADED_RTS)
-void OSThreadProcAttr workerStart(Task *task);
-#endif
+/* Entry point for a new worker */
+void scheduleWorker (Capability *cap, Task *task);
/* The state of the scheduler. This is used to control the sequence
* of events during shutdown, and when the runtime is interrupted
diff --git a/rts/Stats.c b/rts/Stats.c
index 20de32a808..58b113d74b 100644
--- a/rts/Stats.c
+++ b/rts/Stats.c
@@ -623,7 +623,7 @@ stat_exit(int alloc)
i++, task = task->all_link) {
statsPrintf(" Task %2d %-8s : %6.2fs (%6.2fs) %6.2fs (%6.2fs)\n",
i,
- (task->tso == NULL) ? "(worker)" : "(bound)",
+ (task->worker) ? "(worker)" : "(bound)",
TICK_TO_DBL(task->mut_time),
TICK_TO_DBL(task->mut_etime),
TICK_TO_DBL(task->gc_time),
diff --git a/rts/Task.c b/rts/Task.c
index 9a8ebd6963..2921e9e181 100644
--- a/rts/Task.c
+++ b/rts/Task.c
@@ -26,12 +26,13 @@
// Task lists and global counters.
// Locks required: sched_mutex.
Task *all_tasks = NULL;
-static Task *task_free_list = NULL; // singly-linked
static nat taskCount;
-static nat tasksRunning;
-static nat workerCount;
static int tasksInitialized = 0;
+static void freeTask (Task *task);
+static Task * allocTask (void);
+static Task * newTask (rtsBool);
+
/* -----------------------------------------------------------------------------
* Remembering the current thread's Task
* -------------------------------------------------------------------------- */
@@ -39,7 +40,11 @@ static int tasksInitialized = 0;
// A thread-local-storage key that we can use to get access to the
// current thread's Task structure.
#if defined(THREADED_RTS)
+# if defined(MYTASK_USE_TLV)
+__thread Task *my_task;
+# else
ThreadLocalKey currentTaskKey;
+# endif
#else
Task *my_task;
#endif
@@ -53,10 +58,8 @@ initTaskManager (void)
{
if (!tasksInitialized) {
taskCount = 0;
- workerCount = 0;
- tasksRunning = 0;
tasksInitialized = 1;
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
newThreadLocalKey(&currentTaskKey);
#endif
}
@@ -66,29 +69,24 @@ nat
freeTaskManager (void)
{
Task *task, *next;
+ nat tasksRunning = 0;
ASSERT_LOCK_HELD(&sched_mutex);
- debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
- tasksRunning);
-
for (task = all_tasks; task != NULL; task = next) {
next = task->all_link;
if (task->stopped) {
- // We only free resources if the Task is not in use. A
- // Task may still be in use if we have a Haskell thread in
- // a foreign call while we are attempting to shut down the
- // RTS (see conc059).
-#if defined(THREADED_RTS)
- closeCondition(&task->cond);
- closeMutex(&task->lock);
-#endif
- stgFree(task);
+ freeTask(task);
+ } else {
+ tasksRunning++;
}
}
+
+ debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
+ tasksRunning);
+
all_tasks = NULL;
- task_free_list = NULL;
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
freeThreadLocalKey(&currentTaskKey);
#endif
@@ -97,9 +95,52 @@ freeTaskManager (void)
return tasksRunning;
}
+static Task *
+allocTask (void)
+{
+ Task *task;
+
+ task = myTask();
+ if (task != NULL) {
+ return task;
+ } else {
+ task = newTask(rtsFalse);
+#if defined(THREADED_RTS)
+ task->id = osThreadId();
+#endif
+ setMyTask(task);
+ return task;
+ }
+}
+
+static void
+freeTask (Task *task)
+{
+ InCall *incall, *next;
+
+ // We only free resources if the Task is not in use. A
+ // Task may still be in use if we have a Haskell thread in
+ // a foreign call while we are attempting to shut down the
+ // RTS (see conc059).
+#if defined(THREADED_RTS)
+ closeCondition(&task->cond);
+ closeMutex(&task->lock);
+#endif
+
+ for (incall = task->incall; incall != NULL; incall = next) {
+ next = incall->prev_stack;
+ stgFree(incall);
+ }
+ for (incall = task->spare_incalls; incall != NULL; incall = next) {
+ next = incall->next;
+ stgFree(incall);
+ }
+
+ stgFree(task);
+}
static Task*
-newTask (void)
+newTask (rtsBool worker)
{
#if defined(THREADED_RTS)
Ticks currentElapsedTime, currentUserTime;
@@ -109,12 +150,14 @@ newTask (void)
#define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
- task->cap = NULL;
- task->stopped = rtsFalse;
- task->suspended_tso = NULL;
- task->tso = NULL;
- task->stat = NoStatus;
- task->ret = NULL;
+ task->cap = NULL;
+ task->worker = worker;
+ task->stopped = rtsFalse;
+ task->stat = NoStatus;
+ task->ret = NULL;
+ task->n_spare_incalls = 0;
+ task->spare_incalls = NULL;
+ task->incall = NULL;
#if defined(THREADED_RTS)
initCondition(&task->cond);
@@ -133,18 +176,65 @@ newTask (void)
task->elapsedtimestart = currentElapsedTime;
#endif
- task->prev = NULL;
task->next = NULL;
- task->return_link = NULL;
+
+ ACQUIRE_LOCK(&sched_mutex);
task->all_link = all_tasks;
all_tasks = task;
taskCount++;
+ RELEASE_LOCK(&sched_mutex);
+
return task;
}
+// avoid the spare_incalls list growing unboundedly
+#define MAX_SPARE_INCALLS 8
+
+static void
+newInCall (Task *task)
+{
+ InCall *incall;
+
+ if (task->spare_incalls != NULL) {
+ incall = task->spare_incalls;
+ task->spare_incalls = incall->next;
+ task->n_spare_incalls--;
+ } else {
+ incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
+ }
+
+ incall->tso = NULL;
+ incall->task = task;
+ incall->suspended_tso = NULL;
+ incall->suspended_cap = NULL;
+ incall->next = NULL;
+ incall->prev = NULL;
+ incall->prev_stack = task->incall;
+ task->incall = incall;
+}
+
+static void
+endInCall (Task *task)
+{
+ InCall *incall;
+
+ incall = task->incall;
+ incall->tso = NULL;
+ task->incall = task->incall->prev_stack;
+
+ if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
+ stgFree(incall);
+ } else {
+ incall->next = task->spare_incalls;
+ task->spare_incalls = incall;
+ task->n_spare_incalls++;
+ }
+}
+
+
Task *
newBoundTask (void)
{
@@ -155,31 +245,11 @@ newBoundTask (void)
stg_exit(EXIT_FAILURE);
}
- // ToDo: get rid of this lock in the common case. We could store
- // a free Task in thread-local storage, for example. That would
- // leave just one lock on the path into the RTS: cap->lock when
- // acquiring the Capability.
- ACQUIRE_LOCK(&sched_mutex);
-
- if (task_free_list == NULL) {
- task = newTask();
- } else {
- task = task_free_list;
- task_free_list = task->next;
- task->next = NULL;
- task->prev = NULL;
- task->stopped = rtsFalse;
- }
-#if defined(THREADED_RTS)
- task->id = osThreadId();
-#endif
- ASSERT(task->cap == NULL);
-
- tasksRunning++;
+ task = allocTask();
- taskEnter(task);
+ task->stopped = rtsFalse;
- RELEASE_LOCK(&sched_mutex);
+ newInCall(task);
debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
return task;
@@ -188,27 +258,19 @@ newBoundTask (void)
void
boundTaskExiting (Task *task)
{
- task->tso = NULL;
task->stopped = rtsTrue;
- task->cap = NULL;
#if defined(THREADED_RTS)
ASSERT(osThreadId() == task->id);
#endif
ASSERT(myTask() == task);
- setMyTask(task->prev_stack);
-
- tasksRunning--;
- // sadly, we need a lock around the free task list. Todo: eliminate.
- ACQUIRE_LOCK(&sched_mutex);
- task->next = task_free_list;
- task_free_list = task;
- RELEASE_LOCK(&sched_mutex);
+ endInCall(task);
debugTrace(DEBUG_sched, "task exiting");
}
+
#ifdef THREADED_RTS
#define TASK_ID(t) (t)->id
#else
@@ -216,22 +278,20 @@ boundTaskExiting (Task *task)
#endif
void
-discardTask (Task *task)
+discardTasksExcept (Task *keep)
{
- ASSERT_LOCK_HELD(&sched_mutex);
- if (!task->stopped) {
- debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
- task->cap = NULL;
- if (task->tso == NULL) {
- workerCount--;
- } else {
- task->tso = NULL;
+ Task *task;
+
+ // Wipe the task list, except the current Task.
+ ACQUIRE_LOCK(&sched_mutex);
+ for (task = all_tasks; task != NULL; task=task->all_link) {
+ if (task != keep) {
+ debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
+ freeTask(task);
}
- task->stopped = rtsTrue;
- tasksRunning--;
- task->next = task_free_list;
- task_free_list = task;
}
+ all_tasks = keep;
+ RELEASE_LOCK(&sched_mutex);
}
void
@@ -270,33 +330,43 @@ workerTaskStop (Task *task)
task->cap = NULL;
taskTimeStamp(task);
task->stopped = rtsTrue;
- tasksRunning--;
- workerCount--;
-
- ACQUIRE_LOCK(&sched_mutex);
- task->next = task_free_list;
- task_free_list = task;
- RELEASE_LOCK(&sched_mutex);
}
#endif
#if defined(THREADED_RTS)
+static void OSThreadProcAttr
+workerStart(Task *task)
+{
+ Capability *cap;
+
+ // See startWorkerTask().
+ ACQUIRE_LOCK(&task->lock);
+ cap = task->cap;
+ RELEASE_LOCK(&task->lock);
+
+ if (RtsFlags.ParFlags.setAffinity) {
+ setThreadAffinity(cap->no, n_capabilities);
+ }
+
+ // set the thread-local pointer to the Task:
+ setMyTask(task);
+
+ newInCall(task);
+
+ scheduleWorker(cap,task);
+}
+
void
-startWorkerTask (Capability *cap,
- void OSThreadProcAttr (*taskStart)(Task *task))
+startWorkerTask (Capability *cap)
{
int r;
OSThreadId tid;
Task *task;
- workerCount++;
-
// A worker always gets a fresh Task structure.
- task = newTask();
-
- tasksRunning++;
+ task = newTask(rtsTrue);
// The lock here is to synchronise with taskStart(), to make sure
// that we have finished setting up the Task structure before the
@@ -311,7 +381,7 @@ startWorkerTask (Capability *cap,
ASSERT_LOCK_HELD(&cap->lock);
cap->running_task = task;
- r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
+ r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
if (r != 0) {
sysErrorBelch("failed to create OS thread");
stg_exit(EXIT_FAILURE);
@@ -350,8 +420,9 @@ printAllTasks(void)
if (task->cap) {
debugBelch("on capability %d, ", task->cap->no);
}
- if (task->tso) {
- debugBelch("bound to thread %lu", (unsigned long)task->tso->id);
+ if (task->incall->tso) {
+ debugBelch("bound to thread %lu",
+ (unsigned long)task->incall->tso->id);
} else {
debugBelch("worker");
}
diff --git a/rts/Task.h b/rts/Task.h
index 9b5f0253cd..c2b58f2c45 100644
--- a/rts/Task.h
+++ b/rts/Task.h
@@ -17,24 +17,20 @@ BEGIN_RTS_PRIVATE
Definition of a Task
--------------------
- A task is an OSThread that runs Haskell code. Every OSThread
- created by the RTS for the purposes of running Haskell code is a
- Task, and OS threads that enter the Haskell RTS for the purposes of
- making a call-in are also Tasks.
+ A task is an OSThread that runs Haskell code. Every OSThread that
+ runs inside the RTS, whether as a worker created by the RTS or via
+ an in-call from C to Haskell, has an associated Task. The first
+ time an OS thread calls into Haskell it is allocated a Task, which
+ remains until the RTS is shut down.
+
+ There is a one-to-one relationship between OSThreads and Tasks.
+ The Task for an OSThread is kept in thread-local storage, and can
+ be retrieved at any time using myTask().
In the THREADED_RTS build, multiple Tasks may all be running
Haskell code simultaneously. A task relinquishes its Capability
when it is asked to evaluate an external (C) call.
- In general, there may be multiple Tasks associated with a given OS
- thread. A second Task is created when one Task makes a foreign
- call from Haskell, and subsequently calls back in to Haskell,
- creating a new bound thread.
-
- A particular Task structure can belong to more than one OS thread
- over its lifetime. This is to avoid creating an unbounded number
- of Task structures. The stats just accumulate.
-
Ownership of Task
-----------------
@@ -59,8 +55,8 @@ BEGIN_RTS_PRIVATE
(1) a bound Task, the TSO will be on a queue somewhere
(2) a worker task, on the spare_workers queue of task->cap.
- (b) making a foreign call. The Task will be on the
- suspended_ccalling_tasks list.
+ (b) making a foreign call. The InCall will be on the
+ suspended_ccalls list.
We re-establish ownership in each case by respectively
@@ -73,9 +69,46 @@ BEGIN_RTS_PRIVATE
ownership of the Task and a Capability.
*/
+// The InCall structure represents either a single in-call from C to
+// Haskell, or a worker thread.
+typedef struct InCall_ {
+ StgTSO * tso; // the bound TSO (or NULL for a worker)
+
+ StgTSO * suspended_tso; // the TSO is stashed here when we
+ // make a foreign call (NULL otherwise);
+
+ Capability *suspended_cap; // The capability that the
+ // suspended_tso is on, because
+ // we can't read this from the TSO
+ // without owning a Capability in the
+ // first place.
+
+ struct Task_ *task;
+
+ // When a Haskell thread makes a foreign call that re-enters
+ // Haskell, we end up with another Task associated with the
+ // current thread. We have to remember the whole stack of InCalls
+ // associated with the current Task so that we can correctly
+ // save & restore the InCall on entry to and exit from Haskell.
+ struct InCall_ *prev_stack;
+
+ // Links InCalls onto suspended_ccalls, spare_incalls
+ struct InCall_ *prev;
+ struct InCall_ *next;
+} InCall;
+
typedef struct Task_ {
#if defined(THREADED_RTS)
OSThreadId id; // The OS Thread ID of this task
+
+ Condition cond; // used for sleeping & waking up this task
+ Mutex lock; // lock for the condition variable
+
+ // this flag tells the task whether it should wait on task->cond
+ // or just continue immediately. It's a workaround for the fact
+ // that signalling a condition variable doesn't do anything if the
+ // thread is already running, but we want it to be sticky.
+ rtsBool wakeup;
#endif
// This points to the Capability that the Task "belongs" to. If
@@ -92,26 +125,18 @@ typedef struct Task_ {
// must be held when modifying task->cap.
struct Capability_ *cap;
+ // The current top-of-stack InCall
+ struct InCall_ *incall;
+
+ nat n_spare_incalls;
+ struct InCall_ *spare_incalls;
+
+ rtsBool worker; // == rtsTrue if this is a worker Task
rtsBool stopped; // this task has stopped or exited Haskell
- StgTSO * suspended_tso; // the TSO is stashed here when we
- // make a foreign call (NULL otherwise);
- // The following 3 fields are used by bound threads:
- StgTSO * tso; // the bound TSO (or NULL)
SchedulerStatus stat; // return status
StgClosure ** ret; // return value
-#if defined(THREADED_RTS)
- Condition cond; // used for sleeping & waking up this task
- Mutex lock; // lock for the condition variable
-
- // this flag tells the task whether it should wait on task->cond
- // or just continue immediately. It's a workaround for the fact
- // that signalling a condition variable doesn't do anything if the
- // thread is already running, but we want it to be sticky.
- rtsBool wakeup;
-#endif
-
// Stats that we collect about this task
// ToDo: we probably want to put this in a separate TaskStats
// structure, so we can share it between multiple Tasks. We don't
@@ -125,29 +150,19 @@ typedef struct Task_ {
Ticks gc_time;
Ticks gc_etime;
- // Links tasks onto various lists. (ToDo: do we need double
- // linking now?)
- struct Task_ *prev;
+ // Links tasks on the returning_tasks queue of a Capability, and
+ // on spare_workers.
struct Task_ *next;
- // Links tasks on the returning_tasks queue of a Capability.
- struct Task_ *return_link;
-
// Links tasks on the all_tasks list
struct Task_ *all_link;
- // When a Haskell thread makes a foreign call that re-enters
- // Haskell, we end up with another Task associated with the
- // current thread. We have to remember the whole stack of Tasks
- // associated with the current thread so that we can correctly
- // save & restore the thread-local current task pointer.
- struct Task_ *prev_stack;
} Task;
INLINE_HEADER rtsBool
isBoundTask (Task *task)
{
- return (task->tso != NULL);
+ return (task->incall->tso != NULL);
}
@@ -171,11 +186,6 @@ Task *newBoundTask (void);
//
void boundTaskExiting (Task *task);
-// This must be called when a new Task is associated with the current
-// thread. It sets up the thread-local current task pointer so that
-// myTask() can work.
-INLINE_HEADER void taskEnter (Task *task);
-
// Notify the task manager that a task has stopped. This is used
// mainly for stats-gathering purposes.
// Requires: sched_mutex.
@@ -194,7 +204,7 @@ void taskTimeStamp (Task *task);
// Put the task back on the free list, mark it stopped. Used by
// forkProcess().
//
-void discardTask (Task *task);
+void discardTasksExcept (Task *keep);
// Get the Task associated with the current OS thread (or NULL if none).
//
@@ -207,8 +217,7 @@ INLINE_HEADER Task *myTask (void);
// will become the running_task for that Capability.
// Requires: sched_mutex.
//
-void startWorkerTask (struct Capability_ *cap,
- void OSThreadProcAttr (*taskStart)(Task *task));
+void startWorkerTask (Capability *cap);
#endif /* THREADED_RTS */
@@ -218,7 +227,13 @@ void startWorkerTask (struct Capability_ *cap,
// A thread-local-storage key that we can use to get access to the
// current thread's Task structure.
#if defined(THREADED_RTS)
+#if defined(linux_HOST_OS) && \
+ (defined(i386_HOST_ARCH) || defined(x86_64_HOST_ARCH))
+#define MYTASK_USE_TLV
+extern __thread Task *my_task;
+#else
extern ThreadLocalKey currentTaskKey;
+#endif
#else
extern Task *my_task;
#endif
@@ -232,7 +247,7 @@ extern Task *my_task;
INLINE_HEADER Task *
myTask (void)
{
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
return getThreadLocalVar(&currentTaskKey);
#else
return my_task;
@@ -242,25 +257,13 @@ myTask (void)
INLINE_HEADER void
setMyTask (Task *task)
{
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
setThreadLocalVar(&currentTaskKey,task);
#else
my_task = task;
#endif
}
-// This must be called when a new Task is associated with the current
-// thread. It sets up the thread-local current task pointer so that
-// myTask() can work.
-INLINE_HEADER void
-taskEnter (Task *task)
-{
- // save the current value, just in case this Task has been created
- // as a result of re-entering the RTS (defaults to NULL):
- task->prev_stack = myTask();
- setMyTask(task);
-}
-
END_RTS_PRIVATE
#endif /* TASK_H */
diff --git a/rts/Threads.c b/rts/Threads.c
index 4f9560c36c..08b7aab66e 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -230,8 +230,8 @@ unblockOne_ (Capability *cap, StgTSO *tso,
// We are waking up this thread on the current Capability, which
// might involve migrating it from the Capability it was last on.
if (tso->bound) {
- ASSERT(tso->bound->cap == tso->cap);
- tso->bound->cap = cap;
+ ASSERT(tso->bound->task->cap == tso->cap);
+ tso->bound->task->cap = cap;
}
tso->cap = cap;
diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c
index 315eda732d..e55ae2b7c2 100644
--- a/rts/sm/Compact.c
+++ b/rts/sm/Compact.c
@@ -1014,10 +1014,14 @@ compact(StgClosure *static_objects)
// the task list
{
Task *task;
+ InCall *incall;
for (task = all_tasks; task != NULL; task = task->all_link) {
- if (task->tso) {
- thread_(&task->tso);
- }
+ for (incall = task->incall; incall != NULL;
+ incall = incall->prev_stack) {
+ if (incall->tso) {
+ thread_(&incall->tso);
+ }
+ }
}
}