diff options
author | Simon Marlow <marlowsd@gmail.com> | 2011-12-06 15:12:07 +0000 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2011-12-06 16:00:27 +0000 |
commit | 92e7d6c92fdd14de424524564376d3522f2a40cc (patch) | |
tree | 5715d44012b452f5020ca14331a1fe50d5fd9600 /rts/Schedule.c | |
parent | 8b75acd3ca25165536f18976c8d80cb62ad613e4 (diff) | |
download | haskell-92e7d6c92fdd14de424524564376d3522f2a40cc.tar.gz |
Allow the number of capabilities to be increased at runtime (#3729)
At present the number of capabilities can only be *increased*, not
decreased. The latter presents a few more challenges!
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r-- | rts/Schedule.c | 172 |
1 files changed, 143 insertions, 29 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c index 70f6a3fc00..13c886a071 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -134,6 +134,8 @@ static void scheduleYield (Capability **pcap, Task *task); #if defined(THREADED_RTS) static nat requestSync (Capability **pcap, Task *task, nat sync_type); static void acquireAllCapabilities(Capability *cap, Task *task); +static void releaseAllCapabilities(Capability *cap, Task *task); +static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); @@ -1359,7 +1361,7 @@ static nat requestSync (Capability **pcap, Task *task, nat sync_type) // // Grab all the capabilities except the one we already hold. Used // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and -// before a fork (SYNC_FORK). +// before a fork (SYNC_OTHER). // // Only call this after requestSync(), otherwise a deadlock might // ensue if another thread is trying to synchronise. @@ -1380,13 +1382,26 @@ static void acquireAllCapabilities(Capability *cap, Task *task) // unsavoury invariant. task->cap = tmpcap; waitForReturnCapability(&tmpcap, task); - if (tmpcap != &capabilities[i]) { + if (tmpcap->no != i) { barf("acquireAllCapabilities: got the wrong capability"); } } } + task->cap = cap; } +static void releaseAllCapabilities(Capability *cap, Task *task) +{ + nat i; + + for (i = 0; i < n_capabilities; i++) { + if (cap->no != i) { + task->cap = &capabilities[i]; + releaseCapability(&capabilities[i]); + } + } + task->cap = cap; +} #endif /* ----------------------------------------------------------------------------- @@ -1581,17 +1596,7 @@ delete_threads_and_gc: #if defined(THREADED_RTS) if (gc_type == SYNC_GC_SEQ) { // release our stash of capabilities. - for (i = 0; i < n_capabilities; i++) { - if (cap != &capabilities[i]) { - task->cap = &capabilities[i]; - releaseCapability(&capabilities[i]); - } - } - } - if (cap) { - task->cap = cap; - } else { - task->cap = NULL; + releaseAllCapabilities(cap, task); } #endif @@ -1629,7 +1634,7 @@ forkProcess(HsStablePtr *entry #ifdef THREADED_RTS do { - sync = requestSync(&cap, task, SYNC_FORK); + sync = requestSync(&cap, task, SYNC_OTHER); } while (sync); acquireAllCapabilities(cap,task); @@ -1780,6 +1785,105 @@ forkProcess(HsStablePtr *entry } /* --------------------------------------------------------------------------- + * Increase the number of Capabilities + * + * Changing the number of Capabilities is very tricky! We can only do + * it with the system fully stopped, so we do a full sync with + * requestSync(SYNC_OTHER) and grab all the capabilities. + * + * Then we resize the appropriate data structures, and update all + * references to the old data structures which have now moved. + * Finally we release the Capabilities we are holding, and start + * worker Tasks on the new Capabilities we created. + * + * ------------------------------------------------------------------------- */ + +void +setNumCapabilities (nat new_n_capabilities USED_IF_THREADS) +{ +#if !defined(THREADED_RTS) + + barf("setNumCapabilities: not supported in the non-threaded RTS"); + +#else + Task *task; + Capability *cap; + nat sync; + StgTSO* t; + nat g; + Capability *old_capabilities; + + if (new_n_capabilities == n_capabilities) return; + + if (new_n_capabilities < n_capabilities) { + barf("setNumCapabilities: reducing the number of Capabilities is not currently supported."); + } + + debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d", + n_capabilities, new_n_capabilities); + + cap = rts_lock(); + task = cap->running_task; + + do { + sync = requestSync(&cap, task, SYNC_OTHER); + } while (sync); + + acquireAllCapabilities(cap,task); + + pending_sync = 0; + +#if defined(TRACING) + // Allocate eventlog buffers for the new capabilities. Note this + // must be done before calling moreCapabilities(), because that + // will emit events to add the new capabilities to capsets. + tracingAddCapapilities(n_capabilities, new_n_capabilities); +#endif + + // Resize the capabilities array + // NB. after this, capabilities points somewhere new. Any pointers + // of type (Capability *) are now invalid. + old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities); + + // update our own cap pointer + cap = &capabilities[cap->no]; + + // Resize and update storage manager data structures + storageAddCapabilities(n_capabilities, new_n_capabilities); + + // Update (Capability *) refs in the Task manager. + updateCapabilityRefs(); + + // Update (Capability *) refs from TSOs + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) { + t->cap = &capabilities[t->cap->no]; + } + } + + // We're done: release the original Capabilities + releaseAllCapabilities(cap,task); + + // Start worker tasks on the new Capabilities + startWorkerTasks(n_capabilities, new_n_capabilities); + + // finally, update n_capabilities + n_capabilities = new_n_capabilities; + + // We can't free the old array until now, because we access it + // while updating pointers in updateCapabilityRefs(). + if (old_capabilities) { + stgFree(old_capabilities); + } + + rts_unlock(cap); + +#endif // THREADED_RTS +} + + + +/* --------------------------------------------------------------------------- * Delete all the threads in the system * ------------------------------------------------------------------------- */ @@ -2010,7 +2114,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't // move this thread from now on. #if defined(THREADED_RTS) - cpu %= RtsFlags.ParFlags.nNodes; + cpu %= n_capabilities; if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { @@ -2086,6 +2190,26 @@ void scheduleWorker (Capability *cap, Task *task) #endif /* --------------------------------------------------------------------------- + * Start new worker tasks on Capabilities from--to + * -------------------------------------------------------------------------- */ + +static void +startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS) +{ +#if defined(THREADED_RTS) + nat i; + Capability *cap; + + for (i = from; i < to; i++) { + cap = &capabilities[i]; + ACQUIRE_LOCK(&cap->lock); + startWorkerTask(cap); + RELEASE_LOCK(&cap->lock); + } +#endif +} + +/* --------------------------------------------------------------------------- * initScheduler() * * Initialise the scheduler. This resets all the queues - if the @@ -2122,26 +2246,16 @@ initScheduler(void) initTaskManager(); - RELEASE_LOCK(&sched_mutex); - -#if defined(THREADED_RTS) /* * Eagerly start one worker to run each Capability, except for * Capability 0. The idea is that we're probably going to start a * bound thread on Capability 0 pretty soon, so we don't want a * worker task hogging it. */ - { - nat i; - Capability *cap; - for (i = 1; i < n_capabilities; i++) { - cap = &capabilities[i]; - ACQUIRE_LOCK(&cap->lock); - startWorkerTask(cap); - RELEASE_LOCK(&cap->lock); - } - } -#endif + startWorkerTasks(1, n_capabilities); + + RELEASE_LOCK(&sched_mutex); + } void |