summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rts/Capability.c63
-rw-r--r--rts/Capability.h9
-rw-r--r--rts/Schedule.c4
-rw-r--r--rts/Schedule.h15
-rw-r--r--rts/Threads.c2
5 files changed, 39 insertions, 54 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index 0f03621003..1f1a1aea2a 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -540,57 +540,40 @@ yieldCapability (Capability** pCap, Task *task)
* ------------------------------------------------------------------------- */
void
-wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
+wakeupThreadOnCapability (Capability *my_cap,
+ Capability *other_cap,
+ StgTSO *tso)
{
- ASSERT(tso->cap == cap);
- ASSERT(tso->bound ? tso->bound->cap == cap : 1);
- ASSERT_LOCK_HELD(&cap->lock);
+ ACQUIRE_LOCK(&other_cap->lock);
- tso->cap = cap;
+ // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
+ if (tso->bound) {
+ ASSERT(tso->bound->cap == tso->cap);
+ tso->bound->cap = other_cap;
+ }
+ tso->cap = other_cap;
+
+ ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
- if (cap->running_task == NULL) {
+ if (other_cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
// directly onto the run queue and start up a Task to run it.
- appendToRunQueue(cap,tso);
- // start it up
- cap->running_task = myTask(); // precond for releaseCapability_()
- trace(TRACE_sched, "resuming capability %d", cap->no);
- releaseCapability_(cap);
+ other_cap->running_task = myTask();
+ // precond for releaseCapability_() and appendToRunQueue()
+
+ appendToRunQueue(other_cap,tso);
+
+ trace(TRACE_sched, "resuming capability %d", other_cap->no);
+ releaseCapability_(other_cap);
} else {
- appendToWakeupQueue(cap,tso);
+ appendToWakeupQueue(my_cap,other_cap,tso);
// someone is running on this Capability, so it cannot be
// freed without first checking the wakeup queue (see
// releaseCapability_).
}
-}
-void
-wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso)
-{
- ACQUIRE_LOCK(&cap->lock);
- migrateThreadToCapability (cap, tso);
- RELEASE_LOCK(&cap->lock);
-}
-
-void
-migrateThreadToCapability (Capability *cap, StgTSO *tso)
-{
- // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
- if (tso->bound) {
- ASSERT(tso->bound->cap == tso->cap);
- tso->bound->cap = cap;
- }
- tso->cap = cap;
- wakeupThreadOnCapability(cap,tso);
-}
-
-void
-migrateThreadToCapability_lock (Capability *cap, StgTSO *tso)
-{
- ACQUIRE_LOCK(&cap->lock);
- migrateThreadToCapability (cap, tso);
- RELEASE_LOCK(&cap->lock);
+ RELEASE_LOCK(&other_cap->lock);
}
/* ----------------------------------------------------------------------------
@@ -818,7 +801,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta)
}
#if defined(THREADED_RTS)
- markSparkQueue (evac, user, cap);
+ traverseSparkQueue (evac, user, cap);
#endif
}
diff --git a/rts/Capability.h b/rts/Capability.h
index f13afe2117..94306eb6c0 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -202,11 +202,8 @@ void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
// Wakes up a thread on a Capability (probably a different Capability
// from the one held by the current Task).
//
-void wakeupThreadOnCapability (Capability *cap, StgTSO *tso);
-void wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso);
-
-void migrateThreadToCapability (Capability *cap, StgTSO *tso);
-void migrateThreadToCapability_lock (Capability *cap, StgTSO *tso);
+void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap,
+ StgTSO *tso);
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
@@ -252,6 +249,8 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
{
bdescr *bd;
+ // We must own this Capability in order to modify its mutable list.
+ ASSERT(cap->running_task == myTask());
bd = cap->mut_lists[gen];
if (bd->free >= bd->start + BLOCK_SIZE_W) {
bdescr *new_bd;
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 1b5afefc4f..f8a8748d8d 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -1871,7 +1871,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
- migrateThreadToCapability_lock(&capabilities[cpu],tso);
+ wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
}
#else
appendToRunQueue(cap,tso);
@@ -2312,8 +2312,6 @@ checkBlackHoles (Capability *cap)
if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
IF_DEBUG(sanity,checkTSO(t));
t = unblockOne(cap, t);
- // urk, the threads migrate to the current capability
- // here, but we'd like to keep them on the original one.
*prev = t;
any_woke_up = rtsTrue;
} else {
diff --git a/rts/Schedule.h b/rts/Schedule.h
index 59bdb9e999..45aa000758 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -237,16 +237,21 @@ appendToBlockedQueue(StgTSO *tso)
#endif
#if defined(THREADED_RTS)
+// Assumes: my_cap is owned by the current Task. We hold
+// other_cap->lock, but we do not necessarily own other_cap; another
+// Task may be running on it.
INLINE_HEADER void
-appendToWakeupQueue (Capability *cap, StgTSO *tso)
+appendToWakeupQueue (Capability *my_cap, Capability *other_cap, StgTSO *tso)
{
ASSERT(tso->_link == END_TSO_QUEUE);
- if (cap->wakeup_queue_hd == END_TSO_QUEUE) {
- cap->wakeup_queue_hd = tso;
+ if (other_cap->wakeup_queue_hd == END_TSO_QUEUE) {
+ other_cap->wakeup_queue_hd = tso;
} else {
- setTSOLink(cap, cap->wakeup_queue_tl, tso);
+ // my_cap is passed to setTSOLink() because it may need to
+ // write to the mutable list.
+ setTSOLink(my_cap, other_cap->wakeup_queue_tl, tso);
}
- cap->wakeup_queue_tl = tso;
+ other_cap->wakeup_queue_tl = tso;
}
#endif
diff --git a/rts/Threads.c b/rts/Threads.c
index b7f62c8f07..281cb65e48 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -510,7 +510,7 @@ unblockOne_ (Capability *cap, StgTSO *tso,
context_switch = 1;
} else {
// we'll try to wake it up on the Capability it was last on.
- wakeupThreadOnCapability_lock(tso->cap, tso);
+ wakeupThreadOnCapability(cap, tso->cap, tso);
}
#else
appendToRunQueue(cap,tso);