summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2011-12-06 15:12:07 +0000
committerSimon Marlow <marlowsd@gmail.com>2011-12-06 16:00:27 +0000
commit92e7d6c92fdd14de424524564376d3522f2a40cc (patch)
tree5715d44012b452f5020ca14331a1fe50d5fd9600
parent8b75acd3ca25165536f18976c8d80cb62ad613e4 (diff)
downloadhaskell-92e7d6c92fdd14de424524564376d3522f2a40cc.tar.gz
Allow the number of capabilities to be increased at runtime (#3729)
At present the number of capabilities can only be *increased*, not decreased. The latter presents a few more challenges!
-rw-r--r--docs/users_guide/using.xml14
-rw-r--r--includes/rts/Threads.h10
-rw-r--r--rts/Capability.c66
-rw-r--r--rts/Capability.h6
-rw-r--r--rts/Linker.c1
-rw-r--r--rts/Schedule.c172
-rw-r--r--rts/Stats.c2
-rw-r--r--rts/Task.c28
-rw-r--r--rts/Task.h5
-rw-r--r--rts/Trace.c7
-rw-r--r--rts/Trace.h1
-rw-r--r--rts/eventlog/EventLog.c27
-rw-r--r--rts/eventlog/EventLog.h1
-rw-r--r--rts/sm/GC.c50
-rw-r--r--rts/sm/GC.h2
-rw-r--r--rts/sm/Storage.c70
-rw-r--r--rts/sm/Storage.h4
17 files changed, 358 insertions, 108 deletions
diff --git a/docs/users_guide/using.xml b/docs/users_guide/using.xml
index 5bd6f11148..441ad5914b 100644
--- a/docs/users_guide/using.xml
+++ b/docs/users_guide/using.xml
@@ -2112,8 +2112,10 @@ f "2" = 2
<sect2 id="parallel-options">
<title>RTS options for SMP parallelism</title>
- <para>To run a program on multiple CPUs, use the
- RTS <option>-N</option> option:</para>
+ <para>There are two ways to run a program on multiple
+ processors:
+ call <literal>GHC.Conc.setNumCapabilities</literal> from your
+ program, or use the RTS <option>-N</option> option.</para>
<variablelist>
<varlistentry>
@@ -2148,7 +2150,13 @@ f "2" = 2
<para>The current value of the <option>-N</option> option
is available to the Haskell program
- via <literal>GHC.Conc.numCapabilities</literal>.</para>
+ via <literal>GHC.Conc.getNumCapabilities</literal>, and
+ it may be changed while the program is running by
+ calling <literal>GHC.Conc.setNumCapabilities</literal>.
+ Note: in the current implementation,
+ the <option>-N</option> value may only
+ be <emphasis>increased</emphasis>, not decreased, by
+ calling <literal>GHC.Conc.setNumCapabilities</literal>.</para>
</listitem>
</varlistentry>
</variablelist>
diff --git a/includes/rts/Threads.h b/includes/rts/Threads.h
index d2c4aff984..061e38b32b 100644
--- a/includes/rts/Threads.h
+++ b/includes/rts/Threads.h
@@ -57,4 +57,14 @@ extern unsigned int n_capabilities;
extern Capability MainCapability;
#endif
+//
+// Change the number of capabilities (only supports increasing the
+// current value at the moment).
+//
+#if defined(THREADED_RTS)
+extern void setNumCapabilities (nat new);
+#else
+extern void setNumCapabilities (nat new) GNU_ATTRIBUTE(__noreturn__);
+#endif
+
#endif /* RTS_THREADS_H */
diff --git a/rts/Capability.c b/rts/Capability.c
index 4d23f71a86..7ce23a12e6 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -27,6 +27,8 @@
#include "STM.h"
#include "RtsUtils.h"
+#include <string.h>
+
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
Capability MainCapability;
@@ -299,7 +301,6 @@ initCapabilities( void )
traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
#if defined(THREADED_RTS)
- nat i;
#ifndef REG_Base
// We can't support multiple CPUs if BaseReg is not a register
@@ -309,24 +310,10 @@ initCapabilities( void )
}
#endif
+ n_capabilities = 0;
+ moreCapabilities(0, RtsFlags.ParFlags.nNodes);
n_capabilities = RtsFlags.ParFlags.nNodes;
- if (n_capabilities == 1) {
- capabilities = &MainCapability;
- // THREADED_RTS must work on builds that don't have a mutable
- // BaseReg (eg. unregisterised), so in this case
- // capabilities[0] must coincide with &MainCapability.
- } else {
- capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
- "initCapabilities");
- }
-
- for (i = 0; i < n_capabilities; i++) {
- initCapability(&capabilities[i], i);
- }
-
- debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
-
#else /* !THREADED_RTS */
n_capabilities = 1;
@@ -341,6 +328,46 @@ initCapabilities( void )
last_free_capability = &capabilities[0];
}
+Capability *
+moreCapabilities (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ nat i;
+ Capability *old_capabilities = capabilities;
+
+ if (to == 1) {
+ // THREADED_RTS must work on builds that don't have a mutable
+ // BaseReg (eg. unregisterised), so in this case
+ // capabilities[0] must coincide with &MainCapability.
+ capabilities = &MainCapability;
+ } else {
+ capabilities = stgMallocBytes(to * sizeof(Capability),
+ "moreCapabilities");
+
+ if (from > 0) {
+ memcpy(capabilities, old_capabilities, from * sizeof(Capability));
+ }
+ }
+
+ for (i = from; i < to; i++) {
+ initCapability(&capabilities[i], i);
+ }
+
+ last_free_capability = NULL;
+
+ debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
+
+ // Return the old array to free later.
+ if (from > 1) {
+ return old_capabilities;
+ } else {
+ return NULL;
+ }
+#else
+ return NULL;
+#endif
+}
+
/* ----------------------------------------------------------------------------
* setContextSwitches: cause all capabilities to context switch as
* soon as possible.
@@ -426,7 +453,10 @@ releaseCapability_ (Capability* cap,
return;
}
- if (pending_sync == SYNC_GC_SEQ || pending_sync == SYNC_FORK) {
+ // If there is a pending sync, then we should just leave the
+ // Capability free. The thread trying to sync will be about to
+ // call waitForReturnCapability().
+ if (pending_sync != 0 && pending_sync != SYNC_GC_PAR) {
last_free_capability = cap; // needed?
debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no);
return;
diff --git a/rts/Capability.h b/rts/Capability.h
index 033806b3be..a4655dd36d 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -165,6 +165,10 @@ regTableToCapability (StgRegTable *reg)
//
void initCapabilities (void);
+// Add and initialise more Capabilities
+//
+Capability * moreCapabilities (nat from, nat to);
+
// Release a capability. This is called by a Task that is exiting
// Haskell to make a foreign call, or in various other cases when we
// want to relinquish a Capability that we currently hold.
@@ -206,7 +210,7 @@ extern Capability *last_free_capability;
//
#define SYNC_GC_SEQ 1
#define SYNC_GC_PAR 2
-#define SYNC_FORK 3
+#define SYNC_OTHER 3
extern volatile StgWord pending_sync;
// Acquires a capability at a return point. If *cap is non-NULL, then
diff --git a/rts/Linker.c b/rts/Linker.c
index cf2d85a295..c1ea0dd206 100644
--- a/rts/Linker.c
+++ b/rts/Linker.c
@@ -848,6 +848,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_readTVarzh) \
SymI_HasProto(stg_readTVarIOzh) \
SymI_HasProto(resumeThread) \
+ SymI_HasProto(setNumCapabilities) \
SymI_HasProto(resolveObjs) \
SymI_HasProto(stg_retryzh) \
SymI_HasProto(rts_apply) \
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 70f6a3fc00..13c886a071 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -134,6 +134,8 @@ static void scheduleYield (Capability **pcap, Task *task);
#if defined(THREADED_RTS)
static nat requestSync (Capability **pcap, Task *task, nat sync_type);
static void acquireAllCapabilities(Capability *cap, Task *task);
+static void releaseAllCapabilities(Capability *cap, Task *task);
+static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
@@ -1359,7 +1361,7 @@ static nat requestSync (Capability **pcap, Task *task, nat sync_type)
//
// Grab all the capabilities except the one we already hold. Used
// when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
-// before a fork (SYNC_FORK).
+// before a fork (SYNC_OTHER).
//
// Only call this after requestSync(), otherwise a deadlock might
// ensue if another thread is trying to synchronise.
@@ -1380,13 +1382,26 @@ static void acquireAllCapabilities(Capability *cap, Task *task)
// unsavoury invariant.
task->cap = tmpcap;
waitForReturnCapability(&tmpcap, task);
- if (tmpcap != &capabilities[i]) {
+ if (tmpcap->no != i) {
barf("acquireAllCapabilities: got the wrong capability");
}
}
}
+ task->cap = cap;
}
+static void releaseAllCapabilities(Capability *cap, Task *task)
+{
+ nat i;
+
+ for (i = 0; i < n_capabilities; i++) {
+ if (cap->no != i) {
+ task->cap = &capabilities[i];
+ releaseCapability(&capabilities[i]);
+ }
+ }
+ task->cap = cap;
+}
#endif
/* -----------------------------------------------------------------------------
@@ -1581,17 +1596,7 @@ delete_threads_and_gc:
#if defined(THREADED_RTS)
if (gc_type == SYNC_GC_SEQ) {
// release our stash of capabilities.
- for (i = 0; i < n_capabilities; i++) {
- if (cap != &capabilities[i]) {
- task->cap = &capabilities[i];
- releaseCapability(&capabilities[i]);
- }
- }
- }
- if (cap) {
- task->cap = cap;
- } else {
- task->cap = NULL;
+ releaseAllCapabilities(cap, task);
}
#endif
@@ -1629,7 +1634,7 @@ forkProcess(HsStablePtr *entry
#ifdef THREADED_RTS
do {
- sync = requestSync(&cap, task, SYNC_FORK);
+ sync = requestSync(&cap, task, SYNC_OTHER);
} while (sync);
acquireAllCapabilities(cap,task);
@@ -1780,6 +1785,105 @@ forkProcess(HsStablePtr *entry
}
/* ---------------------------------------------------------------------------
+ * Increase the number of Capabilities
+ *
+ * Changing the number of Capabilities is very tricky! We can only do
+ * it with the system fully stopped, so we do a full sync with
+ * requestSync(SYNC_OTHER) and grab all the capabilities.
+ *
+ * Then we resize the appropriate data structures, and update all
+ * references to the old data structures which have now moved.
+ * Finally we release the Capabilities we are holding, and start
+ * worker Tasks on the new Capabilities we created.
+ *
+ * ------------------------------------------------------------------------- */
+
+void
+setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
+{
+#if !defined(THREADED_RTS)
+
+ barf("setNumCapabilities: not supported in the non-threaded RTS");
+
+#else
+ Task *task;
+ Capability *cap;
+ nat sync;
+ StgTSO* t;
+ nat g;
+ Capability *old_capabilities;
+
+ if (new_n_capabilities == n_capabilities) return;
+
+ if (new_n_capabilities < n_capabilities) {
+ barf("setNumCapabilities: reducing the number of Capabilities is not currently supported.");
+ }
+
+ debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
+ n_capabilities, new_n_capabilities);
+
+ cap = rts_lock();
+ task = cap->running_task;
+
+ do {
+ sync = requestSync(&cap, task, SYNC_OTHER);
+ } while (sync);
+
+ acquireAllCapabilities(cap,task);
+
+ pending_sync = 0;
+
+#if defined(TRACING)
+ // Allocate eventlog buffers for the new capabilities. Note this
+ // must be done before calling moreCapabilities(), because that
+ // will emit events to add the new capabilities to capsets.
+ tracingAddCapapilities(n_capabilities, new_n_capabilities);
+#endif
+
+ // Resize the capabilities array
+ // NB. after this, capabilities points somewhere new. Any pointers
+ // of type (Capability *) are now invalid.
+ old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
+
+ // update our own cap pointer
+ cap = &capabilities[cap->no];
+
+ // Resize and update storage manager data structures
+ storageAddCapabilities(n_capabilities, new_n_capabilities);
+
+ // Update (Capability *) refs in the Task manager.
+ updateCapabilityRefs();
+
+ // Update (Capability *) refs from TSOs
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
+ t->cap = &capabilities[t->cap->no];
+ }
+ }
+
+ // We're done: release the original Capabilities
+ releaseAllCapabilities(cap,task);
+
+ // Start worker tasks on the new Capabilities
+ startWorkerTasks(n_capabilities, new_n_capabilities);
+
+ // finally, update n_capabilities
+ n_capabilities = new_n_capabilities;
+
+ // We can't free the old array until now, because we access it
+ // while updating pointers in updateCapabilityRefs().
+ if (old_capabilities) {
+ stgFree(old_capabilities);
+ }
+
+ rts_unlock(cap);
+
+#endif // THREADED_RTS
+}
+
+
+
+/* ---------------------------------------------------------------------------
* Delete all the threads in the system
* ------------------------------------------------------------------------- */
@@ -2010,7 +2114,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
// move this thread from now on.
#if defined(THREADED_RTS)
- cpu %= RtsFlags.ParFlags.nNodes;
+ cpu %= n_capabilities;
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
@@ -2086,6 +2190,26 @@ void scheduleWorker (Capability *cap, Task *task)
#endif
/* ---------------------------------------------------------------------------
+ * Start new worker tasks on Capabilities from--to
+ * -------------------------------------------------------------------------- */
+
+static void
+startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ nat i;
+ Capability *cap;
+
+ for (i = from; i < to; i++) {
+ cap = &capabilities[i];
+ ACQUIRE_LOCK(&cap->lock);
+ startWorkerTask(cap);
+ RELEASE_LOCK(&cap->lock);
+ }
+#endif
+}
+
+/* ---------------------------------------------------------------------------
* initScheduler()
*
* Initialise the scheduler. This resets all the queues - if the
@@ -2122,26 +2246,16 @@ initScheduler(void)
initTaskManager();
- RELEASE_LOCK(&sched_mutex);
-
-#if defined(THREADED_RTS)
/*
* Eagerly start one worker to run each Capability, except for
* Capability 0. The idea is that we're probably going to start a
* bound thread on Capability 0 pretty soon, so we don't want a
* worker task hogging it.
*/
- {
- nat i;
- Capability *cap;
- for (i = 1; i < n_capabilities; i++) {
- cap = &capabilities[i];
- ACQUIRE_LOCK(&cap->lock);
- startWorkerTask(cap);
- RELEASE_LOCK(&cap->lock);
- }
- }
-#endif
+ startWorkerTasks(1, n_capabilities);
+
+ RELEASE_LOCK(&sched_mutex);
+
}
void
diff --git a/rts/Stats.c b/rts/Stats.c
index 9c68364717..61f31f1551 100644
--- a/rts/Stats.c
+++ b/rts/Stats.c
@@ -622,7 +622,7 @@ stat_exit(int alloc)
statsPrintf("\n Parallel GC work balance: %.2f (%ld / %ld, ideal %d)\n",
(double)GC_par_avg_copied / (double)GC_par_max_copied,
(lnat)GC_par_avg_copied, (lnat)GC_par_max_copied,
- RtsFlags.ParFlags.nNodes
+ n_capabilities
);
}
#endif
diff --git a/rts/Task.c b/rts/Task.c
index d72d8a9085..36dd0a94b9 100644
--- a/rts/Task.c
+++ b/rts/Task.c
@@ -325,6 +325,34 @@ discardTasksExcept (Task *keep)
RELEASE_LOCK(&all_tasks_mutex);
}
+//
+// After the capabilities[] array has moved, we have to adjust all
+// (Capability *) pointers to point to the new array. The old array
+// is still valid at this point.
+//
+void updateCapabilityRefs (void)
+{
+ Task *task;
+ InCall *incall;
+
+ ACQUIRE_LOCK(&all_tasks_mutex);
+
+ for (task = all_tasks; task != NULL; task=task->all_link) {
+ if (task->cap != NULL) {
+ task->cap = &capabilities[task->cap->no];
+ }
+
+ for (incall = task->incall; incall != NULL; incall = incall->prev_stack) {
+ if (incall->suspended_cap != NULL) {
+ incall->suspended_cap = &capabilities[incall->suspended_cap->no];
+ }
+ }
+ }
+
+ RELEASE_LOCK(&all_tasks_mutex);
+}
+
+
void
taskTimeStamp (Task *task USED_IF_THREADS)
{
diff --git a/rts/Task.h b/rts/Task.h
index 386e003d28..59a316bd81 100644
--- a/rts/Task.h
+++ b/rts/Task.h
@@ -235,6 +235,11 @@ void interruptWorkerTask (Task *task);
#endif /* THREADED_RTS */
+// Update any (Capability *) pointers belonging to Tasks after the
+// Capability array is moved/resized.
+//
+void updateCapabilityRefs (void);
+
// -----------------------------------------------------------------------------
// INLINE functions... private from here on down:
diff --git a/rts/Trace.c b/rts/Trace.c
index 1671bfeb36..df5147ca05 100644
--- a/rts/Trace.c
+++ b/rts/Trace.c
@@ -143,6 +143,13 @@ void resetTracing (void)
}
}
+void tracingAddCapapilities (nat from, nat to)
+{
+ if (eventlog_enabled) {
+ moreCapEventBufs(from,to);
+ }
+}
+
/* ---------------------------------------------------------------------------
Emitting trace messages/events
--------------------------------------------------------------------------- */
diff --git a/rts/Trace.h b/rts/Trace.h
index 8dacb80eda..0af3f3b88c 100644
--- a/rts/Trace.h
+++ b/rts/Trace.h
@@ -28,6 +28,7 @@ void initTracing (void);
void endTracing (void);
void freeTracing (void);
void resetTracing (void);
+void tracingAddCapapilities (nat from, nat to);
#endif /* TRACING */
diff --git a/rts/eventlog/EventLog.c b/rts/eventlog/EventLog.c
index 88fc64010d..af87492876 100644
--- a/rts/eventlog/EventLog.c
+++ b/rts/eventlog/EventLog.c
@@ -254,12 +254,8 @@ initEventLogging(void)
#else
n_caps = 1;
#endif
- capEventBuf = stgMallocBytes(n_caps * sizeof(EventsBuf),"initEventLogging");
+ moreCapEventBufs(0,n_caps);
- for (c = 0; c < n_caps; ++c) {
- // Init buffer for events.
- initEventsBuf(&capEventBuf[c], EVENT_LOG_SIZE, c);
- }
initEventsBuf(&eventBuf, EVENT_LOG_SIZE, (EventCapNo)(-1));
// Write in buffer: the header begin marker.
@@ -417,7 +413,26 @@ endEventLogging(void)
}
}
-void
+void
+moreCapEventBufs (nat from, nat to)
+{
+ nat c;
+
+ if (from > 0) {
+ capEventBuf = stgReallocBytes(capEventBuf, to * sizeof(EventsBuf),
+ "moreCapEventBufs");
+ } else {
+ capEventBuf = stgMallocBytes(to * sizeof(EventsBuf),
+ "moreCapEventBufs");
+ }
+
+ for (c = from; c < to; ++c) {
+ initEventsBuf(&capEventBuf[c], EVENT_LOG_SIZE, c);
+ }
+}
+
+
+void
freeEventLogging(void)
{
StgWord8 c;
diff --git a/rts/eventlog/EventLog.h b/rts/eventlog/EventLog.h
index 667f34867d..1858be8ecd 100644
--- a/rts/eventlog/EventLog.h
+++ b/rts/eventlog/EventLog.h
@@ -26,6 +26,7 @@ void endEventLogging(void);
void freeEventLogging(void);
void abortEventLogging(void); // #4512 - after fork child needs to abort
void flushEventLog(void); // event log inherited from parent
+void moreCapEventBufs (nat from, nat to);
/*
* Post a scheduler event to the capability's event buffer (an event
diff --git a/rts/sm/GC.c b/rts/sm/GC.c
index a4ac1fb01d..d30300d363 100644
--- a/rts/sm/GC.c
+++ b/rts/sm/GC.c
@@ -260,7 +260,7 @@ GarbageCollect (rtsBool force_major_gc,
* it with +RTS -gn0), or mark/compact/sweep GC.
*/
if (gc_type == SYNC_GC_PAR) {
- n_gc_threads = RtsFlags.ParFlags.nNodes;
+ n_gc_threads = n_capabilities;
} else {
n_gc_threads = 1;
}
@@ -854,29 +854,39 @@ new_gc_thread (nat n, gc_thread *t)
void
-initGcThreads (void)
+initGcThreads (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
{
- if (gc_threads == NULL) {
#if defined(THREADED_RTS)
- nat i;
- gc_threads = stgMallocBytes (RtsFlags.ParFlags.nNodes *
- sizeof(gc_thread*),
- "alloc_gc_threads");
+ nat i;
- for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
- gc_threads[i] =
- stgMallocBytes(sizeof(gc_thread) +
- RtsFlags.GcFlags.generations * sizeof(gen_workspace),
- "alloc_gc_threads");
+ if (from > 0) {
+ gc_threads = stgReallocBytes (gc_threads, to * sizeof(gc_thread*),
+ "initGcThreads");
+ } else {
+ gc_threads = stgMallocBytes (to * sizeof(gc_thread*),
+ "initGcThreads");
+ }
- new_gc_thread(i, gc_threads[i]);
- }
+ // We have to update the gct->cap pointers to point to the new
+ // Capability array now.
+ for (i = 0; i < from; i++) {
+ gc_threads[i]->cap = &capabilities[gc_threads[i]->cap->no];
+ }
+
+ for (i = from; i < to; i++) {
+ gc_threads[i] =
+ stgMallocBytes(sizeof(gc_thread) +
+ RtsFlags.GcFlags.generations * sizeof(gen_workspace),
+ "alloc_gc_threads");
+
+ new_gc_thread(i, gc_threads[i]);
+ }
#else
- gc_threads = stgMallocBytes (sizeof(gc_thread*),"alloc_gc_threads");
- gc_threads[0] = gct;
- new_gc_thread(0,gc_threads[0]);
+ ASSERT(from == 0 && to == 1);
+ gc_threads = stgMallocBytes (sizeof(gc_thread*),"alloc_gc_threads");
+ gc_threads[0] = gct;
+ new_gc_thread(0,gc_threads[0]);
#endif
- }
}
void
@@ -1097,7 +1107,7 @@ gcWorkerThread (Capability *cap)
void
waitForGcThreads (Capability *cap USED_IF_THREADS)
{
- const nat n_threads = RtsFlags.ParFlags.nNodes;
+ const nat n_threads = n_capabilities;
const nat me = cap->no;
nat i, j;
rtsBool retry = rtsTrue;
@@ -1178,7 +1188,7 @@ shutdown_gc_threads (nat me USED_IF_THREADS)
void
releaseGCThreads (Capability *cap USED_IF_THREADS)
{
- const nat n_threads = RtsFlags.ParFlags.nNodes;
+ const nat n_threads = n_capabilities;
const nat me = cap->no;
nat i;
for (i=0; i < n_threads; i++) {
diff --git a/rts/sm/GC.h b/rts/sm/GC.h
index eb1802338b..4dc7347597 100644
--- a/rts/sm/GC.h
+++ b/rts/sm/GC.h
@@ -45,7 +45,7 @@ extern StgWord64 whitehole_spin;
#endif
void gcWorkerThread (Capability *cap);
-void initGcThreads (void);
+void initGcThreads (nat from, nat to);
void freeGcThreads (void);
#if defined(THREADED_RTS)
diff --git a/rts/sm/Storage.c b/rts/sm/Storage.c
index be3badfbe4..fe7bf435eb 100644
--- a/rts/sm/Storage.c
+++ b/rts/sm/Storage.c
@@ -60,7 +60,7 @@ nursery *nurseries = NULL; /* array of nurseries, size == n_capabilities */
Mutex sm_mutex;
#endif
-static void allocNurseries ( void );
+static void allocNurseries (nat from, nat to);
static void
initGeneration (generation *gen, int g)
@@ -94,7 +94,7 @@ initGeneration (generation *gen, int g)
void
initStorage( void )
{
- nat g, n;
+ nat g;
if (generations != NULL) {
// multi-init protection
@@ -146,9 +146,6 @@ initStorage( void )
g0 = &generations[0];
oldest_gen = &generations[RtsFlags.GcFlags.generations-1];
- nurseries = stgMallocBytes(n_capabilities * sizeof(struct nursery_),
- "initStorage: nurseries");
-
/* Set up the destination pointers in each younger gen. step */
for (g = 0; g < RtsFlags.GcFlags.generations-1; g++) {
generations[g].to = &generations[g+1];
@@ -168,14 +165,6 @@ initStorage( void )
generations[0].max_blocks = 0;
- /* The allocation area. Policy: keep the allocation area
- * small to begin with, even if we have a large suggested heap
- * size. Reason: we're going to do a major collection first, and we
- * don't want it to be a big one. This vague idea is borne out by
- * rigorous experimental evidence.
- */
- allocNurseries();
-
weak_ptr_list = NULL;
caf_list = END_OF_STATIC_LIST;
revertible_caf_list = END_OF_STATIC_LIST;
@@ -192,20 +181,44 @@ initStorage( void )
N = 0;
- // allocate a block for each mut list
- for (n = 0; n < n_capabilities; n++) {
- for (g = 1; g < RtsFlags.GcFlags.generations; g++) {
- capabilities[n].mut_lists[g] = allocBlock();
- }
- }
-
- initGcThreads();
+ storageAddCapabilities(0, n_capabilities);
IF_DEBUG(gc, statDescribeGens());
RELEASE_SM_LOCK;
}
+void storageAddCapabilities (nat from, nat to)
+{
+ nat n, g;
+
+ if (from > 0) {
+ nurseries = stgReallocBytes(nurseries, to * sizeof(struct nursery_),
+ "storageAddCapabilities");
+ } else {
+ nurseries = stgMallocBytes(to * sizeof(struct nursery_),
+ "storageAddCapabilities");
+ }
+
+ /* The allocation area. Policy: keep the allocation area
+ * small to begin with, even if we have a large suggested heap
+ * size. Reason: we're going to do a major collection first, and we
+ * don't want it to be a big one. This vague idea is borne out by
+ * rigorous experimental evidence.
+ */
+ allocNurseries(from, to);
+
+ // allocate a block for each mut list
+ for (n = from; n < to; n++) {
+ for (g = 1; g < RtsFlags.GcFlags.generations; g++) {
+ capabilities[n].mut_lists[g] = allocBlock();
+ }
+ }
+
+ initGcThreads(from, to);
+}
+
+
void
exitStorage (void)
{
@@ -445,11 +458,11 @@ allocNursery (bdescr *tail, nat blocks)
}
static void
-assignNurseriesToCapabilities (void)
+assignNurseriesToCapabilities (nat from, nat to)
{
nat i;
- for (i = 0; i < n_capabilities; i++) {
+ for (i = from; i < to; i++) {
capabilities[i].r.rNursery = &nurseries[i];
capabilities[i].r.rCurrentNursery = nurseries[i].blocks;
capabilities[i].r.rCurrentAlloc = NULL;
@@ -457,17 +470,17 @@ assignNurseriesToCapabilities (void)
}
static void
-allocNurseries( void )
+allocNurseries (nat from, nat to)
{
nat i;
- for (i = 0; i < n_capabilities; i++) {
- nurseries[i].blocks =
+ for (i = from; i < to; i++) {
+ nurseries[i].blocks =
allocNursery(NULL, RtsFlags.GcFlags.minAllocAreaSize);
nurseries[i].n_blocks =
RtsFlags.GcFlags.minAllocAreaSize;
}
- assignNurseriesToCapabilities();
+ assignNurseriesToCapabilities(from, to);
}
lnat // words allocated
@@ -493,8 +506,7 @@ clearNurseries (void)
void
resetNurseries (void)
{
- assignNurseriesToCapabilities();
-
+ assignNurseriesToCapabilities(0, n_capabilities);
}
lnat
diff --git a/rts/sm/Storage.h b/rts/sm/Storage.h
index d463d1a2ba..11afc26598 100644
--- a/rts/sm/Storage.h
+++ b/rts/sm/Storage.h
@@ -21,6 +21,10 @@ void initStorage(void);
void exitStorage(void);
void freeStorage(rtsBool free_heap);
+// Adding more Capabilities later: this function allocates nurseries
+// and initialises other storage-related things.
+void storageAddCapabilities (nat from, nat to);
+
/* -----------------------------------------------------------------------------
Storage manager state
-------------------------------------------------------------------------- */