summaryrefslogtreecommitdiff
path: root/rts/Task.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2010-03-09 14:31:11 +0000
committerSimon Marlow <marlowsd@gmail.com>2010-03-09 14:31:11 +0000
commit7effbbbbdfe7eb05c6402fa9337e358e7e9fadde (patch)
treed5896a7858db38265b77fc799fc54d5151737aab /rts/Task.c
parent4e8b07dbc753a5132c574926468ba886728c9049 (diff)
downloadhaskell-7effbbbbdfe7eb05c6402fa9337e358e7e9fadde.tar.gz
Split part of the Task struct into a separate struct InCall
The idea is that this leaves Tasks and OSThread in one-to-one correspondence. The part of a Task that represents a call into Haskell from C is split into a separate struct InCall, pointed to by the Task and the TSO bound to it. A given OSThread/Task thus always uses the same mutex and condition variable, rather than getting a new one for each callback. Conceptually it is simpler, although there are more types and indirections in a few places now. This improves callback performance by removing some of the locks that we had to take when making in-calls. Now we also keep the current Task in a thread-local variable if supported by the OS and gcc (currently only Linux).
Diffstat (limited to 'rts/Task.c')
-rw-r--r--rts/Task.c255
1 files changed, 163 insertions, 92 deletions
diff --git a/rts/Task.c b/rts/Task.c
index 9a8ebd6963..2921e9e181 100644
--- a/rts/Task.c
+++ b/rts/Task.c
@@ -26,12 +26,13 @@
// Task lists and global counters.
// Locks required: sched_mutex.
Task *all_tasks = NULL;
-static Task *task_free_list = NULL; // singly-linked
static nat taskCount;
-static nat tasksRunning;
-static nat workerCount;
static int tasksInitialized = 0;
+static void freeTask (Task *task);
+static Task * allocTask (void);
+static Task * newTask (rtsBool);
+
/* -----------------------------------------------------------------------------
* Remembering the current thread's Task
* -------------------------------------------------------------------------- */
@@ -39,7 +40,11 @@ static int tasksInitialized = 0;
// A thread-local-storage key that we can use to get access to the
// current thread's Task structure.
#if defined(THREADED_RTS)
+# if defined(MYTASK_USE_TLV)
+__thread Task *my_task;
+# else
ThreadLocalKey currentTaskKey;
+# endif
#else
Task *my_task;
#endif
@@ -53,10 +58,8 @@ initTaskManager (void)
{
if (!tasksInitialized) {
taskCount = 0;
- workerCount = 0;
- tasksRunning = 0;
tasksInitialized = 1;
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
newThreadLocalKey(&currentTaskKey);
#endif
}
@@ -66,29 +69,24 @@ nat
freeTaskManager (void)
{
Task *task, *next;
+ nat tasksRunning = 0;
ASSERT_LOCK_HELD(&sched_mutex);
- debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
- tasksRunning);
-
for (task = all_tasks; task != NULL; task = next) {
next = task->all_link;
if (task->stopped) {
- // We only free resources if the Task is not in use. A
- // Task may still be in use if we have a Haskell thread in
- // a foreign call while we are attempting to shut down the
- // RTS (see conc059).
-#if defined(THREADED_RTS)
- closeCondition(&task->cond);
- closeMutex(&task->lock);
-#endif
- stgFree(task);
+ freeTask(task);
+ } else {
+ tasksRunning++;
}
}
+
+ debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
+ tasksRunning);
+
all_tasks = NULL;
- task_free_list = NULL;
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
freeThreadLocalKey(&currentTaskKey);
#endif
@@ -97,9 +95,52 @@ freeTaskManager (void)
return tasksRunning;
}
+static Task *
+allocTask (void)
+{
+ Task *task;
+
+ task = myTask();
+ if (task != NULL) {
+ return task;
+ } else {
+ task = newTask(rtsFalse);
+#if defined(THREADED_RTS)
+ task->id = osThreadId();
+#endif
+ setMyTask(task);
+ return task;
+ }
+}
+
+static void
+freeTask (Task *task)
+{
+ InCall *incall, *next;
+
+ // We only free resources if the Task is not in use. A
+ // Task may still be in use if we have a Haskell thread in
+ // a foreign call while we are attempting to shut down the
+ // RTS (see conc059).
+#if defined(THREADED_RTS)
+ closeCondition(&task->cond);
+ closeMutex(&task->lock);
+#endif
+
+ for (incall = task->incall; incall != NULL; incall = next) {
+ next = incall->prev_stack;
+ stgFree(incall);
+ }
+ for (incall = task->spare_incalls; incall != NULL; incall = next) {
+ next = incall->next;
+ stgFree(incall);
+ }
+
+ stgFree(task);
+}
static Task*
-newTask (void)
+newTask (rtsBool worker)
{
#if defined(THREADED_RTS)
Ticks currentElapsedTime, currentUserTime;
@@ -109,12 +150,14 @@ newTask (void)
#define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
- task->cap = NULL;
- task->stopped = rtsFalse;
- task->suspended_tso = NULL;
- task->tso = NULL;
- task->stat = NoStatus;
- task->ret = NULL;
+ task->cap = NULL;
+ task->worker = worker;
+ task->stopped = rtsFalse;
+ task->stat = NoStatus;
+ task->ret = NULL;
+ task->n_spare_incalls = 0;
+ task->spare_incalls = NULL;
+ task->incall = NULL;
#if defined(THREADED_RTS)
initCondition(&task->cond);
@@ -133,18 +176,65 @@ newTask (void)
task->elapsedtimestart = currentElapsedTime;
#endif
- task->prev = NULL;
task->next = NULL;
- task->return_link = NULL;
+
+ ACQUIRE_LOCK(&sched_mutex);
task->all_link = all_tasks;
all_tasks = task;
taskCount++;
+ RELEASE_LOCK(&sched_mutex);
+
return task;
}
+// avoid the spare_incalls list growing unboundedly
+#define MAX_SPARE_INCALLS 8
+
+static void
+newInCall (Task *task)
+{
+ InCall *incall;
+
+ if (task->spare_incalls != NULL) {
+ incall = task->spare_incalls;
+ task->spare_incalls = incall->next;
+ task->n_spare_incalls--;
+ } else {
+ incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
+ }
+
+ incall->tso = NULL;
+ incall->task = task;
+ incall->suspended_tso = NULL;
+ incall->suspended_cap = NULL;
+ incall->next = NULL;
+ incall->prev = NULL;
+ incall->prev_stack = task->incall;
+ task->incall = incall;
+}
+
+static void
+endInCall (Task *task)
+{
+ InCall *incall;
+
+ incall = task->incall;
+ incall->tso = NULL;
+ task->incall = task->incall->prev_stack;
+
+ if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
+ stgFree(incall);
+ } else {
+ incall->next = task->spare_incalls;
+ task->spare_incalls = incall;
+ task->n_spare_incalls++;
+ }
+}
+
+
Task *
newBoundTask (void)
{
@@ -155,31 +245,11 @@ newBoundTask (void)
stg_exit(EXIT_FAILURE);
}
- // ToDo: get rid of this lock in the common case. We could store
- // a free Task in thread-local storage, for example. That would
- // leave just one lock on the path into the RTS: cap->lock when
- // acquiring the Capability.
- ACQUIRE_LOCK(&sched_mutex);
-
- if (task_free_list == NULL) {
- task = newTask();
- } else {
- task = task_free_list;
- task_free_list = task->next;
- task->next = NULL;
- task->prev = NULL;
- task->stopped = rtsFalse;
- }
-#if defined(THREADED_RTS)
- task->id = osThreadId();
-#endif
- ASSERT(task->cap == NULL);
-
- tasksRunning++;
+ task = allocTask();
- taskEnter(task);
+ task->stopped = rtsFalse;
- RELEASE_LOCK(&sched_mutex);
+ newInCall(task);
debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
return task;
@@ -188,27 +258,19 @@ newBoundTask (void)
void
boundTaskExiting (Task *task)
{
- task->tso = NULL;
task->stopped = rtsTrue;
- task->cap = NULL;
#if defined(THREADED_RTS)
ASSERT(osThreadId() == task->id);
#endif
ASSERT(myTask() == task);
- setMyTask(task->prev_stack);
-
- tasksRunning--;
- // sadly, we need a lock around the free task list. Todo: eliminate.
- ACQUIRE_LOCK(&sched_mutex);
- task->next = task_free_list;
- task_free_list = task;
- RELEASE_LOCK(&sched_mutex);
+ endInCall(task);
debugTrace(DEBUG_sched, "task exiting");
}
+
#ifdef THREADED_RTS
#define TASK_ID(t) (t)->id
#else
@@ -216,22 +278,20 @@ boundTaskExiting (Task *task)
#endif
void
-discardTask (Task *task)
+discardTasksExcept (Task *keep)
{
- ASSERT_LOCK_HELD(&sched_mutex);
- if (!task->stopped) {
- debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
- task->cap = NULL;
- if (task->tso == NULL) {
- workerCount--;
- } else {
- task->tso = NULL;
+ Task *task;
+
+ // Wipe the task list, except the current Task.
+ ACQUIRE_LOCK(&sched_mutex);
+ for (task = all_tasks; task != NULL; task=task->all_link) {
+ if (task != keep) {
+ debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
+ freeTask(task);
}
- task->stopped = rtsTrue;
- tasksRunning--;
- task->next = task_free_list;
- task_free_list = task;
}
+ all_tasks = keep;
+ RELEASE_LOCK(&sched_mutex);
}
void
@@ -270,33 +330,43 @@ workerTaskStop (Task *task)
task->cap = NULL;
taskTimeStamp(task);
task->stopped = rtsTrue;
- tasksRunning--;
- workerCount--;
-
- ACQUIRE_LOCK(&sched_mutex);
- task->next = task_free_list;
- task_free_list = task;
- RELEASE_LOCK(&sched_mutex);
}
#endif
#if defined(THREADED_RTS)
+static void OSThreadProcAttr
+workerStart(Task *task)
+{
+ Capability *cap;
+
+ // See startWorkerTask().
+ ACQUIRE_LOCK(&task->lock);
+ cap = task->cap;
+ RELEASE_LOCK(&task->lock);
+
+ if (RtsFlags.ParFlags.setAffinity) {
+ setThreadAffinity(cap->no, n_capabilities);
+ }
+
+ // set the thread-local pointer to the Task:
+ setMyTask(task);
+
+ newInCall(task);
+
+ scheduleWorker(cap,task);
+}
+
void
-startWorkerTask (Capability *cap,
- void OSThreadProcAttr (*taskStart)(Task *task))
+startWorkerTask (Capability *cap)
{
int r;
OSThreadId tid;
Task *task;
- workerCount++;
-
// A worker always gets a fresh Task structure.
- task = newTask();
-
- tasksRunning++;
+ task = newTask(rtsTrue);
// The lock here is to synchronise with taskStart(), to make sure
// that we have finished setting up the Task structure before the
@@ -311,7 +381,7 @@ startWorkerTask (Capability *cap,
ASSERT_LOCK_HELD(&cap->lock);
cap->running_task = task;
- r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
+ r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
if (r != 0) {
sysErrorBelch("failed to create OS thread");
stg_exit(EXIT_FAILURE);
@@ -350,8 +420,9 @@ printAllTasks(void)
if (task->cap) {
debugBelch("on capability %d, ", task->cap->no);
}
- if (task->tso) {
- debugBelch("bound to thread %lu", (unsigned long)task->tso->id);
+ if (task->incall->tso) {
+ debugBelch("bound to thread %lu",
+ (unsigned long)task->incall->tso->id);
} else {
debugBelch("worker");
}