summaryrefslogtreecommitdiff
path: root/rts/Schedule.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2011-12-06 15:12:07 +0000
committerSimon Marlow <marlowsd@gmail.com>2011-12-06 16:00:27 +0000
commit92e7d6c92fdd14de424524564376d3522f2a40cc (patch)
tree5715d44012b452f5020ca14331a1fe50d5fd9600 /rts/Schedule.c
parent8b75acd3ca25165536f18976c8d80cb62ad613e4 (diff)
downloadhaskell-92e7d6c92fdd14de424524564376d3522f2a40cc.tar.gz
Allow the number of capabilities to be increased at runtime (#3729)
At present the number of capabilities can only be *increased*, not decreased. The latter presents a few more challenges!
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r--rts/Schedule.c172
1 files changed, 143 insertions, 29 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 70f6a3fc00..13c886a071 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -134,6 +134,8 @@ static void scheduleYield (Capability **pcap, Task *task);
#if defined(THREADED_RTS)
static nat requestSync (Capability **pcap, Task *task, nat sync_type);
static void acquireAllCapabilities(Capability *cap, Task *task);
+static void releaseAllCapabilities(Capability *cap, Task *task);
+static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
@@ -1359,7 +1361,7 @@ static nat requestSync (Capability **pcap, Task *task, nat sync_type)
//
// Grab all the capabilities except the one we already hold. Used
// when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
-// before a fork (SYNC_FORK).
+// before a fork (SYNC_OTHER).
//
// Only call this after requestSync(), otherwise a deadlock might
// ensue if another thread is trying to synchronise.
@@ -1380,13 +1382,26 @@ static void acquireAllCapabilities(Capability *cap, Task *task)
// unsavoury invariant.
task->cap = tmpcap;
waitForReturnCapability(&tmpcap, task);
- if (tmpcap != &capabilities[i]) {
+ if (tmpcap->no != i) {
barf("acquireAllCapabilities: got the wrong capability");
}
}
}
+ task->cap = cap;
}
+static void releaseAllCapabilities(Capability *cap, Task *task)
+{
+ nat i;
+
+ for (i = 0; i < n_capabilities; i++) {
+ if (cap->no != i) {
+ task->cap = &capabilities[i];
+ releaseCapability(&capabilities[i]);
+ }
+ }
+ task->cap = cap;
+}
#endif
/* -----------------------------------------------------------------------------
@@ -1581,17 +1596,7 @@ delete_threads_and_gc:
#if defined(THREADED_RTS)
if (gc_type == SYNC_GC_SEQ) {
// release our stash of capabilities.
- for (i = 0; i < n_capabilities; i++) {
- if (cap != &capabilities[i]) {
- task->cap = &capabilities[i];
- releaseCapability(&capabilities[i]);
- }
- }
- }
- if (cap) {
- task->cap = cap;
- } else {
- task->cap = NULL;
+ releaseAllCapabilities(cap, task);
}
#endif
@@ -1629,7 +1634,7 @@ forkProcess(HsStablePtr *entry
#ifdef THREADED_RTS
do {
- sync = requestSync(&cap, task, SYNC_FORK);
+ sync = requestSync(&cap, task, SYNC_OTHER);
} while (sync);
acquireAllCapabilities(cap,task);
@@ -1780,6 +1785,105 @@ forkProcess(HsStablePtr *entry
}
/* ---------------------------------------------------------------------------
+ * Increase the number of Capabilities
+ *
+ * Changing the number of Capabilities is very tricky! We can only do
+ * it with the system fully stopped, so we do a full sync with
+ * requestSync(SYNC_OTHER) and grab all the capabilities.
+ *
+ * Then we resize the appropriate data structures, and update all
+ * references to the old data structures which have now moved.
+ * Finally we release the Capabilities we are holding, and start
+ * worker Tasks on the new Capabilities we created.
+ *
+ * ------------------------------------------------------------------------- */
+
+void
+setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
+{
+#if !defined(THREADED_RTS)
+
+ barf("setNumCapabilities: not supported in the non-threaded RTS");
+
+#else
+ Task *task;
+ Capability *cap;
+ nat sync;
+ StgTSO* t;
+ nat g;
+ Capability *old_capabilities;
+
+ if (new_n_capabilities == n_capabilities) return;
+
+ if (new_n_capabilities < n_capabilities) {
+ barf("setNumCapabilities: reducing the number of Capabilities is not currently supported.");
+ }
+
+ debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
+ n_capabilities, new_n_capabilities);
+
+ cap = rts_lock();
+ task = cap->running_task;
+
+ do {
+ sync = requestSync(&cap, task, SYNC_OTHER);
+ } while (sync);
+
+ acquireAllCapabilities(cap,task);
+
+ pending_sync = 0;
+
+#if defined(TRACING)
+ // Allocate eventlog buffers for the new capabilities. Note this
+ // must be done before calling moreCapabilities(), because that
+ // will emit events to add the new capabilities to capsets.
+ tracingAddCapapilities(n_capabilities, new_n_capabilities);
+#endif
+
+ // Resize the capabilities array
+ // NB. after this, capabilities points somewhere new. Any pointers
+ // of type (Capability *) are now invalid.
+ old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
+
+ // update our own cap pointer
+ cap = &capabilities[cap->no];
+
+ // Resize and update storage manager data structures
+ storageAddCapabilities(n_capabilities, new_n_capabilities);
+
+ // Update (Capability *) refs in the Task manager.
+ updateCapabilityRefs();
+
+ // Update (Capability *) refs from TSOs
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
+ t->cap = &capabilities[t->cap->no];
+ }
+ }
+
+ // We're done: release the original Capabilities
+ releaseAllCapabilities(cap,task);
+
+ // Start worker tasks on the new Capabilities
+ startWorkerTasks(n_capabilities, new_n_capabilities);
+
+ // finally, update n_capabilities
+ n_capabilities = new_n_capabilities;
+
+ // We can't free the old array until now, because we access it
+ // while updating pointers in updateCapabilityRefs().
+ if (old_capabilities) {
+ stgFree(old_capabilities);
+ }
+
+ rts_unlock(cap);
+
+#endif // THREADED_RTS
+}
+
+
+
+/* ---------------------------------------------------------------------------
* Delete all the threads in the system
* ------------------------------------------------------------------------- */
@@ -2010,7 +2114,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
// move this thread from now on.
#if defined(THREADED_RTS)
- cpu %= RtsFlags.ParFlags.nNodes;
+ cpu %= n_capabilities;
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
@@ -2086,6 +2190,26 @@ void scheduleWorker (Capability *cap, Task *task)
#endif
/* ---------------------------------------------------------------------------
+ * Start new worker tasks on Capabilities from--to
+ * -------------------------------------------------------------------------- */
+
+static void
+startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ nat i;
+ Capability *cap;
+
+ for (i = from; i < to; i++) {
+ cap = &capabilities[i];
+ ACQUIRE_LOCK(&cap->lock);
+ startWorkerTask(cap);
+ RELEASE_LOCK(&cap->lock);
+ }
+#endif
+}
+
+/* ---------------------------------------------------------------------------
* initScheduler()
*
* Initialise the scheduler. This resets all the queues - if the
@@ -2122,26 +2246,16 @@ initScheduler(void)
initTaskManager();
- RELEASE_LOCK(&sched_mutex);
-
-#if defined(THREADED_RTS)
/*
* Eagerly start one worker to run each Capability, except for
* Capability 0. The idea is that we're probably going to start a
* bound thread on Capability 0 pretty soon, so we don't want a
* worker task hogging it.
*/
- {
- nat i;
- Capability *cap;
- for (i = 1; i < n_capabilities; i++) {
- cap = &capabilities[i];
- ACQUIRE_LOCK(&cap->lock);
- startWorkerTask(cap);
- RELEASE_LOCK(&cap->lock);
- }
- }
-#endif
+ startWorkerTasks(1, n_capabilities);
+
+ RELEASE_LOCK(&sched_mutex);
+
}
void