summaryrefslogtreecommitdiff
path: root/rts/Schedule.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2011-12-06 11:38:07 +0000
committerSimon Marlow <marlowsd@gmail.com>2011-12-06 15:19:18 +0000
commit8b75acd3ca25165536f18976c8d80cb62ad613e4 (patch)
treeccb87f6f5df2af15ca2ca8f65e5163b1f34886b8 /rts/Schedule.c
parent657773c8e59917fda05ee08065ec566aebb50a5f (diff)
downloadhaskell-8b75acd3ca25165536f18976c8d80cb62ad613e4.tar.gz
Make forkProcess work with +RTS -N
Consider this experimental for the time being. There are a lot of things that could go wrong, but I've verified that at least it works on the test cases we have. I also did some API cleanups while I was here. Previously we had: Capability * rts_eval (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret); but this API is particularly error-prone: if you forget to discard the Capability * you passed in and use the return value instead, then you're in for subtle bugs with +RTS -N later on. So I changed all these functions to this form: void rts_eval (/* inout */ Capability **cap, /* in */ HaskellObj p, /* out */ HaskellObj *ret) It's much harder to use this version incorrectly, because you have to pass the Capability in by reference.
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r--rts/Schedule.c268
1 files changed, 182 insertions, 86 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c
index cd704d2871..70f6a3fc00 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -40,6 +40,7 @@
#include "Timer.h"
#include "ThreadPaused.h"
#include "Messages.h"
+#include "Stable.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
@@ -130,6 +131,10 @@ static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
static void scheduleYield (Capability **pcap, Task *task);
#endif
+#if defined(THREADED_RTS)
+static nat requestSync (Capability **pcap, Task *task, nat sync_type);
+static void acquireAllCapabilities(Capability *cap, Task *task);
+#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleProcessInbox(Capability *cap);
@@ -617,7 +622,7 @@ shouldYieldCapability (Capability *cap, Task *task)
// - the thread at the head of the run queue cannot be run
// by this Task (it is bound to another Task, or it is unbound
// and this task it bound).
- return (waiting_for_gc ||
+ return (pending_sync ||
cap->returning_tasks_hd != NULL ||
(!emptyRunQueue(cap) && (task->incall->tso == NULL
? cap->run_queue_hd->bound != NULL
@@ -1319,6 +1324,72 @@ scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
}
/* -----------------------------------------------------------------------------
+ * Start a synchronisation of all capabilities
+ * -------------------------------------------------------------------------- */
+
+// Returns:
+// 0 if we successfully got a sync
+// non-0 if there was another sync request in progress,
+// and we yielded to it. The value returned is the
+// type of the other sync request.
+//
+#if defined(THREADED_RTS)
+static nat requestSync (Capability **pcap, Task *task, nat sync_type)
+{
+ nat prev_pending_sync;
+
+ prev_pending_sync = cas(&pending_sync, 0, sync_type);
+
+ if (prev_pending_sync)
+ {
+ do {
+ debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
+ prev_pending_sync);
+ ASSERT(*pcap);
+ yieldCapability(pcap,task);
+ } while (pending_sync);
+ return prev_pending_sync; // NOTE: task->cap might have changed now
+ }
+ else
+ {
+ return 0;
+ }
+}
+
+//
+// Grab all the capabilities except the one we already hold. Used
+// when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
+// before a fork (SYNC_FORK).
+//
+// Only call this after requestSync(), otherwise a deadlock might
+// ensue if another thread is trying to synchronise.
+//
+static void acquireAllCapabilities(Capability *cap, Task *task)
+{
+ Capability *tmpcap;
+ nat i;
+
+ for (i=0; i < n_capabilities; i++) {
+ debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
+ tmpcap = &capabilities[i];
+ if (tmpcap != cap) {
+ // we better hope this task doesn't get migrated to
+ // another Capability while we're waiting for this one.
+ // It won't, because load balancing happens while we have
+ // all the Capabilities, but even so it's a slightly
+ // unsavoury invariant.
+ task->cap = tmpcap;
+ waitForReturnCapability(&tmpcap, task);
+ if (tmpcap != &capabilities[i]) {
+ barf("acquireAllCapabilities: got the wrong capability");
+ }
+ }
+ }
+}
+
+#endif
+
+/* -----------------------------------------------------------------------------
* Perform a garbage collection if necessary
* -------------------------------------------------------------------------- */
@@ -1327,10 +1398,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
{
rtsBool heap_census;
#ifdef THREADED_RTS
- /* extern static volatile StgWord waiting_for_gc;
- lives inside capability.c */
- rtsBool gc_type, prev_pending_gc;
- nat i;
+ rtsBool gc_type;
+ nat i, sync;
#endif
if (sched_state == SCHED_SHUTTING_DOWN) {
@@ -1346,9 +1415,9 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
&& N >= RtsFlags.ParFlags.parGcGen
&& ! oldest_gen->mark)
{
- gc_type = PENDING_GC_PAR;
+ gc_type = SYNC_GC_PAR;
} else {
- gc_type = PENDING_GC_SEQ;
+ gc_type = SYNC_GC_SEQ;
}
// In order to GC, there must be no threads running Haskell code.
@@ -1363,26 +1432,25 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
//
/* Other capabilities are prevented from running yet more Haskell
- threads if waiting_for_gc is set. Tested inside
+ threads if pending_sync is set. Tested inside
yieldCapability() and releaseCapability() in Capability.c */
- prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
- if (prev_pending_gc) {
- do {
- debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...",
- prev_pending_gc);
- ASSERT(cap);
- yieldCapability(&cap,task);
- } while (waiting_for_gc);
- return cap; // NOTE: task->cap might have changed here
- }
+ do {
+ sync = requestSync(&cap, task, gc_type);
+ if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
+ // someone else had a pending sync request for a GC, so
+ // let's assume GC has been done and we don't need to GC
+ // again.
+ return cap;
+ }
+ } while (sync);
interruptAllCapabilities();
// The final shutdown GC is always single-threaded, because it's
// possible that some of the Capabilities have no worker threads.
- if (gc_type == PENDING_GC_SEQ)
+ if (gc_type == SYNC_GC_SEQ)
{
traceEventRequestSeqGc(cap);
}
@@ -1392,25 +1460,10 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
}
- if (gc_type == PENDING_GC_SEQ)
+ if (gc_type == SYNC_GC_SEQ)
{
// single-threaded GC: grab all the capabilities
- for (i=0; i < n_capabilities; i++) {
- debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
- if (cap != &capabilities[i]) {
- Capability *pcap = &capabilities[i];
- // we better hope this task doesn't get migrated to
- // another Capability while we're waiting for this one.
- // It won't, because load balancing happens while we have
- // all the Capabilities, but even so it's a slightly
- // unsavoury invariant.
- task->cap = pcap;
- waitForReturnCapability(&pcap, task);
- if (pcap != &capabilities[i]) {
- barf("scheduleDoGC: got the wrong capability");
- }
- }
- }
+ acquireAllCapabilities(cap,task);
}
else
{
@@ -1455,9 +1508,9 @@ delete_threads_and_gc:
traceEventGcStart(cap);
#if defined(THREADED_RTS)
- // reset waiting_for_gc *before* GC, so that when the GC threads
+ // reset pending_sync *before* GC, so that when the GC threads
// emerge they don't immediately re-enter the GC.
- waiting_for_gc = 0;
+ pending_sync = 0;
GarbageCollect(force_major || heap_census, heap_census, gc_type, cap);
#else
GarbageCollect(force_major || heap_census, heap_census, 0, cap);
@@ -1494,7 +1547,7 @@ delete_threads_and_gc:
}
#if defined(THREADED_RTS)
- if (gc_type == PENDING_GC_PAR)
+ if (gc_type == SYNC_GC_PAR)
{
releaseGCThreads(cap);
}
@@ -1526,7 +1579,7 @@ delete_threads_and_gc:
#endif
#if defined(THREADED_RTS)
- if (gc_type == PENDING_GC_SEQ) {
+ if (gc_type == SYNC_GC_SEQ) {
// release our stash of capabilities.
for (i = 0; i < n_capabilities; i++) {
if (cap != &capabilities[i]) {
@@ -1561,26 +1614,41 @@ forkProcess(HsStablePtr *entry
StgTSO* t,*next;
Capability *cap;
nat g;
-
-#if defined(THREADED_RTS)
- if (RtsFlags.ParFlags.nNodes > 1) {
- errorBelch("forking not supported with +RTS -N<n> greater than 1");
- stg_exit(EXIT_FAILURE);
- }
+ Task *task = NULL;
+ nat i;
+#ifdef THREADED_RTS
+ nat sync;
#endif
debugTrace(DEBUG_sched, "forking!");
- // ToDo: for SMP, we should probably acquire *all* the capabilities
- cap = rts_lock();
-
+ task = newBoundTask();
+
+ cap = NULL;
+ waitForReturnCapability(&cap, task);
+
+#ifdef THREADED_RTS
+ do {
+ sync = requestSync(&cap, task, SYNC_FORK);
+ } while (sync);
+
+ acquireAllCapabilities(cap,task);
+
+ pending_sync = 0;
+#endif
+
// no funny business: hold locks while we fork, otherwise if some
// other thread is holding a lock when the fork happens, the data
// structure protected by the lock will forever be in an
// inconsistent state in the child. See also #1391.
ACQUIRE_LOCK(&sched_mutex);
- ACQUIRE_LOCK(&cap->lock);
- ACQUIRE_LOCK(&cap->running_task->lock);
+ ACQUIRE_LOCK(&sm_mutex);
+ ACQUIRE_LOCK(&stable_mutex);
+ ACQUIRE_LOCK(&task->lock);
+
+ for (i=0; i < n_capabilities; i++) {
+ ACQUIRE_LOCK(&capabilities[i].lock);
+ }
stopTimer(); // See #4074
@@ -1595,19 +1663,30 @@ forkProcess(HsStablePtr *entry
startTimer(); // #4074
RELEASE_LOCK(&sched_mutex);
- RELEASE_LOCK(&cap->lock);
- RELEASE_LOCK(&cap->running_task->lock);
+ RELEASE_LOCK(&sm_mutex);
+ RELEASE_LOCK(&stable_mutex);
+ RELEASE_LOCK(&task->lock);
+
+ for (i=0; i < n_capabilities; i++) {
+ releaseCapability_(&capabilities[i],rtsFalse);
+ RELEASE_LOCK(&capabilities[i].lock);
+ }
+ boundTaskExiting(task);
// just return the pid
- rts_unlock(cap);
- return pid;
+ return pid;
} else { // child
#if defined(THREADED_RTS)
initMutex(&sched_mutex);
- initMutex(&cap->lock);
- initMutex(&cap->running_task->lock);
+ initMutex(&sm_mutex);
+ initMutex(&stable_mutex);
+ initMutex(&task->lock);
+
+ for (i=0; i < n_capabilities; i++) {
+ initMutex(&capabilities[i].lock);
+ }
#endif
#ifdef TRACING
@@ -1626,7 +1705,7 @@ forkProcess(HsStablePtr *entry
// don't allow threads to catch the ThreadKilled
// exception, but we do want to raiseAsync() because these
// threads may be evaluating thunks that we need later.
- deleteThread_(cap,t);
+ deleteThread_(t->cap,t);
// stop the GC from updating the InCall to point to
// the TSO. This is only necessary because the
@@ -1637,44 +1716,58 @@ forkProcess(HsStablePtr *entry
}
}
- // Empty the run queue. It seems tempting to let all the
- // killed threads stay on the run queue as zombies to be
- // cleaned up later, but some of them correspond to bound
- // threads for which the corresponding Task does not exist.
- cap->run_queue_hd = END_TSO_QUEUE;
- cap->run_queue_tl = END_TSO_QUEUE;
-
- // Any suspended C-calling Tasks are no more, their OS threads
- // don't exist now:
- cap->suspended_ccalls = NULL;
-
- // Empty the threads lists. Otherwise, the garbage
+ discardTasksExcept(task);
+
+ for (i=0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+
+ // Empty the run queue. It seems tempting to let all the
+ // killed threads stay on the run queue as zombies to be
+ // cleaned up later, but some of them may correspond to
+ // bound threads for which the corresponding Task does not
+ // exist.
+ cap->run_queue_hd = END_TSO_QUEUE;
+ cap->run_queue_tl = END_TSO_QUEUE;
+
+ // Any suspended C-calling Tasks are no more, their OS threads
+ // don't exist now:
+ cap->suspended_ccalls = NULL;
+
+#if defined(THREADED_RTS)
+ // Wipe our spare workers list, they no longer exist. New
+ // workers will be created if necessary.
+ cap->spare_workers = NULL;
+ cap->n_spare_workers = 0;
+ cap->returning_tasks_hd = NULL;
+ cap->returning_tasks_tl = NULL;
+#endif
+
+ // Release all caps except 0, we'll use that for starting
+ // the IO manager and running the client action below.
+ if (cap->no != 0) {
+ task->cap = cap;
+ releaseCapability(cap);
+ }
+ }
+ cap = &capabilities[0];
+ task->cap = cap;
+
+ // Empty the threads lists. Otherwise, the garbage
// collector may attempt to resurrect some of these threads.
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
generations[g].threads = END_TSO_QUEUE;
}
- discardTasksExcept(cap->running_task);
-
-#if defined(THREADED_RTS)
- // Wipe our spare workers list, they no longer exist. New
- // workers will be created if necessary.
- cap->spare_workers = NULL;
- cap->n_spare_workers = 0;
- cap->returning_tasks_hd = NULL;
- cap->returning_tasks_tl = NULL;
-#endif
-
// On Unix, all timers are reset in the child, so we need to start
// the timer again.
initTimer();
startTimer();
#if defined(THREADED_RTS)
- cap = ioManagerStartCap(cap);
+ ioManagerStartCap(&cap);
#endif
- cap = rts_evalStableIO(cap, entry, NULL); // run the action
+ rts_evalStableIO(&cap, entry, NULL); // run the action
rts_checkSchedStatus("forkProcess",cap);
rts_unlock(cap);
@@ -1928,11 +2021,14 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
#endif
}
-Capability *
-scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
+void
+scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
{
Task *task;
DEBUG_ONLY( StgThreadID id );
+ Capability *cap;
+
+ cap = *pcap;
// We already created/initialised the Task
task = cap->running_task;
@@ -1957,7 +2053,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
- return cap;
+ *pcap = cap;
}
/* ----------------------------------------------------------------------------