diff options
-rw-r--r-- | rts/Capability.c | 63 | ||||
-rw-r--r-- | rts/Capability.h | 9 | ||||
-rw-r--r-- | rts/Schedule.c | 4 | ||||
-rw-r--r-- | rts/Schedule.h | 15 | ||||
-rw-r--r-- | rts/Threads.c | 2 |
5 files changed, 39 insertions, 54 deletions
diff --git a/rts/Capability.c b/rts/Capability.c index 0f03621003..1f1a1aea2a 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -540,57 +540,40 @@ yieldCapability (Capability** pCap, Task *task) * ------------------------------------------------------------------------- */ void -wakeupThreadOnCapability (Capability *cap, StgTSO *tso) +wakeupThreadOnCapability (Capability *my_cap, + Capability *other_cap, + StgTSO *tso) { - ASSERT(tso->cap == cap); - ASSERT(tso->bound ? tso->bound->cap == cap : 1); - ASSERT_LOCK_HELD(&cap->lock); + ACQUIRE_LOCK(&other_cap->lock); - tso->cap = cap; + // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) + if (tso->bound) { + ASSERT(tso->bound->cap == tso->cap); + tso->bound->cap = other_cap; + } + tso->cap = other_cap; + + ASSERT(tso->bound ? tso->bound->cap == other_cap : 1); - if (cap->running_task == NULL) { + if (other_cap->running_task == NULL) { // nobody is running this Capability, we can add our thread // directly onto the run queue and start up a Task to run it. - appendToRunQueue(cap,tso); - // start it up - cap->running_task = myTask(); // precond for releaseCapability_() - trace(TRACE_sched, "resuming capability %d", cap->no); - releaseCapability_(cap); + other_cap->running_task = myTask(); + // precond for releaseCapability_() and appendToRunQueue() + + appendToRunQueue(other_cap,tso); + + trace(TRACE_sched, "resuming capability %d", other_cap->no); + releaseCapability_(other_cap); } else { - appendToWakeupQueue(cap,tso); + appendToWakeupQueue(my_cap,other_cap,tso); // someone is running on this Capability, so it cannot be // freed without first checking the wakeup queue (see // releaseCapability_). } -} -void -wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso) -{ - ACQUIRE_LOCK(&cap->lock); - migrateThreadToCapability (cap, tso); - RELEASE_LOCK(&cap->lock); -} - -void -migrateThreadToCapability (Capability *cap, StgTSO *tso) -{ - // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) - if (tso->bound) { - ASSERT(tso->bound->cap == tso->cap); - tso->bound->cap = cap; - } - tso->cap = cap; - wakeupThreadOnCapability(cap,tso); -} - -void -migrateThreadToCapability_lock (Capability *cap, StgTSO *tso) -{ - ACQUIRE_LOCK(&cap->lock); - migrateThreadToCapability (cap, tso); - RELEASE_LOCK(&cap->lock); + RELEASE_LOCK(&other_cap->lock); } /* ---------------------------------------------------------------------------- @@ -818,7 +801,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta) } #if defined(THREADED_RTS) - markSparkQueue (evac, user, cap); + traverseSparkQueue (evac, user, cap); #endif } diff --git a/rts/Capability.h b/rts/Capability.h index f13afe2117..94306eb6c0 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -202,11 +202,8 @@ void waitForCapability (Task *task, Mutex *mutex, Capability **pCap); // Wakes up a thread on a Capability (probably a different Capability // from the one held by the current Task). // -void wakeupThreadOnCapability (Capability *cap, StgTSO *tso); -void wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso); - -void migrateThreadToCapability (Capability *cap, StgTSO *tso); -void migrateThreadToCapability_lock (Capability *cap, StgTSO *tso); +void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap, + StgTSO *tso); // Wakes up a worker thread on just one Capability, used when we // need to service some global event. @@ -252,6 +249,8 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen) { bdescr *bd; + // We must own this Capability in order to modify its mutable list. + ASSERT(cap->running_task == myTask()); bd = cap->mut_lists[gen]; if (bd->free >= bd->start + BLOCK_SIZE_W) { bdescr *new_bd; diff --git a/rts/Schedule.c b/rts/Schedule.c index 1b5afefc4f..f8a8748d8d 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -1871,7 +1871,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { - migrateThreadToCapability_lock(&capabilities[cpu],tso); + wakeupThreadOnCapability(cap, &capabilities[cpu], tso); } #else appendToRunQueue(cap,tso); @@ -2312,8 +2312,6 @@ checkBlackHoles (Capability *cap) if (type != BLACKHOLE && type != CAF_BLACKHOLE) { IF_DEBUG(sanity,checkTSO(t)); t = unblockOne(cap, t); - // urk, the threads migrate to the current capability - // here, but we'd like to keep them on the original one. *prev = t; any_woke_up = rtsTrue; } else { diff --git a/rts/Schedule.h b/rts/Schedule.h index 59bdb9e999..45aa000758 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -237,16 +237,21 @@ appendToBlockedQueue(StgTSO *tso) #endif #if defined(THREADED_RTS) +// Assumes: my_cap is owned by the current Task. We hold +// other_cap->lock, but we do not necessarily own other_cap; another +// Task may be running on it. INLINE_HEADER void -appendToWakeupQueue (Capability *cap, StgTSO *tso) +appendToWakeupQueue (Capability *my_cap, Capability *other_cap, StgTSO *tso) { ASSERT(tso->_link == END_TSO_QUEUE); - if (cap->wakeup_queue_hd == END_TSO_QUEUE) { - cap->wakeup_queue_hd = tso; + if (other_cap->wakeup_queue_hd == END_TSO_QUEUE) { + other_cap->wakeup_queue_hd = tso; } else { - setTSOLink(cap, cap->wakeup_queue_tl, tso); + // my_cap is passed to setTSOLink() because it may need to + // write to the mutable list. + setTSOLink(my_cap, other_cap->wakeup_queue_tl, tso); } - cap->wakeup_queue_tl = tso; + other_cap->wakeup_queue_tl = tso; } #endif diff --git a/rts/Threads.c b/rts/Threads.c index b7f62c8f07..281cb65e48 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -510,7 +510,7 @@ unblockOne_ (Capability *cap, StgTSO *tso, context_switch = 1; } else { // we'll try to wake it up on the Capability it was last on. - wakeupThreadOnCapability_lock(tso->cap, tso); + wakeupThreadOnCapability(cap, tso->cap, tso); } #else appendToRunQueue(cap,tso); |