summaryrefslogtreecommitdiff
path: root/rts
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2008-09-09 13:32:23 +0000
committerSimon Marlow <marlowsd@gmail.com>2008-09-09 13:32:23 +0000
commitd572aed64d9c40dcc38a49b09d18f301555e4efb (patch)
treef44fa2291ccc16f953a55470a375583d31a0d85f /rts
parent4318aa600015f0d7b4d977cb67071f3f8e7c3b0b (diff)
downloadhaskell-d572aed64d9c40dcc38a49b09d18f301555e4efb.tar.gz
Fix race condition in wakeupThreadOnCapability() (#2574)
wakeupThreadOnCapbility() is used to signal another capability that there is a thread waiting to be added to its run queue. It adds the thread to the (locked) wakeup queue on the remote capability. In order to do this, it has to modify the TSO's link field, which has a write barrier. The write barrier might put the TSO on the mutable list, and the bug was that it was using the mutable list of the *target* capability, which we do not have exclusive access to. We should be using the current Capabilty's mutable list in this case.
Diffstat (limited to 'rts')
-rw-r--r--rts/Capability.c63
-rw-r--r--rts/Capability.h9
-rw-r--r--rts/Schedule.c4
-rw-r--r--rts/Schedule.h15
-rw-r--r--rts/Threads.c2
5 files changed, 39 insertions, 54 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index 0f03621003..1f1a1aea2a 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -540,57 +540,40 @@ yieldCapability (Capability** pCap, Task *task)
* ------------------------------------------------------------------------- */
void
-wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
+wakeupThreadOnCapability (Capability *my_cap,
+ Capability *other_cap,
+ StgTSO *tso)
{
- ASSERT(tso->cap == cap);
- ASSERT(tso->bound ? tso->bound->cap == cap : 1);
- ASSERT_LOCK_HELD(&cap->lock);
+ ACQUIRE_LOCK(&other_cap->lock);
- tso->cap = cap;
+ // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
+ if (tso->bound) {
+ ASSERT(tso->bound->cap == tso->cap);
+ tso->bound->cap = other_cap;
+ }
+ tso->cap = other_cap;
+
+ ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
- if (cap->running_task == NULL) {
+ if (other_cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
// directly onto the run queue and start up a Task to run it.
- appendToRunQueue(cap,tso);
- // start it up
- cap->running_task = myTask(); // precond for releaseCapability_()
- trace(TRACE_sched, "resuming capability %d", cap->no);
- releaseCapability_(cap);
+ other_cap->running_task = myTask();
+ // precond for releaseCapability_() and appendToRunQueue()
+
+ appendToRunQueue(other_cap,tso);
+
+ trace(TRACE_sched, "resuming capability %d", other_cap->no);
+ releaseCapability_(other_cap);
} else {
- appendToWakeupQueue(cap,tso);
+ appendToWakeupQueue(my_cap,other_cap,tso);
// someone is running on this Capability, so it cannot be
// freed without first checking the wakeup queue (see
// releaseCapability_).
}
-}
-void
-wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso)
-{
- ACQUIRE_LOCK(&cap->lock);
- migrateThreadToCapability (cap, tso);
- RELEASE_LOCK(&cap->lock);
-}
-
-void
-migrateThreadToCapability (Capability *cap, StgTSO *tso)
-{
- // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
- if (tso->bound) {
- ASSERT(tso->bound->cap == tso->cap);
- tso->bound->cap = cap;
- }
- tso->cap = cap;
- wakeupThreadOnCapability(cap,tso);
-}
-
-void
-migrateThreadToCapability_lock (Capability *cap, StgTSO *tso)
-{
- ACQUIRE_LOCK(&cap->lock);
- migrateThreadToCapability (cap, tso);
- RELEASE_LOCK(&cap->lock);
+ RELEASE_LOCK(&other_cap->lock);
}
/* ----------------------------------------------------------------------------
@@ -818,7 +801,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta)
}
#if defined(THREADED_RTS)
- markSparkQueue (evac, user, cap);
+ traverseSparkQueue (evac, user, cap);
#endif
}
diff --git a/rts/Capability.h b/rts/Capability.h
index f13afe2117..94306eb6c0 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -202,11 +202,8 @@ void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
// Wakes up a thread on a Capability (probably a different Capability
// from the one held by the current Task).
//
-void wakeupThreadOnCapability (Capability *cap, StgTSO *tso);
-void wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso);
-
-void migrateThreadToCapability (Capability *cap, StgTSO *tso);
-void migrateThreadToCapability_lock (Capability *cap, StgTSO *tso);
+void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap,
+ StgTSO *tso);
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
@@ -252,6 +249,8 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
{
bdescr *bd;
+ // We must own this Capability in order to modify its mutable list.
+ ASSERT(cap->running_task == myTask());
bd = cap->mut_lists[gen];
if (bd->free >= bd->start + BLOCK_SIZE_W) {
bdescr *new_bd;
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 1b5afefc4f..f8a8748d8d 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -1871,7 +1871,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
- migrateThreadToCapability_lock(&capabilities[cpu],tso);
+ wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
}
#else
appendToRunQueue(cap,tso);
@@ -2312,8 +2312,6 @@ checkBlackHoles (Capability *cap)
if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
IF_DEBUG(sanity,checkTSO(t));
t = unblockOne(cap, t);
- // urk, the threads migrate to the current capability
- // here, but we'd like to keep them on the original one.
*prev = t;
any_woke_up = rtsTrue;
} else {
diff --git a/rts/Schedule.h b/rts/Schedule.h
index 59bdb9e999..45aa000758 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -237,16 +237,21 @@ appendToBlockedQueue(StgTSO *tso)
#endif
#if defined(THREADED_RTS)
+// Assumes: my_cap is owned by the current Task. We hold
+// other_cap->lock, but we do not necessarily own other_cap; another
+// Task may be running on it.
INLINE_HEADER void
-appendToWakeupQueue (Capability *cap, StgTSO *tso)
+appendToWakeupQueue (Capability *my_cap, Capability *other_cap, StgTSO *tso)
{
ASSERT(tso->_link == END_TSO_QUEUE);
- if (cap->wakeup_queue_hd == END_TSO_QUEUE) {
- cap->wakeup_queue_hd = tso;
+ if (other_cap->wakeup_queue_hd == END_TSO_QUEUE) {
+ other_cap->wakeup_queue_hd = tso;
} else {
- setTSOLink(cap, cap->wakeup_queue_tl, tso);
+ // my_cap is passed to setTSOLink() because it may need to
+ // write to the mutable list.
+ setTSOLink(my_cap, other_cap->wakeup_queue_tl, tso);
}
- cap->wakeup_queue_tl = tso;
+ other_cap->wakeup_queue_tl = tso;
}
#endif
diff --git a/rts/Threads.c b/rts/Threads.c
index b7f62c8f07..281cb65e48 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -510,7 +510,7 @@ unblockOne_ (Capability *cap, StgTSO *tso,
context_switch = 1;
} else {
// we'll try to wake it up on the Capability it was last on.
- wakeupThreadOnCapability_lock(tso->cap, tso);
+ wakeupThreadOnCapability(cap, tso->cap, tso);
}
#else
appendToRunQueue(cap,tso);