summaryrefslogtreecommitdiff
path: root/rts/Schedule.c
diff options
context:
space:
mode:
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r--rts/Schedule.c89
1 files changed, 41 insertions, 48 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 8002ac37dc..0444f0ca15 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -41,7 +41,8 @@
#include "Timer.h"
#include "ThreadPaused.h"
#include "Messages.h"
-#include "Stable.h"
+#include "StablePtr.h"
+#include "StableName.h"
#include "TopHandler.h"
#if defined(HAVE_SYS_TYPES_H)
@@ -67,7 +68,7 @@
* -------------------------------------------------------------------------- */
#if !defined(THREADED_RTS)
-// Blocked/sleeping thrads
+// Blocked/sleeping threads
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
@@ -151,11 +152,11 @@ static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
static bool scheduleNeedHeapProfile(bool ready_to_gc);
static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
-static void deleteThread (Capability *cap, StgTSO *tso);
-static void deleteAllThreads (Capability *cap);
+static void deleteThread (StgTSO *tso);
+static void deleteAllThreads (void);
#if defined(FORKPROCESS_PRIMOP_SUPPORTED)
-static void deleteThread_(Capability *cap, StgTSO *tso);
+static void deleteThread_(StgTSO *tso);
#endif
/* ---------------------------------------------------------------------------
@@ -180,9 +181,6 @@ schedule (Capability *initialCapability, Task *task)
StgThreadReturnCode ret;
uint32_t prev_what_next;
bool ready_to_gc;
-#if defined(THREADED_RTS)
- bool first = true;
-#endif
cap = initialCapability;
@@ -271,7 +269,7 @@ schedule (Capability *initialCapability, Task *task)
}
break;
default:
- barf("sched_state: %d", sched_state);
+ barf("sched_state: %" FMT_Word, sched_state);
}
scheduleFindWork(&cap);
@@ -292,16 +290,6 @@ schedule (Capability *initialCapability, Task *task)
// as a result of a console event having been delivered.
#if defined(THREADED_RTS)
- if (first)
- {
- // XXX: ToDo
- // // don't yield the first time, we want a chance to run this
- // // thread for a bit, even if there are others banging at the
- // // door.
- // first = false;
- // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
- }
-
scheduleYield(&cap,task);
if (emptyRunQueue(cap)) continue; // look for work again
@@ -360,7 +348,7 @@ schedule (Capability *initialCapability, Task *task)
// in a foreign call returns.
if (sched_state >= SCHED_INTERRUPTING &&
!(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
- deleteThread(cap,t);
+ deleteThread(t);
}
// If this capability is disabled, migrate the thread away rather
@@ -679,7 +667,11 @@ scheduleYield (Capability **pcap, Task *task)
// otherwise yield (sleep), and keep yielding if necessary.
do {
- didGcLast = yieldCapability(&cap,task, !didGcLast);
+ if (doIdleGCWork(cap, false)) {
+ didGcLast = false;
+ } else {
+ didGcLast = yieldCapability(&cap,task, !didGcLast);
+ }
}
while (shouldYieldCapability(cap,task,didGcLast));
@@ -701,8 +693,6 @@ static void
schedulePushWork(Capability *cap USED_IF_THREADS,
Task *task USED_IF_THREADS)
{
- /* following code not for PARALLEL_HASKELL. I kept the call general,
- future GUM versions might use pushing in a distributed setup */
#if defined(THREADED_RTS)
Capability *free_caps[n_capabilities], *cap0;
@@ -1263,7 +1253,7 @@ scheduleHandleThreadBlocked( StgTSO *t
* -------------------------------------------------------------------------- */
static bool
-scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
+scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t)
{
/* Need to check whether this was a main thread, and if so,
* return with the return value.
@@ -1352,7 +1342,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
* -------------------------------------------------------------------------- */
static bool
-scheduleNeedHeapProfile( bool ready_to_gc STG_UNUSED )
+scheduleNeedHeapProfile( bool ready_to_gc )
{
// When we have +RTS -i0 and we're heap profiling, do a census at
// every GC. This lets us get repeatable runs for debugging.
@@ -1738,10 +1728,8 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
// they have stopped mutating and are standing by for GC.
waitForGcThreads(cap, idle_cap);
-#if defined(THREADED_RTS)
// Stable point where we can do a global check on our spark counters
ASSERT(checkSparkCountInvariant());
-#endif
}
#endif
@@ -1756,7 +1744,7 @@ delete_threads_and_gc:
* Checking for major_gc ensures that the last GC is major.
*/
if (sched_state == SCHED_INTERRUPTING && major_gc) {
- deleteAllThreads(cap);
+ deleteAllThreads();
#if defined(THREADED_RTS)
// Discard all the sparks from every Capability. Why?
// They'll probably be GC'd anyway since we've killed all the
@@ -1800,6 +1788,9 @@ delete_threads_and_gc:
}
#endif
+ // Do any remaining idle GC work from the previous GC
+ doIdleGCWork(cap, true /* all of it */);
+
#if defined(THREADED_RTS)
// reset pending_sync *before* GC, so that when the GC threads
// emerge they don't immediately re-enter the GC.
@@ -1809,6 +1800,11 @@ delete_threads_and_gc:
GarbageCollect(collect_gen, heap_census, 0, cap, NULL);
#endif
+ // If we're shutting down, don't leave any idle GC work to do.
+ if (sched_state == SCHED_SHUTTING_DOWN) {
+ doIdleGCWork(cap, true /* all of it */);
+ }
+
traceSparkCounters(cap);
switch (recent_activity) {
@@ -1920,13 +1916,6 @@ delete_threads_and_gc:
throwToSelf(cap, main_thread, heapOverflow_closure);
}
}
-#if defined(SPARKBALANCE)
- /* JB
- Once we are all together... this would be the place to balance all
- spark pools. No concurrent stealing or adding of new sparks can
- occur. Should be defined in Sparks.c. */
- balanceSparkPoolsCaps(n_capabilities, capabilities);
-#endif
#if defined(THREADED_RTS)
stgFree(idle_cap);
@@ -1976,7 +1965,8 @@ forkProcess(HsStablePtr *entry
// inconsistent state in the child. See also #1391.
ACQUIRE_LOCK(&sched_mutex);
ACQUIRE_LOCK(&sm_mutex);
- ACQUIRE_LOCK(&stable_mutex);
+ ACQUIRE_LOCK(&stable_ptr_mutex);
+ ACQUIRE_LOCK(&stable_name_mutex);
ACQUIRE_LOCK(&task->lock);
for (i=0; i < n_capabilities; i++) {
@@ -2001,18 +1991,20 @@ forkProcess(HsStablePtr *entry
RELEASE_LOCK(&sched_mutex);
RELEASE_LOCK(&sm_mutex);
- RELEASE_LOCK(&stable_mutex);
+ RELEASE_LOCK(&stable_ptr_mutex);
+ RELEASE_LOCK(&stable_name_mutex);
RELEASE_LOCK(&task->lock);
+#if defined(THREADED_RTS)
+ /* N.B. releaseCapability_ below may need to take all_tasks_mutex */
+ RELEASE_LOCK(&all_tasks_mutex);
+#endif
+
for (i=0; i < n_capabilities; i++) {
releaseCapability_(capabilities[i],false);
RELEASE_LOCK(&capabilities[i]->lock);
}
-#if defined(THREADED_RTS)
- RELEASE_LOCK(&all_tasks_mutex);
-#endif
-
boundTaskExiting(task);
// just return the pid
@@ -2023,7 +2015,8 @@ forkProcess(HsStablePtr *entry
#if defined(THREADED_RTS)
initMutex(&sched_mutex);
initMutex(&sm_mutex);
- initMutex(&stable_mutex);
+ initMutex(&stable_ptr_mutex);
+ initMutex(&stable_name_mutex);
initMutex(&task->lock);
for (i=0; i < n_capabilities; i++) {
@@ -2049,7 +2042,7 @@ forkProcess(HsStablePtr *entry
// don't allow threads to catch the ThreadKilled
// exception, but we do want to raiseAsync() because these
// threads may be evaluating thunks that we need later.
- deleteThread_(t->cap,t);
+ deleteThread_(t);
// stop the GC from updating the InCall to point to
// the TSO. This is only necessary because the
@@ -2273,7 +2266,7 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
* ------------------------------------------------------------------------- */
static void
-deleteAllThreads ( Capability *cap )
+deleteAllThreads ()
{
// NOTE: only safe to call if we own all capabilities.
@@ -2284,7 +2277,7 @@ deleteAllThreads ( Capability *cap )
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
next = t->global_link;
- deleteThread(cap,t);
+ deleteThread(t);
}
}
@@ -2795,7 +2788,7 @@ void wakeUpRts(void)
-------------------------------------------------------------------------- */
static void
-deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
+deleteThread (StgTSO *tso)
{
// NOTE: must only be called on a TSO that we have exclusive
// access to, because we will call throwToSingleThreaded() below.
@@ -2810,7 +2803,7 @@ deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
#if defined(FORKPROCESS_PRIMOP_SUPPORTED)
static void
-deleteThread_(Capability *cap, StgTSO *tso)
+deleteThread_(StgTSO *tso)
{ // for forkProcess only:
// like deleteThread(), but we delete threads in foreign calls, too.
@@ -2819,7 +2812,7 @@ deleteThread_(Capability *cap, StgTSO *tso)
tso->what_next = ThreadKilled;
appendToRunQueue(tso->cap, tso);
} else {
- deleteThread(cap,tso);
+ deleteThread(tso);
}
}
#endif