summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--includes/Constants.h30
-rw-r--r--includes/SMP.h15
-rw-r--r--includes/StgMiscClosures.h2
-rw-r--r--includes/TSO.h30
-rw-r--r--includes/mkDerivedConstants.c4
-rw-r--r--rts/Capability.c31
-rw-r--r--rts/Capability.h4
-rw-r--r--rts/Exception.cmm186
-rw-r--r--rts/Exception.h40
-rw-r--r--rts/GC.c15
-rw-r--r--rts/GCCompact.c4
-rw-r--r--rts/HCIncludes.h22
-rw-r--r--rts/HeapStackCheck.cmm25
-rw-r--r--rts/Makefile21
-rw-r--r--rts/RaiseAsync.c1015
-rw-r--r--rts/RaiseAsync.h71
-rw-r--r--rts/Schedule.c1646
-rw-r--r--rts/Schedule.h23
-rw-r--r--rts/ThreadLabels.c17
-rw-r--r--rts/ThreadLabels.h24
-rw-r--r--rts/Threads.c974
-rw-r--r--rts/Threads.h46
-rw-r--r--rts/Trace.h4
23 files changed, 2461 insertions, 1788 deletions
diff --git a/includes/Constants.h b/includes/Constants.h
index 4f3c35b744..ef2a4865e2 100644
--- a/includes/Constants.h
+++ b/includes/Constants.h
@@ -230,6 +230,36 @@
#define ThreadBlocked 4
#define ThreadFinished 5
+/*
+ * Flags for the tso->flags field.
+ *
+ * The TSO_DIRTY flag indicates that this TSO's stack should be
+ * scanned during garbage collection. The link field of a TSO is
+ * always scanned, so we don't have to dirty a TSO just for linking
+ * it on a different list.
+ *
+ * TSO_DIRTY is set by
+ * - schedule(), just before running a thread,
+ * - raiseAsync(), because it modifies a thread's stack
+ * - resumeThread(), just before running the thread again
+ * and unset by the garbage collector (only).
+ */
+#define TSO_DIRTY 1
+
+/*
+ * TSO_LOCKED is set when a TSO is locked to a particular Capability.
+ */
+#define TSO_LOCKED 2
+
+/*
+ * TSO_BLOCKEX: the TSO is blocking exceptions
+ *
+ * TSO_INTERRUPTIBLE: the TSO can be interrupted if it blocks
+ * interruptibly (eg. with BlockedOnMVar).
+ */
+#define TSO_BLOCKEX 4
+#define TSO_INTERRUPTIBLE 8
+
/* -----------------------------------------------------------------------------
RET_DYN stack frames
-------------------------------------------------------------------------- */
diff --git a/includes/SMP.h b/includes/SMP.h
index 5974c962ad..d985576cb4 100644
--- a/includes/SMP.h
+++ b/includes/SMP.h
@@ -155,6 +155,21 @@ xchg(StgPtr p, StgWord w)
return old;
}
+INLINE_HEADER StgInfoTable *
+lockClosure(StgClosure *p)
+{ return (StgInfoTable *)p->header.info; }
+
+INLINE_HEADER void
+unlockClosure(StgClosure *p STG_UNUSED, StgInfoTable *info STG_UNUSED)
+{ /* nothing */ }
+
#endif /* !THREADED_RTS */
+// Handy specialised versions of lockClosure()/unlockClosure()
+INLINE_HEADER void lockTSO(StgTSO *tso)
+{ lockClosure((StgClosure *)tso); }
+
+INLINE_HEADER void unlockTSO(StgTSO *tso)
+{ unlockClosure((StgClosure*)tso, (StgInfoTable*)&stg_TSO_info); }
+
#endif /* SMP_H */
diff --git a/includes/StgMiscClosures.h b/includes/StgMiscClosures.h
index 4a6a7c47c2..fcc973630a 100644
--- a/includes/StgMiscClosures.h
+++ b/includes/StgMiscClosures.h
@@ -490,6 +490,8 @@ RTS_FUN(stg_block_async_void);
RTS_ENTRY(stg_block_async_void_ret);
#endif
RTS_FUN(stg_block_stmwait);
+RTS_FUN(stg_block_throwto);
+RTS_RET_INFO(stg_block_throwto_info);
/* Entry/exit points from StgStartup.cmm */
diff --git a/includes/TSO.h b/includes/TSO.h
index d096d401cf..0c3e4eec38 100644
--- a/includes/TSO.h
+++ b/includes/TSO.h
@@ -77,27 +77,6 @@ typedef StgTSOStatBuf StgTSOGranInfo;
*/
typedef StgWord32 StgThreadID;
-/*
- * Flags for the tso->flags field.
- *
- * The TSO_DIRTY flag indicates that this TSO's stack should be
- * scanned during garbage collection. The link field of a TSO is
- * always scanned, so we don't have to dirty a TSO just for linking
- * it on a different list.
- *
- * TSO_DIRTY is set by
- * - schedule(), just before running a thread,
- * - raiseAsync(), because it modifies a thread's stack
- * - resumeThread(), just before running the thread again
- * and unset by the garbage collector (only).
- */
-#define TSO_DIRTY 1
-
-/*
- * TSO_LOCKED is set when a TSO is locked to a particular Capability.
- */
-#define TSO_LOCKED 2
-
#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY)
#define tsoLocked(tso) ((tso)->flags & TSO_LOCKED)
@@ -127,6 +106,7 @@ typedef union {
StgWord target;
} StgTSOBlockInfo;
+
/*
* TSOs live on the heap, and therefore look just like heap objects.
* Large TSOs will live in their own "block group" allocated by the
@@ -151,13 +131,19 @@ typedef struct StgTSO_ {
StgWord16 why_blocked; /* Values defined in Constants.h */
StgWord32 flags;
StgTSOBlockInfo block_info;
- struct StgTSO_* blocked_exceptions;
StgThreadID id;
int saved_errno;
struct Task_* bound;
struct Capability_* cap;
struct StgTRecHeader_ * trec; /* STM transaction record */
+ /*
+ A list of threads blocked on this TSO waiting to throw
+ exceptions. In order to access this field, the TSO must be
+ locked using lockClosure/unlockClosure (see SMP.h).
+ */
+ struct StgTSO_ * blocked_exceptions;
+
#ifdef TICKY_TICKY
/* TICKY-specific stuff would go here. */
#endif
diff --git a/includes/mkDerivedConstants.c b/includes/mkDerivedConstants.c
index 27d4fa9e7b..62d949041b 100644
--- a/includes/mkDerivedConstants.c
+++ b/includes/mkDerivedConstants.c
@@ -18,6 +18,7 @@
* doesn't affect the offsets of anything else.
*/
#define PROFILING
+#define THREADED_RTS
#include "Rts.h"
#include "RtsFlags.h"
@@ -227,6 +228,7 @@ main(int argc, char *argv[])
def_offset("stgGCFun", FUN_OFFSET(stgGCFun));
field_offset(Capability, r);
+ field_offset(Capability, lock);
struct_field(bdescr, start);
struct_field(bdescr, free);
@@ -276,8 +278,10 @@ main(int argc, char *argv[])
closure_field(StgTSO, block_info);
closure_field(StgTSO, blocked_exceptions);
closure_field(StgTSO, id);
+ closure_field(StgTSO, cap);
closure_field(StgTSO, saved_errno);
closure_field(StgTSO, trec);
+ closure_field(StgTSO, flags);
closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS);
tso_field(StgTSO, sp);
tso_field_offset(StgTSO, stack);
diff --git a/rts/Capability.c b/rts/Capability.c
index 0415092a03..2384262ae9 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -518,8 +518,10 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
{
ASSERT(tso->cap == cap);
ASSERT(tso->bound ? tso->bound->cap == cap : 1);
+ ASSERT_LOCK_HELD(&cap->lock);
+
+ tso->cap = cap;
- ACQUIRE_LOCK(&cap->lock);
if (cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
// directly onto the run queue and start up a Task to run it.
@@ -535,6 +537,33 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
// freed without first checking the wakeup queue (see
// releaseCapability_).
}
+}
+
+void
+wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso)
+{
+ ACQUIRE_LOCK(&cap->lock);
+ migrateThreadToCapability (cap, tso);
+ RELEASE_LOCK(&cap->lock);
+}
+
+void
+migrateThreadToCapability (Capability *cap, StgTSO *tso)
+{
+ // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
+ if (tso->bound) {
+ ASSERT(tso->bound->cap == tso->cap);
+ tso->bound->cap = cap;
+ }
+ tso->cap = cap;
+ wakeupThreadOnCapability(cap,tso);
+}
+
+void
+migrateThreadToCapability_lock (Capability *cap, StgTSO *tso)
+{
+ ACQUIRE_LOCK(&cap->lock);
+ migrateThreadToCapability (cap, tso);
RELEASE_LOCK(&cap->lock);
}
diff --git a/rts/Capability.h b/rts/Capability.h
index a2551d0cc5..641f37db01 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -199,6 +199,10 @@ void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
// from the one held by the current Task).
//
void wakeupThreadOnCapability (Capability *cap, StgTSO *tso);
+void wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso);
+
+void migrateThreadToCapability (Capability *cap, StgTSO *tso);
+void migrateThreadToCapability_lock (Capability *cap, StgTSO *tso);
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
diff --git a/rts/Exception.cmm b/rts/Exception.cmm
index b5c29626b2..f4327b9ce2 100644
--- a/rts/Exception.cmm
+++ b/rts/Exception.cmm
@@ -11,6 +11,7 @@
* ---------------------------------------------------------------------------*/
#include "Cmm.h"
+#include "RaiseAsync.h"
/* -----------------------------------------------------------------------------
Exception Primitives
@@ -54,13 +55,13 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret,
{
// Not true: see comments above
// ASSERT(StgTSO_blocked_exceptions(CurrentTSO) != NULL);
-#if defined(GRAN) || defined(PAR)
- foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr",
- NULL "ptr");
-#else
- foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr");
-#endif
- StgTSO_blocked_exceptions(CurrentTSO) = NULL;
+
+ foreign "C" awakenBlockedExceptionQueue(MyCapability() "ptr",
+ CurrentTSO "ptr") [R1];
+
+ StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) &
+ ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32);
+
#ifdef REG_R1
Sp_adj(1);
jump %ENTRY_CODE(Sp(0));
@@ -76,7 +77,10 @@ INFO_TABLE_RET( stg_blockAsyncExceptionszh_ret,
{
// Not true: see comments above
// ASSERT(StgTSO_blocked_exceptions(CurrentTSO) == NULL);
- StgTSO_blocked_exceptions(CurrentTSO) = END_TSO_QUEUE;
+
+ StgTSO_flags(CurrentTSO) =
+ StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+
#ifdef REG_R1
Sp_adj(1);
jump %ENTRY_CODE(Sp(0));
@@ -92,15 +96,18 @@ blockAsyncExceptionszh_fast
/* Args: R1 :: IO a */
STK_CHK_GEN( WDS(2)/* worst case */, R1_PTR, blockAsyncExceptionszh_fast);
- if (StgTSO_blocked_exceptions(CurrentTSO) == NULL) {
- StgTSO_blocked_exceptions(CurrentTSO) = END_TSO_QUEUE;
- /* avoid growing the stack unnecessarily */
- if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) {
- Sp_adj(1);
- } else {
- Sp_adj(-1);
- Sp(0) = stg_unblockAsyncExceptionszh_ret_info;
- }
+ if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) == 0) {
+
+ StgTSO_flags(CurrentTSO) =
+ StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+
+ /* avoid growing the stack unnecessarily */
+ if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) {
+ Sp_adj(1);
+ } else {
+ Sp_adj(-1);
+ Sp(0) = stg_unblockAsyncExceptionszh_ret_info;
+ }
}
TICK_UNKNOWN_CALL();
TICK_SLOW_CALL_v();
@@ -112,22 +119,17 @@ unblockAsyncExceptionszh_fast
/* Args: R1 :: IO a */
STK_CHK_GEN( WDS(2), R1_PTR, unblockAsyncExceptionszh_fast);
- if (StgTSO_blocked_exceptions(CurrentTSO) != NULL) {
-#if defined(GRAN) || defined(PAR)
- foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr",
- StgTSO_block_info(CurrentTSO) "ptr");
-#else
- foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr");
-#endif
- StgTSO_blocked_exceptions(CurrentTSO) = NULL;
+ if (TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) {
+ foreign "C" awakenBlockedExceptionQueue(MyCapability() "ptr",
+ CurrentTSO "ptr") [R1];
- /* avoid growing the stack unnecessarily */
- if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) {
- Sp_adj(1);
- } else {
- Sp_adj(-1);
- Sp(0) = stg_blockAsyncExceptionszh_ret_info;
- }
+ /* avoid growing the stack unnecessarily */
+ if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) {
+ Sp_adj(1);
+ } else {
+ Sp_adj(-1);
+ Sp(0) = stg_blockAsyncExceptionszh_ret_info;
+ }
}
TICK_UNKNOWN_CALL();
TICK_SLOW_CALL_v();
@@ -135,74 +137,62 @@ unblockAsyncExceptionszh_fast
}
-#define interruptible(what_next) \
- ( what_next == BlockedOnMVar \
- || what_next == BlockedOnException \
- || what_next == BlockedOnRead \
- || what_next == BlockedOnWrite \
- || what_next == BlockedOnDelay \
- || what_next == BlockedOnDoProc)
-
killThreadzh_fast
{
- /* args: R1 = TSO to kill, R2 = Exception */
-
- W_ why_blocked;
-
- /* This thread may have been relocated.
- * (see Schedule.c:threadStackOverflow)
- */
- while:
- if (StgTSO_what_next(R1) == ThreadRelocated::I16) {
- R1 = StgTSO_link(R1);
- goto while;
- }
-
- /* Determine whether this thread is interruptible or not */
-
- /* If the target thread is currently blocking async exceptions,
- * we'll have to block until it's ready to accept them. The
- * exception is interruptible threads - ie. those that are blocked
- * on some resource.
- */
- why_blocked = TO_W_(StgTSO_why_blocked(R1));
- if (StgTSO_blocked_exceptions(R1) != NULL && !interruptible(why_blocked))
- {
- StgTSO_link(CurrentTSO) = StgTSO_blocked_exceptions(R1);
- StgTSO_blocked_exceptions(R1) = CurrentTSO;
-
- StgTSO_why_blocked(CurrentTSO) = BlockedOnException::I16;
- StgTSO_block_info(CurrentTSO) = R1;
-
- BLOCK( R1_PTR & R2_PTR, killThreadzh_fast );
- }
-
- /* Killed threads turn into zombies, which might be garbage
- * collected at a later date. That's why we don't have to
- * explicitly remove them from any queues they might be on.
- */
-
- /* We might have killed ourselves. In which case, better be *very*
- * careful. If the exception killed us, then return to the scheduler.
- * If the exception went to a catch frame, we'll just continue from
- * the handler.
- */
- if (R1 == CurrentTSO) {
+ /* args: R1 = TSO to kill, R2 = Exception */
+
+ W_ why_blocked;
+ W_ target;
+ W_ exception;
+
+ target = R1;
+ exception = R2;
+
+ STK_CHK_GEN( WDS(3), R1_PTR | R2_PTR, killThreadzh_fast);
+
+ /*
+ * We might have killed ourselves. In which case, better be *very*
+ * careful. If the exception killed us, then return to the scheduler.
+ * If the exception went to a catch frame, we'll just continue from
+ * the handler.
+ */
+ if (target == CurrentTSO) {
SAVE_THREAD_STATE();
- foreign "C" raiseAsync(MyCapability() "ptr", R1 "ptr", R2 "ptr");
+ /* ToDo: what if the current thread is blocking exceptions? */
+ foreign "C" throwToSingleThreaded(MyCapability() "ptr",
+ target "ptr", exception "ptr")[R1,R2];
if (StgTSO_what_next(CurrentTSO) == ThreadKilled::I16) {
- R1 = ThreadFinished;
- jump StgReturn;
+ R1 = ThreadFinished;
+ jump StgReturn;
} else {
- LOAD_THREAD_STATE();
- ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16);
- jump %ENTRY_CODE(Sp(0));
+ LOAD_THREAD_STATE();
+ ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16);
+ jump %ENTRY_CODE(Sp(0));
+ }
+ } else {
+ W_ out;
+ W_ retcode;
+ out = BaseReg + OFFSET_StgRegTable_rmp_tmp_w;
+
+ retcode = foreign "C" throwTo(MyCapability() "ptr",
+ CurrentTSO "ptr",
+ target "ptr",
+ exception "ptr",
+ out "ptr") [R1,R2];
+
+ switch [THROWTO_SUCCESS .. THROWTO_BLOCKED] (retcode) {
+
+ case THROWTO_SUCCESS: {
+ jump %ENTRY_CODE(Sp(0));
}
- } else {
- foreign "C" raiseAsync(MyCapability() "ptr", R1 "ptr", R2 "ptr");
- }
- jump %ENTRY_CODE(Sp(0));
+ case THROWTO_BLOCKED: {
+ R3 = W_[out];
+ // we must block, and call throwToReleaseTarget() before returning
+ jump stg_block_throwto;
+ }
+ }
+ }
}
/* -----------------------------------------------------------------------------
@@ -300,15 +290,14 @@ catchzh_fast
SET_HDR(Sp,stg_catch_frame_info,W_[CCCS]);
StgCatchFrame_handler(Sp) = R2;
- StgCatchFrame_exceptions_blocked(Sp) =
- (StgTSO_blocked_exceptions(CurrentTSO) != NULL);
+ StgCatchFrame_exceptions_blocked(Sp) = TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX;
TICK_CATCHF_PUSHED();
/* Apply R1 to the realworld token */
TICK_UNKNOWN_CALL();
TICK_SLOW_CALL_v();
jump stg_ap_v_fast;
-}
+}
/* -----------------------------------------------------------------------------
* The raise infotable
@@ -423,9 +412,8 @@ retry_pop_stack:
/* Ensure that async excpetions are blocked when running the handler.
*/
- if (StgTSO_blocked_exceptions(CurrentTSO) == NULL) {
- StgTSO_blocked_exceptions(CurrentTSO) = END_TSO_QUEUE;
- }
+ StgTSO_flags(CurrentTSO) =
+ StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
/* Call the handler, passing the exception value and a realworld
* token as arguments.
diff --git a/rts/Exception.h b/rts/Exception.h
deleted file mode 100644
index f7832f4045..0000000000
--- a/rts/Exception.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/* -----------------------------------------------------------------------------
- *
- * (c) The GHC Team, 1998-2005
- *
- * Exception support
- *
- * ---------------------------------------------------------------------------*/
-
-#ifndef EXCEPTION_H
-#define EXCEPTION_H
-
-extern const StgRetInfoTable stg_blockAsyncExceptionszh_ret_info;
-extern const StgRetInfoTable stg_unblockAsyncExceptionszh_ret_info;
-
-/* Determine whether a thread is interruptible (ie. blocked
- * indefinitely). Interruptible threads can be sent an exception with
- * killThread# even if they have async exceptions blocked.
- */
-STATIC_INLINE int
-interruptible(StgTSO *t)
-{
- switch (t->why_blocked) {
- case BlockedOnMVar:
- case BlockedOnException:
- case BlockedOnRead:
- case BlockedOnWrite:
-#if defined(mingw32_HOST_OS)
- case BlockedOnDoProc:
-#endif
- case BlockedOnDelay:
- return 1;
- // NB. Threaded blocked on foreign calls (BlockedOnCCall) are
- // *not* interruptible. We can't send these threads an exception.
- default:
- return 0;
- }
-}
-
-#endif /* EXCEPTION_H */
-
diff --git a/rts/GC.c b/rts/GC.c
index 727027dd93..10f6a36599 100644
--- a/rts/GC.c
+++ b/rts/GC.c
@@ -44,6 +44,7 @@
#endif
#include "Trace.h"
#include "RetainerProfile.h"
+#include "RaiseAsync.h"
#include <string.h>
@@ -2631,10 +2632,8 @@ scavengeTSO (StgTSO *tso)
) {
tso->block_info.closure = evacuate(tso->block_info.closure);
}
- if ( tso->blocked_exceptions != NULL ) {
- tso->blocked_exceptions =
- (StgTSO *)evacuate((StgClosure *)tso->blocked_exceptions);
- }
+ tso->blocked_exceptions =
+ (StgTSO *)evacuate((StgClosure *)tso->blocked_exceptions);
// We don't always chase the link field: TSOs on the blackhole
// queue are not automatically alive, so the link field is a
@@ -4620,6 +4619,14 @@ threadPaused(Capability *cap, StgTSO *tso)
nat weight_pending = 0;
rtsBool prev_was_update_frame;
+ // Check to see whether we have threads waiting to raise
+ // exceptions, and we're not blocking exceptions, or are blocked
+ // interruptibly. This is important; if a thread is running with
+ // TSO_BLOCKEX and becomes blocked interruptibly, this is the only
+ // place we ensure that the blocked_exceptions get a chance.
+ maybePerformBlockedException (cap, tso);
+ if (tso->what_next == ThreadKilled) { return; }
+
stack_end = &tso->stack[tso->stack_size];
frame = (StgClosure *)tso->sp;
diff --git a/rts/GCCompact.c b/rts/GCCompact.c
index 45222c3b9b..7f91501101 100644
--- a/rts/GCCompact.c
+++ b/rts/GCCompact.c
@@ -403,9 +403,7 @@ thread_TSO (StgTSO *tso)
) {
thread_(&tso->block_info.closure);
}
- if ( tso->blocked_exceptions != NULL ) {
- thread_(&tso->blocked_exceptions);
- }
+ thread_(&tso->blocked_exceptions);
thread_(&tso->trec);
diff --git a/rts/HCIncludes.h b/rts/HCIncludes.h
new file mode 100644
index 0000000000..06cc61a8a5
--- /dev/null
+++ b/rts/HCIncludes.h
@@ -0,0 +1,22 @@
+/* includes for compiling .cmm files via-C */
+#include "Rts.h"
+#include "RtsFlags.h"
+#include "RtsUtils.h"
+#include "StgRun.h"
+#include "Schedule.h"
+#include "Printer.h"
+#include "Sanity.h"
+#include "STM.h"
+#include "Storage.h"
+#include "SchedAPI.h"
+#include "Timer.h"
+#include "ProfHeap.h"
+#include "LdvProfile.h"
+#include "Profiling.h"
+#include "OSThreads.h"
+#include "Apply.h"
+#include "SMP.h"
+#include "RaiseAsync.h"
+#include "ThreadLabels.h"
+#include "Threads.h"
+#include "Prelude.h"
diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm
index 4e5dd24596..aae28cb77f 100644
--- a/rts/HeapStackCheck.cmm
+++ b/rts/HeapStackCheck.cmm
@@ -902,6 +902,31 @@ stg_block_blackhole
BLOCK_BUT_FIRST(stg_block_blackhole_finally);
}
+INFO_TABLE_RET( stg_block_throwto, 2/*framesize*/, 0/*bitmap*/, RET_SMALL )
+{
+ R2 = Sp(2);
+ R1 = Sp(1);
+ Sp_adj(3);
+ jump killThreadzh_fast;
+}
+
+stg_block_throwto_finally
+{
+#ifdef THREADED_RTS
+ foreign "C" throwToReleaseTarget (R3 "ptr");
+#endif
+ jump StgReturn;
+}
+
+stg_block_throwto
+{
+ Sp_adj(-3);
+ Sp(2) = R2;
+ Sp(1) = R1;
+ Sp(0) = stg_block_throwto_info;
+ BLOCK_BUT_FIRST(stg_block_throwto_finally);
+}
+
#ifdef mingw32_HOST_OS
INFO_TABLE_RET( stg_block_async, 0/*framesize*/, 0/*bitmap*/, RET_SMALL )
{
diff --git a/rts/Makefile b/rts/Makefile
index 67201cded5..b1111a027f 100644
--- a/rts/Makefile
+++ b/rts/Makefile
@@ -301,26 +301,7 @@ endif
# Compiling the cmm files
# ToDo: should we really include Rts.h here? Required for GNU_ATTRIBUTE().
-SRC_HC_OPTS += \
- -I. \
- -\#include Prelude.h \
- -\#include Rts.h \
- -\#include RtsFlags.h \
- -\#include RtsUtils.h \
- -\#include StgRun.h \
- -\#include Schedule.h \
- -\#include Printer.h \
- -\#include Sanity.h \
- -\#include STM.h \
- -\#include Storage.h \
- -\#include SchedAPI.h \
- -\#include Timer.h \
- -\#include ProfHeap.h \
- -\#include LdvProfile.h \
- -\#include Profiling.h \
- -\#include OSThreads.h \
- -\#include Apply.h \
- -\#include SMP.h
+SRC_HC_OPTS += -I. -\#include HCIncludes.h
ifeq "$(Windows)" "YES"
PrimOps_HC_OPTS += -\#include '<windows.h>' -\#include win32/AsyncIO.h
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
new file mode 100644
index 0000000000..9041c06cb2
--- /dev/null
+++ b/rts/RaiseAsync.c
@@ -0,0 +1,1015 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 1998-2006
+ *
+ * Asynchronous exceptions
+ *
+ * --------------------------------------------------------------------------*/
+
+#include "PosixSource.h"
+#include "Rts.h"
+#include "Threads.h"
+#include "Trace.h"
+#include "RaiseAsync.h"
+#include "SMP.h"
+#include "Schedule.h"
+#include "Storage.h"
+#include "Updates.h"
+#include "STM.h"
+#include "Sanity.h"
+
+static void raiseAsync (Capability *cap,
+ StgTSO *tso,
+ StgClosure *exception,
+ rtsBool stop_at_atomically,
+ StgPtr stop_here);
+
+static void removeFromQueues(Capability *cap, StgTSO *tso);
+
+static void blockedThrowTo (StgTSO *source, StgTSO *target);
+
+static void performBlockedException (Capability *cap,
+ StgTSO *source, StgTSO *target);
+
+/* -----------------------------------------------------------------------------
+ throwToSingleThreaded
+
+ This version of throwTo is safe to use if and only if one of the
+ following holds:
+
+ - !THREADED_RTS
+
+ - all the other threads in the system are stopped (eg. during GC).
+
+ - we surely own the target TSO (eg. we just took it from the
+ run queue of the current capability, or we are running it).
+
+ It doesn't cater for blocking the source thread until the exception
+ has been raised.
+ -------------------------------------------------------------------------- */
+
+void
+throwToSingleThreaded(Capability *cap, StgTSO *tso, StgClosure *exception)
+{
+ throwToSingleThreaded_(cap, tso, exception, rtsFalse, NULL);
+}
+
+void
+throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
+ rtsBool stop_at_atomically, StgPtr stop_here)
+{
+ // Thread already dead?
+ if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
+ return;
+ }
+
+ // Remove it from any blocking queues
+ removeFromQueues(cap,tso);
+
+ raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
+}
+
+void
+suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
+{
+ // Thread already dead?
+ if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
+ return;
+ }
+
+ // Remove it from any blocking queues
+ removeFromQueues(cap,tso);
+
+ raiseAsync(cap, tso, NULL, rtsFalse, stop_here);
+}
+
+/* -----------------------------------------------------------------------------
+ throwTo
+
+ This function may be used to throw an exception from one thread to
+ another, during the course of normal execution. This is a tricky
+ task: the target thread might be running on another CPU, or it
+ may be blocked and could be woken up at any point by another CPU.
+ We have some delicate synchronisation to do.
+
+ There is a completely safe fallback scheme: it is always possible
+ to just block the source TSO on the target TSO's blocked_exceptions
+ queue. This queue is locked using lockTSO()/unlockTSO(). It is
+ checked at regular intervals: before and after running a thread
+ (schedule() and threadPaused() respectively), and just before GC
+ (scheduleDoGC()). Activating a thread on this queue should be done
+ using maybePerformBlockedException(): this is done in the context
+ of the target thread, so the exception can be raised eagerly.
+
+ This fallback scheme works even if the target thread is complete or
+ killed: scheduleDoGC() will discover the blocked thread before the
+ target is GC'd.
+
+ Blocking the source thread on the target thread's blocked_exception
+ queue is also employed when the target thread is currently blocking
+ exceptions (ie. inside Control.Exception.block).
+
+ We could use the safe fallback scheme exclusively, but that
+ wouldn't be ideal: most calls to throwTo would block immediately,
+ possibly until the next GC, which might require the deadlock
+ detection mechanism to kick in. So we try to provide promptness
+ wherever possible.
+
+ We can promptly deliver the exception if the target thread is:
+
+ - runnable, on the same Capability as the source thread (because
+ we own the run queue and therefore the target thread).
+
+ - blocked, and we can obtain exclusive access to it. Obtaining
+ exclusive access to the thread depends on how it is blocked.
+
+ We must also be careful to not trip over threadStackOverflow(),
+ which might be moving the TSO to enlarge its stack.
+ lockTSO()/unlockTSO() are used here too.
+
+ Returns:
+
+ THROWTO_SUCCESS exception was raised, ok to continue
+
+ THROWTO_BLOCKED exception was not raised; block the source
+ thread then call throwToReleaseTarget() when
+ the source thread is properly tidied away.
+
+ -------------------------------------------------------------------------- */
+
+nat
+throwTo (Capability *cap, // the Capability we hold
+ StgTSO *source, // the TSO sending the exception
+ StgTSO *target, // the TSO receiving the exception
+ StgClosure *exception, // the exception closure
+ /*[out]*/ void **out USED_IF_THREADS)
+{
+ StgWord status;
+
+ // follow ThreadRelocated links in the target first
+ while (target->what_next == ThreadRelocated) {
+ target = target->link;
+ // No, it might be a WHITEHOLE:
+ // ASSERT(get_itbl(target)->type == TSO);
+ }
+
+ debugTrace(DEBUG_sched, "throwTo: from thread %d to thread %d",
+ source->id, target->id);
+
+#ifdef DEBUG
+ if (traceClass(DEBUG_sched)) {
+ debugTraceBegin("throwTo: target");
+ printThreadStatus(target);
+ debugTraceEnd();
+ }
+#endif
+
+ goto check_target;
+retry:
+ debugTrace(DEBUG_sched, "throwTo: retrying...");
+
+check_target:
+ // Thread already dead?
+ if (target->what_next == ThreadComplete
+ || target->what_next == ThreadKilled) {
+ return THROWTO_SUCCESS;
+ }
+
+ status = target->why_blocked;
+
+ switch (status) {
+ case NotBlocked:
+ /* if status==NotBlocked, and target->cap == cap, then
+ we own this TSO and can raise the exception.
+
+ How do we establish this condition? Very carefully.
+
+ Let
+ P = (status == NotBlocked)
+ Q = (tso->cap == cap)
+
+ Now, if P & Q are true, then the TSO is locked and owned by
+ this capability. No other OS thread can steal it.
+
+ If P==0 and Q==1: the TSO is blocked, but attached to this
+ capabilty, and it can be stolen by another capability.
+
+ If P==1 and Q==0: the TSO is runnable on another
+ capability. At any time, the TSO may change from runnable
+ to blocked and vice versa, while it remains owned by
+ another capability.
+
+ Suppose we test like this:
+
+ p = P
+ q = Q
+ if (p && q) ...
+
+ this is defeated by another capability stealing a blocked
+ TSO from us to wake it up (Schedule.c:unblockOne()). The
+ other thread is doing
+
+ Q = 0
+ P = 1
+
+ assuming arbitrary reordering, we could see this
+ interleaving:
+
+ start: P==0 && Q==1
+ P = 1
+ p = P
+ q = Q
+ Q = 0
+ if (p && q) ...
+
+ so we need a memory barrier:
+
+ p = P
+ mb()
+ q = Q
+ if (p && q) ...
+
+ this avoids the problematic case. There are other cases
+ to consider, but this is the tricky one.
+
+ Note that we must be sure that unblockOne() does the
+ writes in the correct order: Q before P. The memory
+ barrier ensures that if we have seen the write to P, we
+ have also seen the write to Q.
+ */
+ {
+ Capability *target_cap;
+
+ wb();
+ target_cap = target->cap;
+ if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) {
+ // It's on our run queue and not blocking exceptions
+ raiseAsync(cap, target, exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
+ } else {
+ // Otherwise, just block on the blocked_exceptions queue
+ // of the target thread. The queue will get looked at
+ // soon enough: it is checked before and after running a
+ // thread, and during GC.
+ lockTSO(target);
+
+ // Avoid race with threadStackOverflow, which may have
+ // just moved this TSO.
+ if (target->what_next == ThreadRelocated) {
+ unlockTSO(target);
+ target = target->link;
+ goto retry;
+ }
+ blockedThrowTo(source,target);
+ *out = target;
+ return THROWTO_BLOCKED;
+ }
+ }
+
+ case BlockedOnMVar:
+ {
+ /*
+ To establish ownership of this TSO, we need to acquire a
+ lock on the MVar that it is blocked on.
+ */
+ StgMVar *mvar;
+ StgInfoTable *info USED_IF_THREADS;
+
+ mvar = (StgMVar *)target->block_info.closure;
+
+ // ASSUMPTION: tso->block_info must always point to a
+ // closure. In the threaded RTS it does.
+ if (get_itbl(mvar)->type != MVAR) goto retry;
+
+ info = lockClosure((StgClosure *)mvar);
+
+ if (target->what_next == ThreadRelocated) {
+ target = target->link;
+ unlockClosure((StgClosure *)mvar,info);
+ goto retry;
+ }
+ // we have the MVar, let's check whether the thread
+ // is still blocked on the same MVar.
+ if (target->why_blocked != BlockedOnMVar
+ || (StgMVar *)target->block_info.closure != mvar) {
+ unlockClosure((StgClosure *)mvar, info);
+ goto retry;
+ }
+
+ if ((target->flags & TSO_BLOCKEX) &&
+ ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+ lockClosure((StgClosure *)target);
+ blockedThrowTo(source,target);
+ unlockClosure((StgClosure *)mvar, info);
+ *out = target;
+ return THROWTO_BLOCKED; // caller releases TSO
+ } else {
+ removeThreadFromMVarQueue(mvar, target);
+ raiseAsync(cap, target, exception, rtsFalse, NULL);
+ unblockOne(cap, target);
+ unlockClosure((StgClosure *)mvar, info);
+ return THROWTO_SUCCESS;
+ }
+ }
+
+ case BlockedOnBlackHole:
+ {
+ ACQUIRE_LOCK(&sched_mutex);
+ // double checking the status after the memory barrier:
+ if (target->why_blocked != BlockedOnBlackHole) {
+ RELEASE_LOCK(&sched_mutex);
+ goto retry;
+ }
+
+ if (target->flags & TSO_BLOCKEX) {
+ lockTSO(target);
+ blockedThrowTo(source,target);
+ RELEASE_LOCK(&sched_mutex);
+ *out = target;
+ return THROWTO_BLOCKED; // caller releases TSO
+ } else {
+ removeThreadFromQueue(&blackhole_queue, target);
+ raiseAsync(cap, target, exception, rtsFalse, NULL);
+ unblockOne(cap, target);
+ RELEASE_LOCK(&sched_mutex);
+ return THROWTO_SUCCESS;
+ }
+ }
+
+ case BlockedOnException:
+ {
+ StgTSO *target2;
+ StgInfoTable *info;
+
+ /*
+ To obtain exclusive access to a BlockedOnException thread,
+ we must call lockClosure() on the TSO on which it is blocked.
+ Since the TSO might change underneath our feet, after we
+ call lockClosure() we must check that
+
+ (a) the closure we locked is actually a TSO
+ (b) the original thread is still BlockedOnException,
+ (c) the original thread is still blocked on the TSO we locked
+ and (d) the target thread has not been relocated.
+
+ We synchronise with threadStackOverflow() (which relocates
+ threads) using lockClosure()/unlockClosure().
+ */
+ target2 = target->block_info.tso;
+
+ info = lockClosure((StgClosure *)target2);
+ if (info != &stg_TSO_info) {
+ unlockClosure((StgClosure *)target2, info);
+ goto retry;
+ }
+ if (target->what_next == ThreadRelocated) {
+ target = target->link;
+ unlockTSO(target2);
+ goto retry;
+ }
+ if (target2->what_next == ThreadRelocated) {
+ target->block_info.tso = target2->link;
+ unlockTSO(target2);
+ goto retry;
+ }
+ if (target->why_blocked != BlockedOnException
+ || target->block_info.tso != target2) {
+ unlockTSO(target2);
+ goto retry;
+ }
+
+ /*
+ Now we have exclusive rights to the target TSO...
+
+ If it is blocking exceptions, add the source TSO to its
+ blocked_exceptions queue. Otherwise, raise the exception.
+ */
+ if ((target->flags & TSO_BLOCKEX) &&
+ ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+ lockTSO(target);
+ blockedThrowTo(source,target);
+ unlockTSO(target2);
+ *out = target;
+ return THROWTO_BLOCKED;
+ } else {
+ removeThreadFromQueue(&target2->blocked_exceptions, target);
+ raiseAsync(cap, target, exception, rtsFalse, NULL);
+ unblockOne(cap, target);
+ unlockTSO(target2);
+ return THROWTO_SUCCESS;
+ }
+ }
+
+ case BlockedOnSTM:
+ barf("ToDo");
+
+ case BlockedOnCCall:
+ case BlockedOnCCall_NoUnblockExc:
+ // I don't think it's possible to acquire ownership of a
+ // BlockedOnCCall thread. We just assume that the target
+ // thread is blocking exceptions, and block on its
+ // blocked_exception queue.
+ lockTSO(target);
+ blockedThrowTo(source,target);
+ *out = target;
+ return THROWTO_BLOCKED;
+
+#ifndef THREADEDED_RTS
+ case BlockedOnRead:
+ case BlockedOnWrite:
+ case BlockedOnDelay:
+ if ((target->flags & TSO_BLOCKEX) &&
+ ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+ blockedThrowTo(source,target);
+ return THROWTO_BLOCKED;
+ } else {
+ removeFromQueues(cap,target);
+ raiseAsync(cap, target, exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
+ }
+#endif
+
+ default:
+ barf("throwTo: unrecognised why_blocked value");
+ }
+ barf("throwTo");
+}
+
+// Block a TSO on another TSO's blocked_exceptions queue.
+// Precondition: we hold an exclusive lock on the target TSO (this is
+// complex to achieve as there's no single lock on a TSO; see
+// throwTo()).
+static void
+blockedThrowTo (StgTSO *source, StgTSO *target)
+{
+ debugTrace(DEBUG_sched, "throwTo: blocking on thread %d", target->id);
+ source->link = target->blocked_exceptions;
+ target->blocked_exceptions = source;
+ dirtyTSO(target); // we modified the blocked_exceptions queue
+
+ source->block_info.tso = target;
+ wb(); // throwTo_exception *must* be visible if BlockedOnException is.
+ source->why_blocked = BlockedOnException;
+}
+
+
+#ifdef THREADED_RTS
+void
+throwToReleaseTarget (void *tso)
+{
+ unlockTSO((StgTSO *)tso);
+}
+#endif
+
+/* -----------------------------------------------------------------------------
+ Waking up threads blocked in throwTo
+
+ There are two ways to do this: maybePerformBlockedException() will
+ perform the throwTo() for the thread at the head of the queue
+ immediately, and leave the other threads on the queue.
+ maybePerformBlockedException() also checks the TSO_BLOCKEX flag
+ before raising an exception.
+
+ awakenBlockedExceptionQueue() will wake up all the threads in the
+ queue, but not perform any throwTo() immediately. This might be
+ more appropriate when the target thread is the one actually running
+ (see Exception.cmm).
+ -------------------------------------------------------------------------- */
+
+void
+maybePerformBlockedException (Capability *cap, StgTSO *tso)
+{
+ StgTSO *source;
+
+ if (tso->blocked_exceptions != END_TSO_QUEUE
+ && ((tso->flags & TSO_BLOCKEX) == 0
+ || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
+
+ // Lock the TSO, this gives us exclusive access to the queue
+ lockTSO(tso);
+
+ // Check the queue again; it might have changed before we
+ // locked it.
+ if (tso->blocked_exceptions == END_TSO_QUEUE) {
+ unlockTSO(tso);
+ return;
+ }
+
+ // We unblock just the first thread on the queue, and perform
+ // its throw immediately.
+ source = tso->blocked_exceptions;
+ performBlockedException(cap, source, tso);
+ tso->blocked_exceptions = unblockOne_(cap, source,
+ rtsFalse/*no migrate*/);
+ unlockTSO(tso);
+ }
+}
+
+void
+awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
+{
+ if (tso->blocked_exceptions != END_TSO_QUEUE) {
+ lockTSO(tso);
+ awakenBlockedQueue(cap, tso->blocked_exceptions);
+ tso->blocked_exceptions = END_TSO_QUEUE;
+ unlockTSO(tso);
+ }
+}
+
+static void
+performBlockedException (Capability *cap, StgTSO *source, StgTSO *target)
+{
+ StgClosure *exception;
+
+ ASSERT(source->why_blocked == BlockedOnException);
+ ASSERT(source->block_info.tso->id == target->id);
+ ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
+ ASSERT(((StgTSO *)source->sp[1])->id == target->id);
+ // check ids not pointers, because the thread might be relocated
+
+ exception = (StgClosure *)source->sp[2];
+ throwToSingleThreaded(cap, target, exception);
+ source->sp += 3;
+}
+
+/* -----------------------------------------------------------------------------
+ Remove a thread from blocking queues.
+
+ This is for use when we raise an exception in another thread, which
+ may be blocked.
+ This has nothing to do with the UnblockThread event in GranSim. -- HWL
+ -------------------------------------------------------------------------- */
+
+#if defined(GRAN) || defined(PARALLEL_HASKELL)
+/*
+ NB: only the type of the blocking queue is different in GranSim and GUM
+ the operations on the queue-elements are the same
+ long live polymorphism!
+
+ Locks: sched_mutex is held upon entry and exit.
+
+*/
+static void
+removeFromQueues(Capability *cap, StgTSO *tso)
+{
+ StgBlockingQueueElement *t, **last;
+
+ switch (tso->why_blocked) {
+
+ case NotBlocked:
+ return; /* not blocked */
+
+ case BlockedOnSTM:
+ // Be careful: nothing to do here! We tell the scheduler that the thread
+ // is runnable and we leave it to the stack-walking code to abort the
+ // transaction while unwinding the stack. We should perhaps have a debugging
+ // test to make sure that this really happens and that the 'zombie' transaction
+ // does not get committed.
+ goto done;
+
+ case BlockedOnMVar:
+ ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
+ {
+ StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
+ StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
+
+ last = (StgBlockingQueueElement **)&mvar->head;
+ for (t = (StgBlockingQueueElement *)mvar->head;
+ t != END_BQ_QUEUE;
+ last = &t->link, last_tso = t, t = t->link) {
+ if (t == (StgBlockingQueueElement *)tso) {
+ *last = (StgBlockingQueueElement *)tso->link;
+ if (mvar->tail == tso) {
+ mvar->tail = (StgTSO *)last_tso;
+ }
+ goto done;
+ }
+ }
+ barf("removeFromQueues (MVAR): TSO not found");
+ }
+
+ case BlockedOnBlackHole:
+ ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
+
+ last = &bq->blocking_queue;
+ for (t = bq->blocking_queue;
+ t != END_BQ_QUEUE;
+ last = &t->link, t = t->link) {
+ if (t == (StgBlockingQueueElement *)tso) {
+ *last = (StgBlockingQueueElement *)tso->link;
+ goto done;
+ }
+ }
+ barf("removeFromQueues (BLACKHOLE): TSO not found");
+ }
+
+ case BlockedOnException:
+ {
+ StgTSO *target = tso->block_info.tso;
+
+ ASSERT(get_itbl(target)->type == TSO);
+
+ while (target->what_next == ThreadRelocated) {
+ target = target2->link;
+ ASSERT(get_itbl(target)->type == TSO);
+ }
+
+ last = (StgBlockingQueueElement **)&target->blocked_exceptions;
+ for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
+ t != END_BQ_QUEUE;
+ last = &t->link, t = t->link) {
+ ASSERT(get_itbl(t)->type == TSO);
+ if (t == (StgBlockingQueueElement *)tso) {
+ *last = (StgBlockingQueueElement *)tso->link;
+ goto done;
+ }
+ }
+ barf("removeFromQueues (Exception): TSO not found");
+ }
+
+ case BlockedOnRead:
+ case BlockedOnWrite:
+#if defined(mingw32_HOST_OS)
+ case BlockedOnDoProc:
+#endif
+ {
+ /* take TSO off blocked_queue */
+ StgBlockingQueueElement *prev = NULL;
+ for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
+ prev = t, t = t->link) {
+ if (t == (StgBlockingQueueElement *)tso) {
+ if (prev == NULL) {
+ blocked_queue_hd = (StgTSO *)t->link;
+ if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
+ blocked_queue_tl = END_TSO_QUEUE;
+ }
+ } else {
+ prev->link = t->link;
+ if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
+ blocked_queue_tl = (StgTSO *)prev;
+ }
+ }
+#if defined(mingw32_HOST_OS)
+ /* (Cooperatively) signal that the worker thread should abort
+ * the request.
+ */
+ abandonWorkRequest(tso->block_info.async_result->reqID);
+#endif
+ goto done;
+ }
+ }
+ barf("removeFromQueues (I/O): TSO not found");
+ }
+
+ case BlockedOnDelay:
+ {
+ /* take TSO off sleeping_queue */
+ StgBlockingQueueElement *prev = NULL;
+ for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
+ prev = t, t = t->link) {
+ if (t == (StgBlockingQueueElement *)tso) {
+ if (prev == NULL) {
+ sleeping_queue = (StgTSO *)t->link;
+ } else {
+ prev->link = t->link;
+ }
+ goto done;
+ }
+ }
+ barf("removeFromQueues (delay): TSO not found");
+ }
+
+ default:
+ barf("removeFromQueues");
+ }
+
+ done:
+ tso->link = END_TSO_QUEUE;
+ tso->why_blocked = NotBlocked;
+ tso->block_info.closure = NULL;
+ pushOnRunQueue(cap,tso);
+}
+#else
+static void
+removeFromQueues(Capability *cap, StgTSO *tso)
+{
+ switch (tso->why_blocked) {
+
+ case NotBlocked:
+ return;
+
+ case BlockedOnSTM:
+ // Be careful: nothing to do here! We tell the scheduler that the
+ // thread is runnable and we leave it to the stack-walking code to
+ // abort the transaction while unwinding the stack. We should
+ // perhaps have a debugging test to make sure that this really
+ // happens and that the 'zombie' transaction does not get
+ // committed.
+ goto done;
+
+ case BlockedOnMVar:
+ removeThreadFromMVarQueue((StgMVar *)tso->block_info.closure, tso);
+ goto done;
+
+ case BlockedOnBlackHole:
+ removeThreadFromQueue(&blackhole_queue, tso);
+ goto done;
+
+ case BlockedOnException:
+ {
+ StgTSO *target = tso->block_info.tso;
+
+ // NO: when called by threadPaused(), we probably have this
+ // TSO already locked (WHITEHOLEd) because we just placed
+ // ourselves on its queue.
+ // ASSERT(get_itbl(target)->type == TSO);
+
+ while (target->what_next == ThreadRelocated) {
+ target = target->link;
+ }
+
+ removeThreadFromQueue(&target->blocked_exceptions, tso);
+ goto done;
+ }
+
+#if !defined(THREADED_RTS)
+ case BlockedOnRead:
+ case BlockedOnWrite:
+#if defined(mingw32_HOST_OS)
+ case BlockedOnDoProc:
+#endif
+ removeThreadFromDeQueue(&blocked_queue_hd, &blocked_queue_tl, tso);
+#if defined(mingw32_HOST_OS)
+ /* (Cooperatively) signal that the worker thread should abort
+ * the request.
+ */
+ abandonWorkRequest(tso->block_info.async_result->reqID);
+#endif
+ goto done;
+
+ case BlockedOnDelay:
+ removeThreadFromQueue(&sleeping_queue, tso);
+ goto done;
+#endif
+
+ default:
+ barf("removeFromQueues");
+ }
+
+ done:
+ tso->link = END_TSO_QUEUE;
+ tso->why_blocked = NotBlocked;
+ tso->block_info.closure = NULL;
+ appendToRunQueue(cap,tso);
+
+ // We might have just migrated this TSO to our Capability:
+ if (tso->bound) {
+ tso->bound->cap = cap;
+ }
+ tso->cap = cap;
+}
+#endif
+
+/* -----------------------------------------------------------------------------
+ * raiseAsync()
+ *
+ * The following function implements the magic for raising an
+ * asynchronous exception in an existing thread.
+ *
+ * We first remove the thread from any queue on which it might be
+ * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
+ *
+ * We strip the stack down to the innermost CATCH_FRAME, building
+ * thunks in the heap for all the active computations, so they can
+ * be restarted if necessary. When we reach a CATCH_FRAME, we build
+ * an application of the handler to the exception, and push it on
+ * the top of the stack.
+ *
+ * How exactly do we save all the active computations? We create an
+ * AP_STACK for every UpdateFrame on the stack. Entering one of these
+ * AP_STACKs pushes everything from the corresponding update frame
+ * upwards onto the stack. (Actually, it pushes everything up to the
+ * next update frame plus a pointer to the next AP_STACK object.
+ * Entering the next AP_STACK object pushes more onto the stack until we
+ * reach the last AP_STACK object - at which point the stack should look
+ * exactly as it did when we killed the TSO and we can continue
+ * execution by entering the closure on top of the stack.
+ *
+ * We can also kill a thread entirely - this happens if either (a) the
+ * exception passed to raiseAsync is NULL, or (b) there's no
+ * CATCH_FRAME on the stack. In either case, we strip the entire
+ * stack and replace the thread with a zombie.
+ *
+ * ToDo: in THREADED_RTS mode, this function is only safe if either
+ * (a) we hold all the Capabilities (eg. in GC, or if there is only
+ * one Capability), or (b) we own the Capability that the TSO is
+ * currently blocked on or on the run queue of.
+ *
+ * -------------------------------------------------------------------------- */
+
+static void
+raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
+ rtsBool stop_at_atomically, StgPtr stop_here)
+{
+ StgRetInfoTable *info;
+ StgPtr sp, frame;
+ nat i;
+
+ debugTrace(DEBUG_sched,
+ "raising exception in thread %ld.", (long)tso->id);
+
+ // mark it dirty; we're about to change its stack.
+ dirtyTSO(tso);
+
+ sp = tso->sp;
+
+ // ASSUMES: the thread is not already complete or dead. Upper
+ // layers should deal with that.
+ ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled);
+
+ // The stack freezing code assumes there's a closure pointer on
+ // the top of the stack, so we have to arrange that this is the case...
+ //
+ if (sp[0] == (W_)&stg_enter_info) {
+ sp++;
+ } else {
+ sp--;
+ sp[0] = (W_)&stg_dummy_ret_closure;
+ }
+
+ frame = sp + 1;
+ while (stop_here == NULL || frame < stop_here) {
+
+ // 1. Let the top of the stack be the "current closure"
+ //
+ // 2. Walk up the stack until we find either an UPDATE_FRAME or a
+ // CATCH_FRAME.
+ //
+ // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
+ // current closure applied to the chunk of stack up to (but not
+ // including) the update frame. This closure becomes the "current
+ // closure". Go back to step 2.
+ //
+ // 4. If it's a CATCH_FRAME, then leave the exception handler on
+ // top of the stack applied to the exception.
+ //
+ // 5. If it's a STOP_FRAME, then kill the thread.
+ //
+ // NB: if we pass an ATOMICALLY_FRAME then abort the associated
+ // transaction
+
+ info = get_ret_itbl((StgClosure *)frame);
+
+ switch (info->i.type) {
+
+ case UPDATE_FRAME:
+ {
+ StgAP_STACK * ap;
+ nat words;
+
+ // First build an AP_STACK consisting of the stack chunk above the
+ // current update frame, with the top word on the stack as the
+ // fun field.
+ //
+ words = frame - sp - 1;
+ ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
+
+ ap->size = words;
+ ap->fun = (StgClosure *)sp[0];
+ sp++;
+ for(i=0; i < (nat)words; ++i) {
+ ap->payload[i] = (StgClosure *)*sp++;
+ }
+
+ SET_HDR(ap,&stg_AP_STACK_info,
+ ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
+ TICK_ALLOC_UP_THK(words+1,0);
+
+ //IF_DEBUG(scheduler,
+ // debugBelch("sched: Updating ");
+ // printPtr((P_)((StgUpdateFrame *)frame)->updatee);
+ // debugBelch(" with ");
+ // printObj((StgClosure *)ap);
+ // );
+
+ // Replace the updatee with an indirection
+ //
+ // Warning: if we're in a loop, more than one update frame on
+ // the stack may point to the same object. Be careful not to
+ // overwrite an IND_OLDGEN in this case, because we'll screw
+ // up the mutable lists. To be on the safe side, don't
+ // overwrite any kind of indirection at all. See also
+ // threadSqueezeStack in GC.c, where we have to make a similar
+ // check.
+ //
+ if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
+ // revert the black hole
+ UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
+ (StgClosure *)ap);
+ }
+ sp += sizeofW(StgUpdateFrame) - 1;
+ sp[0] = (W_)ap; // push onto stack
+ frame = sp + 1;
+ continue; //no need to bump frame
+ }
+
+ case STOP_FRAME:
+ // We've stripped the entire stack, the thread is now dead.
+ tso->what_next = ThreadKilled;
+ tso->sp = frame + sizeofW(StgStopFrame);
+ return;
+
+ case CATCH_FRAME:
+ // If we find a CATCH_FRAME, and we've got an exception to raise,
+ // then build the THUNK raise(exception), and leave it on
+ // top of the CATCH_FRAME ready to enter.
+ //
+ {
+#ifdef PROFILING
+ StgCatchFrame *cf = (StgCatchFrame *)frame;
+#endif
+ StgThunk *raise;
+
+ if (exception == NULL) break;
+
+ // we've got an exception to raise, so let's pass it to the
+ // handler in this frame.
+ //
+ raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
+ TICK_ALLOC_SE_THK(1,0);
+ SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
+ raise->payload[0] = exception;
+
+ // throw away the stack from Sp up to the CATCH_FRAME.
+ //
+ sp = frame - 1;
+
+ /* Ensure that async excpetions are blocked now, so we don't get
+ * a surprise exception before we get around to executing the
+ * handler.
+ */
+ tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE;
+
+ /* Put the newly-built THUNK on top of the stack, ready to execute
+ * when the thread restarts.
+ */
+ sp[0] = (W_)raise;
+ sp[-1] = (W_)&stg_enter_info;
+ tso->sp = sp-1;
+ tso->what_next = ThreadRunGHC;
+ IF_DEBUG(sanity, checkTSO(tso));
+ return;
+ }
+
+ case ATOMICALLY_FRAME:
+ if (stop_at_atomically) {
+ ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+ stmCondemnTransaction(cap, tso -> trec);
+#ifdef REG_R1
+ tso->sp = frame;
+#else
+ // R1 is not a register: the return convention for IO in
+ // this case puts the return value on the stack, so we
+ // need to set up the stack to return to the atomically
+ // frame properly...
+ tso->sp = frame - 2;
+ tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
+ tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
+#endif
+ tso->what_next = ThreadRunGHC;
+ return;
+ }
+ // Not stop_at_atomically... fall through and abort the
+ // transaction.
+
+ case CATCH_RETRY_FRAME:
+ // IF we find an ATOMICALLY_FRAME then we abort the
+ // current transaction and propagate the exception. In
+ // this case (unlike ordinary exceptions) we do not care
+ // whether the transaction is valid or not because its
+ // possible validity cannot have caused the exception
+ // and will not be visible after the abort.
+ debugTrace(DEBUG_stm,
+ "found atomically block delivering async exception");
+
+ StgTRecHeader *trec = tso -> trec;
+ StgTRecHeader *outer = stmGetEnclosingTRec(trec);
+ stmAbortTransaction(cap, trec);
+ tso -> trec = outer;
+ break;
+
+ default:
+ break;
+ }
+
+ // move on to the next stack frame
+ frame += stack_frame_sizeW((StgClosure *)frame);
+ }
+
+ // if we got here, then we stopped at stop_here
+ ASSERT(stop_here != NULL);
+}
+
+
diff --git a/rts/RaiseAsync.h b/rts/RaiseAsync.h
new file mode 100644
index 0000000000..8e59d51d9f
--- /dev/null
+++ b/rts/RaiseAsync.h
@@ -0,0 +1,71 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 1998-2006
+ *
+ * Asynchronous exceptions
+ *
+ * --------------------------------------------------------------------------*/
+
+#ifndef RAISEASYNC_H
+#define RAISEASYNC_H
+
+#define THROWTO_SUCCESS 0
+#define THROWTO_BLOCKED 1
+
+#ifndef CMINUSMINUS
+void throwToSingleThreaded (Capability *cap,
+ StgTSO *tso,
+ StgClosure *exception);
+
+void throwToSingleThreaded_ (Capability *cap,
+ StgTSO *tso,
+ StgClosure *exception,
+ rtsBool stop_at_atomically,
+ StgPtr stop_here);
+
+void suspendComputation (Capability *cap,
+ StgTSO *tso,
+ StgPtr stop_here);
+
+nat throwTo (Capability *cap, // the Capability we hold
+ StgTSO *source, // the TSO sending the exception
+ StgTSO *target, // the TSO receiving the exception
+ StgClosure *exception, // the exception closure
+ /*[out]*/ void **out // pass to throwToReleaseTarget()
+ );
+
+#ifdef THREADED_RTS
+void throwToReleaseTarget (void *tso);
+#endif
+
+void maybePerformBlockedException (Capability *cap, StgTSO *tso);
+void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso);
+
+/* Determine whether a thread is interruptible (ie. blocked
+ * indefinitely). Interruptible threads can be sent an exception with
+ * killThread# even if they have async exceptions blocked.
+ */
+STATIC_INLINE int
+interruptible(StgTSO *t)
+{
+ switch (t->why_blocked) {
+ case BlockedOnMVar:
+ case BlockedOnException:
+ case BlockedOnRead:
+ case BlockedOnWrite:
+#if defined(mingw32_HOST_OS)
+ case BlockedOnDoProc:
+#endif
+ case BlockedOnDelay:
+ return 1;
+ // NB. Threaded blocked on foreign calls (BlockedOnCCall) are
+ // *not* interruptible. We can't send these threads an exception.
+ default:
+ return 0;
+ }
+}
+
+#endif /* CMINUSMINUS */
+
+#endif /* RAISEASYNC_H */
+
diff --git a/rts/Schedule.c b/rts/Schedule.c
index d9adda415c..11b9f87d59 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -1,6 +1,6 @@
/* ---------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-2005
+ * (c) The GHC Team, 1998-2006
*
* The scheduler and thread-related functionality
*
@@ -19,7 +19,6 @@
#include "Schedule.h"
#include "StgMiscClosures.h"
#include "Interpreter.h"
-#include "Exception.h"
#include "Printer.h"
#include "RtsSignals.h"
#include "Sanity.h"
@@ -51,6 +50,8 @@
#include "win32/IOManager.h"
#endif
#include "Trace.h"
+#include "RaiseAsync.h"
+#include "Threads.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
@@ -140,23 +141,6 @@ nat recent_activity = ACTIVITY_YES;
*/
rtsBool sched_state = SCHED_RUNNING;
-/* Next thread ID to allocate.
- * LOCK: sched_mutex
- */
-static StgThreadID next_thread_id = 1;
-
-/* The smallest stack size that makes any sense is:
- * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
- * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
- * + 1 (the closure to enter)
- * + 1 (stg_ap_v_ret)
- * + 1 (spare slot req'd by stg_ap_v_ret)
- *
- * A thread with this stack will bomb immediately with a stack
- * overflow, which will increase its stack size.
- */
-#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
-
#if defined(GRAN)
StgTSO *CurrentTSO;
#endif
@@ -188,6 +172,10 @@ rtsTime TimeOfLastYield;
rtsBool emitSchedule = rtsTrue;
#endif
+#if !defined(mingw32_HOST_OS)
+#define FORKPROCESS_PRIMOP_SUPPORTED
+#endif
+
/* -----------------------------------------------------------------------------
* static function prototypes
* -------------------------------------------------------------------------- */
@@ -233,22 +221,16 @@ static Capability *scheduleDoGC(Capability *cap, Task *task,
rtsBool force_major,
void (*get_roots)(evac_fn));
-static void unblockThread(Capability *cap, StgTSO *tso);
static rtsBool checkBlackHoles(Capability *cap);
static void AllRoots(evac_fn evac);
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
-static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
- rtsBool stop_at_atomically, StgPtr stop_here);
-
static void deleteThread (Capability *cap, StgTSO *tso);
static void deleteAllThreads (Capability *cap);
-#ifdef DEBUG
-static void printThreadBlockage(StgTSO *tso);
-static void printThreadStatus(StgTSO *tso);
-void printThreadQueue(StgTSO *tso);
+#ifdef FORKPROCESS_PRIMOP_SUPPORTED
+static void deleteThread_(Capability *cap, StgTSO *tso);
#endif
#if defined(PARALLEL_HASKELL)
@@ -596,6 +578,9 @@ run_thread:
startHeapProfTimer();
#endif
+ // Check for exceptions blocked on this thread
+ maybePerformBlockedException (cap, t);
+
// ----------------------------------------------------------------------
// Run the current thread
@@ -1019,7 +1004,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
case BlockedOnBlackHole:
case BlockedOnException:
case BlockedOnMVar:
- raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
+ throwToSingleThreaded(cap, task->tso,
+ (StgClosure *)NonTermination_closure);
return;
default:
barf("deadlock: main thread blocked in a strange way");
@@ -1644,7 +1630,7 @@ static void
scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
{
debugTrace (DEBUG_sched,
- "--<< thread %ld (%s) stopped, StackOverflow\n",
+ "--<< thread %ld (%s) stopped, StackOverflow",
(long)t->id, whatNext_strs[t->what_next]);
/* just adjust the stack for this thread, then pop it back
@@ -1687,11 +1673,11 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
#ifdef DEBUG
if (t->what_next != prev_what_next) {
debugTrace(DEBUG_sched,
- "--<< thread %ld (%s) stopped to switch evaluators\n",
+ "--<< thread %ld (%s) stopped to switch evaluators",
(long)t->id, whatNext_strs[t->what_next]);
} else {
debugTrace(DEBUG_sched,
- "--<< thread %ld (%s) stopped, yielding\n",
+ "--<< thread %ld (%s) stopped, yielding",
(long)t->id, whatNext_strs[t->what_next]);
}
#endif
@@ -2024,6 +2010,18 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
next = t->link;
} else {
next = t->global_link;
+
+ // This is a good place to check for blocked
+ // exceptions. It might be the case that a thread is
+ // blocked on delivering an exception to a thread that
+ // is also blocked - we try to ensure that this
+ // doesn't happen in throwTo(), but it's too hard (or
+ // impossible) to close all the race holes, so we
+ // accept that some might get through and deal with
+ // them here. A GC will always happen at some point,
+ // even if the system is otherwise deadlocked.
+ maybePerformBlockedException (&capabilities[0], t);
+
if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
if (!stmValidateNestOfTransactions (t -> trec)) {
debugTrace(DEBUG_sched | DEBUG_stm,
@@ -2033,7 +2031,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
// ATOMICALLY_FRAME, aborting the (nested)
// transaction, and saving the stack of any
// partially-evaluated thunks on the heap.
- raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
+ throwToSingleThreaded_(&capabilities[0], t,
+ NULL, rtsTrue, NULL);
#ifdef REG_R1
ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
@@ -2099,45 +2098,9 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
}
/* ---------------------------------------------------------------------------
- * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
- * used by Control.Concurrent for error checking.
- * ------------------------------------------------------------------------- */
-
-StgBool
-rtsSupportsBoundThreads(void)
-{
-#if defined(THREADED_RTS)
- return rtsTrue;
-#else
- return rtsFalse;
-#endif
-}
-
-/* ---------------------------------------------------------------------------
- * isThreadBound(tso): check whether tso is bound to an OS thread.
- * ------------------------------------------------------------------------- */
-
-StgBool
-isThreadBound(StgTSO* tso USED_IF_THREADS)
-{
-#if defined(THREADED_RTS)
- return (tso->bound != NULL);
-#endif
- return rtsFalse;
-}
-
-/* ---------------------------------------------------------------------------
* Singleton fork(). Do not copy any running threads.
* ------------------------------------------------------------------------- */
-#if !defined(mingw32_HOST_OS)
-#define FORKPROCESS_PRIMOP_SUPPORTED
-#endif
-
-#ifdef FORKPROCESS_PRIMOP_SUPPORTED
-static void
-deleteThread_(Capability *cap, StgTSO *tso);
-#endif
StgInt
forkProcess(HsStablePtr *entry
#ifndef FORKPROCESS_PRIMOP_SUPPORTED
@@ -2243,26 +2206,28 @@ forkProcess(HsStablePtr *entry
static void
deleteAllThreads ( Capability *cap )
{
- StgTSO* t, *next;
- debugTrace(DEBUG_sched,"deleting all threads");
- for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- if (t->what_next == ThreadRelocated) {
- next = t->link;
- } else {
- next = t->global_link;
- deleteThread(cap,t);
- }
- }
+ // NOTE: only safe to call if we own all capabilities.
+
+ StgTSO* t, *next;
+ debugTrace(DEBUG_sched,"deleting all threads");
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ deleteThread(cap,t);
+ }
+ }
- // The run queue now contains a bunch of ThreadKilled threads. We
- // must not throw these away: the main thread(s) will be in there
- // somewhere, and the main scheduler loop has to deal with it.
- // Also, the run queue is the only thing keeping these threads from
- // being GC'd, and we don't want the "main thread has been GC'd" panic.
+ // The run queue now contains a bunch of ThreadKilled threads. We
+ // must not throw these away: the main thread(s) will be in there
+ // somewhere, and the main scheduler loop has to deal with it.
+ // Also, the run queue is the only thing keeping these threads from
+ // being GC'd, and we don't want the "main thread has been GC'd" panic.
#if !defined(THREADED_RTS)
- ASSERT(blocked_queue_hd == END_TSO_QUEUE);
- ASSERT(sleeping_queue == END_TSO_QUEUE);
+ ASSERT(blocked_queue_hd == END_TSO_QUEUE);
+ ASSERT(sleeping_queue == END_TSO_QUEUE);
#endif
}
@@ -2337,9 +2302,10 @@ suspendThread (StgRegTable *reg)
threadPaused(cap,tso);
- if(tso->blocked_exceptions == NULL) {
+ if ((tso->flags & TSO_BLOCKEX) == 0) {
tso->why_blocked = BlockedOnCCall;
- tso->blocked_exceptions = END_TSO_QUEUE;
+ tso->flags |= TSO_BLOCKEX;
+ tso->flags &= ~TSO_INTERRUPTIBLE;
} else {
tso->why_blocked = BlockedOnCCall_NoUnblockExc;
}
@@ -2390,8 +2356,8 @@ resumeThread (void *task_)
debugTrace(DEBUG_sched, "thread %d: re-entering RTS", tso->id);
if (tso->why_blocked == BlockedOnCCall) {
- awakenBlockedQueue(cap,tso->blocked_exceptions);
- tso->blocked_exceptions = NULL;
+ awakenBlockedExceptionQueue(cap,tso);
+ tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
}
/* Reset blocking status */
@@ -2410,300 +2376,6 @@ resumeThread (void *task_)
}
/* ---------------------------------------------------------------------------
- * Comparing Thread ids.
- *
- * This is used from STG land in the implementation of the
- * instances of Eq/Ord for ThreadIds.
- * ------------------------------------------------------------------------ */
-
-int
-cmp_thread(StgPtr tso1, StgPtr tso2)
-{
- StgThreadID id1 = ((StgTSO *)tso1)->id;
- StgThreadID id2 = ((StgTSO *)tso2)->id;
-
- if (id1 < id2) return (-1);
- if (id1 > id2) return 1;
- return 0;
-}
-
-/* ---------------------------------------------------------------------------
- * Fetching the ThreadID from an StgTSO.
- *
- * This is used in the implementation of Show for ThreadIds.
- * ------------------------------------------------------------------------ */
-int
-rts_getThreadId(StgPtr tso)
-{
- return ((StgTSO *)tso)->id;
-}
-
-#ifdef DEBUG
-void
-labelThread(StgPtr tso, char *label)
-{
- int len;
- void *buf;
-
- /* Caveat: Once set, you can only set the thread name to "" */
- len = strlen(label)+1;
- buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
- strncpy(buf,label,len);
- /* Update will free the old memory for us */
- updateThreadLabel(((StgTSO *)tso)->id,buf);
-}
-#endif /* DEBUG */
-
-/* ---------------------------------------------------------------------------
- Create a new thread.
-
- The new thread starts with the given stack size. Before the
- scheduler can run, however, this thread needs to have a closure
- (and possibly some arguments) pushed on its stack. See
- pushClosure() in Schedule.h.
-
- createGenThread() and createIOThread() (in SchedAPI.h) are
- convenient packaged versions of this function.
-
- currently pri (priority) is only used in a GRAN setup -- HWL
- ------------------------------------------------------------------------ */
-#if defined(GRAN)
-/* currently pri (priority) is only used in a GRAN setup -- HWL */
-StgTSO *
-createThread(nat size, StgInt pri)
-#else
-StgTSO *
-createThread(Capability *cap, nat size)
-#endif
-{
- StgTSO *tso;
- nat stack_size;
-
- /* sched_mutex is *not* required */
-
- /* First check whether we should create a thread at all */
-#if defined(PARALLEL_HASKELL)
- /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
- if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
- threadsIgnored++;
- debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
- RtsFlags.ParFlags.maxThreads, advisory_thread_count);
- return END_TSO_QUEUE;
- }
- threadsCreated++;
-#endif
-
-#if defined(GRAN)
- ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
-#endif
-
- // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
-
- /* catch ridiculously small stack sizes */
- if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
- size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
- }
-
- stack_size = size - TSO_STRUCT_SIZEW;
-
- tso = (StgTSO *)allocateLocal(cap, size);
- TICK_ALLOC_TSO(stack_size, 0);
-
- SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
-#if defined(GRAN)
- SET_GRAN_HDR(tso, ThisPE);
-#endif
-
- // Always start with the compiled code evaluator
- tso->what_next = ThreadRunGHC;
-
- tso->why_blocked = NotBlocked;
- tso->blocked_exceptions = NULL;
- tso->flags = TSO_DIRTY;
-
- tso->saved_errno = 0;
- tso->bound = NULL;
- tso->cap = cap;
-
- tso->stack_size = stack_size;
- tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
- - TSO_STRUCT_SIZEW;
- tso->sp = (P_)&(tso->stack) + stack_size;
-
- tso->trec = NO_TREC;
-
-#ifdef PROFILING
- tso->prof.CCCS = CCS_MAIN;
-#endif
-
- /* put a stop frame on the stack */
- tso->sp -= sizeofW(StgStopFrame);
- SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
- tso->link = END_TSO_QUEUE;
-
- // ToDo: check this
-#if defined(GRAN)
- /* uses more flexible routine in GranSim */
- insertThread(tso, CurrentProc);
-#else
- /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
- * from its creation
- */
-#endif
-
-#if defined(GRAN)
- if (RtsFlags.GranFlags.GranSimStats.Full)
- DumpGranEvent(GR_START,tso);
-#elif defined(PARALLEL_HASKELL)
- if (RtsFlags.ParFlags.ParStats.Full)
- DumpGranEvent(GR_STARTQ,tso);
- /* HACk to avoid SCHEDULE
- LastTSO = tso; */
-#endif
-
- /* Link the new thread on the global thread list.
- */
- ACQUIRE_LOCK(&sched_mutex);
- tso->id = next_thread_id++; // while we have the mutex
- tso->global_link = all_threads;
- all_threads = tso;
- RELEASE_LOCK(&sched_mutex);
-
-#if defined(DIST)
- tso->dist.priority = MandatoryPriority; //by default that is...
-#endif
-
-#if defined(GRAN)
- tso->gran.pri = pri;
-# if defined(DEBUG)
- tso->gran.magic = TSO_MAGIC; // debugging only
-# endif
- tso->gran.sparkname = 0;
- tso->gran.startedat = CURRENT_TIME;
- tso->gran.exported = 0;
- tso->gran.basicblocks = 0;
- tso->gran.allocs = 0;
- tso->gran.exectime = 0;
- tso->gran.fetchtime = 0;
- tso->gran.fetchcount = 0;
- tso->gran.blocktime = 0;
- tso->gran.blockcount = 0;
- tso->gran.blockedat = 0;
- tso->gran.globalsparks = 0;
- tso->gran.localsparks = 0;
- if (RtsFlags.GranFlags.Light)
- tso->gran.clock = Now; /* local clock */
- else
- tso->gran.clock = 0;
-
- IF_DEBUG(gran,printTSO(tso));
-#elif defined(PARALLEL_HASKELL)
-# if defined(DEBUG)
- tso->par.magic = TSO_MAGIC; // debugging only
-# endif
- tso->par.sparkname = 0;
- tso->par.startedat = CURRENT_TIME;
- tso->par.exported = 0;
- tso->par.basicblocks = 0;
- tso->par.allocs = 0;
- tso->par.exectime = 0;
- tso->par.fetchtime = 0;
- tso->par.fetchcount = 0;
- tso->par.blocktime = 0;
- tso->par.blockcount = 0;
- tso->par.blockedat = 0;
- tso->par.globalsparks = 0;
- tso->par.localsparks = 0;
-#endif
-
-#if defined(GRAN)
- globalGranStats.tot_threads_created++;
- globalGranStats.threads_created_on_PE[CurrentProc]++;
- globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
- globalGranStats.tot_sq_probes++;
-#elif defined(PARALLEL_HASKELL)
- // collect parallel global statistics (currently done together with GC stats)
- if (RtsFlags.ParFlags.ParStats.Global &&
- RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
- //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
- globalParStats.tot_threads_created++;
- }
-#endif
-
-#if defined(GRAN)
- debugTrace(GRAN_DEBUG_pri,
- "==__ schedule: Created TSO %d (%p);",
- CurrentProc, tso, tso->id);
-#elif defined(PARALLEL_HASKELL)
- debugTrace(PAR_DEBUG_verbose,
- "==__ schedule: Created TSO %d (%p); %d threads active",
- (long)tso->id, tso, advisory_thread_count);
-#else
- debugTrace(DEBUG_sched,
- "created thread %ld, stack size = %lx words",
- (long)tso->id, (long)tso->stack_size);
-#endif
- return tso;
-}
-
-#if defined(PAR)
-/* RFP:
- all parallel thread creation calls should fall through the following routine.
-*/
-StgTSO *
-createThreadFromSpark(rtsSpark spark)
-{ StgTSO *tso;
- ASSERT(spark != (rtsSpark)NULL);
-// JB: TAKE CARE OF THIS COUNTER! BUGGY
- if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
- { threadsIgnored++;
- barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
- RtsFlags.ParFlags.maxThreads, advisory_thread_count);
- return END_TSO_QUEUE;
- }
- else
- { threadsCreated++;
- tso = createThread(RtsFlags.GcFlags.initialStkSize);
- if (tso==END_TSO_QUEUE)
- barf("createSparkThread: Cannot create TSO");
-#if defined(DIST)
- tso->priority = AdvisoryPriority;
-#endif
- pushClosure(tso,spark);
- addToRunQueue(tso);
- advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
- }
- return tso;
-}
-#endif
-
-/*
- Turn a spark into a thread.
- ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
-*/
-#if 0
-StgTSO *
-activateSpark (rtsSpark spark)
-{
- StgTSO *tso;
-
- tso = createSparkThread(spark);
- if (RtsFlags.ParFlags.ParStats.Full) {
- //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
- IF_PAR_DEBUG(verbose,
- debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
- (StgClosure *)spark, info_type((StgClosure *)spark)));
- }
- // ToDo: fwd info on local/global spark to thread -- HWL
- // tso->gran.exported = spark->exported;
- // tso->gran.locked = !spark->global;
- // tso->gran.sparkname = spark->name;
-
- return tso;
-}
-#endif
-
-/* ---------------------------------------------------------------------------
* scheduleThread()
*
* scheduleThread puts a thread on the end of the runnable queue.
@@ -2731,12 +2403,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
- Capability *target_cap = &capabilities[cpu];
- if (tso->bound) {
- tso->bound->cap = target_cap;
- }
- tso->cap = target_cap;
- wakeupThreadOnCapability(target_cap,tso);
+ migrateThreadToCapability_lock(&capabilities[cpu],tso);
}
#else
appendToRunQueue(cap,tso);
@@ -3072,19 +2739,25 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
StgTSO *dest;
IF_DEBUG(sanity,checkTSO(tso));
+
+ // don't allow throwTo() to modify the blocked_exceptions queue
+ // while we are moving the TSO:
+ lockClosure((StgClosure *)tso);
+
if (tso->stack_size >= tso->max_stack_size) {
debugTrace(DEBUG_gc,
- "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
+ "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
(long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
IF_DEBUG(gc,
/* If we're debugging, just print out the top of the stack */
printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
tso->sp+64)));
- /* Send this thread the StackOverflow exception */
- raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
- return tso;
+ // Send this thread the StackOverflow exception
+ unlockTSO(tso);
+ throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
+ return tso;
}
/* Try to double the current stack size. If that takes us over the
@@ -3098,7 +2771,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
debugTrace(DEBUG_sched,
- "increasing stack size from %ld words to %d.\n",
+ "increasing stack size from %ld words to %d.",
(long)tso->stack_size, new_stack_size);
dest = (StgTSO *)allocate(new_tso_size);
@@ -3133,7 +2806,10 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
tso->sp+64)));
- IF_DEBUG(sanity,checkTSO(tso));
+ unlockTSO(dest);
+ unlockTSO(tso);
+
+ IF_DEBUG(sanity,checkTSO(dest));
#if 0
IF_DEBUG(scheduler,printTSO(dest));
#endif
@@ -3142,301 +2818,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
}
/* ---------------------------------------------------------------------------
- Wake up a queue that was blocked on some resource.
- ------------------------------------------------------------------------ */
-
-#if defined(GRAN)
-STATIC_INLINE void
-unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
-{
-}
-#elif defined(PARALLEL_HASKELL)
-STATIC_INLINE void
-unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
-{
- /* write RESUME events to log file and
- update blocked and fetch time (depending on type of the orig closure) */
- if (RtsFlags.ParFlags.ParStats.Full) {
- DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
- GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
- 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
- if (emptyRunQueue())
- emitSchedule = rtsTrue;
-
- switch (get_itbl(node)->type) {
- case FETCH_ME_BQ:
- ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
- break;
- case RBH:
- case FETCH_ME:
- case BLACKHOLE_BQ:
- ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
- break;
-#ifdef DIST
- case MVAR:
- break;
-#endif
- default:
- barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
- }
- }
-}
-#endif
-
-#if defined(GRAN)
-StgBlockingQueueElement *
-unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
-{
- StgTSO *tso;
- PEs node_loc, tso_loc;
-
- node_loc = where_is(node); // should be lifted out of loop
- tso = (StgTSO *)bqe; // wastes an assignment to get the type right
- tso_loc = where_is((StgClosure *)tso);
- if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
- /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
- ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
- CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
- // insertThread(tso, node_loc);
- new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
- ResumeThread,
- tso, node, (rtsSpark*)NULL);
- tso->link = END_TSO_QUEUE; // overwrite link just to be sure
- // len_local++;
- // len++;
- } else { // TSO is remote (actually should be FMBQ)
- CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
- RtsFlags.GranFlags.Costs.gunblocktime +
- RtsFlags.GranFlags.Costs.latency;
- new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
- UnblockThread,
- tso, node, (rtsSpark*)NULL);
- tso->link = END_TSO_QUEUE; // overwrite link just to be sure
- // len++;
- }
- /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
- IF_GRAN_DEBUG(bq,
- debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
- (node_loc==tso_loc ? "Local" : "Global"),
- tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
- tso->block_info.closure = NULL;
- debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)\n",
- tso->id, tso));
-}
-#elif defined(PARALLEL_HASKELL)
-StgBlockingQueueElement *
-unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
-{
- StgBlockingQueueElement *next;
-
- switch (get_itbl(bqe)->type) {
- case TSO:
- ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
- /* if it's a TSO just push it onto the run_queue */
- next = bqe->link;
- ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
- APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
- threadRunnable();
- unblockCount(bqe, node);
- /* reset blocking status after dumping event */
- ((StgTSO *)bqe)->why_blocked = NotBlocked;
- break;
-
- case BLOCKED_FETCH:
- /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
- next = bqe->link;
- bqe->link = (StgBlockingQueueElement *)PendingFetches;
- PendingFetches = (StgBlockedFetch *)bqe;
- break;
-
-# if defined(DEBUG)
- /* can ignore this case in a non-debugging setup;
- see comments on RBHSave closures above */
- case CONSTR:
- /* check that the closure is an RBHSave closure */
- ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
- get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
- get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
- break;
-
- default:
- barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
- get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
- (StgClosure *)bqe);
-# endif
- }
- IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
- return next;
-}
-#endif
-
-StgTSO *
-unblockOne(Capability *cap, StgTSO *tso)
-{
- StgTSO *next;
-
- ASSERT(get_itbl(tso)->type == TSO);
- ASSERT(tso->why_blocked != NotBlocked);
-
- tso->why_blocked = NotBlocked;
- next = tso->link;
- tso->link = END_TSO_QUEUE;
-
-#if defined(THREADED_RTS)
- if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
- // We are waking up this thread on the current Capability, which
- // might involve migrating it from the Capability it was last on.
- if (tso->bound) {
- ASSERT(tso->bound->cap == tso->cap);
- tso->bound->cap = cap;
- }
- tso->cap = cap;
- appendToRunQueue(cap,tso);
- // we're holding a newly woken thread, make sure we context switch
- // quickly so we can migrate it if necessary.
- context_switch = 1;
- } else {
- // we'll try to wake it up on the Capability it was last on.
- wakeupThreadOnCapability(tso->cap, tso);
- }
-#else
- appendToRunQueue(cap,tso);
- context_switch = 1;
-#endif
-
- debugTrace(DEBUG_sched,
- "waking up thread %ld on cap %d",
- (long)tso->id, tso->cap->no);
-
- return next;
-}
-
-
-#if defined(GRAN)
-void
-awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
-{
- StgBlockingQueueElement *bqe;
- PEs node_loc;
- nat len = 0;
-
- IF_GRAN_DEBUG(bq,
- debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
- node, CurrentProc, CurrentTime[CurrentProc],
- CurrentTSO->id, CurrentTSO));
-
- node_loc = where_is(node);
-
- ASSERT(q == END_BQ_QUEUE ||
- get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
- get_itbl(q)->type == CONSTR); // closure (type constructor)
- ASSERT(is_unique(node));
-
- /* FAKE FETCH: magically copy the node to the tso's proc;
- no Fetch necessary because in reality the node should not have been
- moved to the other PE in the first place
- */
- if (CurrentProc!=node_loc) {
- IF_GRAN_DEBUG(bq,
- debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
- node, node_loc, CurrentProc, CurrentTSO->id,
- // CurrentTSO, where_is(CurrentTSO),
- node->header.gran.procs));
- node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
- IF_GRAN_DEBUG(bq,
- debugBelch("## new bitmask of node %p is %#x\n",
- node, node->header.gran.procs));
- if (RtsFlags.GranFlags.GranSimStats.Global) {
- globalGranStats.tot_fake_fetches++;
- }
- }
-
- bqe = q;
- // ToDo: check: ASSERT(CurrentProc==node_loc);
- while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
- //next = bqe->link;
- /*
- bqe points to the current element in the queue
- next points to the next element in the queue
- */
- //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
- //tso_loc = where_is(tso);
- len++;
- bqe = unblockOne(bqe, node);
- }
-
- /* if this is the BQ of an RBH, we have to put back the info ripped out of
- the closure to make room for the anchor of the BQ */
- if (bqe!=END_BQ_QUEUE) {
- ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
- /*
- ASSERT((info_ptr==&RBH_Save_0_info) ||
- (info_ptr==&RBH_Save_1_info) ||
- (info_ptr==&RBH_Save_2_info));
- */
- /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
- ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
- ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
-
- IF_GRAN_DEBUG(bq,
- debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
- node, info_type(node)));
- }
-
- /* statistics gathering */
- if (RtsFlags.GranFlags.GranSimStats.Global) {
- // globalGranStats.tot_bq_processing_time += bq_processing_time;
- globalGranStats.tot_bq_len += len; // total length of all bqs awakened
- // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
- globalGranStats.tot_awbq++; // total no. of bqs awakened
- }
- IF_GRAN_DEBUG(bq,
- debugBelch("## BQ Stats of %p: [%d entries] %s\n",
- node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
-}
-#elif defined(PARALLEL_HASKELL)
-void
-awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
-{
- StgBlockingQueueElement *bqe;
-
- IF_PAR_DEBUG(verbose,
- debugBelch("##-_ AwBQ for node %p on [%x]: \n",
- node, mytid));
-#ifdef DIST
- //RFP
- if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
- IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
- return;
- }
-#endif
-
- ASSERT(q == END_BQ_QUEUE ||
- get_itbl(q)->type == TSO ||
- get_itbl(q)->type == BLOCKED_FETCH ||
- get_itbl(q)->type == CONSTR);
-
- bqe = q;
- while (get_itbl(bqe)->type==TSO ||
- get_itbl(bqe)->type==BLOCKED_FETCH) {
- bqe = unblockOne(bqe, node);
- }
-}
-
-#else /* !GRAN && !PARALLEL_HASKELL */
-
-void
-awakenBlockedQueue(Capability *cap, StgTSO *tso)
-{
- if (tso == NULL) return; // hack; see bug #1235728, and comments in
- // Exception.cmm
- while (tso != END_TSO_QUEUE) {
- tso = unblockOne(cap,tso);
- }
-}
-#endif
-
-/* ---------------------------------------------------------------------------
Interrupt execution
- usually called inside a signal handler so it mustn't do anything fancy.
------------------------------------------------------------------------ */
@@ -3481,316 +2862,6 @@ wakeUpRts(void)
}
/* -----------------------------------------------------------------------------
- Unblock a thread
-
- This is for use when we raise an exception in another thread, which
- may be blocked.
- This has nothing to do with the UnblockThread event in GranSim. -- HWL
- -------------------------------------------------------------------------- */
-
-#if defined(GRAN) || defined(PARALLEL_HASKELL)
-/*
- NB: only the type of the blocking queue is different in GranSim and GUM
- the operations on the queue-elements are the same
- long live polymorphism!
-
- Locks: sched_mutex is held upon entry and exit.
-
-*/
-static void
-unblockThread(Capability *cap, StgTSO *tso)
-{
- StgBlockingQueueElement *t, **last;
-
- switch (tso->why_blocked) {
-
- case NotBlocked:
- return; /* not blocked */
-
- case BlockedOnSTM:
- // Be careful: nothing to do here! We tell the scheduler that the thread
- // is runnable and we leave it to the stack-walking code to abort the
- // transaction while unwinding the stack. We should perhaps have a debugging
- // test to make sure that this really happens and that the 'zombie' transaction
- // does not get committed.
- goto done;
-
- case BlockedOnMVar:
- ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
- {
- StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
- StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
-
- last = (StgBlockingQueueElement **)&mvar->head;
- for (t = (StgBlockingQueueElement *)mvar->head;
- t != END_BQ_QUEUE;
- last = &t->link, last_tso = t, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- *last = (StgBlockingQueueElement *)tso->link;
- if (mvar->tail == tso) {
- mvar->tail = (StgTSO *)last_tso;
- }
- goto done;
- }
- }
- barf("unblockThread (MVAR): TSO not found");
- }
-
- case BlockedOnBlackHole:
- ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
- {
- StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
-
- last = &bq->blocking_queue;
- for (t = bq->blocking_queue;
- t != END_BQ_QUEUE;
- last = &t->link, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- *last = (StgBlockingQueueElement *)tso->link;
- goto done;
- }
- }
- barf("unblockThread (BLACKHOLE): TSO not found");
- }
-
- case BlockedOnException:
- {
- StgTSO *target = tso->block_info.tso;
-
- ASSERT(get_itbl(target)->type == TSO);
-
- if (target->what_next == ThreadRelocated) {
- target = target->link;
- ASSERT(get_itbl(target)->type == TSO);
- }
-
- ASSERT(target->blocked_exceptions != NULL);
-
- last = (StgBlockingQueueElement **)&target->blocked_exceptions;
- for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
- t != END_BQ_QUEUE;
- last = &t->link, t = t->link) {
- ASSERT(get_itbl(t)->type == TSO);
- if (t == (StgBlockingQueueElement *)tso) {
- *last = (StgBlockingQueueElement *)tso->link;
- goto done;
- }
- }
- barf("unblockThread (Exception): TSO not found");
- }
-
- case BlockedOnRead:
- case BlockedOnWrite:
-#if defined(mingw32_HOST_OS)
- case BlockedOnDoProc:
-#endif
- {
- /* take TSO off blocked_queue */
- StgBlockingQueueElement *prev = NULL;
- for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
- prev = t, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- if (prev == NULL) {
- blocked_queue_hd = (StgTSO *)t->link;
- if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
- blocked_queue_tl = END_TSO_QUEUE;
- }
- } else {
- prev->link = t->link;
- if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
- blocked_queue_tl = (StgTSO *)prev;
- }
- }
-#if defined(mingw32_HOST_OS)
- /* (Cooperatively) signal that the worker thread should abort
- * the request.
- */
- abandonWorkRequest(tso->block_info.async_result->reqID);
-#endif
- goto done;
- }
- }
- barf("unblockThread (I/O): TSO not found");
- }
-
- case BlockedOnDelay:
- {
- /* take TSO off sleeping_queue */
- StgBlockingQueueElement *prev = NULL;
- for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
- prev = t, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- if (prev == NULL) {
- sleeping_queue = (StgTSO *)t->link;
- } else {
- prev->link = t->link;
- }
- goto done;
- }
- }
- barf("unblockThread (delay): TSO not found");
- }
-
- default:
- barf("unblockThread");
- }
-
- done:
- tso->link = END_TSO_QUEUE;
- tso->why_blocked = NotBlocked;
- tso->block_info.closure = NULL;
- pushOnRunQueue(cap,tso);
-}
-#else
-static void
-unblockThread(Capability *cap, StgTSO *tso)
-{
- StgTSO *t, **last;
-
- /* To avoid locking unnecessarily. */
- if (tso->why_blocked == NotBlocked) {
- return;
- }
-
- switch (tso->why_blocked) {
-
- case BlockedOnSTM:
- // Be careful: nothing to do here! We tell the scheduler that the thread
- // is runnable and we leave it to the stack-walking code to abort the
- // transaction while unwinding the stack. We should perhaps have a debugging
- // test to make sure that this really happens and that the 'zombie' transaction
- // does not get committed.
- goto done;
-
- case BlockedOnMVar:
- ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
- {
- StgTSO *last_tso = END_TSO_QUEUE;
- StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
-
- last = &mvar->head;
- for (t = mvar->head; t != END_TSO_QUEUE;
- last = &t->link, last_tso = t, t = t->link) {
- if (t == tso) {
- *last = tso->link;
- if (mvar->tail == tso) {
- mvar->tail = last_tso;
- }
- goto done;
- }
- }
- barf("unblockThread (MVAR): TSO not found");
- }
-
- case BlockedOnBlackHole:
- {
- last = &blackhole_queue;
- for (t = blackhole_queue; t != END_TSO_QUEUE;
- last = &t->link, t = t->link) {
- if (t == tso) {
- *last = tso->link;
- goto done;
- }
- }
- barf("unblockThread (BLACKHOLE): TSO not found");
- }
-
- case BlockedOnException:
- {
- StgTSO *target = tso->block_info.tso;
-
- ASSERT(get_itbl(target)->type == TSO);
-
- while (target->what_next == ThreadRelocated) {
- target = target->link;
- ASSERT(get_itbl(target)->type == TSO);
- }
-
- ASSERT(target->blocked_exceptions != NULL);
-
- last = &target->blocked_exceptions;
- for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
- last = &t->link, t = t->link) {
- ASSERT(get_itbl(t)->type == TSO);
- if (t == tso) {
- *last = tso->link;
- goto done;
- }
- }
- barf("unblockThread (Exception): TSO not found");
- }
-
-#if !defined(THREADED_RTS)
- case BlockedOnRead:
- case BlockedOnWrite:
-#if defined(mingw32_HOST_OS)
- case BlockedOnDoProc:
-#endif
- {
- StgTSO *prev = NULL;
- for (t = blocked_queue_hd; t != END_TSO_QUEUE;
- prev = t, t = t->link) {
- if (t == tso) {
- if (prev == NULL) {
- blocked_queue_hd = t->link;
- if (blocked_queue_tl == t) {
- blocked_queue_tl = END_TSO_QUEUE;
- }
- } else {
- prev->link = t->link;
- if (blocked_queue_tl == t) {
- blocked_queue_tl = prev;
- }
- }
-#if defined(mingw32_HOST_OS)
- /* (Cooperatively) signal that the worker thread should abort
- * the request.
- */
- abandonWorkRequest(tso->block_info.async_result->reqID);
-#endif
- goto done;
- }
- }
- barf("unblockThread (I/O): TSO not found");
- }
-
- case BlockedOnDelay:
- {
- StgTSO *prev = NULL;
- for (t = sleeping_queue; t != END_TSO_QUEUE;
- prev = t, t = t->link) {
- if (t == tso) {
- if (prev == NULL) {
- sleeping_queue = t->link;
- } else {
- prev->link = t->link;
- }
- goto done;
- }
- }
- barf("unblockThread (delay): TSO not found");
- }
-#endif
-
- default:
- barf("unblockThread");
- }
-
- done:
- tso->link = END_TSO_QUEUE;
- tso->why_blocked = NotBlocked;
- tso->block_info.closure = NULL;
- appendToRunQueue(cap,tso);
-
- // We might have just migrated this TSO to our Capability:
- if (tso->bound) {
- tso->bound->cap = cap;
- }
- tso->cap = cap;
-}
-#endif
-
-/* -----------------------------------------------------------------------------
* checkBlackHoles()
*
* Check the blackhole_queue for threads that can be woken up. We do
@@ -3840,264 +2911,6 @@ checkBlackHoles (Capability *cap)
}
/* -----------------------------------------------------------------------------
- * raiseAsync()
- *
- * The following function implements the magic for raising an
- * asynchronous exception in an existing thread.
- *
- * We first remove the thread from any queue on which it might be
- * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
- *
- * We strip the stack down to the innermost CATCH_FRAME, building
- * thunks in the heap for all the active computations, so they can
- * be restarted if necessary. When we reach a CATCH_FRAME, we build
- * an application of the handler to the exception, and push it on
- * the top of the stack.
- *
- * How exactly do we save all the active computations? We create an
- * AP_STACK for every UpdateFrame on the stack. Entering one of these
- * AP_STACKs pushes everything from the corresponding update frame
- * upwards onto the stack. (Actually, it pushes everything up to the
- * next update frame plus a pointer to the next AP_STACK object.
- * Entering the next AP_STACK object pushes more onto the stack until we
- * reach the last AP_STACK object - at which point the stack should look
- * exactly as it did when we killed the TSO and we can continue
- * execution by entering the closure on top of the stack.
- *
- * We can also kill a thread entirely - this happens if either (a) the
- * exception passed to raiseAsync is NULL, or (b) there's no
- * CATCH_FRAME on the stack. In either case, we strip the entire
- * stack and replace the thread with a zombie.
- *
- * ToDo: in THREADED_RTS mode, this function is only safe if either
- * (a) we hold all the Capabilities (eg. in GC, or if there is only
- * one Capability), or (b) we own the Capability that the TSO is
- * currently blocked on or on the run queue of.
- *
- * -------------------------------------------------------------------------- */
-
-void
-raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
-{
- raiseAsync_(cap, tso, exception, rtsFalse, NULL);
-}
-
-void
-suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
-{
- raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
-}
-
-static void
-raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
- rtsBool stop_at_atomically, StgPtr stop_here)
-{
- StgRetInfoTable *info;
- StgPtr sp, frame;
- nat i;
-
- // Thread already dead?
- if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
- return;
- }
-
- debugTrace(DEBUG_sched,
- "raising exception in thread %ld.", (long)tso->id);
-
- // Remove it from any blocking queues
- unblockThread(cap,tso);
-
- // mark it dirty; we're about to change its stack.
- dirtyTSO(tso);
-
- sp = tso->sp;
-
- // The stack freezing code assumes there's a closure pointer on
- // the top of the stack, so we have to arrange that this is the case...
- //
- if (sp[0] == (W_)&stg_enter_info) {
- sp++;
- } else {
- sp--;
- sp[0] = (W_)&stg_dummy_ret_closure;
- }
-
- frame = sp + 1;
- while (stop_here == NULL || frame < stop_here) {
-
- // 1. Let the top of the stack be the "current closure"
- //
- // 2. Walk up the stack until we find either an UPDATE_FRAME or a
- // CATCH_FRAME.
- //
- // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
- // current closure applied to the chunk of stack up to (but not
- // including) the update frame. This closure becomes the "current
- // closure". Go back to step 2.
- //
- // 4. If it's a CATCH_FRAME, then leave the exception handler on
- // top of the stack applied to the exception.
- //
- // 5. If it's a STOP_FRAME, then kill the thread.
- //
- // NB: if we pass an ATOMICALLY_FRAME then abort the associated
- // transaction
-
- info = get_ret_itbl((StgClosure *)frame);
-
- switch (info->i.type) {
-
- case UPDATE_FRAME:
- {
- StgAP_STACK * ap;
- nat words;
-
- // First build an AP_STACK consisting of the stack chunk above the
- // current update frame, with the top word on the stack as the
- // fun field.
- //
- words = frame - sp - 1;
- ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
-
- ap->size = words;
- ap->fun = (StgClosure *)sp[0];
- sp++;
- for(i=0; i < (nat)words; ++i) {
- ap->payload[i] = (StgClosure *)*sp++;
- }
-
- SET_HDR(ap,&stg_AP_STACK_info,
- ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
- TICK_ALLOC_UP_THK(words+1,0);
-
- //IF_DEBUG(scheduler,
- // debugBelch("sched: Updating ");
- // printPtr((P_)((StgUpdateFrame *)frame)->updatee);
- // debugBelch(" with ");
- // printObj((StgClosure *)ap);
- // );
-
- // Replace the updatee with an indirection
- //
- // Warning: if we're in a loop, more than one update frame on
- // the stack may point to the same object. Be careful not to
- // overwrite an IND_OLDGEN in this case, because we'll screw
- // up the mutable lists. To be on the safe side, don't
- // overwrite any kind of indirection at all. See also
- // threadSqueezeStack in GC.c, where we have to make a similar
- // check.
- //
- if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
- // revert the black hole
- UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
- (StgClosure *)ap);
- }
- sp += sizeofW(StgUpdateFrame) - 1;
- sp[0] = (W_)ap; // push onto stack
- frame = sp + 1;
- continue; //no need to bump frame
- }
-
- case STOP_FRAME:
- // We've stripped the entire stack, the thread is now dead.
- tso->what_next = ThreadKilled;
- tso->sp = frame + sizeofW(StgStopFrame);
- return;
-
- case CATCH_FRAME:
- // If we find a CATCH_FRAME, and we've got an exception to raise,
- // then build the THUNK raise(exception), and leave it on
- // top of the CATCH_FRAME ready to enter.
- //
- {
-#ifdef PROFILING
- StgCatchFrame *cf = (StgCatchFrame *)frame;
-#endif
- StgThunk *raise;
-
- if (exception == NULL) break;
-
- // we've got an exception to raise, so let's pass it to the
- // handler in this frame.
- //
- raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
- TICK_ALLOC_SE_THK(1,0);
- SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
- raise->payload[0] = exception;
-
- // throw away the stack from Sp up to the CATCH_FRAME.
- //
- sp = frame - 1;
-
- /* Ensure that async excpetions are blocked now, so we don't get
- * a surprise exception before we get around to executing the
- * handler.
- */
- if (tso->blocked_exceptions == NULL) {
- tso->blocked_exceptions = END_TSO_QUEUE;
- }
-
- /* Put the newly-built THUNK on top of the stack, ready to execute
- * when the thread restarts.
- */
- sp[0] = (W_)raise;
- sp[-1] = (W_)&stg_enter_info;
- tso->sp = sp-1;
- tso->what_next = ThreadRunGHC;
- IF_DEBUG(sanity, checkTSO(tso));
- return;
- }
-
- case ATOMICALLY_FRAME:
- if (stop_at_atomically) {
- ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
- stmCondemnTransaction(cap, tso -> trec);
-#ifdef REG_R1
- tso->sp = frame;
-#else
- // R1 is not a register: the return convention for IO in
- // this case puts the return value on the stack, so we
- // need to set up the stack to return to the atomically
- // frame properly...
- tso->sp = frame - 2;
- tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
- tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
-#endif
- tso->what_next = ThreadRunGHC;
- return;
- }
- // Not stop_at_atomically... fall through and abort the
- // transaction.
-
- case CATCH_RETRY_FRAME:
- // IF we find an ATOMICALLY_FRAME then we abort the
- // current transaction and propagate the exception. In
- // this case (unlike ordinary exceptions) we do not care
- // whether the transaction is valid or not because its
- // possible validity cannot have caused the exception
- // and will not be visible after the abort.
- debugTrace(DEBUG_stm,
- "found atomically block delivering async exception");
-
- StgTRecHeader *trec = tso -> trec;
- StgTRecHeader *outer = stmGetEnclosingTRec(trec);
- stmAbortTransaction(cap, trec);
- tso -> trec = outer;
- break;
-
- default:
- break;
- }
-
- // move on to the next stack frame
- frame += stack_frame_sizeW((StgClosure *)frame);
- }
-
- // if we got here, then we stopped at stop_here
- ASSERT(stop_here != NULL);
-}
-
-/* -----------------------------------------------------------------------------
Deleting threads
This is used for interruption (^C) and forking, and corresponds to
@@ -4108,10 +2921,15 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
static void
deleteThread (Capability *cap, StgTSO *tso)
{
- if (tso->why_blocked != BlockedOnCCall &&
- tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- raiseAsync(cap,tso,NULL);
- }
+ // NOTE: must only be called on a TSO that we have exclusive
+ // access to, because we will call throwToSingleThreaded() below.
+ // The TSO must be on the run queue of the Capability we own, or
+ // we must own all Capabilities.
+
+ if (tso->why_blocked != BlockedOnCCall &&
+ tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
+ throwToSingleThreaded(cap,tso,NULL);
+ }
}
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
@@ -4293,13 +3111,16 @@ resurrectThreads (StgTSO *threads)
case BlockedOnMVar:
case BlockedOnException:
/* Called by GC - sched_mutex lock is currently held. */
- raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
+ throwToSingleThreaded(cap, tso,
+ (StgClosure *)BlockedOnDeadMVar_closure);
break;
case BlockedOnBlackHole:
- raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
+ throwToSingleThreaded(cap, tso,
+ (StgClosure *)NonTermination_closure);
break;
case BlockedOnSTM:
- raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
+ throwToSingleThreaded(cap, tso,
+ (StgClosure *)BlockedIndefinitely_closure);
break;
case NotBlocked:
/* This might happen if the thread was blocked on a black hole
@@ -4312,298 +3133,3 @@ resurrectThreads (StgTSO *threads)
}
}
}
-
-/* ----------------------------------------------------------------------------
- * Debugging: why is a thread blocked
- * [Also provides useful information when debugging threaded programs
- * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
- ------------------------------------------------------------------------- */
-
-#if DEBUG
-static void
-printThreadBlockage(StgTSO *tso)
-{
- switch (tso->why_blocked) {
- case BlockedOnRead:
- debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
- break;
- case BlockedOnWrite:
- debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
- break;
-#if defined(mingw32_HOST_OS)
- case BlockedOnDoProc:
- debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
- break;
-#endif
- case BlockedOnDelay:
- debugBelch("is blocked until %ld", (long)(tso->block_info.target));
- break;
- case BlockedOnMVar:
- debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
- break;
- case BlockedOnException:
- debugBelch("is blocked on delivering an exception to thread %d",
- tso->block_info.tso->id);
- break;
- case BlockedOnBlackHole:
- debugBelch("is blocked on a black hole");
- break;
- case NotBlocked:
- debugBelch("is not blocked");
- break;
-#if defined(PARALLEL_HASKELL)
- case BlockedOnGA:
- debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
- tso->block_info.closure, info_type(tso->block_info.closure));
- break;
- case BlockedOnGA_NoSend:
- debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
- tso->block_info.closure, info_type(tso->block_info.closure));
- break;
-#endif
- case BlockedOnCCall:
- debugBelch("is blocked on an external call");
- break;
- case BlockedOnCCall_NoUnblockExc:
- debugBelch("is blocked on an external call (exceptions were already blocked)");
- break;
- case BlockedOnSTM:
- debugBelch("is blocked on an STM operation");
- break;
- default:
- barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
- tso->why_blocked, tso->id, tso);
- }
-}
-
-void
-printThreadStatus(StgTSO *t)
-{
- debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
- {
- void *label = lookupThreadLabel(t->id);
- if (label) debugBelch("[\"%s\"] ",(char *)label);
- }
- if (t->what_next == ThreadRelocated) {
- debugBelch("has been relocated...\n");
- } else {
- switch (t->what_next) {
- case ThreadKilled:
- debugBelch("has been killed");
- break;
- case ThreadComplete:
- debugBelch("has completed");
- break;
- default:
- printThreadBlockage(t);
- }
- debugBelch("\n");
- }
-}
-
-void
-printAllThreads(void)
-{
- StgTSO *t, *next;
- nat i;
- Capability *cap;
-
-# if defined(GRAN)
- char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
- ullong_format_string(TIME_ON_PROC(CurrentProc),
- time_string, rtsFalse/*no commas!*/);
-
- debugBelch("all threads at [%s]:\n", time_string);
-# elif defined(PARALLEL_HASKELL)
- char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
- ullong_format_string(CURRENT_TIME,
- time_string, rtsFalse/*no commas!*/);
-
- debugBelch("all threads at [%s]:\n", time_string);
-# else
- debugBelch("all threads:\n");
-# endif
-
- for (i = 0; i < n_capabilities; i++) {
- cap = &capabilities[i];
- debugBelch("threads on capability %d:\n", cap->no);
- for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
- printThreadStatus(t);
- }
- }
-
- debugBelch("other threads:\n");
- for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- if (t->why_blocked != NotBlocked) {
- printThreadStatus(t);
- }
- if (t->what_next == ThreadRelocated) {
- next = t->link;
- } else {
- next = t->global_link;
- }
- }
-}
-
-// useful from gdb
-void
-printThreadQueue(StgTSO *t)
-{
- nat i = 0;
- for (; t != END_TSO_QUEUE; t = t->link) {
- printThreadStatus(t);
- i++;
- }
- debugBelch("%d threads on queue\n", i);
-}
-
-/*
- Print a whole blocking queue attached to node (debugging only).
-*/
-# if defined(PARALLEL_HASKELL)
-void
-print_bq (StgClosure *node)
-{
- StgBlockingQueueElement *bqe;
- StgTSO *tso;
- rtsBool end;
-
- debugBelch("## BQ of closure %p (%s): ",
- node, info_type(node));
-
- /* should cover all closures that may have a blocking queue */
- ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
- get_itbl(node)->type == FETCH_ME_BQ ||
- get_itbl(node)->type == RBH ||
- get_itbl(node)->type == MVAR);
-
- ASSERT(node!=(StgClosure*)NULL); // sanity check
-
- print_bqe(((StgBlockingQueue*)node)->blocking_queue);
-}
-
-/*
- Print a whole blocking queue starting with the element bqe.
-*/
-void
-print_bqe (StgBlockingQueueElement *bqe)
-{
- rtsBool end;
-
- /*
- NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
- */
- for (end = (bqe==END_BQ_QUEUE);
- !end; // iterate until bqe points to a CONSTR
- end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
- bqe = end ? END_BQ_QUEUE : bqe->link) {
- ASSERT(bqe != END_BQ_QUEUE); // sanity check
- ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
- /* types of closures that may appear in a blocking queue */
- ASSERT(get_itbl(bqe)->type == TSO ||
- get_itbl(bqe)->type == BLOCKED_FETCH ||
- get_itbl(bqe)->type == CONSTR);
- /* only BQs of an RBH end with an RBH_Save closure */
- //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
-
- switch (get_itbl(bqe)->type) {
- case TSO:
- debugBelch(" TSO %u (%x),",
- ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
- break;
- case BLOCKED_FETCH:
- debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
- ((StgBlockedFetch *)bqe)->node,
- ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
- ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
- ((StgBlockedFetch *)bqe)->ga.weight);
- break;
- case CONSTR:
- debugBelch(" %s (IP %p),",
- (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
- get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
- get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
- "RBH_Save_?"), get_itbl(bqe));
- break;
- default:
- barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
- info_type((StgClosure *)bqe)); // , node, info_type(node));
- break;
- }
- } /* for */
- debugBelch("\n");
-}
-# elif defined(GRAN)
-void
-print_bq (StgClosure *node)
-{
- StgBlockingQueueElement *bqe;
- PEs node_loc, tso_loc;
- rtsBool end;
-
- /* should cover all closures that may have a blocking queue */
- ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
- get_itbl(node)->type == FETCH_ME_BQ ||
- get_itbl(node)->type == RBH);
-
- ASSERT(node!=(StgClosure*)NULL); // sanity check
- node_loc = where_is(node);
-
- debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
- node, info_type(node), node_loc);
-
- /*
- NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
- */
- for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
- !end; // iterate until bqe points to a CONSTR
- end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
- ASSERT(bqe != END_BQ_QUEUE); // sanity check
- ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
- /* types of closures that may appear in a blocking queue */
- ASSERT(get_itbl(bqe)->type == TSO ||
- get_itbl(bqe)->type == CONSTR);
- /* only BQs of an RBH end with an RBH_Save closure */
- ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
-
- tso_loc = where_is((StgClosure *)bqe);
- switch (get_itbl(bqe)->type) {
- case TSO:
- debugBelch(" TSO %d (%p) on [PE %d],",
- ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
- break;
- case CONSTR:
- debugBelch(" %s (IP %p),",
- (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
- get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
- get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
- "RBH_Save_?"), get_itbl(bqe));
- break;
- default:
- barf("Unexpected closure type %s in blocking queue of %p (%s)",
- info_type((StgClosure *)bqe), node, info_type(node));
- break;
- }
- } /* for */
- debugBelch("\n");
-}
-# endif
-
-#if defined(PARALLEL_HASKELL)
-static nat
-run_queue_len(void)
-{
- nat i;
- StgTSO *tso;
-
- for (i=0, tso=run_queue_hd;
- tso != END_TSO_QUEUE;
- i++, tso=tso->link) {
- /* nothing */
- }
-
- return i;
-}
-#endif
-
-#endif /* DEBUG */
diff --git a/rts/Schedule.h b/rts/Schedule.h
index e30e911e68..f82946e831 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -55,24 +55,7 @@ void wakeUpRts(void);
* Called from STG : yes
* Locks assumed : we own the Capability.
*/
-StgTSO * unblockOne(Capability *cap, StgTSO *tso);
-
-/* raiseAsync()
- *
- * Raises an exception asynchronously in the specified thread.
- *
- * Called from STG : yes
- * Locks assumed : none
- */
-void raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception);
-
-/* suspendComputation()
- *
- * A variant of raiseAsync(), this strips the stack of the specified
- * thread down to the stop_here point, leaving a current closure on
- * top of the stack at [stop_here - 1].
- */
-void suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here);
+StgTSO * unblockOne (Capability *cap, StgTSO *tso);
/* raiseExceptionHelper */
StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception);
@@ -176,8 +159,6 @@ extern rtsBool blackholes_need_checking;
extern Mutex RTS_VAR(sched_mutex);
#endif
-StgBool isThreadBound(StgTSO *tso);
-
SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret);
/* Called by shutdown_handler(). */
@@ -198,8 +179,6 @@ void print_bq (StgClosure *node);
void print_bqe (StgBlockingQueueElement *bqe);
#endif
-void labelThread(StgPtr tso, char *label);
-
/* -----------------------------------------------------------------------------
* Some convenient macros/inline functions...
*/
diff --git a/rts/ThreadLabels.c b/rts/ThreadLabels.c
index 9b9f1723ff..72cd5d3c4b 100644
--- a/rts/ThreadLabels.c
+++ b/rts/ThreadLabels.c
@@ -8,8 +8,10 @@
* ---------------------------------------------------------------------------*/
#include "PosixSource.h"
+#include "Rts.h"
#include "ThreadLabels.h"
#include "RtsUtils.h"
+#include "Hash.h"
#include <stdlib.h>
@@ -47,4 +49,19 @@ removeThreadLabel(StgWord key)
stgFree(old);
}
}
+
+void
+labelThread(StgPtr tso, char *label)
+{
+ int len;
+ void *buf;
+
+ /* Caveat: Once set, you can only set the thread name to "" */
+ len = strlen(label)+1;
+ buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
+ strncpy(buf,label,len);
+ /* Update will free the old memory for us */
+ updateThreadLabel(((StgTSO *)tso)->id,buf);
+}
+
#endif /* DEBUG */
diff --git a/rts/ThreadLabels.h b/rts/ThreadLabels.h
index 97d3d0d241..eaed22d281 100644
--- a/rts/ThreadLabels.h
+++ b/rts/ThreadLabels.h
@@ -1,27 +1,21 @@
/* -----------------------------------------------------------------------------
* ThreadLabels.h
*
- * (c) The GHC Team 2002-2003
+ * (c) The GHC Team 2002-2006
*
* Table of thread labels.
*
* ---------------------------------------------------------------------------*/
+
#ifndef __THREADLABELS_H__
#define __THREADLABELS_H__
-#include "Rts.h"
-#include "Hash.h"
-
-void
-initThreadLabelTable(void);
-
-void
-updateThreadLabel(StgWord key, void *data);
-
-void *
-lookupThreadLabel(StgWord key);
-
-void
-removeThreadLabel(StgWord key);
+#if defined(DEBUG)
+void initThreadLabelTable (void);
+void updateThreadLabel (StgWord key, void *data);
+void * lookupThreadLabel (StgWord key);
+void removeThreadLabel (StgWord key);
+void labelThread (StgPtr tso, char *label);
+#endif
#endif /* __THREADLABELS_H__ */
diff --git a/rts/Threads.c b/rts/Threads.c
new file mode 100644
index 0000000000..b550cc6aaa
--- /dev/null
+++ b/rts/Threads.c
@@ -0,0 +1,974 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2006
+ *
+ * Thread-related functionality
+ *
+ * --------------------------------------------------------------------------*/
+
+#include "PosixSource.h"
+#include "Rts.h"
+#include "SchedAPI.h"
+#include "Storage.h"
+#include "Threads.h"
+#include "RtsFlags.h"
+#include "STM.h"
+#include "Schedule.h"
+#include "Trace.h"
+#include "ThreadLabels.h"
+
+/* Next thread ID to allocate.
+ * LOCK: sched_mutex
+ */
+static StgThreadID next_thread_id = 1;
+
+/* The smallest stack size that makes any sense is:
+ * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
+ * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
+ * + 1 (the closure to enter)
+ * + 1 (stg_ap_v_ret)
+ * + 1 (spare slot req'd by stg_ap_v_ret)
+ *
+ * A thread with this stack will bomb immediately with a stack
+ * overflow, which will increase its stack size.
+ */
+#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
+
+/* ---------------------------------------------------------------------------
+ Create a new thread.
+
+ The new thread starts with the given stack size. Before the
+ scheduler can run, however, this thread needs to have a closure
+ (and possibly some arguments) pushed on its stack. See
+ pushClosure() in Schedule.h.
+
+ createGenThread() and createIOThread() (in SchedAPI.h) are
+ convenient packaged versions of this function.
+
+ currently pri (priority) is only used in a GRAN setup -- HWL
+ ------------------------------------------------------------------------ */
+#if defined(GRAN)
+/* currently pri (priority) is only used in a GRAN setup -- HWL */
+StgTSO *
+createThread(nat size, StgInt pri)
+#else
+StgTSO *
+createThread(Capability *cap, nat size)
+#endif
+{
+ StgTSO *tso;
+ nat stack_size;
+
+ /* sched_mutex is *not* required */
+
+ /* First check whether we should create a thread at all */
+#if defined(PARALLEL_HASKELL)
+ /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
+ if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
+ threadsIgnored++;
+ debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
+ RtsFlags.ParFlags.maxThreads, advisory_thread_count);
+ return END_TSO_QUEUE;
+ }
+ threadsCreated++;
+#endif
+
+#if defined(GRAN)
+ ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
+#endif
+
+ // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
+
+ /* catch ridiculously small stack sizes */
+ if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
+ size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
+ }
+
+ stack_size = size - TSO_STRUCT_SIZEW;
+
+ tso = (StgTSO *)allocateLocal(cap, size);
+ TICK_ALLOC_TSO(stack_size, 0);
+
+ SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
+#if defined(GRAN)
+ SET_GRAN_HDR(tso, ThisPE);
+#endif
+
+ // Always start with the compiled code evaluator
+ tso->what_next = ThreadRunGHC;
+
+ tso->why_blocked = NotBlocked;
+ tso->blocked_exceptions = END_TSO_QUEUE;
+ tso->flags = TSO_DIRTY;
+
+ tso->saved_errno = 0;
+ tso->bound = NULL;
+ tso->cap = cap;
+
+ tso->stack_size = stack_size;
+ tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
+ - TSO_STRUCT_SIZEW;
+ tso->sp = (P_)&(tso->stack) + stack_size;
+
+ tso->trec = NO_TREC;
+
+#ifdef PROFILING
+ tso->prof.CCCS = CCS_MAIN;
+#endif
+
+ /* put a stop frame on the stack */
+ tso->sp -= sizeofW(StgStopFrame);
+ SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
+ tso->link = END_TSO_QUEUE;
+
+ // ToDo: check this
+#if defined(GRAN)
+ /* uses more flexible routine in GranSim */
+ insertThread(tso, CurrentProc);
+#else
+ /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
+ * from its creation
+ */
+#endif
+
+#if defined(GRAN)
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpGranEvent(GR_START,tso);
+#elif defined(PARALLEL_HASKELL)
+ if (RtsFlags.ParFlags.ParStats.Full)
+ DumpGranEvent(GR_STARTQ,tso);
+ /* HACk to avoid SCHEDULE
+ LastTSO = tso; */
+#endif
+
+ /* Link the new thread on the global thread list.
+ */
+ ACQUIRE_LOCK(&sched_mutex);
+ tso->id = next_thread_id++; // while we have the mutex
+ tso->global_link = all_threads;
+ all_threads = tso;
+ RELEASE_LOCK(&sched_mutex);
+
+#if defined(DIST)
+ tso->dist.priority = MandatoryPriority; //by default that is...
+#endif
+
+#if defined(GRAN)
+ tso->gran.pri = pri;
+# if defined(DEBUG)
+ tso->gran.magic = TSO_MAGIC; // debugging only
+# endif
+ tso->gran.sparkname = 0;
+ tso->gran.startedat = CURRENT_TIME;
+ tso->gran.exported = 0;
+ tso->gran.basicblocks = 0;
+ tso->gran.allocs = 0;
+ tso->gran.exectime = 0;
+ tso->gran.fetchtime = 0;
+ tso->gran.fetchcount = 0;
+ tso->gran.blocktime = 0;
+ tso->gran.blockcount = 0;
+ tso->gran.blockedat = 0;
+ tso->gran.globalsparks = 0;
+ tso->gran.localsparks = 0;
+ if (RtsFlags.GranFlags.Light)
+ tso->gran.clock = Now; /* local clock */
+ else
+ tso->gran.clock = 0;
+
+ IF_DEBUG(gran,printTSO(tso));
+#elif defined(PARALLEL_HASKELL)
+# if defined(DEBUG)
+ tso->par.magic = TSO_MAGIC; // debugging only
+# endif
+ tso->par.sparkname = 0;
+ tso->par.startedat = CURRENT_TIME;
+ tso->par.exported = 0;
+ tso->par.basicblocks = 0;
+ tso->par.allocs = 0;
+ tso->par.exectime = 0;
+ tso->par.fetchtime = 0;
+ tso->par.fetchcount = 0;
+ tso->par.blocktime = 0;
+ tso->par.blockcount = 0;
+ tso->par.blockedat = 0;
+ tso->par.globalsparks = 0;
+ tso->par.localsparks = 0;
+#endif
+
+#if defined(GRAN)
+ globalGranStats.tot_threads_created++;
+ globalGranStats.threads_created_on_PE[CurrentProc]++;
+ globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
+ globalGranStats.tot_sq_probes++;
+#elif defined(PARALLEL_HASKELL)
+ // collect parallel global statistics (currently done together with GC stats)
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
+ globalParStats.tot_threads_created++;
+ }
+#endif
+
+#if defined(GRAN)
+ debugTrace(GRAN_DEBUG_pri,
+ "==__ schedule: Created TSO %d (%p);",
+ CurrentProc, tso, tso->id);
+#elif defined(PARALLEL_HASKELL)
+ debugTrace(PAR_DEBUG_verbose,
+ "==__ schedule: Created TSO %d (%p); %d threads active",
+ (long)tso->id, tso, advisory_thread_count);
+#else
+ debugTrace(DEBUG_sched,
+ "created thread %ld, stack size = %lx words",
+ (long)tso->id, (long)tso->stack_size);
+#endif
+ return tso;
+}
+
+#if defined(PAR)
+/* RFP:
+ all parallel thread creation calls should fall through the following routine.
+*/
+StgTSO *
+createThreadFromSpark(rtsSpark spark)
+{ StgTSO *tso;
+ ASSERT(spark != (rtsSpark)NULL);
+// JB: TAKE CARE OF THIS COUNTER! BUGGY
+ if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
+ { threadsIgnored++;
+ barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
+ RtsFlags.ParFlags.maxThreads, advisory_thread_count);
+ return END_TSO_QUEUE;
+ }
+ else
+ { threadsCreated++;
+ tso = createThread(RtsFlags.GcFlags.initialStkSize);
+ if (tso==END_TSO_QUEUE)
+ barf("createSparkThread: Cannot create TSO");
+#if defined(DIST)
+ tso->priority = AdvisoryPriority;
+#endif
+ pushClosure(tso,spark);
+ addToRunQueue(tso);
+ advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
+ }
+ return tso;
+}
+#endif
+
+/* ---------------------------------------------------------------------------
+ * Comparing Thread ids.
+ *
+ * This is used from STG land in the implementation of the
+ * instances of Eq/Ord for ThreadIds.
+ * ------------------------------------------------------------------------ */
+
+int
+cmp_thread(StgPtr tso1, StgPtr tso2)
+{
+ StgThreadID id1 = ((StgTSO *)tso1)->id;
+ StgThreadID id2 = ((StgTSO *)tso2)->id;
+
+ if (id1 < id2) return (-1);
+ if (id1 > id2) return 1;
+ return 0;
+}
+
+/* ---------------------------------------------------------------------------
+ * Fetching the ThreadID from an StgTSO.
+ *
+ * This is used in the implementation of Show for ThreadIds.
+ * ------------------------------------------------------------------------ */
+int
+rts_getThreadId(StgPtr tso)
+{
+ return ((StgTSO *)tso)->id;
+}
+
+/* -----------------------------------------------------------------------------
+ Remove a thread from a queue.
+ Fails fatally if the TSO is not on the queue.
+ -------------------------------------------------------------------------- */
+
+void
+removeThreadFromQueue (StgTSO **queue, StgTSO *tso)
+{
+ StgTSO *t, *prev;
+
+ prev = NULL;
+ for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->link) {
+ if (t == tso) {
+ if (prev) {
+ prev->link = t->link;
+ } else {
+ *queue = t->link;
+ }
+ return;
+ }
+ }
+ barf("removeThreadFromQueue: not found");
+}
+
+void
+removeThreadFromDeQueue (StgTSO **head, StgTSO **tail, StgTSO *tso)
+{
+ StgTSO *t, *prev;
+
+ prev = NULL;
+ for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->link) {
+ if (t == tso) {
+ if (prev) {
+ prev->link = t->link;
+ } else {
+ *head = t->link;
+ }
+ if (*tail == tso) {
+ if (prev) {
+ *tail = prev;
+ } else {
+ *tail = END_TSO_QUEUE;
+ }
+ }
+ return;
+ }
+ }
+ barf("removeThreadFromMVarQueue: not found");
+}
+
+void
+removeThreadFromMVarQueue (StgMVar *mvar, StgTSO *tso)
+{
+ removeThreadFromDeQueue (&mvar->head, &mvar->tail, tso);
+}
+
+/* ----------------------------------------------------------------------------
+ unblockOne()
+
+ unblock a single thread.
+ ------------------------------------------------------------------------- */
+
+#if defined(GRAN)
+STATIC_INLINE void
+unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
+{
+}
+#elif defined(PARALLEL_HASKELL)
+STATIC_INLINE void
+unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
+{
+ /* write RESUME events to log file and
+ update blocked and fetch time (depending on type of the orig closure) */
+ if (RtsFlags.ParFlags.ParStats.Full) {
+ DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
+ GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
+ 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
+ if (emptyRunQueue())
+ emitSchedule = rtsTrue;
+
+ switch (get_itbl(node)->type) {
+ case FETCH_ME_BQ:
+ ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
+ break;
+ case RBH:
+ case FETCH_ME:
+ case BLACKHOLE_BQ:
+ ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
+ break;
+#ifdef DIST
+ case MVAR:
+ break;
+#endif
+ default:
+ barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
+ }
+ }
+}
+#endif
+
+#if defined(GRAN)
+StgBlockingQueueElement *
+unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
+{
+ StgTSO *tso;
+ PEs node_loc, tso_loc;
+
+ node_loc = where_is(node); // should be lifted out of loop
+ tso = (StgTSO *)bqe; // wastes an assignment to get the type right
+ tso_loc = where_is((StgClosure *)tso);
+ if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
+ /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
+ ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
+ CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
+ // insertThread(tso, node_loc);
+ new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
+ ResumeThread,
+ tso, node, (rtsSpark*)NULL);
+ tso->link = END_TSO_QUEUE; // overwrite link just to be sure
+ // len_local++;
+ // len++;
+ } else { // TSO is remote (actually should be FMBQ)
+ CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
+ RtsFlags.GranFlags.Costs.gunblocktime +
+ RtsFlags.GranFlags.Costs.latency;
+ new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
+ UnblockThread,
+ tso, node, (rtsSpark*)NULL);
+ tso->link = END_TSO_QUEUE; // overwrite link just to be sure
+ // len++;
+ }
+ /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
+ IF_GRAN_DEBUG(bq,
+ debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
+ (node_loc==tso_loc ? "Local" : "Global"),
+ tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
+ tso->block_info.closure = NULL;
+ debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
+ tso->id, tso));
+}
+#elif defined(PARALLEL_HASKELL)
+StgBlockingQueueElement *
+unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
+{
+ StgBlockingQueueElement *next;
+
+ switch (get_itbl(bqe)->type) {
+ case TSO:
+ ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
+ /* if it's a TSO just push it onto the run_queue */
+ next = bqe->link;
+ ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
+ APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
+ threadRunnable();
+ unblockCount(bqe, node);
+ /* reset blocking status after dumping event */
+ ((StgTSO *)bqe)->why_blocked = NotBlocked;
+ break;
+
+ case BLOCKED_FETCH:
+ /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
+ next = bqe->link;
+ bqe->link = (StgBlockingQueueElement *)PendingFetches;
+ PendingFetches = (StgBlockedFetch *)bqe;
+ break;
+
+# if defined(DEBUG)
+ /* can ignore this case in a non-debugging setup;
+ see comments on RBHSave closures above */
+ case CONSTR:
+ /* check that the closure is an RBHSave closure */
+ ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
+ get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
+ get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
+ break;
+
+ default:
+ barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
+ get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
+ (StgClosure *)bqe);
+# endif
+ }
+ IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
+ return next;
+}
+#endif
+
+StgTSO *
+unblockOne (Capability *cap, StgTSO *tso)
+{
+ return unblockOne_(cap,tso,rtsTrue); // allow migration
+}
+
+StgTSO *
+unblockOne_ (Capability *cap, StgTSO *tso,
+ rtsBool allow_migrate USED_IF_THREADS)
+{
+ StgTSO *next;
+
+ ASSERT(get_itbl(tso)->type == TSO);
+ ASSERT(tso->why_blocked != NotBlocked);
+
+ tso->why_blocked = NotBlocked;
+ next = tso->link;
+ tso->link = END_TSO_QUEUE;
+
+#if defined(THREADED_RTS)
+ if (tso->cap == cap || (!tsoLocked(tso) &&
+ allow_migrate &&
+ RtsFlags.ParFlags.wakeupMigrate)) {
+ // We are waking up this thread on the current Capability, which
+ // might involve migrating it from the Capability it was last on.
+ if (tso->bound) {
+ ASSERT(tso->bound->cap == tso->cap);
+ tso->bound->cap = cap;
+ }
+ tso->cap = cap;
+ appendToRunQueue(cap,tso);
+ // we're holding a newly woken thread, make sure we context switch
+ // quickly so we can migrate it if necessary.
+ context_switch = 1;
+ } else {
+ // we'll try to wake it up on the Capability it was last on.
+ wakeupThreadOnCapability_lock(tso->cap, tso);
+ }
+#else
+ appendToRunQueue(cap,tso);
+ context_switch = 1;
+#endif
+
+ debugTrace(DEBUG_sched,
+ "waking up thread %ld on cap %d",
+ (long)tso->id, tso->cap->no);
+
+ return next;
+}
+
+/* ----------------------------------------------------------------------------
+ awakenBlockedQueue
+
+ wakes up all the threads on the specified queue.
+ ------------------------------------------------------------------------- */
+
+#if defined(GRAN)
+void
+awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
+{
+ StgBlockingQueueElement *bqe;
+ PEs node_loc;
+ nat len = 0;
+
+ IF_GRAN_DEBUG(bq,
+ debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
+ node, CurrentProc, CurrentTime[CurrentProc],
+ CurrentTSO->id, CurrentTSO));
+
+ node_loc = where_is(node);
+
+ ASSERT(q == END_BQ_QUEUE ||
+ get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
+ get_itbl(q)->type == CONSTR); // closure (type constructor)
+ ASSERT(is_unique(node));
+
+ /* FAKE FETCH: magically copy the node to the tso's proc;
+ no Fetch necessary because in reality the node should not have been
+ moved to the other PE in the first place
+ */
+ if (CurrentProc!=node_loc) {
+ IF_GRAN_DEBUG(bq,
+ debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
+ node, node_loc, CurrentProc, CurrentTSO->id,
+ // CurrentTSO, where_is(CurrentTSO),
+ node->header.gran.procs));
+ node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
+ IF_GRAN_DEBUG(bq,
+ debugBelch("## new bitmask of node %p is %#x\n",
+ node, node->header.gran.procs));
+ if (RtsFlags.GranFlags.GranSimStats.Global) {
+ globalGranStats.tot_fake_fetches++;
+ }
+ }
+
+ bqe = q;
+ // ToDo: check: ASSERT(CurrentProc==node_loc);
+ while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
+ //next = bqe->link;
+ /*
+ bqe points to the current element in the queue
+ next points to the next element in the queue
+ */
+ //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
+ //tso_loc = where_is(tso);
+ len++;
+ bqe = unblockOne(bqe, node);
+ }
+
+ /* if this is the BQ of an RBH, we have to put back the info ripped out of
+ the closure to make room for the anchor of the BQ */
+ if (bqe!=END_BQ_QUEUE) {
+ ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
+ /*
+ ASSERT((info_ptr==&RBH_Save_0_info) ||
+ (info_ptr==&RBH_Save_1_info) ||
+ (info_ptr==&RBH_Save_2_info));
+ */
+ /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
+ ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
+ ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
+
+ IF_GRAN_DEBUG(bq,
+ debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
+ node, info_type(node)));
+ }
+
+ /* statistics gathering */
+ if (RtsFlags.GranFlags.GranSimStats.Global) {
+ // globalGranStats.tot_bq_processing_time += bq_processing_time;
+ globalGranStats.tot_bq_len += len; // total length of all bqs awakened
+ // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
+ globalGranStats.tot_awbq++; // total no. of bqs awakened
+ }
+ IF_GRAN_DEBUG(bq,
+ debugBelch("## BQ Stats of %p: [%d entries] %s\n",
+ node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
+}
+#elif defined(PARALLEL_HASKELL)
+void
+awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
+{
+ StgBlockingQueueElement *bqe;
+
+ IF_PAR_DEBUG(verbose,
+ debugBelch("##-_ AwBQ for node %p on [%x]: \n",
+ node, mytid));
+#ifdef DIST
+ //RFP
+ if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
+ IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
+ return;
+ }
+#endif
+
+ ASSERT(q == END_BQ_QUEUE ||
+ get_itbl(q)->type == TSO ||
+ get_itbl(q)->type == BLOCKED_FETCH ||
+ get_itbl(q)->type == CONSTR);
+
+ bqe = q;
+ while (get_itbl(bqe)->type==TSO ||
+ get_itbl(bqe)->type==BLOCKED_FETCH) {
+ bqe = unblockOne(bqe, node);
+ }
+}
+
+#else /* !GRAN && !PARALLEL_HASKELL */
+
+void
+awakenBlockedQueue(Capability *cap, StgTSO *tso)
+{
+ while (tso != END_TSO_QUEUE) {
+ tso = unblockOne(cap,tso);
+ }
+}
+#endif
+
+
+/* ---------------------------------------------------------------------------
+ * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
+ * used by Control.Concurrent for error checking.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+rtsSupportsBoundThreads(void)
+{
+#if defined(THREADED_RTS)
+ return rtsTrue;
+#else
+ return rtsFalse;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * isThreadBound(tso): check whether tso is bound to an OS thread.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+isThreadBound(StgTSO* tso USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ return (tso->bound != NULL);
+#endif
+ return rtsFalse;
+}
+
+/* ----------------------------------------------------------------------------
+ * Debugging: why is a thread blocked
+ * ------------------------------------------------------------------------- */
+
+#if DEBUG
+void
+printThreadBlockage(StgTSO *tso)
+{
+ switch (tso->why_blocked) {
+ case BlockedOnRead:
+ debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
+ break;
+ case BlockedOnWrite:
+ debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
+ break;
+#if defined(mingw32_HOST_OS)
+ case BlockedOnDoProc:
+ debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
+ break;
+#endif
+ case BlockedOnDelay:
+ debugBelch("is blocked until %ld", (long)(tso->block_info.target));
+ break;
+ case BlockedOnMVar:
+ debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
+ break;
+ case BlockedOnException:
+ debugBelch("is blocked on delivering an exception to thread %d",
+ tso->block_info.tso->id);
+ break;
+ case BlockedOnBlackHole:
+ debugBelch("is blocked on a black hole");
+ break;
+ case NotBlocked:
+ debugBelch("is not blocked");
+ break;
+#if defined(PARALLEL_HASKELL)
+ case BlockedOnGA:
+ debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
+ tso->block_info.closure, info_type(tso->block_info.closure));
+ break;
+ case BlockedOnGA_NoSend:
+ debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
+ tso->block_info.closure, info_type(tso->block_info.closure));
+ break;
+#endif
+ case BlockedOnCCall:
+ debugBelch("is blocked on an external call");
+ break;
+ case BlockedOnCCall_NoUnblockExc:
+ debugBelch("is blocked on an external call (exceptions were already blocked)");
+ break;
+ case BlockedOnSTM:
+ debugBelch("is blocked on an STM operation");
+ break;
+ default:
+ barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
+ tso->why_blocked, tso->id, tso);
+ }
+}
+
+void
+printThreadStatus(StgTSO *t)
+{
+ debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+ {
+ void *label = lookupThreadLabel(t->id);
+ if (label) debugBelch("[\"%s\"] ",(char *)label);
+ }
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ } else {
+ switch (t->what_next) {
+ case ThreadKilled:
+ debugBelch("has been killed");
+ break;
+ case ThreadComplete:
+ debugBelch("has completed");
+ break;
+ default:
+ printThreadBlockage(t);
+ }
+ debugBelch("\n");
+ }
+}
+
+void
+printAllThreads(void)
+{
+ StgTSO *t, *next;
+ nat i;
+ Capability *cap;
+
+# if defined(GRAN)
+ char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
+ ullong_format_string(TIME_ON_PROC(CurrentProc),
+ time_string, rtsFalse/*no commas!*/);
+
+ debugBelch("all threads at [%s]:\n", time_string);
+# elif defined(PARALLEL_HASKELL)
+ char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
+ ullong_format_string(CURRENT_TIME,
+ time_string, rtsFalse/*no commas!*/);
+
+ debugBelch("all threads at [%s]:\n", time_string);
+# else
+ debugBelch("all threads:\n");
+# endif
+
+ for (i = 0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ debugBelch("threads on capability %d:\n", cap->no);
+ for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
+ printThreadStatus(t);
+ }
+ }
+
+ debugBelch("other threads:\n");
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->why_blocked != NotBlocked) {
+ printThreadStatus(t);
+ }
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ }
+ }
+}
+
+// useful from gdb
+void
+printThreadQueue(StgTSO *t)
+{
+ nat i = 0;
+ for (; t != END_TSO_QUEUE; t = t->link) {
+ printThreadStatus(t);
+ i++;
+ }
+ debugBelch("%d threads on queue\n", i);
+}
+
+/*
+ Print a whole blocking queue attached to node (debugging only).
+*/
+# if defined(PARALLEL_HASKELL)
+void
+print_bq (StgClosure *node)
+{
+ StgBlockingQueueElement *bqe;
+ StgTSO *tso;
+ rtsBool end;
+
+ debugBelch("## BQ of closure %p (%s): ",
+ node, info_type(node));
+
+ /* should cover all closures that may have a blocking queue */
+ ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
+ get_itbl(node)->type == FETCH_ME_BQ ||
+ get_itbl(node)->type == RBH ||
+ get_itbl(node)->type == MVAR);
+
+ ASSERT(node!=(StgClosure*)NULL); // sanity check
+
+ print_bqe(((StgBlockingQueue*)node)->blocking_queue);
+}
+
+/*
+ Print a whole blocking queue starting with the element bqe.
+*/
+void
+print_bqe (StgBlockingQueueElement *bqe)
+{
+ rtsBool end;
+
+ /*
+ NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
+ */
+ for (end = (bqe==END_BQ_QUEUE);
+ !end; // iterate until bqe points to a CONSTR
+ end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
+ bqe = end ? END_BQ_QUEUE : bqe->link) {
+ ASSERT(bqe != END_BQ_QUEUE); // sanity check
+ ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
+ /* types of closures that may appear in a blocking queue */
+ ASSERT(get_itbl(bqe)->type == TSO ||
+ get_itbl(bqe)->type == BLOCKED_FETCH ||
+ get_itbl(bqe)->type == CONSTR);
+ /* only BQs of an RBH end with an RBH_Save closure */
+ //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
+
+ switch (get_itbl(bqe)->type) {
+ case TSO:
+ debugBelch(" TSO %u (%x),",
+ ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
+ break;
+ case BLOCKED_FETCH:
+ debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
+ ((StgBlockedFetch *)bqe)->node,
+ ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
+ ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
+ ((StgBlockedFetch *)bqe)->ga.weight);
+ break;
+ case CONSTR:
+ debugBelch(" %s (IP %p),",
+ (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
+ get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
+ get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
+ "RBH_Save_?"), get_itbl(bqe));
+ break;
+ default:
+ barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
+ info_type((StgClosure *)bqe)); // , node, info_type(node));
+ break;
+ }
+ } /* for */
+ debugBelch("\n");
+}
+# elif defined(GRAN)
+void
+print_bq (StgClosure *node)
+{
+ StgBlockingQueueElement *bqe;
+ PEs node_loc, tso_loc;
+ rtsBool end;
+
+ /* should cover all closures that may have a blocking queue */
+ ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
+ get_itbl(node)->type == FETCH_ME_BQ ||
+ get_itbl(node)->type == RBH);
+
+ ASSERT(node!=(StgClosure*)NULL); // sanity check
+ node_loc = where_is(node);
+
+ debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
+ node, info_type(node), node_loc);
+
+ /*
+ NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
+ */
+ for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
+ !end; // iterate until bqe points to a CONSTR
+ end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
+ ASSERT(bqe != END_BQ_QUEUE); // sanity check
+ ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
+ /* types of closures that may appear in a blocking queue */
+ ASSERT(get_itbl(bqe)->type == TSO ||
+ get_itbl(bqe)->type == CONSTR);
+ /* only BQs of an RBH end with an RBH_Save closure */
+ ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
+
+ tso_loc = where_is((StgClosure *)bqe);
+ switch (get_itbl(bqe)->type) {
+ case TSO:
+ debugBelch(" TSO %d (%p) on [PE %d],",
+ ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
+ break;
+ case CONSTR:
+ debugBelch(" %s (IP %p),",
+ (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
+ get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
+ get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
+ "RBH_Save_?"), get_itbl(bqe));
+ break;
+ default:
+ barf("Unexpected closure type %s in blocking queue of %p (%s)",
+ info_type((StgClosure *)bqe), node, info_type(node));
+ break;
+ }
+ } /* for */
+ debugBelch("\n");
+}
+# endif
+
+#if defined(PARALLEL_HASKELL)
+nat
+run_queue_len(void)
+{
+ nat i;
+ StgTSO *tso;
+
+ for (i=0, tso=run_queue_hd;
+ tso != END_TSO_QUEUE;
+ i++, tso=tso->link) {
+ /* nothing */
+ }
+
+ return i;
+}
+#endif
+
+#endif /* DEBUG */
diff --git a/rts/Threads.h b/rts/Threads.h
new file mode 100644
index 0000000000..e331c50dae
--- /dev/null
+++ b/rts/Threads.h
@@ -0,0 +1,46 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2006
+ *
+ * Thread-related functionality
+ *
+ * --------------------------------------------------------------------------*/
+
+#ifndef THREADS_H
+#define THREADS_H
+
+#if defined(GRAN) || defined(PARALLEL_HASKELL)
+StgBlockingQueueElement * unblockOne (StgBlockingQueueElement *bqe,
+ StgClosure *node);
+#else
+StgTSO * unblockOne (Capability *cap, StgTSO *tso);
+StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate);
+#endif
+
+#if defined(GRAN) || defined(PARALLEL_HASKELL)
+void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
+#else
+void awakenBlockedQueue (Capability *cap, StgTSO *tso);
+#endif
+
+void removeThreadFromMVarQueue (StgMVar *mvar, StgTSO *tso);
+void removeThreadFromQueue (StgTSO **queue, StgTSO *tso);
+void removeThreadFromDeQueue (StgTSO **head, StgTSO **tail, StgTSO *tso);
+
+StgBool isThreadBound (StgTSO* tso);
+
+#ifdef DEBUG
+void printThreadBlockage (StgTSO *tso);
+void printThreadStatus (StgTSO *t);
+void printAllThreads (void);
+void printThreadQueue (StgTSO *t);
+# if defined(PARALLEL_HASKELL)
+void print_bq (StgClosure *node);
+void print_bqe (StgBlockingQueueElement *bqe);
+nat run_queue_len (void);
+# elif defined(GRAN)
+void print_bq (StgClosure *node);
+# endif
+#endif
+
+#endif /* THREADS_H */
diff --git a/rts/Trace.h b/rts/Trace.h
index 19e492c26e..cf6c1411c0 100644
--- a/rts/Trace.h
+++ b/rts/Trace.h
@@ -45,11 +45,11 @@ void traceEnd (void);
#ifdef DEBUG
#define debugTrace(class, str, ...) trace(class,str, ## __VA_ARGS__)
// variable arg macros are C99, and supported by gcc.
-#define debugTraceBegin(class, str, ...) traceBegin(class,str, ## __VA_ARGS__)
+#define debugTraceBegin(str, ...) traceBegin(str, ## __VA_ARGS__)
#define debugTraceEnd() traceEnd()
#else
#define debugTrace(class, str, ...) /* nothing */
-#define debugTraceBegin(class, str, ...) /* nothing */
+#define debugTraceBegin(str, ...) /* nothing */
#define debugTraceEnd() /* nothing */
#endif