diff options
-rw-r--r-- | includes/Constants.h | 30 | ||||
-rw-r--r-- | includes/SMP.h | 15 | ||||
-rw-r--r-- | includes/StgMiscClosures.h | 2 | ||||
-rw-r--r-- | includes/TSO.h | 30 | ||||
-rw-r--r-- | includes/mkDerivedConstants.c | 4 | ||||
-rw-r--r-- | rts/Capability.c | 31 | ||||
-rw-r--r-- | rts/Capability.h | 4 | ||||
-rw-r--r-- | rts/Exception.cmm | 186 | ||||
-rw-r--r-- | rts/Exception.h | 40 | ||||
-rw-r--r-- | rts/GC.c | 15 | ||||
-rw-r--r-- | rts/GCCompact.c | 4 | ||||
-rw-r--r-- | rts/HCIncludes.h | 22 | ||||
-rw-r--r-- | rts/HeapStackCheck.cmm | 25 | ||||
-rw-r--r-- | rts/Makefile | 21 | ||||
-rw-r--r-- | rts/RaiseAsync.c | 1015 | ||||
-rw-r--r-- | rts/RaiseAsync.h | 71 | ||||
-rw-r--r-- | rts/Schedule.c | 1646 | ||||
-rw-r--r-- | rts/Schedule.h | 23 | ||||
-rw-r--r-- | rts/ThreadLabels.c | 17 | ||||
-rw-r--r-- | rts/ThreadLabels.h | 24 | ||||
-rw-r--r-- | rts/Threads.c | 974 | ||||
-rw-r--r-- | rts/Threads.h | 46 | ||||
-rw-r--r-- | rts/Trace.h | 4 |
23 files changed, 2461 insertions, 1788 deletions
diff --git a/includes/Constants.h b/includes/Constants.h index 4f3c35b744..ef2a4865e2 100644 --- a/includes/Constants.h +++ b/includes/Constants.h @@ -230,6 +230,36 @@ #define ThreadBlocked 4 #define ThreadFinished 5 +/* + * Flags for the tso->flags field. + * + * The TSO_DIRTY flag indicates that this TSO's stack should be + * scanned during garbage collection. The link field of a TSO is + * always scanned, so we don't have to dirty a TSO just for linking + * it on a different list. + * + * TSO_DIRTY is set by + * - schedule(), just before running a thread, + * - raiseAsync(), because it modifies a thread's stack + * - resumeThread(), just before running the thread again + * and unset by the garbage collector (only). + */ +#define TSO_DIRTY 1 + +/* + * TSO_LOCKED is set when a TSO is locked to a particular Capability. + */ +#define TSO_LOCKED 2 + +/* + * TSO_BLOCKEX: the TSO is blocking exceptions + * + * TSO_INTERRUPTIBLE: the TSO can be interrupted if it blocks + * interruptibly (eg. with BlockedOnMVar). + */ +#define TSO_BLOCKEX 4 +#define TSO_INTERRUPTIBLE 8 + /* ----------------------------------------------------------------------------- RET_DYN stack frames -------------------------------------------------------------------------- */ diff --git a/includes/SMP.h b/includes/SMP.h index 5974c962ad..d985576cb4 100644 --- a/includes/SMP.h +++ b/includes/SMP.h @@ -155,6 +155,21 @@ xchg(StgPtr p, StgWord w) return old; } +INLINE_HEADER StgInfoTable * +lockClosure(StgClosure *p) +{ return (StgInfoTable *)p->header.info; } + +INLINE_HEADER void +unlockClosure(StgClosure *p STG_UNUSED, StgInfoTable *info STG_UNUSED) +{ /* nothing */ } + #endif /* !THREADED_RTS */ +// Handy specialised versions of lockClosure()/unlockClosure() +INLINE_HEADER void lockTSO(StgTSO *tso) +{ lockClosure((StgClosure *)tso); } + +INLINE_HEADER void unlockTSO(StgTSO *tso) +{ unlockClosure((StgClosure*)tso, (StgInfoTable*)&stg_TSO_info); } + #endif /* SMP_H */ diff --git a/includes/StgMiscClosures.h b/includes/StgMiscClosures.h index 4a6a7c47c2..fcc973630a 100644 --- a/includes/StgMiscClosures.h +++ b/includes/StgMiscClosures.h @@ -490,6 +490,8 @@ RTS_FUN(stg_block_async_void); RTS_ENTRY(stg_block_async_void_ret); #endif RTS_FUN(stg_block_stmwait); +RTS_FUN(stg_block_throwto); +RTS_RET_INFO(stg_block_throwto_info); /* Entry/exit points from StgStartup.cmm */ diff --git a/includes/TSO.h b/includes/TSO.h index d096d401cf..0c3e4eec38 100644 --- a/includes/TSO.h +++ b/includes/TSO.h @@ -77,27 +77,6 @@ typedef StgTSOStatBuf StgTSOGranInfo; */ typedef StgWord32 StgThreadID; -/* - * Flags for the tso->flags field. - * - * The TSO_DIRTY flag indicates that this TSO's stack should be - * scanned during garbage collection. The link field of a TSO is - * always scanned, so we don't have to dirty a TSO just for linking - * it on a different list. - * - * TSO_DIRTY is set by - * - schedule(), just before running a thread, - * - raiseAsync(), because it modifies a thread's stack - * - resumeThread(), just before running the thread again - * and unset by the garbage collector (only). - */ -#define TSO_DIRTY 1 - -/* - * TSO_LOCKED is set when a TSO is locked to a particular Capability. - */ -#define TSO_LOCKED 2 - #define tsoDirty(tso) ((tso)->flags & TSO_DIRTY) #define tsoLocked(tso) ((tso)->flags & TSO_LOCKED) @@ -127,6 +106,7 @@ typedef union { StgWord target; } StgTSOBlockInfo; + /* * TSOs live on the heap, and therefore look just like heap objects. * Large TSOs will live in their own "block group" allocated by the @@ -151,13 +131,19 @@ typedef struct StgTSO_ { StgWord16 why_blocked; /* Values defined in Constants.h */ StgWord32 flags; StgTSOBlockInfo block_info; - struct StgTSO_* blocked_exceptions; StgThreadID id; int saved_errno; struct Task_* bound; struct Capability_* cap; struct StgTRecHeader_ * trec; /* STM transaction record */ + /* + A list of threads blocked on this TSO waiting to throw + exceptions. In order to access this field, the TSO must be + locked using lockClosure/unlockClosure (see SMP.h). + */ + struct StgTSO_ * blocked_exceptions; + #ifdef TICKY_TICKY /* TICKY-specific stuff would go here. */ #endif diff --git a/includes/mkDerivedConstants.c b/includes/mkDerivedConstants.c index 27d4fa9e7b..62d949041b 100644 --- a/includes/mkDerivedConstants.c +++ b/includes/mkDerivedConstants.c @@ -18,6 +18,7 @@ * doesn't affect the offsets of anything else. */ #define PROFILING +#define THREADED_RTS #include "Rts.h" #include "RtsFlags.h" @@ -227,6 +228,7 @@ main(int argc, char *argv[]) def_offset("stgGCFun", FUN_OFFSET(stgGCFun)); field_offset(Capability, r); + field_offset(Capability, lock); struct_field(bdescr, start); struct_field(bdescr, free); @@ -276,8 +278,10 @@ main(int argc, char *argv[]) closure_field(StgTSO, block_info); closure_field(StgTSO, blocked_exceptions); closure_field(StgTSO, id); + closure_field(StgTSO, cap); closure_field(StgTSO, saved_errno); closure_field(StgTSO, trec); + closure_field(StgTSO, flags); closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS); tso_field(StgTSO, sp); tso_field_offset(StgTSO, stack); diff --git a/rts/Capability.c b/rts/Capability.c index 0415092a03..2384262ae9 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -518,8 +518,10 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso) { ASSERT(tso->cap == cap); ASSERT(tso->bound ? tso->bound->cap == cap : 1); + ASSERT_LOCK_HELD(&cap->lock); + + tso->cap = cap; - ACQUIRE_LOCK(&cap->lock); if (cap->running_task == NULL) { // nobody is running this Capability, we can add our thread // directly onto the run queue and start up a Task to run it. @@ -535,6 +537,33 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso) // freed without first checking the wakeup queue (see // releaseCapability_). } +} + +void +wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso) +{ + ACQUIRE_LOCK(&cap->lock); + migrateThreadToCapability (cap, tso); + RELEASE_LOCK(&cap->lock); +} + +void +migrateThreadToCapability (Capability *cap, StgTSO *tso) +{ + // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) + if (tso->bound) { + ASSERT(tso->bound->cap == tso->cap); + tso->bound->cap = cap; + } + tso->cap = cap; + wakeupThreadOnCapability(cap,tso); +} + +void +migrateThreadToCapability_lock (Capability *cap, StgTSO *tso) +{ + ACQUIRE_LOCK(&cap->lock); + migrateThreadToCapability (cap, tso); RELEASE_LOCK(&cap->lock); } diff --git a/rts/Capability.h b/rts/Capability.h index a2551d0cc5..641f37db01 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -199,6 +199,10 @@ void waitForCapability (Task *task, Mutex *mutex, Capability **pCap); // from the one held by the current Task). // void wakeupThreadOnCapability (Capability *cap, StgTSO *tso); +void wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso); + +void migrateThreadToCapability (Capability *cap, StgTSO *tso); +void migrateThreadToCapability_lock (Capability *cap, StgTSO *tso); // Wakes up a worker thread on just one Capability, used when we // need to service some global event. diff --git a/rts/Exception.cmm b/rts/Exception.cmm index b5c29626b2..f4327b9ce2 100644 --- a/rts/Exception.cmm +++ b/rts/Exception.cmm @@ -11,6 +11,7 @@ * ---------------------------------------------------------------------------*/ #include "Cmm.h" +#include "RaiseAsync.h" /* ----------------------------------------------------------------------------- Exception Primitives @@ -54,13 +55,13 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, { // Not true: see comments above // ASSERT(StgTSO_blocked_exceptions(CurrentTSO) != NULL); -#if defined(GRAN) || defined(PAR) - foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr", - NULL "ptr"); -#else - foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr"); -#endif - StgTSO_blocked_exceptions(CurrentTSO) = NULL; + + foreign "C" awakenBlockedExceptionQueue(MyCapability() "ptr", + CurrentTSO "ptr") [R1]; + + StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) & + ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32); + #ifdef REG_R1 Sp_adj(1); jump %ENTRY_CODE(Sp(0)); @@ -76,7 +77,10 @@ INFO_TABLE_RET( stg_blockAsyncExceptionszh_ret, { // Not true: see comments above // ASSERT(StgTSO_blocked_exceptions(CurrentTSO) == NULL); - StgTSO_blocked_exceptions(CurrentTSO) = END_TSO_QUEUE; + + StgTSO_flags(CurrentTSO) = + StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32; + #ifdef REG_R1 Sp_adj(1); jump %ENTRY_CODE(Sp(0)); @@ -92,15 +96,18 @@ blockAsyncExceptionszh_fast /* Args: R1 :: IO a */ STK_CHK_GEN( WDS(2)/* worst case */, R1_PTR, blockAsyncExceptionszh_fast); - if (StgTSO_blocked_exceptions(CurrentTSO) == NULL) { - StgTSO_blocked_exceptions(CurrentTSO) = END_TSO_QUEUE; - /* avoid growing the stack unnecessarily */ - if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) { - Sp_adj(1); - } else { - Sp_adj(-1); - Sp(0) = stg_unblockAsyncExceptionszh_ret_info; - } + if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) == 0) { + + StgTSO_flags(CurrentTSO) = + StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32; + + /* avoid growing the stack unnecessarily */ + if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) { + Sp_adj(1); + } else { + Sp_adj(-1); + Sp(0) = stg_unblockAsyncExceptionszh_ret_info; + } } TICK_UNKNOWN_CALL(); TICK_SLOW_CALL_v(); @@ -112,22 +119,17 @@ unblockAsyncExceptionszh_fast /* Args: R1 :: IO a */ STK_CHK_GEN( WDS(2), R1_PTR, unblockAsyncExceptionszh_fast); - if (StgTSO_blocked_exceptions(CurrentTSO) != NULL) { -#if defined(GRAN) || defined(PAR) - foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr", - StgTSO_block_info(CurrentTSO) "ptr"); -#else - foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr"); -#endif - StgTSO_blocked_exceptions(CurrentTSO) = NULL; + if (TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) { + foreign "C" awakenBlockedExceptionQueue(MyCapability() "ptr", + CurrentTSO "ptr") [R1]; - /* avoid growing the stack unnecessarily */ - if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) { - Sp_adj(1); - } else { - Sp_adj(-1); - Sp(0) = stg_blockAsyncExceptionszh_ret_info; - } + /* avoid growing the stack unnecessarily */ + if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) { + Sp_adj(1); + } else { + Sp_adj(-1); + Sp(0) = stg_blockAsyncExceptionszh_ret_info; + } } TICK_UNKNOWN_CALL(); TICK_SLOW_CALL_v(); @@ -135,74 +137,62 @@ unblockAsyncExceptionszh_fast } -#define interruptible(what_next) \ - ( what_next == BlockedOnMVar \ - || what_next == BlockedOnException \ - || what_next == BlockedOnRead \ - || what_next == BlockedOnWrite \ - || what_next == BlockedOnDelay \ - || what_next == BlockedOnDoProc) - killThreadzh_fast { - /* args: R1 = TSO to kill, R2 = Exception */ - - W_ why_blocked; - - /* This thread may have been relocated. - * (see Schedule.c:threadStackOverflow) - */ - while: - if (StgTSO_what_next(R1) == ThreadRelocated::I16) { - R1 = StgTSO_link(R1); - goto while; - } - - /* Determine whether this thread is interruptible or not */ - - /* If the target thread is currently blocking async exceptions, - * we'll have to block until it's ready to accept them. The - * exception is interruptible threads - ie. those that are blocked - * on some resource. - */ - why_blocked = TO_W_(StgTSO_why_blocked(R1)); - if (StgTSO_blocked_exceptions(R1) != NULL && !interruptible(why_blocked)) - { - StgTSO_link(CurrentTSO) = StgTSO_blocked_exceptions(R1); - StgTSO_blocked_exceptions(R1) = CurrentTSO; - - StgTSO_why_blocked(CurrentTSO) = BlockedOnException::I16; - StgTSO_block_info(CurrentTSO) = R1; - - BLOCK( R1_PTR & R2_PTR, killThreadzh_fast ); - } - - /* Killed threads turn into zombies, which might be garbage - * collected at a later date. That's why we don't have to - * explicitly remove them from any queues they might be on. - */ - - /* We might have killed ourselves. In which case, better be *very* - * careful. If the exception killed us, then return to the scheduler. - * If the exception went to a catch frame, we'll just continue from - * the handler. - */ - if (R1 == CurrentTSO) { + /* args: R1 = TSO to kill, R2 = Exception */ + + W_ why_blocked; + W_ target; + W_ exception; + + target = R1; + exception = R2; + + STK_CHK_GEN( WDS(3), R1_PTR | R2_PTR, killThreadzh_fast); + + /* + * We might have killed ourselves. In which case, better be *very* + * careful. If the exception killed us, then return to the scheduler. + * If the exception went to a catch frame, we'll just continue from + * the handler. + */ + if (target == CurrentTSO) { SAVE_THREAD_STATE(); - foreign "C" raiseAsync(MyCapability() "ptr", R1 "ptr", R2 "ptr"); + /* ToDo: what if the current thread is blocking exceptions? */ + foreign "C" throwToSingleThreaded(MyCapability() "ptr", + target "ptr", exception "ptr")[R1,R2]; if (StgTSO_what_next(CurrentTSO) == ThreadKilled::I16) { - R1 = ThreadFinished; - jump StgReturn; + R1 = ThreadFinished; + jump StgReturn; } else { - LOAD_THREAD_STATE(); - ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16); - jump %ENTRY_CODE(Sp(0)); + LOAD_THREAD_STATE(); + ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16); + jump %ENTRY_CODE(Sp(0)); + } + } else { + W_ out; + W_ retcode; + out = BaseReg + OFFSET_StgRegTable_rmp_tmp_w; + + retcode = foreign "C" throwTo(MyCapability() "ptr", + CurrentTSO "ptr", + target "ptr", + exception "ptr", + out "ptr") [R1,R2]; + + switch [THROWTO_SUCCESS .. THROWTO_BLOCKED] (retcode) { + + case THROWTO_SUCCESS: { + jump %ENTRY_CODE(Sp(0)); } - } else { - foreign "C" raiseAsync(MyCapability() "ptr", R1 "ptr", R2 "ptr"); - } - jump %ENTRY_CODE(Sp(0)); + case THROWTO_BLOCKED: { + R3 = W_[out]; + // we must block, and call throwToReleaseTarget() before returning + jump stg_block_throwto; + } + } + } } /* ----------------------------------------------------------------------------- @@ -300,15 +290,14 @@ catchzh_fast SET_HDR(Sp,stg_catch_frame_info,W_[CCCS]); StgCatchFrame_handler(Sp) = R2; - StgCatchFrame_exceptions_blocked(Sp) = - (StgTSO_blocked_exceptions(CurrentTSO) != NULL); + StgCatchFrame_exceptions_blocked(Sp) = TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX; TICK_CATCHF_PUSHED(); /* Apply R1 to the realworld token */ TICK_UNKNOWN_CALL(); TICK_SLOW_CALL_v(); jump stg_ap_v_fast; -} +} /* ----------------------------------------------------------------------------- * The raise infotable @@ -423,9 +412,8 @@ retry_pop_stack: /* Ensure that async excpetions are blocked when running the handler. */ - if (StgTSO_blocked_exceptions(CurrentTSO) == NULL) { - StgTSO_blocked_exceptions(CurrentTSO) = END_TSO_QUEUE; - } + StgTSO_flags(CurrentTSO) = + StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32; /* Call the handler, passing the exception value and a realworld * token as arguments. diff --git a/rts/Exception.h b/rts/Exception.h deleted file mode 100644 index f7832f4045..0000000000 --- a/rts/Exception.h +++ /dev/null @@ -1,40 +0,0 @@ -/* ----------------------------------------------------------------------------- - * - * (c) The GHC Team, 1998-2005 - * - * Exception support - * - * ---------------------------------------------------------------------------*/ - -#ifndef EXCEPTION_H -#define EXCEPTION_H - -extern const StgRetInfoTable stg_blockAsyncExceptionszh_ret_info; -extern const StgRetInfoTable stg_unblockAsyncExceptionszh_ret_info; - -/* Determine whether a thread is interruptible (ie. blocked - * indefinitely). Interruptible threads can be sent an exception with - * killThread# even if they have async exceptions blocked. - */ -STATIC_INLINE int -interruptible(StgTSO *t) -{ - switch (t->why_blocked) { - case BlockedOnMVar: - case BlockedOnException: - case BlockedOnRead: - case BlockedOnWrite: -#if defined(mingw32_HOST_OS) - case BlockedOnDoProc: -#endif - case BlockedOnDelay: - return 1; - // NB. Threaded blocked on foreign calls (BlockedOnCCall) are - // *not* interruptible. We can't send these threads an exception. - default: - return 0; - } -} - -#endif /* EXCEPTION_H */ - @@ -44,6 +44,7 @@ #endif #include "Trace.h" #include "RetainerProfile.h" +#include "RaiseAsync.h" #include <string.h> @@ -2631,10 +2632,8 @@ scavengeTSO (StgTSO *tso) ) { tso->block_info.closure = evacuate(tso->block_info.closure); } - if ( tso->blocked_exceptions != NULL ) { - tso->blocked_exceptions = - (StgTSO *)evacuate((StgClosure *)tso->blocked_exceptions); - } + tso->blocked_exceptions = + (StgTSO *)evacuate((StgClosure *)tso->blocked_exceptions); // We don't always chase the link field: TSOs on the blackhole // queue are not automatically alive, so the link field is a @@ -4620,6 +4619,14 @@ threadPaused(Capability *cap, StgTSO *tso) nat weight_pending = 0; rtsBool prev_was_update_frame; + // Check to see whether we have threads waiting to raise + // exceptions, and we're not blocking exceptions, or are blocked + // interruptibly. This is important; if a thread is running with + // TSO_BLOCKEX and becomes blocked interruptibly, this is the only + // place we ensure that the blocked_exceptions get a chance. + maybePerformBlockedException (cap, tso); + if (tso->what_next == ThreadKilled) { return; } + stack_end = &tso->stack[tso->stack_size]; frame = (StgClosure *)tso->sp; diff --git a/rts/GCCompact.c b/rts/GCCompact.c index 45222c3b9b..7f91501101 100644 --- a/rts/GCCompact.c +++ b/rts/GCCompact.c @@ -403,9 +403,7 @@ thread_TSO (StgTSO *tso) ) { thread_(&tso->block_info.closure); } - if ( tso->blocked_exceptions != NULL ) { - thread_(&tso->blocked_exceptions); - } + thread_(&tso->blocked_exceptions); thread_(&tso->trec); diff --git a/rts/HCIncludes.h b/rts/HCIncludes.h new file mode 100644 index 0000000000..06cc61a8a5 --- /dev/null +++ b/rts/HCIncludes.h @@ -0,0 +1,22 @@ +/* includes for compiling .cmm files via-C */ +#include "Rts.h" +#include "RtsFlags.h" +#include "RtsUtils.h" +#include "StgRun.h" +#include "Schedule.h" +#include "Printer.h" +#include "Sanity.h" +#include "STM.h" +#include "Storage.h" +#include "SchedAPI.h" +#include "Timer.h" +#include "ProfHeap.h" +#include "LdvProfile.h" +#include "Profiling.h" +#include "OSThreads.h" +#include "Apply.h" +#include "SMP.h" +#include "RaiseAsync.h" +#include "ThreadLabels.h" +#include "Threads.h" +#include "Prelude.h" diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm index 4e5dd24596..aae28cb77f 100644 --- a/rts/HeapStackCheck.cmm +++ b/rts/HeapStackCheck.cmm @@ -902,6 +902,31 @@ stg_block_blackhole BLOCK_BUT_FIRST(stg_block_blackhole_finally); } +INFO_TABLE_RET( stg_block_throwto, 2/*framesize*/, 0/*bitmap*/, RET_SMALL ) +{ + R2 = Sp(2); + R1 = Sp(1); + Sp_adj(3); + jump killThreadzh_fast; +} + +stg_block_throwto_finally +{ +#ifdef THREADED_RTS + foreign "C" throwToReleaseTarget (R3 "ptr"); +#endif + jump StgReturn; +} + +stg_block_throwto +{ + Sp_adj(-3); + Sp(2) = R2; + Sp(1) = R1; + Sp(0) = stg_block_throwto_info; + BLOCK_BUT_FIRST(stg_block_throwto_finally); +} + #ifdef mingw32_HOST_OS INFO_TABLE_RET( stg_block_async, 0/*framesize*/, 0/*bitmap*/, RET_SMALL ) { diff --git a/rts/Makefile b/rts/Makefile index 67201cded5..b1111a027f 100644 --- a/rts/Makefile +++ b/rts/Makefile @@ -301,26 +301,7 @@ endif # Compiling the cmm files # ToDo: should we really include Rts.h here? Required for GNU_ATTRIBUTE(). -SRC_HC_OPTS += \ - -I. \ - -\#include Prelude.h \ - -\#include Rts.h \ - -\#include RtsFlags.h \ - -\#include RtsUtils.h \ - -\#include StgRun.h \ - -\#include Schedule.h \ - -\#include Printer.h \ - -\#include Sanity.h \ - -\#include STM.h \ - -\#include Storage.h \ - -\#include SchedAPI.h \ - -\#include Timer.h \ - -\#include ProfHeap.h \ - -\#include LdvProfile.h \ - -\#include Profiling.h \ - -\#include OSThreads.h \ - -\#include Apply.h \ - -\#include SMP.h +SRC_HC_OPTS += -I. -\#include HCIncludes.h ifeq "$(Windows)" "YES" PrimOps_HC_OPTS += -\#include '<windows.h>' -\#include win32/AsyncIO.h diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c new file mode 100644 index 0000000000..9041c06cb2 --- /dev/null +++ b/rts/RaiseAsync.c @@ -0,0 +1,1015 @@ +/* --------------------------------------------------------------------------- + * + * (c) The GHC Team, 1998-2006 + * + * Asynchronous exceptions + * + * --------------------------------------------------------------------------*/ + +#include "PosixSource.h" +#include "Rts.h" +#include "Threads.h" +#include "Trace.h" +#include "RaiseAsync.h" +#include "SMP.h" +#include "Schedule.h" +#include "Storage.h" +#include "Updates.h" +#include "STM.h" +#include "Sanity.h" + +static void raiseAsync (Capability *cap, + StgTSO *tso, + StgClosure *exception, + rtsBool stop_at_atomically, + StgPtr stop_here); + +static void removeFromQueues(Capability *cap, StgTSO *tso); + +static void blockedThrowTo (StgTSO *source, StgTSO *target); + +static void performBlockedException (Capability *cap, + StgTSO *source, StgTSO *target); + +/* ----------------------------------------------------------------------------- + throwToSingleThreaded + + This version of throwTo is safe to use if and only if one of the + following holds: + + - !THREADED_RTS + + - all the other threads in the system are stopped (eg. during GC). + + - we surely own the target TSO (eg. we just took it from the + run queue of the current capability, or we are running it). + + It doesn't cater for blocking the source thread until the exception + has been raised. + -------------------------------------------------------------------------- */ + +void +throwToSingleThreaded(Capability *cap, StgTSO *tso, StgClosure *exception) +{ + throwToSingleThreaded_(cap, tso, exception, rtsFalse, NULL); +} + +void +throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, + rtsBool stop_at_atomically, StgPtr stop_here) +{ + // Thread already dead? + if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { + return; + } + + // Remove it from any blocking queues + removeFromQueues(cap,tso); + + raiseAsync(cap, tso, exception, stop_at_atomically, stop_here); +} + +void +suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here) +{ + // Thread already dead? + if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { + return; + } + + // Remove it from any blocking queues + removeFromQueues(cap,tso); + + raiseAsync(cap, tso, NULL, rtsFalse, stop_here); +} + +/* ----------------------------------------------------------------------------- + throwTo + + This function may be used to throw an exception from one thread to + another, during the course of normal execution. This is a tricky + task: the target thread might be running on another CPU, or it + may be blocked and could be woken up at any point by another CPU. + We have some delicate synchronisation to do. + + There is a completely safe fallback scheme: it is always possible + to just block the source TSO on the target TSO's blocked_exceptions + queue. This queue is locked using lockTSO()/unlockTSO(). It is + checked at regular intervals: before and after running a thread + (schedule() and threadPaused() respectively), and just before GC + (scheduleDoGC()). Activating a thread on this queue should be done + using maybePerformBlockedException(): this is done in the context + of the target thread, so the exception can be raised eagerly. + + This fallback scheme works even if the target thread is complete or + killed: scheduleDoGC() will discover the blocked thread before the + target is GC'd. + + Blocking the source thread on the target thread's blocked_exception + queue is also employed when the target thread is currently blocking + exceptions (ie. inside Control.Exception.block). + + We could use the safe fallback scheme exclusively, but that + wouldn't be ideal: most calls to throwTo would block immediately, + possibly until the next GC, which might require the deadlock + detection mechanism to kick in. So we try to provide promptness + wherever possible. + + We can promptly deliver the exception if the target thread is: + + - runnable, on the same Capability as the source thread (because + we own the run queue and therefore the target thread). + + - blocked, and we can obtain exclusive access to it. Obtaining + exclusive access to the thread depends on how it is blocked. + + We must also be careful to not trip over threadStackOverflow(), + which might be moving the TSO to enlarge its stack. + lockTSO()/unlockTSO() are used here too. + + Returns: + + THROWTO_SUCCESS exception was raised, ok to continue + + THROWTO_BLOCKED exception was not raised; block the source + thread then call throwToReleaseTarget() when + the source thread is properly tidied away. + + -------------------------------------------------------------------------- */ + +nat +throwTo (Capability *cap, // the Capability we hold + StgTSO *source, // the TSO sending the exception + StgTSO *target, // the TSO receiving the exception + StgClosure *exception, // the exception closure + /*[out]*/ void **out USED_IF_THREADS) +{ + StgWord status; + + // follow ThreadRelocated links in the target first + while (target->what_next == ThreadRelocated) { + target = target->link; + // No, it might be a WHITEHOLE: + // ASSERT(get_itbl(target)->type == TSO); + } + + debugTrace(DEBUG_sched, "throwTo: from thread %d to thread %d", + source->id, target->id); + +#ifdef DEBUG + if (traceClass(DEBUG_sched)) { + debugTraceBegin("throwTo: target"); + printThreadStatus(target); + debugTraceEnd(); + } +#endif + + goto check_target; +retry: + debugTrace(DEBUG_sched, "throwTo: retrying..."); + +check_target: + // Thread already dead? + if (target->what_next == ThreadComplete + || target->what_next == ThreadKilled) { + return THROWTO_SUCCESS; + } + + status = target->why_blocked; + + switch (status) { + case NotBlocked: + /* if status==NotBlocked, and target->cap == cap, then + we own this TSO and can raise the exception. + + How do we establish this condition? Very carefully. + + Let + P = (status == NotBlocked) + Q = (tso->cap == cap) + + Now, if P & Q are true, then the TSO is locked and owned by + this capability. No other OS thread can steal it. + + If P==0 and Q==1: the TSO is blocked, but attached to this + capabilty, and it can be stolen by another capability. + + If P==1 and Q==0: the TSO is runnable on another + capability. At any time, the TSO may change from runnable + to blocked and vice versa, while it remains owned by + another capability. + + Suppose we test like this: + + p = P + q = Q + if (p && q) ... + + this is defeated by another capability stealing a blocked + TSO from us to wake it up (Schedule.c:unblockOne()). The + other thread is doing + + Q = 0 + P = 1 + + assuming arbitrary reordering, we could see this + interleaving: + + start: P==0 && Q==1 + P = 1 + p = P + q = Q + Q = 0 + if (p && q) ... + + so we need a memory barrier: + + p = P + mb() + q = Q + if (p && q) ... + + this avoids the problematic case. There are other cases + to consider, but this is the tricky one. + + Note that we must be sure that unblockOne() does the + writes in the correct order: Q before P. The memory + barrier ensures that if we have seen the write to P, we + have also seen the write to Q. + */ + { + Capability *target_cap; + + wb(); + target_cap = target->cap; + if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) { + // It's on our run queue and not blocking exceptions + raiseAsync(cap, target, exception, rtsFalse, NULL); + return THROWTO_SUCCESS; + } else { + // Otherwise, just block on the blocked_exceptions queue + // of the target thread. The queue will get looked at + // soon enough: it is checked before and after running a + // thread, and during GC. + lockTSO(target); + + // Avoid race with threadStackOverflow, which may have + // just moved this TSO. + if (target->what_next == ThreadRelocated) { + unlockTSO(target); + target = target->link; + goto retry; + } + blockedThrowTo(source,target); + *out = target; + return THROWTO_BLOCKED; + } + } + + case BlockedOnMVar: + { + /* + To establish ownership of this TSO, we need to acquire a + lock on the MVar that it is blocked on. + */ + StgMVar *mvar; + StgInfoTable *info USED_IF_THREADS; + + mvar = (StgMVar *)target->block_info.closure; + + // ASSUMPTION: tso->block_info must always point to a + // closure. In the threaded RTS it does. + if (get_itbl(mvar)->type != MVAR) goto retry; + + info = lockClosure((StgClosure *)mvar); + + if (target->what_next == ThreadRelocated) { + target = target->link; + unlockClosure((StgClosure *)mvar,info); + goto retry; + } + // we have the MVar, let's check whether the thread + // is still blocked on the same MVar. + if (target->why_blocked != BlockedOnMVar + || (StgMVar *)target->block_info.closure != mvar) { + unlockClosure((StgClosure *)mvar, info); + goto retry; + } + + if ((target->flags & TSO_BLOCKEX) && + ((target->flags & TSO_INTERRUPTIBLE) == 0)) { + lockClosure((StgClosure *)target); + blockedThrowTo(source,target); + unlockClosure((StgClosure *)mvar, info); + *out = target; + return THROWTO_BLOCKED; // caller releases TSO + } else { + removeThreadFromMVarQueue(mvar, target); + raiseAsync(cap, target, exception, rtsFalse, NULL); + unblockOne(cap, target); + unlockClosure((StgClosure *)mvar, info); + return THROWTO_SUCCESS; + } + } + + case BlockedOnBlackHole: + { + ACQUIRE_LOCK(&sched_mutex); + // double checking the status after the memory barrier: + if (target->why_blocked != BlockedOnBlackHole) { + RELEASE_LOCK(&sched_mutex); + goto retry; + } + + if (target->flags & TSO_BLOCKEX) { + lockTSO(target); + blockedThrowTo(source,target); + RELEASE_LOCK(&sched_mutex); + *out = target; + return THROWTO_BLOCKED; // caller releases TSO + } else { + removeThreadFromQueue(&blackhole_queue, target); + raiseAsync(cap, target, exception, rtsFalse, NULL); + unblockOne(cap, target); + RELEASE_LOCK(&sched_mutex); + return THROWTO_SUCCESS; + } + } + + case BlockedOnException: + { + StgTSO *target2; + StgInfoTable *info; + + /* + To obtain exclusive access to a BlockedOnException thread, + we must call lockClosure() on the TSO on which it is blocked. + Since the TSO might change underneath our feet, after we + call lockClosure() we must check that + + (a) the closure we locked is actually a TSO + (b) the original thread is still BlockedOnException, + (c) the original thread is still blocked on the TSO we locked + and (d) the target thread has not been relocated. + + We synchronise with threadStackOverflow() (which relocates + threads) using lockClosure()/unlockClosure(). + */ + target2 = target->block_info.tso; + + info = lockClosure((StgClosure *)target2); + if (info != &stg_TSO_info) { + unlockClosure((StgClosure *)target2, info); + goto retry; + } + if (target->what_next == ThreadRelocated) { + target = target->link; + unlockTSO(target2); + goto retry; + } + if (target2->what_next == ThreadRelocated) { + target->block_info.tso = target2->link; + unlockTSO(target2); + goto retry; + } + if (target->why_blocked != BlockedOnException + || target->block_info.tso != target2) { + unlockTSO(target2); + goto retry; + } + + /* + Now we have exclusive rights to the target TSO... + + If it is blocking exceptions, add the source TSO to its + blocked_exceptions queue. Otherwise, raise the exception. + */ + if ((target->flags & TSO_BLOCKEX) && + ((target->flags & TSO_INTERRUPTIBLE) == 0)) { + lockTSO(target); + blockedThrowTo(source,target); + unlockTSO(target2); + *out = target; + return THROWTO_BLOCKED; + } else { + removeThreadFromQueue(&target2->blocked_exceptions, target); + raiseAsync(cap, target, exception, rtsFalse, NULL); + unblockOne(cap, target); + unlockTSO(target2); + return THROWTO_SUCCESS; + } + } + + case BlockedOnSTM: + barf("ToDo"); + + case BlockedOnCCall: + case BlockedOnCCall_NoUnblockExc: + // I don't think it's possible to acquire ownership of a + // BlockedOnCCall thread. We just assume that the target + // thread is blocking exceptions, and block on its + // blocked_exception queue. + lockTSO(target); + blockedThrowTo(source,target); + *out = target; + return THROWTO_BLOCKED; + +#ifndef THREADEDED_RTS + case BlockedOnRead: + case BlockedOnWrite: + case BlockedOnDelay: + if ((target->flags & TSO_BLOCKEX) && + ((target->flags & TSO_INTERRUPTIBLE) == 0)) { + blockedThrowTo(source,target); + return THROWTO_BLOCKED; + } else { + removeFromQueues(cap,target); + raiseAsync(cap, target, exception, rtsFalse, NULL); + return THROWTO_SUCCESS; + } +#endif + + default: + barf("throwTo: unrecognised why_blocked value"); + } + barf("throwTo"); +} + +// Block a TSO on another TSO's blocked_exceptions queue. +// Precondition: we hold an exclusive lock on the target TSO (this is +// complex to achieve as there's no single lock on a TSO; see +// throwTo()). +static void +blockedThrowTo (StgTSO *source, StgTSO *target) +{ + debugTrace(DEBUG_sched, "throwTo: blocking on thread %d", target->id); + source->link = target->blocked_exceptions; + target->blocked_exceptions = source; + dirtyTSO(target); // we modified the blocked_exceptions queue + + source->block_info.tso = target; + wb(); // throwTo_exception *must* be visible if BlockedOnException is. + source->why_blocked = BlockedOnException; +} + + +#ifdef THREADED_RTS +void +throwToReleaseTarget (void *tso) +{ + unlockTSO((StgTSO *)tso); +} +#endif + +/* ----------------------------------------------------------------------------- + Waking up threads blocked in throwTo + + There are two ways to do this: maybePerformBlockedException() will + perform the throwTo() for the thread at the head of the queue + immediately, and leave the other threads on the queue. + maybePerformBlockedException() also checks the TSO_BLOCKEX flag + before raising an exception. + + awakenBlockedExceptionQueue() will wake up all the threads in the + queue, but not perform any throwTo() immediately. This might be + more appropriate when the target thread is the one actually running + (see Exception.cmm). + -------------------------------------------------------------------------- */ + +void +maybePerformBlockedException (Capability *cap, StgTSO *tso) +{ + StgTSO *source; + + if (tso->blocked_exceptions != END_TSO_QUEUE + && ((tso->flags & TSO_BLOCKEX) == 0 + || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) { + + // Lock the TSO, this gives us exclusive access to the queue + lockTSO(tso); + + // Check the queue again; it might have changed before we + // locked it. + if (tso->blocked_exceptions == END_TSO_QUEUE) { + unlockTSO(tso); + return; + } + + // We unblock just the first thread on the queue, and perform + // its throw immediately. + source = tso->blocked_exceptions; + performBlockedException(cap, source, tso); + tso->blocked_exceptions = unblockOne_(cap, source, + rtsFalse/*no migrate*/); + unlockTSO(tso); + } +} + +void +awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) +{ + if (tso->blocked_exceptions != END_TSO_QUEUE) { + lockTSO(tso); + awakenBlockedQueue(cap, tso->blocked_exceptions); + tso->blocked_exceptions = END_TSO_QUEUE; + unlockTSO(tso); + } +} + +static void +performBlockedException (Capability *cap, StgTSO *source, StgTSO *target) +{ + StgClosure *exception; + + ASSERT(source->why_blocked == BlockedOnException); + ASSERT(source->block_info.tso->id == target->id); + ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info); + ASSERT(((StgTSO *)source->sp[1])->id == target->id); + // check ids not pointers, because the thread might be relocated + + exception = (StgClosure *)source->sp[2]; + throwToSingleThreaded(cap, target, exception); + source->sp += 3; +} + +/* ----------------------------------------------------------------------------- + Remove a thread from blocking queues. + + This is for use when we raise an exception in another thread, which + may be blocked. + This has nothing to do with the UnblockThread event in GranSim. -- HWL + -------------------------------------------------------------------------- */ + +#if defined(GRAN) || defined(PARALLEL_HASKELL) +/* + NB: only the type of the blocking queue is different in GranSim and GUM + the operations on the queue-elements are the same + long live polymorphism! + + Locks: sched_mutex is held upon entry and exit. + +*/ +static void +removeFromQueues(Capability *cap, StgTSO *tso) +{ + StgBlockingQueueElement *t, **last; + + switch (tso->why_blocked) { + + case NotBlocked: + return; /* not blocked */ + + case BlockedOnSTM: + // Be careful: nothing to do here! We tell the scheduler that the thread + // is runnable and we leave it to the stack-walking code to abort the + // transaction while unwinding the stack. We should perhaps have a debugging + // test to make sure that this really happens and that the 'zombie' transaction + // does not get committed. + goto done; + + case BlockedOnMVar: + ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); + { + StgBlockingQueueElement *last_tso = END_BQ_QUEUE; + StgMVar *mvar = (StgMVar *)(tso->block_info.closure); + + last = (StgBlockingQueueElement **)&mvar->head; + for (t = (StgBlockingQueueElement *)mvar->head; + t != END_BQ_QUEUE; + last = &t->link, last_tso = t, t = t->link) { + if (t == (StgBlockingQueueElement *)tso) { + *last = (StgBlockingQueueElement *)tso->link; + if (mvar->tail == tso) { + mvar->tail = (StgTSO *)last_tso; + } + goto done; + } + } + barf("removeFromQueues (MVAR): TSO not found"); + } + + case BlockedOnBlackHole: + ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ); + { + StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure); + + last = &bq->blocking_queue; + for (t = bq->blocking_queue; + t != END_BQ_QUEUE; + last = &t->link, t = t->link) { + if (t == (StgBlockingQueueElement *)tso) { + *last = (StgBlockingQueueElement *)tso->link; + goto done; + } + } + barf("removeFromQueues (BLACKHOLE): TSO not found"); + } + + case BlockedOnException: + { + StgTSO *target = tso->block_info.tso; + + ASSERT(get_itbl(target)->type == TSO); + + while (target->what_next == ThreadRelocated) { + target = target2->link; + ASSERT(get_itbl(target)->type == TSO); + } + + last = (StgBlockingQueueElement **)&target->blocked_exceptions; + for (t = (StgBlockingQueueElement *)target->blocked_exceptions; + t != END_BQ_QUEUE; + last = &t->link, t = t->link) { + ASSERT(get_itbl(t)->type == TSO); + if (t == (StgBlockingQueueElement *)tso) { + *last = (StgBlockingQueueElement *)tso->link; + goto done; + } + } + barf("removeFromQueues (Exception): TSO not found"); + } + + case BlockedOnRead: + case BlockedOnWrite: +#if defined(mingw32_HOST_OS) + case BlockedOnDoProc: +#endif + { + /* take TSO off blocked_queue */ + StgBlockingQueueElement *prev = NULL; + for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; + prev = t, t = t->link) { + if (t == (StgBlockingQueueElement *)tso) { + if (prev == NULL) { + blocked_queue_hd = (StgTSO *)t->link; + if ((StgBlockingQueueElement *)blocked_queue_tl == t) { + blocked_queue_tl = END_TSO_QUEUE; + } + } else { + prev->link = t->link; + if ((StgBlockingQueueElement *)blocked_queue_tl == t) { + blocked_queue_tl = (StgTSO *)prev; + } + } +#if defined(mingw32_HOST_OS) + /* (Cooperatively) signal that the worker thread should abort + * the request. + */ + abandonWorkRequest(tso->block_info.async_result->reqID); +#endif + goto done; + } + } + barf("removeFromQueues (I/O): TSO not found"); + } + + case BlockedOnDelay: + { + /* take TSO off sleeping_queue */ + StgBlockingQueueElement *prev = NULL; + for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; + prev = t, t = t->link) { + if (t == (StgBlockingQueueElement *)tso) { + if (prev == NULL) { + sleeping_queue = (StgTSO *)t->link; + } else { + prev->link = t->link; + } + goto done; + } + } + barf("removeFromQueues (delay): TSO not found"); + } + + default: + barf("removeFromQueues"); + } + + done: + tso->link = END_TSO_QUEUE; + tso->why_blocked = NotBlocked; + tso->block_info.closure = NULL; + pushOnRunQueue(cap,tso); +} +#else +static void +removeFromQueues(Capability *cap, StgTSO *tso) +{ + switch (tso->why_blocked) { + + case NotBlocked: + return; + + case BlockedOnSTM: + // Be careful: nothing to do here! We tell the scheduler that the + // thread is runnable and we leave it to the stack-walking code to + // abort the transaction while unwinding the stack. We should + // perhaps have a debugging test to make sure that this really + // happens and that the 'zombie' transaction does not get + // committed. + goto done; + + case BlockedOnMVar: + removeThreadFromMVarQueue((StgMVar *)tso->block_info.closure, tso); + goto done; + + case BlockedOnBlackHole: + removeThreadFromQueue(&blackhole_queue, tso); + goto done; + + case BlockedOnException: + { + StgTSO *target = tso->block_info.tso; + + // NO: when called by threadPaused(), we probably have this + // TSO already locked (WHITEHOLEd) because we just placed + // ourselves on its queue. + // ASSERT(get_itbl(target)->type == TSO); + + while (target->what_next == ThreadRelocated) { + target = target->link; + } + + removeThreadFromQueue(&target->blocked_exceptions, tso); + goto done; + } + +#if !defined(THREADED_RTS) + case BlockedOnRead: + case BlockedOnWrite: +#if defined(mingw32_HOST_OS) + case BlockedOnDoProc: +#endif + removeThreadFromDeQueue(&blocked_queue_hd, &blocked_queue_tl, tso); +#if defined(mingw32_HOST_OS) + /* (Cooperatively) signal that the worker thread should abort + * the request. + */ + abandonWorkRequest(tso->block_info.async_result->reqID); +#endif + goto done; + + case BlockedOnDelay: + removeThreadFromQueue(&sleeping_queue, tso); + goto done; +#endif + + default: + barf("removeFromQueues"); + } + + done: + tso->link = END_TSO_QUEUE; + tso->why_blocked = NotBlocked; + tso->block_info.closure = NULL; + appendToRunQueue(cap,tso); + + // We might have just migrated this TSO to our Capability: + if (tso->bound) { + tso->bound->cap = cap; + } + tso->cap = cap; +} +#endif + +/* ----------------------------------------------------------------------------- + * raiseAsync() + * + * The following function implements the magic for raising an + * asynchronous exception in an existing thread. + * + * We first remove the thread from any queue on which it might be + * blocked. The possible blockages are MVARs and BLACKHOLE_BQs. + * + * We strip the stack down to the innermost CATCH_FRAME, building + * thunks in the heap for all the active computations, so they can + * be restarted if necessary. When we reach a CATCH_FRAME, we build + * an application of the handler to the exception, and push it on + * the top of the stack. + * + * How exactly do we save all the active computations? We create an + * AP_STACK for every UpdateFrame on the stack. Entering one of these + * AP_STACKs pushes everything from the corresponding update frame + * upwards onto the stack. (Actually, it pushes everything up to the + * next update frame plus a pointer to the next AP_STACK object. + * Entering the next AP_STACK object pushes more onto the stack until we + * reach the last AP_STACK object - at which point the stack should look + * exactly as it did when we killed the TSO and we can continue + * execution by entering the closure on top of the stack. + * + * We can also kill a thread entirely - this happens if either (a) the + * exception passed to raiseAsync is NULL, or (b) there's no + * CATCH_FRAME on the stack. In either case, we strip the entire + * stack and replace the thread with a zombie. + * + * ToDo: in THREADED_RTS mode, this function is only safe if either + * (a) we hold all the Capabilities (eg. in GC, or if there is only + * one Capability), or (b) we own the Capability that the TSO is + * currently blocked on or on the run queue of. + * + * -------------------------------------------------------------------------- */ + +static void +raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, + rtsBool stop_at_atomically, StgPtr stop_here) +{ + StgRetInfoTable *info; + StgPtr sp, frame; + nat i; + + debugTrace(DEBUG_sched, + "raising exception in thread %ld.", (long)tso->id); + + // mark it dirty; we're about to change its stack. + dirtyTSO(tso); + + sp = tso->sp; + + // ASSUMES: the thread is not already complete or dead. Upper + // layers should deal with that. + ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled); + + // The stack freezing code assumes there's a closure pointer on + // the top of the stack, so we have to arrange that this is the case... + // + if (sp[0] == (W_)&stg_enter_info) { + sp++; + } else { + sp--; + sp[0] = (W_)&stg_dummy_ret_closure; + } + + frame = sp + 1; + while (stop_here == NULL || frame < stop_here) { + + // 1. Let the top of the stack be the "current closure" + // + // 2. Walk up the stack until we find either an UPDATE_FRAME or a + // CATCH_FRAME. + // + // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the + // current closure applied to the chunk of stack up to (but not + // including) the update frame. This closure becomes the "current + // closure". Go back to step 2. + // + // 4. If it's a CATCH_FRAME, then leave the exception handler on + // top of the stack applied to the exception. + // + // 5. If it's a STOP_FRAME, then kill the thread. + // + // NB: if we pass an ATOMICALLY_FRAME then abort the associated + // transaction + + info = get_ret_itbl((StgClosure *)frame); + + switch (info->i.type) { + + case UPDATE_FRAME: + { + StgAP_STACK * ap; + nat words; + + // First build an AP_STACK consisting of the stack chunk above the + // current update frame, with the top word on the stack as the + // fun field. + // + words = frame - sp - 1; + ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words)); + + ap->size = words; + ap->fun = (StgClosure *)sp[0]; + sp++; + for(i=0; i < (nat)words; ++i) { + ap->payload[i] = (StgClosure *)*sp++; + } + + SET_HDR(ap,&stg_AP_STACK_info, + ((StgClosure *)frame)->header.prof.ccs /* ToDo */); + TICK_ALLOC_UP_THK(words+1,0); + + //IF_DEBUG(scheduler, + // debugBelch("sched: Updating "); + // printPtr((P_)((StgUpdateFrame *)frame)->updatee); + // debugBelch(" with "); + // printObj((StgClosure *)ap); + // ); + + // Replace the updatee with an indirection + // + // Warning: if we're in a loop, more than one update frame on + // the stack may point to the same object. Be careful not to + // overwrite an IND_OLDGEN in this case, because we'll screw + // up the mutable lists. To be on the safe side, don't + // overwrite any kind of indirection at all. See also + // threadSqueezeStack in GC.c, where we have to make a similar + // check. + // + if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) { + // revert the black hole + UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee, + (StgClosure *)ap); + } + sp += sizeofW(StgUpdateFrame) - 1; + sp[0] = (W_)ap; // push onto stack + frame = sp + 1; + continue; //no need to bump frame + } + + case STOP_FRAME: + // We've stripped the entire stack, the thread is now dead. + tso->what_next = ThreadKilled; + tso->sp = frame + sizeofW(StgStopFrame); + return; + + case CATCH_FRAME: + // If we find a CATCH_FRAME, and we've got an exception to raise, + // then build the THUNK raise(exception), and leave it on + // top of the CATCH_FRAME ready to enter. + // + { +#ifdef PROFILING + StgCatchFrame *cf = (StgCatchFrame *)frame; +#endif + StgThunk *raise; + + if (exception == NULL) break; + + // we've got an exception to raise, so let's pass it to the + // handler in this frame. + // + raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1); + TICK_ALLOC_SE_THK(1,0); + SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); + raise->payload[0] = exception; + + // throw away the stack from Sp up to the CATCH_FRAME. + // + sp = frame - 1; + + /* Ensure that async excpetions are blocked now, so we don't get + * a surprise exception before we get around to executing the + * handler. + */ + tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE; + + /* Put the newly-built THUNK on top of the stack, ready to execute + * when the thread restarts. + */ + sp[0] = (W_)raise; + sp[-1] = (W_)&stg_enter_info; + tso->sp = sp-1; + tso->what_next = ThreadRunGHC; + IF_DEBUG(sanity, checkTSO(tso)); + return; + } + + case ATOMICALLY_FRAME: + if (stop_at_atomically) { + ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); + stmCondemnTransaction(cap, tso -> trec); +#ifdef REG_R1 + tso->sp = frame; +#else + // R1 is not a register: the return convention for IO in + // this case puts the return value on the stack, so we + // need to set up the stack to return to the atomically + // frame properly... + tso->sp = frame - 2; + tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not? + tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info; +#endif + tso->what_next = ThreadRunGHC; + return; + } + // Not stop_at_atomically... fall through and abort the + // transaction. + + case CATCH_RETRY_FRAME: + // IF we find an ATOMICALLY_FRAME then we abort the + // current transaction and propagate the exception. In + // this case (unlike ordinary exceptions) we do not care + // whether the transaction is valid or not because its + // possible validity cannot have caused the exception + // and will not be visible after the abort. + debugTrace(DEBUG_stm, + "found atomically block delivering async exception"); + + StgTRecHeader *trec = tso -> trec; + StgTRecHeader *outer = stmGetEnclosingTRec(trec); + stmAbortTransaction(cap, trec); + tso -> trec = outer; + break; + + default: + break; + } + + // move on to the next stack frame + frame += stack_frame_sizeW((StgClosure *)frame); + } + + // if we got here, then we stopped at stop_here + ASSERT(stop_here != NULL); +} + + diff --git a/rts/RaiseAsync.h b/rts/RaiseAsync.h new file mode 100644 index 0000000000..8e59d51d9f --- /dev/null +++ b/rts/RaiseAsync.h @@ -0,0 +1,71 @@ +/* --------------------------------------------------------------------------- + * + * (c) The GHC Team, 1998-2006 + * + * Asynchronous exceptions + * + * --------------------------------------------------------------------------*/ + +#ifndef RAISEASYNC_H +#define RAISEASYNC_H + +#define THROWTO_SUCCESS 0 +#define THROWTO_BLOCKED 1 + +#ifndef CMINUSMINUS +void throwToSingleThreaded (Capability *cap, + StgTSO *tso, + StgClosure *exception); + +void throwToSingleThreaded_ (Capability *cap, + StgTSO *tso, + StgClosure *exception, + rtsBool stop_at_atomically, + StgPtr stop_here); + +void suspendComputation (Capability *cap, + StgTSO *tso, + StgPtr stop_here); + +nat throwTo (Capability *cap, // the Capability we hold + StgTSO *source, // the TSO sending the exception + StgTSO *target, // the TSO receiving the exception + StgClosure *exception, // the exception closure + /*[out]*/ void **out // pass to throwToReleaseTarget() + ); + +#ifdef THREADED_RTS +void throwToReleaseTarget (void *tso); +#endif + +void maybePerformBlockedException (Capability *cap, StgTSO *tso); +void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso); + +/* Determine whether a thread is interruptible (ie. blocked + * indefinitely). Interruptible threads can be sent an exception with + * killThread# even if they have async exceptions blocked. + */ +STATIC_INLINE int +interruptible(StgTSO *t) +{ + switch (t->why_blocked) { + case BlockedOnMVar: + case BlockedOnException: + case BlockedOnRead: + case BlockedOnWrite: +#if defined(mingw32_HOST_OS) + case BlockedOnDoProc: +#endif + case BlockedOnDelay: + return 1; + // NB. Threaded blocked on foreign calls (BlockedOnCCall) are + // *not* interruptible. We can't send these threads an exception. + default: + return 0; + } +} + +#endif /* CMINUSMINUS */ + +#endif /* RAISEASYNC_H */ + diff --git a/rts/Schedule.c b/rts/Schedule.c index d9adda415c..11b9f87d59 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -1,6 +1,6 @@ /* --------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-2005 + * (c) The GHC Team, 1998-2006 * * The scheduler and thread-related functionality * @@ -19,7 +19,6 @@ #include "Schedule.h" #include "StgMiscClosures.h" #include "Interpreter.h" -#include "Exception.h" #include "Printer.h" #include "RtsSignals.h" #include "Sanity.h" @@ -51,6 +50,8 @@ #include "win32/IOManager.h" #endif #include "Trace.h" +#include "RaiseAsync.h" +#include "Threads.h" #ifdef HAVE_SYS_TYPES_H #include <sys/types.h> @@ -140,23 +141,6 @@ nat recent_activity = ACTIVITY_YES; */ rtsBool sched_state = SCHED_RUNNING; -/* Next thread ID to allocate. - * LOCK: sched_mutex - */ -static StgThreadID next_thread_id = 1; - -/* The smallest stack size that makes any sense is: - * RESERVED_STACK_WORDS (so we can get back from the stack overflow) - * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame) - * + 1 (the closure to enter) - * + 1 (stg_ap_v_ret) - * + 1 (spare slot req'd by stg_ap_v_ret) - * - * A thread with this stack will bomb immediately with a stack - * overflow, which will increase its stack size. - */ -#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3) - #if defined(GRAN) StgTSO *CurrentTSO; #endif @@ -188,6 +172,10 @@ rtsTime TimeOfLastYield; rtsBool emitSchedule = rtsTrue; #endif +#if !defined(mingw32_HOST_OS) +#define FORKPROCESS_PRIMOP_SUPPORTED +#endif + /* ----------------------------------------------------------------------------- * static function prototypes * -------------------------------------------------------------------------- */ @@ -233,22 +221,16 @@ static Capability *scheduleDoGC(Capability *cap, Task *task, rtsBool force_major, void (*get_roots)(evac_fn)); -static void unblockThread(Capability *cap, StgTSO *tso); static rtsBool checkBlackHoles(Capability *cap); static void AllRoots(evac_fn evac); static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso); -static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, - rtsBool stop_at_atomically, StgPtr stop_here); - static void deleteThread (Capability *cap, StgTSO *tso); static void deleteAllThreads (Capability *cap); -#ifdef DEBUG -static void printThreadBlockage(StgTSO *tso); -static void printThreadStatus(StgTSO *tso); -void printThreadQueue(StgTSO *tso); +#ifdef FORKPROCESS_PRIMOP_SUPPORTED +static void deleteThread_(Capability *cap, StgTSO *tso); #endif #if defined(PARALLEL_HASKELL) @@ -596,6 +578,9 @@ run_thread: startHeapProfTimer(); #endif + // Check for exceptions blocked on this thread + maybePerformBlockedException (cap, t); + // ---------------------------------------------------------------------- // Run the current thread @@ -1019,7 +1004,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task) case BlockedOnBlackHole: case BlockedOnException: case BlockedOnMVar: - raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure); + throwToSingleThreaded(cap, task->tso, + (StgClosure *)NonTermination_closure); return; default: barf("deadlock: main thread blocked in a strange way"); @@ -1644,7 +1630,7 @@ static void scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) { debugTrace (DEBUG_sched, - "--<< thread %ld (%s) stopped, StackOverflow\n", + "--<< thread %ld (%s) stopped, StackOverflow", (long)t->id, whatNext_strs[t->what_next]); /* just adjust the stack for this thread, then pop it back @@ -1687,11 +1673,11 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) #ifdef DEBUG if (t->what_next != prev_what_next) { debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped to switch evaluators\n", + "--<< thread %ld (%s) stopped to switch evaluators", (long)t->id, whatNext_strs[t->what_next]); } else { debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped, yielding\n", + "--<< thread %ld (%s) stopped, yielding", (long)t->id, whatNext_strs[t->what_next]); } #endif @@ -2024,6 +2010,18 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, next = t->link; } else { next = t->global_link; + + // This is a good place to check for blocked + // exceptions. It might be the case that a thread is + // blocked on delivering an exception to a thread that + // is also blocked - we try to ensure that this + // doesn't happen in throwTo(), but it's too hard (or + // impossible) to close all the race holes, so we + // accept that some might get through and deal with + // them here. A GC will always happen at some point, + // even if the system is otherwise deadlocked. + maybePerformBlockedException (&capabilities[0], t); + if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { if (!stmValidateNestOfTransactions (t -> trec)) { debugTrace(DEBUG_sched | DEBUG_stm, @@ -2033,7 +2031,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, // ATOMICALLY_FRAME, aborting the (nested) // transaction, and saving the stack of any // partially-evaluated thunks on the heap. - raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL); + throwToSingleThreaded_(&capabilities[0], t, + NULL, rtsTrue, NULL); #ifdef REG_R1 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); @@ -2099,45 +2098,9 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, } /* --------------------------------------------------------------------------- - * rtsSupportsBoundThreads(): is the RTS built to support bound threads? - * used by Control.Concurrent for error checking. - * ------------------------------------------------------------------------- */ - -StgBool -rtsSupportsBoundThreads(void) -{ -#if defined(THREADED_RTS) - return rtsTrue; -#else - return rtsFalse; -#endif -} - -/* --------------------------------------------------------------------------- - * isThreadBound(tso): check whether tso is bound to an OS thread. - * ------------------------------------------------------------------------- */ - -StgBool -isThreadBound(StgTSO* tso USED_IF_THREADS) -{ -#if defined(THREADED_RTS) - return (tso->bound != NULL); -#endif - return rtsFalse; -} - -/* --------------------------------------------------------------------------- * Singleton fork(). Do not copy any running threads. * ------------------------------------------------------------------------- */ -#if !defined(mingw32_HOST_OS) -#define FORKPROCESS_PRIMOP_SUPPORTED -#endif - -#ifdef FORKPROCESS_PRIMOP_SUPPORTED -static void -deleteThread_(Capability *cap, StgTSO *tso); -#endif StgInt forkProcess(HsStablePtr *entry #ifndef FORKPROCESS_PRIMOP_SUPPORTED @@ -2243,26 +2206,28 @@ forkProcess(HsStablePtr *entry static void deleteAllThreads ( Capability *cap ) { - StgTSO* t, *next; - debugTrace(DEBUG_sched,"deleting all threads"); - for (t = all_threads; t != END_TSO_QUEUE; t = next) { - if (t->what_next == ThreadRelocated) { - next = t->link; - } else { - next = t->global_link; - deleteThread(cap,t); - } - } + // NOTE: only safe to call if we own all capabilities. + + StgTSO* t, *next; + debugTrace(DEBUG_sched,"deleting all threads"); + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + deleteThread(cap,t); + } + } - // The run queue now contains a bunch of ThreadKilled threads. We - // must not throw these away: the main thread(s) will be in there - // somewhere, and the main scheduler loop has to deal with it. - // Also, the run queue is the only thing keeping these threads from - // being GC'd, and we don't want the "main thread has been GC'd" panic. + // The run queue now contains a bunch of ThreadKilled threads. We + // must not throw these away: the main thread(s) will be in there + // somewhere, and the main scheduler loop has to deal with it. + // Also, the run queue is the only thing keeping these threads from + // being GC'd, and we don't want the "main thread has been GC'd" panic. #if !defined(THREADED_RTS) - ASSERT(blocked_queue_hd == END_TSO_QUEUE); - ASSERT(sleeping_queue == END_TSO_QUEUE); + ASSERT(blocked_queue_hd == END_TSO_QUEUE); + ASSERT(sleeping_queue == END_TSO_QUEUE); #endif } @@ -2337,9 +2302,10 @@ suspendThread (StgRegTable *reg) threadPaused(cap,tso); - if(tso->blocked_exceptions == NULL) { + if ((tso->flags & TSO_BLOCKEX) == 0) { tso->why_blocked = BlockedOnCCall; - tso->blocked_exceptions = END_TSO_QUEUE; + tso->flags |= TSO_BLOCKEX; + tso->flags &= ~TSO_INTERRUPTIBLE; } else { tso->why_blocked = BlockedOnCCall_NoUnblockExc; } @@ -2390,8 +2356,8 @@ resumeThread (void *task_) debugTrace(DEBUG_sched, "thread %d: re-entering RTS", tso->id); if (tso->why_blocked == BlockedOnCCall) { - awakenBlockedQueue(cap,tso->blocked_exceptions); - tso->blocked_exceptions = NULL; + awakenBlockedExceptionQueue(cap,tso); + tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE); } /* Reset blocking status */ @@ -2410,300 +2376,6 @@ resumeThread (void *task_) } /* --------------------------------------------------------------------------- - * Comparing Thread ids. - * - * This is used from STG land in the implementation of the - * instances of Eq/Ord for ThreadIds. - * ------------------------------------------------------------------------ */ - -int -cmp_thread(StgPtr tso1, StgPtr tso2) -{ - StgThreadID id1 = ((StgTSO *)tso1)->id; - StgThreadID id2 = ((StgTSO *)tso2)->id; - - if (id1 < id2) return (-1); - if (id1 > id2) return 1; - return 0; -} - -/* --------------------------------------------------------------------------- - * Fetching the ThreadID from an StgTSO. - * - * This is used in the implementation of Show for ThreadIds. - * ------------------------------------------------------------------------ */ -int -rts_getThreadId(StgPtr tso) -{ - return ((StgTSO *)tso)->id; -} - -#ifdef DEBUG -void -labelThread(StgPtr tso, char *label) -{ - int len; - void *buf; - - /* Caveat: Once set, you can only set the thread name to "" */ - len = strlen(label)+1; - buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()"); - strncpy(buf,label,len); - /* Update will free the old memory for us */ - updateThreadLabel(((StgTSO *)tso)->id,buf); -} -#endif /* DEBUG */ - -/* --------------------------------------------------------------------------- - Create a new thread. - - The new thread starts with the given stack size. Before the - scheduler can run, however, this thread needs to have a closure - (and possibly some arguments) pushed on its stack. See - pushClosure() in Schedule.h. - - createGenThread() and createIOThread() (in SchedAPI.h) are - convenient packaged versions of this function. - - currently pri (priority) is only used in a GRAN setup -- HWL - ------------------------------------------------------------------------ */ -#if defined(GRAN) -/* currently pri (priority) is only used in a GRAN setup -- HWL */ -StgTSO * -createThread(nat size, StgInt pri) -#else -StgTSO * -createThread(Capability *cap, nat size) -#endif -{ - StgTSO *tso; - nat stack_size; - - /* sched_mutex is *not* required */ - - /* First check whether we should create a thread at all */ -#if defined(PARALLEL_HASKELL) - /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */ - if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) { - threadsIgnored++; - debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n", - RtsFlags.ParFlags.maxThreads, advisory_thread_count); - return END_TSO_QUEUE; - } - threadsCreated++; -#endif - -#if defined(GRAN) - ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0); -#endif - - // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW - - /* catch ridiculously small stack sizes */ - if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) { - size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW; - } - - stack_size = size - TSO_STRUCT_SIZEW; - - tso = (StgTSO *)allocateLocal(cap, size); - TICK_ALLOC_TSO(stack_size, 0); - - SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM); -#if defined(GRAN) - SET_GRAN_HDR(tso, ThisPE); -#endif - - // Always start with the compiled code evaluator - tso->what_next = ThreadRunGHC; - - tso->why_blocked = NotBlocked; - tso->blocked_exceptions = NULL; - tso->flags = TSO_DIRTY; - - tso->saved_errno = 0; - tso->bound = NULL; - tso->cap = cap; - - tso->stack_size = stack_size; - tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) - - TSO_STRUCT_SIZEW; - tso->sp = (P_)&(tso->stack) + stack_size; - - tso->trec = NO_TREC; - -#ifdef PROFILING - tso->prof.CCCS = CCS_MAIN; -#endif - - /* put a stop frame on the stack */ - tso->sp -= sizeofW(StgStopFrame); - SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM); - tso->link = END_TSO_QUEUE; - - // ToDo: check this -#if defined(GRAN) - /* uses more flexible routine in GranSim */ - insertThread(tso, CurrentProc); -#else - /* In a non-GranSim setup the pushing of a TSO onto the runq is separated - * from its creation - */ -#endif - -#if defined(GRAN) - if (RtsFlags.GranFlags.GranSimStats.Full) - DumpGranEvent(GR_START,tso); -#elif defined(PARALLEL_HASKELL) - if (RtsFlags.ParFlags.ParStats.Full) - DumpGranEvent(GR_STARTQ,tso); - /* HACk to avoid SCHEDULE - LastTSO = tso; */ -#endif - - /* Link the new thread on the global thread list. - */ - ACQUIRE_LOCK(&sched_mutex); - tso->id = next_thread_id++; // while we have the mutex - tso->global_link = all_threads; - all_threads = tso; - RELEASE_LOCK(&sched_mutex); - -#if defined(DIST) - tso->dist.priority = MandatoryPriority; //by default that is... -#endif - -#if defined(GRAN) - tso->gran.pri = pri; -# if defined(DEBUG) - tso->gran.magic = TSO_MAGIC; // debugging only -# endif - tso->gran.sparkname = 0; - tso->gran.startedat = CURRENT_TIME; - tso->gran.exported = 0; - tso->gran.basicblocks = 0; - tso->gran.allocs = 0; - tso->gran.exectime = 0; - tso->gran.fetchtime = 0; - tso->gran.fetchcount = 0; - tso->gran.blocktime = 0; - tso->gran.blockcount = 0; - tso->gran.blockedat = 0; - tso->gran.globalsparks = 0; - tso->gran.localsparks = 0; - if (RtsFlags.GranFlags.Light) - tso->gran.clock = Now; /* local clock */ - else - tso->gran.clock = 0; - - IF_DEBUG(gran,printTSO(tso)); -#elif defined(PARALLEL_HASKELL) -# if defined(DEBUG) - tso->par.magic = TSO_MAGIC; // debugging only -# endif - tso->par.sparkname = 0; - tso->par.startedat = CURRENT_TIME; - tso->par.exported = 0; - tso->par.basicblocks = 0; - tso->par.allocs = 0; - tso->par.exectime = 0; - tso->par.fetchtime = 0; - tso->par.fetchcount = 0; - tso->par.blocktime = 0; - tso->par.blockcount = 0; - tso->par.blockedat = 0; - tso->par.globalsparks = 0; - tso->par.localsparks = 0; -#endif - -#if defined(GRAN) - globalGranStats.tot_threads_created++; - globalGranStats.threads_created_on_PE[CurrentProc]++; - globalGranStats.tot_sq_len += spark_queue_len(CurrentProc); - globalGranStats.tot_sq_probes++; -#elif defined(PARALLEL_HASKELL) - // collect parallel global statistics (currently done together with GC stats) - if (RtsFlags.ParFlags.ParStats.Global && - RtsFlags.GcFlags.giveStats > NO_GC_STATS) { - //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); - globalParStats.tot_threads_created++; - } -#endif - -#if defined(GRAN) - debugTrace(GRAN_DEBUG_pri, - "==__ schedule: Created TSO %d (%p);", - CurrentProc, tso, tso->id); -#elif defined(PARALLEL_HASKELL) - debugTrace(PAR_DEBUG_verbose, - "==__ schedule: Created TSO %d (%p); %d threads active", - (long)tso->id, tso, advisory_thread_count); -#else - debugTrace(DEBUG_sched, - "created thread %ld, stack size = %lx words", - (long)tso->id, (long)tso->stack_size); -#endif - return tso; -} - -#if defined(PAR) -/* RFP: - all parallel thread creation calls should fall through the following routine. -*/ -StgTSO * -createThreadFromSpark(rtsSpark spark) -{ StgTSO *tso; - ASSERT(spark != (rtsSpark)NULL); -// JB: TAKE CARE OF THIS COUNTER! BUGGY - if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) - { threadsIgnored++; - barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)", - RtsFlags.ParFlags.maxThreads, advisory_thread_count); - return END_TSO_QUEUE; - } - else - { threadsCreated++; - tso = createThread(RtsFlags.GcFlags.initialStkSize); - if (tso==END_TSO_QUEUE) - barf("createSparkThread: Cannot create TSO"); -#if defined(DIST) - tso->priority = AdvisoryPriority; -#endif - pushClosure(tso,spark); - addToRunQueue(tso); - advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY - } - return tso; -} -#endif - -/* - Turn a spark into a thread. - ToDo: fix for SMP (needs to acquire SCHED_MUTEX!) -*/ -#if 0 -StgTSO * -activateSpark (rtsSpark spark) -{ - StgTSO *tso; - - tso = createSparkThread(spark); - if (RtsFlags.ParFlags.ParStats.Full) { - //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ... - IF_PAR_DEBUG(verbose, - debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n", - (StgClosure *)spark, info_type((StgClosure *)spark))); - } - // ToDo: fwd info on local/global spark to thread -- HWL - // tso->gran.exported = spark->exported; - // tso->gran.locked = !spark->global; - // tso->gran.sparkname = spark->name; - - return tso; -} -#endif - -/* --------------------------------------------------------------------------- * scheduleThread() * * scheduleThread puts a thread on the end of the runnable queue. @@ -2731,12 +2403,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { - Capability *target_cap = &capabilities[cpu]; - if (tso->bound) { - tso->bound->cap = target_cap; - } - tso->cap = target_cap; - wakeupThreadOnCapability(target_cap,tso); + migrateThreadToCapability_lock(&capabilities[cpu],tso); } #else appendToRunQueue(cap,tso); @@ -3072,19 +2739,25 @@ threadStackOverflow(Capability *cap, StgTSO *tso) StgTSO *dest; IF_DEBUG(sanity,checkTSO(tso)); + + // don't allow throwTo() to modify the blocked_exceptions queue + // while we are moving the TSO: + lockClosure((StgClosure *)tso); + if (tso->stack_size >= tso->max_stack_size) { debugTrace(DEBUG_gc, - "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n", + "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)", (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size); IF_DEBUG(gc, /* If we're debugging, just print out the top of the stack */ printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, tso->sp+64))); - /* Send this thread the StackOverflow exception */ - raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure); - return tso; + // Send this thread the StackOverflow exception + unlockTSO(tso); + throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure); + return tso; } /* Try to double the current stack size. If that takes us over the @@ -3098,7 +2771,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso) new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; debugTrace(DEBUG_sched, - "increasing stack size from %ld words to %d.\n", + "increasing stack size from %ld words to %d.", (long)tso->stack_size, new_stack_size); dest = (StgTSO *)allocate(new_tso_size); @@ -3133,7 +2806,10 @@ threadStackOverflow(Capability *cap, StgTSO *tso) printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, tso->sp+64))); - IF_DEBUG(sanity,checkTSO(tso)); + unlockTSO(dest); + unlockTSO(tso); + + IF_DEBUG(sanity,checkTSO(dest)); #if 0 IF_DEBUG(scheduler,printTSO(dest)); #endif @@ -3142,301 +2818,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) } /* --------------------------------------------------------------------------- - Wake up a queue that was blocked on some resource. - ------------------------------------------------------------------------ */ - -#if defined(GRAN) -STATIC_INLINE void -unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) -{ -} -#elif defined(PARALLEL_HASKELL) -STATIC_INLINE void -unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) -{ - /* write RESUME events to log file and - update blocked and fetch time (depending on type of the orig closure) */ - if (RtsFlags.ParFlags.ParStats.Full) { - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure, - 0, 0 /* spark_queue_len(ADVISORY_POOL) */); - if (emptyRunQueue()) - emitSchedule = rtsTrue; - - switch (get_itbl(node)->type) { - case FETCH_ME_BQ: - ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat; - break; - case RBH: - case FETCH_ME: - case BLACKHOLE_BQ: - ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat; - break; -#ifdef DIST - case MVAR: - break; -#endif - default: - barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue"); - } - } -} -#endif - -#if defined(GRAN) -StgBlockingQueueElement * -unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) -{ - StgTSO *tso; - PEs node_loc, tso_loc; - - node_loc = where_is(node); // should be lifted out of loop - tso = (StgTSO *)bqe; // wastes an assignment to get the type right - tso_loc = where_is((StgClosure *)tso); - if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local - /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */ - ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc); - CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime; - // insertThread(tso, node_loc); - new_event(tso_loc, tso_loc, CurrentTime[CurrentProc], - ResumeThread, - tso, node, (rtsSpark*)NULL); - tso->link = END_TSO_QUEUE; // overwrite link just to be sure - // len_local++; - // len++; - } else { // TSO is remote (actually should be FMBQ) - CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime + - RtsFlags.GranFlags.Costs.gunblocktime + - RtsFlags.GranFlags.Costs.latency; - new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc], - UnblockThread, - tso, node, (rtsSpark*)NULL); - tso->link = END_TSO_QUEUE; // overwrite link just to be sure - // len++; - } - /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */ - IF_GRAN_DEBUG(bq, - debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,", - (node_loc==tso_loc ? "Local" : "Global"), - tso->id, tso, CurrentProc, tso->block_info.closure, tso->link)); - tso->block_info.closure = NULL; - debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)\n", - tso->id, tso)); -} -#elif defined(PARALLEL_HASKELL) -StgBlockingQueueElement * -unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) -{ - StgBlockingQueueElement *next; - - switch (get_itbl(bqe)->type) { - case TSO: - ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked); - /* if it's a TSO just push it onto the run_queue */ - next = bqe->link; - ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging? - APPEND_TO_RUN_QUEUE((StgTSO *)bqe); - threadRunnable(); - unblockCount(bqe, node); - /* reset blocking status after dumping event */ - ((StgTSO *)bqe)->why_blocked = NotBlocked; - break; - - case BLOCKED_FETCH: - /* if it's a BLOCKED_FETCH put it on the PendingFetches list */ - next = bqe->link; - bqe->link = (StgBlockingQueueElement *)PendingFetches; - PendingFetches = (StgBlockedFetch *)bqe; - break; - -# if defined(DEBUG) - /* can ignore this case in a non-debugging setup; - see comments on RBHSave closures above */ - case CONSTR: - /* check that the closure is an RBHSave closure */ - ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info || - get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info || - get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info); - break; - - default: - barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n", - get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), - (StgClosure *)bqe); -# endif - } - IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe))); - return next; -} -#endif - -StgTSO * -unblockOne(Capability *cap, StgTSO *tso) -{ - StgTSO *next; - - ASSERT(get_itbl(tso)->type == TSO); - ASSERT(tso->why_blocked != NotBlocked); - - tso->why_blocked = NotBlocked; - next = tso->link; - tso->link = END_TSO_QUEUE; - -#if defined(THREADED_RTS) - if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) { - // We are waking up this thread on the current Capability, which - // might involve migrating it from the Capability it was last on. - if (tso->bound) { - ASSERT(tso->bound->cap == tso->cap); - tso->bound->cap = cap; - } - tso->cap = cap; - appendToRunQueue(cap,tso); - // we're holding a newly woken thread, make sure we context switch - // quickly so we can migrate it if necessary. - context_switch = 1; - } else { - // we'll try to wake it up on the Capability it was last on. - wakeupThreadOnCapability(tso->cap, tso); - } -#else - appendToRunQueue(cap,tso); - context_switch = 1; -#endif - - debugTrace(DEBUG_sched, - "waking up thread %ld on cap %d", - (long)tso->id, tso->cap->no); - - return next; -} - - -#if defined(GRAN) -void -awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) -{ - StgBlockingQueueElement *bqe; - PEs node_loc; - nat len = 0; - - IF_GRAN_DEBUG(bq, - debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \ - node, CurrentProc, CurrentTime[CurrentProc], - CurrentTSO->id, CurrentTSO)); - - node_loc = where_is(node); - - ASSERT(q == END_BQ_QUEUE || - get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave - get_itbl(q)->type == CONSTR); // closure (type constructor) - ASSERT(is_unique(node)); - - /* FAKE FETCH: magically copy the node to the tso's proc; - no Fetch necessary because in reality the node should not have been - moved to the other PE in the first place - */ - if (CurrentProc!=node_loc) { - IF_GRAN_DEBUG(bq, - debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n", - node, node_loc, CurrentProc, CurrentTSO->id, - // CurrentTSO, where_is(CurrentTSO), - node->header.gran.procs)); - node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc); - IF_GRAN_DEBUG(bq, - debugBelch("## new bitmask of node %p is %#x\n", - node, node->header.gran.procs)); - if (RtsFlags.GranFlags.GranSimStats.Global) { - globalGranStats.tot_fake_fetches++; - } - } - - bqe = q; - // ToDo: check: ASSERT(CurrentProc==node_loc); - while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) { - //next = bqe->link; - /* - bqe points to the current element in the queue - next points to the next element in the queue - */ - //tso = (StgTSO *)bqe; // wastes an assignment to get the type right - //tso_loc = where_is(tso); - len++; - bqe = unblockOne(bqe, node); - } - - /* if this is the BQ of an RBH, we have to put back the info ripped out of - the closure to make room for the anchor of the BQ */ - if (bqe!=END_BQ_QUEUE) { - ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR); - /* - ASSERT((info_ptr==&RBH_Save_0_info) || - (info_ptr==&RBH_Save_1_info) || - (info_ptr==&RBH_Save_2_info)); - */ - /* cf. convertToRBH in RBH.c for writing the RBHSave closure */ - ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0]; - ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1]; - - IF_GRAN_DEBUG(bq, - debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n", - node, info_type(node))); - } - - /* statistics gathering */ - if (RtsFlags.GranFlags.GranSimStats.Global) { - // globalGranStats.tot_bq_processing_time += bq_processing_time; - globalGranStats.tot_bq_len += len; // total length of all bqs awakened - // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only - globalGranStats.tot_awbq++; // total no. of bqs awakened - } - IF_GRAN_DEBUG(bq, - debugBelch("## BQ Stats of %p: [%d entries] %s\n", - node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : "")); -} -#elif defined(PARALLEL_HASKELL) -void -awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) -{ - StgBlockingQueueElement *bqe; - - IF_PAR_DEBUG(verbose, - debugBelch("##-_ AwBQ for node %p on [%x]: \n", - node, mytid)); -#ifdef DIST - //RFP - if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) { - IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n")); - return; - } -#endif - - ASSERT(q == END_BQ_QUEUE || - get_itbl(q)->type == TSO || - get_itbl(q)->type == BLOCKED_FETCH || - get_itbl(q)->type == CONSTR); - - bqe = q; - while (get_itbl(bqe)->type==TSO || - get_itbl(bqe)->type==BLOCKED_FETCH) { - bqe = unblockOne(bqe, node); - } -} - -#else /* !GRAN && !PARALLEL_HASKELL */ - -void -awakenBlockedQueue(Capability *cap, StgTSO *tso) -{ - if (tso == NULL) return; // hack; see bug #1235728, and comments in - // Exception.cmm - while (tso != END_TSO_QUEUE) { - tso = unblockOne(cap,tso); - } -} -#endif - -/* --------------------------------------------------------------------------- Interrupt execution - usually called inside a signal handler so it mustn't do anything fancy. ------------------------------------------------------------------------ */ @@ -3481,316 +2862,6 @@ wakeUpRts(void) } /* ----------------------------------------------------------------------------- - Unblock a thread - - This is for use when we raise an exception in another thread, which - may be blocked. - This has nothing to do with the UnblockThread event in GranSim. -- HWL - -------------------------------------------------------------------------- */ - -#if defined(GRAN) || defined(PARALLEL_HASKELL) -/* - NB: only the type of the blocking queue is different in GranSim and GUM - the operations on the queue-elements are the same - long live polymorphism! - - Locks: sched_mutex is held upon entry and exit. - -*/ -static void -unblockThread(Capability *cap, StgTSO *tso) -{ - StgBlockingQueueElement *t, **last; - - switch (tso->why_blocked) { - - case NotBlocked: - return; /* not blocked */ - - case BlockedOnSTM: - // Be careful: nothing to do here! We tell the scheduler that the thread - // is runnable and we leave it to the stack-walking code to abort the - // transaction while unwinding the stack. We should perhaps have a debugging - // test to make sure that this really happens and that the 'zombie' transaction - // does not get committed. - goto done; - - case BlockedOnMVar: - ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); - { - StgBlockingQueueElement *last_tso = END_BQ_QUEUE; - StgMVar *mvar = (StgMVar *)(tso->block_info.closure); - - last = (StgBlockingQueueElement **)&mvar->head; - for (t = (StgBlockingQueueElement *)mvar->head; - t != END_BQ_QUEUE; - last = &t->link, last_tso = t, t = t->link) { - if (t == (StgBlockingQueueElement *)tso) { - *last = (StgBlockingQueueElement *)tso->link; - if (mvar->tail == tso) { - mvar->tail = (StgTSO *)last_tso; - } - goto done; - } - } - barf("unblockThread (MVAR): TSO not found"); - } - - case BlockedOnBlackHole: - ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ); - { - StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure); - - last = &bq->blocking_queue; - for (t = bq->blocking_queue; - t != END_BQ_QUEUE; - last = &t->link, t = t->link) { - if (t == (StgBlockingQueueElement *)tso) { - *last = (StgBlockingQueueElement *)tso->link; - goto done; - } - } - barf("unblockThread (BLACKHOLE): TSO not found"); - } - - case BlockedOnException: - { - StgTSO *target = tso->block_info.tso; - - ASSERT(get_itbl(target)->type == TSO); - - if (target->what_next == ThreadRelocated) { - target = target->link; - ASSERT(get_itbl(target)->type == TSO); - } - - ASSERT(target->blocked_exceptions != NULL); - - last = (StgBlockingQueueElement **)&target->blocked_exceptions; - for (t = (StgBlockingQueueElement *)target->blocked_exceptions; - t != END_BQ_QUEUE; - last = &t->link, t = t->link) { - ASSERT(get_itbl(t)->type == TSO); - if (t == (StgBlockingQueueElement *)tso) { - *last = (StgBlockingQueueElement *)tso->link; - goto done; - } - } - barf("unblockThread (Exception): TSO not found"); - } - - case BlockedOnRead: - case BlockedOnWrite: -#if defined(mingw32_HOST_OS) - case BlockedOnDoProc: -#endif - { - /* take TSO off blocked_queue */ - StgBlockingQueueElement *prev = NULL; - for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; - prev = t, t = t->link) { - if (t == (StgBlockingQueueElement *)tso) { - if (prev == NULL) { - blocked_queue_hd = (StgTSO *)t->link; - if ((StgBlockingQueueElement *)blocked_queue_tl == t) { - blocked_queue_tl = END_TSO_QUEUE; - } - } else { - prev->link = t->link; - if ((StgBlockingQueueElement *)blocked_queue_tl == t) { - blocked_queue_tl = (StgTSO *)prev; - } - } -#if defined(mingw32_HOST_OS) - /* (Cooperatively) signal that the worker thread should abort - * the request. - */ - abandonWorkRequest(tso->block_info.async_result->reqID); -#endif - goto done; - } - } - barf("unblockThread (I/O): TSO not found"); - } - - case BlockedOnDelay: - { - /* take TSO off sleeping_queue */ - StgBlockingQueueElement *prev = NULL; - for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; - prev = t, t = t->link) { - if (t == (StgBlockingQueueElement *)tso) { - if (prev == NULL) { - sleeping_queue = (StgTSO *)t->link; - } else { - prev->link = t->link; - } - goto done; - } - } - barf("unblockThread (delay): TSO not found"); - } - - default: - barf("unblockThread"); - } - - done: - tso->link = END_TSO_QUEUE; - tso->why_blocked = NotBlocked; - tso->block_info.closure = NULL; - pushOnRunQueue(cap,tso); -} -#else -static void -unblockThread(Capability *cap, StgTSO *tso) -{ - StgTSO *t, **last; - - /* To avoid locking unnecessarily. */ - if (tso->why_blocked == NotBlocked) { - return; - } - - switch (tso->why_blocked) { - - case BlockedOnSTM: - // Be careful: nothing to do here! We tell the scheduler that the thread - // is runnable and we leave it to the stack-walking code to abort the - // transaction while unwinding the stack. We should perhaps have a debugging - // test to make sure that this really happens and that the 'zombie' transaction - // does not get committed. - goto done; - - case BlockedOnMVar: - ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); - { - StgTSO *last_tso = END_TSO_QUEUE; - StgMVar *mvar = (StgMVar *)(tso->block_info.closure); - - last = &mvar->head; - for (t = mvar->head; t != END_TSO_QUEUE; - last = &t->link, last_tso = t, t = t->link) { - if (t == tso) { - *last = tso->link; - if (mvar->tail == tso) { - mvar->tail = last_tso; - } - goto done; - } - } - barf("unblockThread (MVAR): TSO not found"); - } - - case BlockedOnBlackHole: - { - last = &blackhole_queue; - for (t = blackhole_queue; t != END_TSO_QUEUE; - last = &t->link, t = t->link) { - if (t == tso) { - *last = tso->link; - goto done; - } - } - barf("unblockThread (BLACKHOLE): TSO not found"); - } - - case BlockedOnException: - { - StgTSO *target = tso->block_info.tso; - - ASSERT(get_itbl(target)->type == TSO); - - while (target->what_next == ThreadRelocated) { - target = target->link; - ASSERT(get_itbl(target)->type == TSO); - } - - ASSERT(target->blocked_exceptions != NULL); - - last = &target->blocked_exceptions; - for (t = target->blocked_exceptions; t != END_TSO_QUEUE; - last = &t->link, t = t->link) { - ASSERT(get_itbl(t)->type == TSO); - if (t == tso) { - *last = tso->link; - goto done; - } - } - barf("unblockThread (Exception): TSO not found"); - } - -#if !defined(THREADED_RTS) - case BlockedOnRead: - case BlockedOnWrite: -#if defined(mingw32_HOST_OS) - case BlockedOnDoProc: -#endif - { - StgTSO *prev = NULL; - for (t = blocked_queue_hd; t != END_TSO_QUEUE; - prev = t, t = t->link) { - if (t == tso) { - if (prev == NULL) { - blocked_queue_hd = t->link; - if (blocked_queue_tl == t) { - blocked_queue_tl = END_TSO_QUEUE; - } - } else { - prev->link = t->link; - if (blocked_queue_tl == t) { - blocked_queue_tl = prev; - } - } -#if defined(mingw32_HOST_OS) - /* (Cooperatively) signal that the worker thread should abort - * the request. - */ - abandonWorkRequest(tso->block_info.async_result->reqID); -#endif - goto done; - } - } - barf("unblockThread (I/O): TSO not found"); - } - - case BlockedOnDelay: - { - StgTSO *prev = NULL; - for (t = sleeping_queue; t != END_TSO_QUEUE; - prev = t, t = t->link) { - if (t == tso) { - if (prev == NULL) { - sleeping_queue = t->link; - } else { - prev->link = t->link; - } - goto done; - } - } - barf("unblockThread (delay): TSO not found"); - } -#endif - - default: - barf("unblockThread"); - } - - done: - tso->link = END_TSO_QUEUE; - tso->why_blocked = NotBlocked; - tso->block_info.closure = NULL; - appendToRunQueue(cap,tso); - - // We might have just migrated this TSO to our Capability: - if (tso->bound) { - tso->bound->cap = cap; - } - tso->cap = cap; -} -#endif - -/* ----------------------------------------------------------------------------- * checkBlackHoles() * * Check the blackhole_queue for threads that can be woken up. We do @@ -3840,264 +2911,6 @@ checkBlackHoles (Capability *cap) } /* ----------------------------------------------------------------------------- - * raiseAsync() - * - * The following function implements the magic for raising an - * asynchronous exception in an existing thread. - * - * We first remove the thread from any queue on which it might be - * blocked. The possible blockages are MVARs and BLACKHOLE_BQs. - * - * We strip the stack down to the innermost CATCH_FRAME, building - * thunks in the heap for all the active computations, so they can - * be restarted if necessary. When we reach a CATCH_FRAME, we build - * an application of the handler to the exception, and push it on - * the top of the stack. - * - * How exactly do we save all the active computations? We create an - * AP_STACK for every UpdateFrame on the stack. Entering one of these - * AP_STACKs pushes everything from the corresponding update frame - * upwards onto the stack. (Actually, it pushes everything up to the - * next update frame plus a pointer to the next AP_STACK object. - * Entering the next AP_STACK object pushes more onto the stack until we - * reach the last AP_STACK object - at which point the stack should look - * exactly as it did when we killed the TSO and we can continue - * execution by entering the closure on top of the stack. - * - * We can also kill a thread entirely - this happens if either (a) the - * exception passed to raiseAsync is NULL, or (b) there's no - * CATCH_FRAME on the stack. In either case, we strip the entire - * stack and replace the thread with a zombie. - * - * ToDo: in THREADED_RTS mode, this function is only safe if either - * (a) we hold all the Capabilities (eg. in GC, or if there is only - * one Capability), or (b) we own the Capability that the TSO is - * currently blocked on or on the run queue of. - * - * -------------------------------------------------------------------------- */ - -void -raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception) -{ - raiseAsync_(cap, tso, exception, rtsFalse, NULL); -} - -void -suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here) -{ - raiseAsync_(cap, tso, NULL, rtsFalse, stop_here); -} - -static void -raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, - rtsBool stop_at_atomically, StgPtr stop_here) -{ - StgRetInfoTable *info; - StgPtr sp, frame; - nat i; - - // Thread already dead? - if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { - return; - } - - debugTrace(DEBUG_sched, - "raising exception in thread %ld.", (long)tso->id); - - // Remove it from any blocking queues - unblockThread(cap,tso); - - // mark it dirty; we're about to change its stack. - dirtyTSO(tso); - - sp = tso->sp; - - // The stack freezing code assumes there's a closure pointer on - // the top of the stack, so we have to arrange that this is the case... - // - if (sp[0] == (W_)&stg_enter_info) { - sp++; - } else { - sp--; - sp[0] = (W_)&stg_dummy_ret_closure; - } - - frame = sp + 1; - while (stop_here == NULL || frame < stop_here) { - - // 1. Let the top of the stack be the "current closure" - // - // 2. Walk up the stack until we find either an UPDATE_FRAME or a - // CATCH_FRAME. - // - // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the - // current closure applied to the chunk of stack up to (but not - // including) the update frame. This closure becomes the "current - // closure". Go back to step 2. - // - // 4. If it's a CATCH_FRAME, then leave the exception handler on - // top of the stack applied to the exception. - // - // 5. If it's a STOP_FRAME, then kill the thread. - // - // NB: if we pass an ATOMICALLY_FRAME then abort the associated - // transaction - - info = get_ret_itbl((StgClosure *)frame); - - switch (info->i.type) { - - case UPDATE_FRAME: - { - StgAP_STACK * ap; - nat words; - - // First build an AP_STACK consisting of the stack chunk above the - // current update frame, with the top word on the stack as the - // fun field. - // - words = frame - sp - 1; - ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words)); - - ap->size = words; - ap->fun = (StgClosure *)sp[0]; - sp++; - for(i=0; i < (nat)words; ++i) { - ap->payload[i] = (StgClosure *)*sp++; - } - - SET_HDR(ap,&stg_AP_STACK_info, - ((StgClosure *)frame)->header.prof.ccs /* ToDo */); - TICK_ALLOC_UP_THK(words+1,0); - - //IF_DEBUG(scheduler, - // debugBelch("sched: Updating "); - // printPtr((P_)((StgUpdateFrame *)frame)->updatee); - // debugBelch(" with "); - // printObj((StgClosure *)ap); - // ); - - // Replace the updatee with an indirection - // - // Warning: if we're in a loop, more than one update frame on - // the stack may point to the same object. Be careful not to - // overwrite an IND_OLDGEN in this case, because we'll screw - // up the mutable lists. To be on the safe side, don't - // overwrite any kind of indirection at all. See also - // threadSqueezeStack in GC.c, where we have to make a similar - // check. - // - if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) { - // revert the black hole - UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee, - (StgClosure *)ap); - } - sp += sizeofW(StgUpdateFrame) - 1; - sp[0] = (W_)ap; // push onto stack - frame = sp + 1; - continue; //no need to bump frame - } - - case STOP_FRAME: - // We've stripped the entire stack, the thread is now dead. - tso->what_next = ThreadKilled; - tso->sp = frame + sizeofW(StgStopFrame); - return; - - case CATCH_FRAME: - // If we find a CATCH_FRAME, and we've got an exception to raise, - // then build the THUNK raise(exception), and leave it on - // top of the CATCH_FRAME ready to enter. - // - { -#ifdef PROFILING - StgCatchFrame *cf = (StgCatchFrame *)frame; -#endif - StgThunk *raise; - - if (exception == NULL) break; - - // we've got an exception to raise, so let's pass it to the - // handler in this frame. - // - raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1); - TICK_ALLOC_SE_THK(1,0); - SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); - raise->payload[0] = exception; - - // throw away the stack from Sp up to the CATCH_FRAME. - // - sp = frame - 1; - - /* Ensure that async excpetions are blocked now, so we don't get - * a surprise exception before we get around to executing the - * handler. - */ - if (tso->blocked_exceptions == NULL) { - tso->blocked_exceptions = END_TSO_QUEUE; - } - - /* Put the newly-built THUNK on top of the stack, ready to execute - * when the thread restarts. - */ - sp[0] = (W_)raise; - sp[-1] = (W_)&stg_enter_info; - tso->sp = sp-1; - tso->what_next = ThreadRunGHC; - IF_DEBUG(sanity, checkTSO(tso)); - return; - } - - case ATOMICALLY_FRAME: - if (stop_at_atomically) { - ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); - stmCondemnTransaction(cap, tso -> trec); -#ifdef REG_R1 - tso->sp = frame; -#else - // R1 is not a register: the return convention for IO in - // this case puts the return value on the stack, so we - // need to set up the stack to return to the atomically - // frame properly... - tso->sp = frame - 2; - tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not? - tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info; -#endif - tso->what_next = ThreadRunGHC; - return; - } - // Not stop_at_atomically... fall through and abort the - // transaction. - - case CATCH_RETRY_FRAME: - // IF we find an ATOMICALLY_FRAME then we abort the - // current transaction and propagate the exception. In - // this case (unlike ordinary exceptions) we do not care - // whether the transaction is valid or not because its - // possible validity cannot have caused the exception - // and will not be visible after the abort. - debugTrace(DEBUG_stm, - "found atomically block delivering async exception"); - - StgTRecHeader *trec = tso -> trec; - StgTRecHeader *outer = stmGetEnclosingTRec(trec); - stmAbortTransaction(cap, trec); - tso -> trec = outer; - break; - - default: - break; - } - - // move on to the next stack frame - frame += stack_frame_sizeW((StgClosure *)frame); - } - - // if we got here, then we stopped at stop_here - ASSERT(stop_here != NULL); -} - -/* ----------------------------------------------------------------------------- Deleting threads This is used for interruption (^C) and forking, and corresponds to @@ -4108,10 +2921,15 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, static void deleteThread (Capability *cap, StgTSO *tso) { - if (tso->why_blocked != BlockedOnCCall && - tso->why_blocked != BlockedOnCCall_NoUnblockExc) { - raiseAsync(cap,tso,NULL); - } + // NOTE: must only be called on a TSO that we have exclusive + // access to, because we will call throwToSingleThreaded() below. + // The TSO must be on the run queue of the Capability we own, or + // we must own all Capabilities. + + if (tso->why_blocked != BlockedOnCCall && + tso->why_blocked != BlockedOnCCall_NoUnblockExc) { + throwToSingleThreaded(cap,tso,NULL); + } } #ifdef FORKPROCESS_PRIMOP_SUPPORTED @@ -4293,13 +3111,16 @@ resurrectThreads (StgTSO *threads) case BlockedOnMVar: case BlockedOnException: /* Called by GC - sched_mutex lock is currently held. */ - raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure); + throwToSingleThreaded(cap, tso, + (StgClosure *)BlockedOnDeadMVar_closure); break; case BlockedOnBlackHole: - raiseAsync(cap, tso,(StgClosure *)NonTermination_closure); + throwToSingleThreaded(cap, tso, + (StgClosure *)NonTermination_closure); break; case BlockedOnSTM: - raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure); + throwToSingleThreaded(cap, tso, + (StgClosure *)BlockedIndefinitely_closure); break; case NotBlocked: /* This might happen if the thread was blocked on a black hole @@ -4312,298 +3133,3 @@ resurrectThreads (StgTSO *threads) } } } - -/* ---------------------------------------------------------------------------- - * Debugging: why is a thread blocked - * [Also provides useful information when debugging threaded programs - * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02] - ------------------------------------------------------------------------- */ - -#if DEBUG -static void -printThreadBlockage(StgTSO *tso) -{ - switch (tso->why_blocked) { - case BlockedOnRead: - debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd)); - break; - case BlockedOnWrite: - debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd)); - break; -#if defined(mingw32_HOST_OS) - case BlockedOnDoProc: - debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID); - break; -#endif - case BlockedOnDelay: - debugBelch("is blocked until %ld", (long)(tso->block_info.target)); - break; - case BlockedOnMVar: - debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); - break; - case BlockedOnException: - debugBelch("is blocked on delivering an exception to thread %d", - tso->block_info.tso->id); - break; - case BlockedOnBlackHole: - debugBelch("is blocked on a black hole"); - break; - case NotBlocked: - debugBelch("is not blocked"); - break; -#if defined(PARALLEL_HASKELL) - case BlockedOnGA: - debugBelch("is blocked on global address; local FM_BQ is %p (%s)", - tso->block_info.closure, info_type(tso->block_info.closure)); - break; - case BlockedOnGA_NoSend: - debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)", - tso->block_info.closure, info_type(tso->block_info.closure)); - break; -#endif - case BlockedOnCCall: - debugBelch("is blocked on an external call"); - break; - case BlockedOnCCall_NoUnblockExc: - debugBelch("is blocked on an external call (exceptions were already blocked)"); - break; - case BlockedOnSTM: - debugBelch("is blocked on an STM operation"); - break; - default: - barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)", - tso->why_blocked, tso->id, tso); - } -} - -void -printThreadStatus(StgTSO *t) -{ - debugBelch("\tthread %4d @ %p ", t->id, (void *)t); - { - void *label = lookupThreadLabel(t->id); - if (label) debugBelch("[\"%s\"] ",(char *)label); - } - if (t->what_next == ThreadRelocated) { - debugBelch("has been relocated...\n"); - } else { - switch (t->what_next) { - case ThreadKilled: - debugBelch("has been killed"); - break; - case ThreadComplete: - debugBelch("has completed"); - break; - default: - printThreadBlockage(t); - } - debugBelch("\n"); - } -} - -void -printAllThreads(void) -{ - StgTSO *t, *next; - nat i; - Capability *cap; - -# if defined(GRAN) - char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; - ullong_format_string(TIME_ON_PROC(CurrentProc), - time_string, rtsFalse/*no commas!*/); - - debugBelch("all threads at [%s]:\n", time_string); -# elif defined(PARALLEL_HASKELL) - char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; - ullong_format_string(CURRENT_TIME, - time_string, rtsFalse/*no commas!*/); - - debugBelch("all threads at [%s]:\n", time_string); -# else - debugBelch("all threads:\n"); -# endif - - for (i = 0; i < n_capabilities; i++) { - cap = &capabilities[i]; - debugBelch("threads on capability %d:\n", cap->no); - for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) { - printThreadStatus(t); - } - } - - debugBelch("other threads:\n"); - for (t = all_threads; t != END_TSO_QUEUE; t = next) { - if (t->why_blocked != NotBlocked) { - printThreadStatus(t); - } - if (t->what_next == ThreadRelocated) { - next = t->link; - } else { - next = t->global_link; - } - } -} - -// useful from gdb -void -printThreadQueue(StgTSO *t) -{ - nat i = 0; - for (; t != END_TSO_QUEUE; t = t->link) { - printThreadStatus(t); - i++; - } - debugBelch("%d threads on queue\n", i); -} - -/* - Print a whole blocking queue attached to node (debugging only). -*/ -# if defined(PARALLEL_HASKELL) -void -print_bq (StgClosure *node) -{ - StgBlockingQueueElement *bqe; - StgTSO *tso; - rtsBool end; - - debugBelch("## BQ of closure %p (%s): ", - node, info_type(node)); - - /* should cover all closures that may have a blocking queue */ - ASSERT(get_itbl(node)->type == BLACKHOLE_BQ || - get_itbl(node)->type == FETCH_ME_BQ || - get_itbl(node)->type == RBH || - get_itbl(node)->type == MVAR); - - ASSERT(node!=(StgClosure*)NULL); // sanity check - - print_bqe(((StgBlockingQueue*)node)->blocking_queue); -} - -/* - Print a whole blocking queue starting with the element bqe. -*/ -void -print_bqe (StgBlockingQueueElement *bqe) -{ - rtsBool end; - - /* - NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure; - */ - for (end = (bqe==END_BQ_QUEUE); - !end; // iterate until bqe points to a CONSTR - end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), - bqe = end ? END_BQ_QUEUE : bqe->link) { - ASSERT(bqe != END_BQ_QUEUE); // sanity check - ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check - /* types of closures that may appear in a blocking queue */ - ASSERT(get_itbl(bqe)->type == TSO || - get_itbl(bqe)->type == BLOCKED_FETCH || - get_itbl(bqe)->type == CONSTR); - /* only BQs of an RBH end with an RBH_Save closure */ - //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH); - - switch (get_itbl(bqe)->type) { - case TSO: - debugBelch(" TSO %u (%x),", - ((StgTSO *)bqe)->id, ((StgTSO *)bqe)); - break; - case BLOCKED_FETCH: - debugBelch(" BF (node=%p, ga=((%x, %d, %x)),", - ((StgBlockedFetch *)bqe)->node, - ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid, - ((StgBlockedFetch *)bqe)->ga.payload.gc.slot, - ((StgBlockedFetch *)bqe)->ga.weight); - break; - case CONSTR: - debugBelch(" %s (IP %p),", - (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" : - get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" : - get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" : - "RBH_Save_?"), get_itbl(bqe)); - break; - default: - barf("Unexpected closure type %s in blocking queue", // of %p (%s)", - info_type((StgClosure *)bqe)); // , node, info_type(node)); - break; - } - } /* for */ - debugBelch("\n"); -} -# elif defined(GRAN) -void -print_bq (StgClosure *node) -{ - StgBlockingQueueElement *bqe; - PEs node_loc, tso_loc; - rtsBool end; - - /* should cover all closures that may have a blocking queue */ - ASSERT(get_itbl(node)->type == BLACKHOLE_BQ || - get_itbl(node)->type == FETCH_ME_BQ || - get_itbl(node)->type == RBH); - - ASSERT(node!=(StgClosure*)NULL); // sanity check - node_loc = where_is(node); - - debugBelch("## BQ of closure %p (%s) on [PE %d]: ", - node, info_type(node), node_loc); - - /* - NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure; - */ - for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE); - !end; // iterate until bqe points to a CONSTR - end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) { - ASSERT(bqe != END_BQ_QUEUE); // sanity check - ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check - /* types of closures that may appear in a blocking queue */ - ASSERT(get_itbl(bqe)->type == TSO || - get_itbl(bqe)->type == CONSTR); - /* only BQs of an RBH end with an RBH_Save closure */ - ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH); - - tso_loc = where_is((StgClosure *)bqe); - switch (get_itbl(bqe)->type) { - case TSO: - debugBelch(" TSO %d (%p) on [PE %d],", - ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc); - break; - case CONSTR: - debugBelch(" %s (IP %p),", - (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" : - get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" : - get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" : - "RBH_Save_?"), get_itbl(bqe)); - break; - default: - barf("Unexpected closure type %s in blocking queue of %p (%s)", - info_type((StgClosure *)bqe), node, info_type(node)); - break; - } - } /* for */ - debugBelch("\n"); -} -# endif - -#if defined(PARALLEL_HASKELL) -static nat -run_queue_len(void) -{ - nat i; - StgTSO *tso; - - for (i=0, tso=run_queue_hd; - tso != END_TSO_QUEUE; - i++, tso=tso->link) { - /* nothing */ - } - - return i; -} -#endif - -#endif /* DEBUG */ diff --git a/rts/Schedule.h b/rts/Schedule.h index e30e911e68..f82946e831 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -55,24 +55,7 @@ void wakeUpRts(void); * Called from STG : yes * Locks assumed : we own the Capability. */ -StgTSO * unblockOne(Capability *cap, StgTSO *tso); - -/* raiseAsync() - * - * Raises an exception asynchronously in the specified thread. - * - * Called from STG : yes - * Locks assumed : none - */ -void raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception); - -/* suspendComputation() - * - * A variant of raiseAsync(), this strips the stack of the specified - * thread down to the stop_here point, leaving a current closure on - * top of the stack at [stop_here - 1]. - */ -void suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here); +StgTSO * unblockOne (Capability *cap, StgTSO *tso); /* raiseExceptionHelper */ StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception); @@ -176,8 +159,6 @@ extern rtsBool blackholes_need_checking; extern Mutex RTS_VAR(sched_mutex); #endif -StgBool isThreadBound(StgTSO *tso); - SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret); /* Called by shutdown_handler(). */ @@ -198,8 +179,6 @@ void print_bq (StgClosure *node); void print_bqe (StgBlockingQueueElement *bqe); #endif -void labelThread(StgPtr tso, char *label); - /* ----------------------------------------------------------------------------- * Some convenient macros/inline functions... */ diff --git a/rts/ThreadLabels.c b/rts/ThreadLabels.c index 9b9f1723ff..72cd5d3c4b 100644 --- a/rts/ThreadLabels.c +++ b/rts/ThreadLabels.c @@ -8,8 +8,10 @@ * ---------------------------------------------------------------------------*/ #include "PosixSource.h" +#include "Rts.h" #include "ThreadLabels.h" #include "RtsUtils.h" +#include "Hash.h" #include <stdlib.h> @@ -47,4 +49,19 @@ removeThreadLabel(StgWord key) stgFree(old); } } + +void +labelThread(StgPtr tso, char *label) +{ + int len; + void *buf; + + /* Caveat: Once set, you can only set the thread name to "" */ + len = strlen(label)+1; + buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()"); + strncpy(buf,label,len); + /* Update will free the old memory for us */ + updateThreadLabel(((StgTSO *)tso)->id,buf); +} + #endif /* DEBUG */ diff --git a/rts/ThreadLabels.h b/rts/ThreadLabels.h index 97d3d0d241..eaed22d281 100644 --- a/rts/ThreadLabels.h +++ b/rts/ThreadLabels.h @@ -1,27 +1,21 @@ /* ----------------------------------------------------------------------------- * ThreadLabels.h * - * (c) The GHC Team 2002-2003 + * (c) The GHC Team 2002-2006 * * Table of thread labels. * * ---------------------------------------------------------------------------*/ + #ifndef __THREADLABELS_H__ #define __THREADLABELS_H__ -#include "Rts.h" -#include "Hash.h" - -void -initThreadLabelTable(void); - -void -updateThreadLabel(StgWord key, void *data); - -void * -lookupThreadLabel(StgWord key); - -void -removeThreadLabel(StgWord key); +#if defined(DEBUG) +void initThreadLabelTable (void); +void updateThreadLabel (StgWord key, void *data); +void * lookupThreadLabel (StgWord key); +void removeThreadLabel (StgWord key); +void labelThread (StgPtr tso, char *label); +#endif #endif /* __THREADLABELS_H__ */ diff --git a/rts/Threads.c b/rts/Threads.c new file mode 100644 index 0000000000..b550cc6aaa --- /dev/null +++ b/rts/Threads.c @@ -0,0 +1,974 @@ +/* --------------------------------------------------------------------------- + * + * (c) The GHC Team, 2006 + * + * Thread-related functionality + * + * --------------------------------------------------------------------------*/ + +#include "PosixSource.h" +#include "Rts.h" +#include "SchedAPI.h" +#include "Storage.h" +#include "Threads.h" +#include "RtsFlags.h" +#include "STM.h" +#include "Schedule.h" +#include "Trace.h" +#include "ThreadLabels.h" + +/* Next thread ID to allocate. + * LOCK: sched_mutex + */ +static StgThreadID next_thread_id = 1; + +/* The smallest stack size that makes any sense is: + * RESERVED_STACK_WORDS (so we can get back from the stack overflow) + * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame) + * + 1 (the closure to enter) + * + 1 (stg_ap_v_ret) + * + 1 (spare slot req'd by stg_ap_v_ret) + * + * A thread with this stack will bomb immediately with a stack + * overflow, which will increase its stack size. + */ +#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3) + +/* --------------------------------------------------------------------------- + Create a new thread. + + The new thread starts with the given stack size. Before the + scheduler can run, however, this thread needs to have a closure + (and possibly some arguments) pushed on its stack. See + pushClosure() in Schedule.h. + + createGenThread() and createIOThread() (in SchedAPI.h) are + convenient packaged versions of this function. + + currently pri (priority) is only used in a GRAN setup -- HWL + ------------------------------------------------------------------------ */ +#if defined(GRAN) +/* currently pri (priority) is only used in a GRAN setup -- HWL */ +StgTSO * +createThread(nat size, StgInt pri) +#else +StgTSO * +createThread(Capability *cap, nat size) +#endif +{ + StgTSO *tso; + nat stack_size; + + /* sched_mutex is *not* required */ + + /* First check whether we should create a thread at all */ +#if defined(PARALLEL_HASKELL) + /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */ + if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) { + threadsIgnored++; + debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n", + RtsFlags.ParFlags.maxThreads, advisory_thread_count); + return END_TSO_QUEUE; + } + threadsCreated++; +#endif + +#if defined(GRAN) + ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0); +#endif + + // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW + + /* catch ridiculously small stack sizes */ + if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) { + size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW; + } + + stack_size = size - TSO_STRUCT_SIZEW; + + tso = (StgTSO *)allocateLocal(cap, size); + TICK_ALLOC_TSO(stack_size, 0); + + SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM); +#if defined(GRAN) + SET_GRAN_HDR(tso, ThisPE); +#endif + + // Always start with the compiled code evaluator + tso->what_next = ThreadRunGHC; + + tso->why_blocked = NotBlocked; + tso->blocked_exceptions = END_TSO_QUEUE; + tso->flags = TSO_DIRTY; + + tso->saved_errno = 0; + tso->bound = NULL; + tso->cap = cap; + + tso->stack_size = stack_size; + tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) + - TSO_STRUCT_SIZEW; + tso->sp = (P_)&(tso->stack) + stack_size; + + tso->trec = NO_TREC; + +#ifdef PROFILING + tso->prof.CCCS = CCS_MAIN; +#endif + + /* put a stop frame on the stack */ + tso->sp -= sizeofW(StgStopFrame); + SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM); + tso->link = END_TSO_QUEUE; + + // ToDo: check this +#if defined(GRAN) + /* uses more flexible routine in GranSim */ + insertThread(tso, CurrentProc); +#else + /* In a non-GranSim setup the pushing of a TSO onto the runq is separated + * from its creation + */ +#endif + +#if defined(GRAN) + if (RtsFlags.GranFlags.GranSimStats.Full) + DumpGranEvent(GR_START,tso); +#elif defined(PARALLEL_HASKELL) + if (RtsFlags.ParFlags.ParStats.Full) + DumpGranEvent(GR_STARTQ,tso); + /* HACk to avoid SCHEDULE + LastTSO = tso; */ +#endif + + /* Link the new thread on the global thread list. + */ + ACQUIRE_LOCK(&sched_mutex); + tso->id = next_thread_id++; // while we have the mutex + tso->global_link = all_threads; + all_threads = tso; + RELEASE_LOCK(&sched_mutex); + +#if defined(DIST) + tso->dist.priority = MandatoryPriority; //by default that is... +#endif + +#if defined(GRAN) + tso->gran.pri = pri; +# if defined(DEBUG) + tso->gran.magic = TSO_MAGIC; // debugging only +# endif + tso->gran.sparkname = 0; + tso->gran.startedat = CURRENT_TIME; + tso->gran.exported = 0; + tso->gran.basicblocks = 0; + tso->gran.allocs = 0; + tso->gran.exectime = 0; + tso->gran.fetchtime = 0; + tso->gran.fetchcount = 0; + tso->gran.blocktime = 0; + tso->gran.blockcount = 0; + tso->gran.blockedat = 0; + tso->gran.globalsparks = 0; + tso->gran.localsparks = 0; + if (RtsFlags.GranFlags.Light) + tso->gran.clock = Now; /* local clock */ + else + tso->gran.clock = 0; + + IF_DEBUG(gran,printTSO(tso)); +#elif defined(PARALLEL_HASKELL) +# if defined(DEBUG) + tso->par.magic = TSO_MAGIC; // debugging only +# endif + tso->par.sparkname = 0; + tso->par.startedat = CURRENT_TIME; + tso->par.exported = 0; + tso->par.basicblocks = 0; + tso->par.allocs = 0; + tso->par.exectime = 0; + tso->par.fetchtime = 0; + tso->par.fetchcount = 0; + tso->par.blocktime = 0; + tso->par.blockcount = 0; + tso->par.blockedat = 0; + tso->par.globalsparks = 0; + tso->par.localsparks = 0; +#endif + +#if defined(GRAN) + globalGranStats.tot_threads_created++; + globalGranStats.threads_created_on_PE[CurrentProc]++; + globalGranStats.tot_sq_len += spark_queue_len(CurrentProc); + globalGranStats.tot_sq_probes++; +#elif defined(PARALLEL_HASKELL) + // collect parallel global statistics (currently done together with GC stats) + if (RtsFlags.ParFlags.ParStats.Global && + RtsFlags.GcFlags.giveStats > NO_GC_STATS) { + //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); + globalParStats.tot_threads_created++; + } +#endif + +#if defined(GRAN) + debugTrace(GRAN_DEBUG_pri, + "==__ schedule: Created TSO %d (%p);", + CurrentProc, tso, tso->id); +#elif defined(PARALLEL_HASKELL) + debugTrace(PAR_DEBUG_verbose, + "==__ schedule: Created TSO %d (%p); %d threads active", + (long)tso->id, tso, advisory_thread_count); +#else + debugTrace(DEBUG_sched, + "created thread %ld, stack size = %lx words", + (long)tso->id, (long)tso->stack_size); +#endif + return tso; +} + +#if defined(PAR) +/* RFP: + all parallel thread creation calls should fall through the following routine. +*/ +StgTSO * +createThreadFromSpark(rtsSpark spark) +{ StgTSO *tso; + ASSERT(spark != (rtsSpark)NULL); +// JB: TAKE CARE OF THIS COUNTER! BUGGY + if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) + { threadsIgnored++; + barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)", + RtsFlags.ParFlags.maxThreads, advisory_thread_count); + return END_TSO_QUEUE; + } + else + { threadsCreated++; + tso = createThread(RtsFlags.GcFlags.initialStkSize); + if (tso==END_TSO_QUEUE) + barf("createSparkThread: Cannot create TSO"); +#if defined(DIST) + tso->priority = AdvisoryPriority; +#endif + pushClosure(tso,spark); + addToRunQueue(tso); + advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY + } + return tso; +} +#endif + +/* --------------------------------------------------------------------------- + * Comparing Thread ids. + * + * This is used from STG land in the implementation of the + * instances of Eq/Ord for ThreadIds. + * ------------------------------------------------------------------------ */ + +int +cmp_thread(StgPtr tso1, StgPtr tso2) +{ + StgThreadID id1 = ((StgTSO *)tso1)->id; + StgThreadID id2 = ((StgTSO *)tso2)->id; + + if (id1 < id2) return (-1); + if (id1 > id2) return 1; + return 0; +} + +/* --------------------------------------------------------------------------- + * Fetching the ThreadID from an StgTSO. + * + * This is used in the implementation of Show for ThreadIds. + * ------------------------------------------------------------------------ */ +int +rts_getThreadId(StgPtr tso) +{ + return ((StgTSO *)tso)->id; +} + +/* ----------------------------------------------------------------------------- + Remove a thread from a queue. + Fails fatally if the TSO is not on the queue. + -------------------------------------------------------------------------- */ + +void +removeThreadFromQueue (StgTSO **queue, StgTSO *tso) +{ + StgTSO *t, *prev; + + prev = NULL; + for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->link) { + if (t == tso) { + if (prev) { + prev->link = t->link; + } else { + *queue = t->link; + } + return; + } + } + barf("removeThreadFromQueue: not found"); +} + +void +removeThreadFromDeQueue (StgTSO **head, StgTSO **tail, StgTSO *tso) +{ + StgTSO *t, *prev; + + prev = NULL; + for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->link) { + if (t == tso) { + if (prev) { + prev->link = t->link; + } else { + *head = t->link; + } + if (*tail == tso) { + if (prev) { + *tail = prev; + } else { + *tail = END_TSO_QUEUE; + } + } + return; + } + } + barf("removeThreadFromMVarQueue: not found"); +} + +void +removeThreadFromMVarQueue (StgMVar *mvar, StgTSO *tso) +{ + removeThreadFromDeQueue (&mvar->head, &mvar->tail, tso); +} + +/* ---------------------------------------------------------------------------- + unblockOne() + + unblock a single thread. + ------------------------------------------------------------------------- */ + +#if defined(GRAN) +STATIC_INLINE void +unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) +{ +} +#elif defined(PARALLEL_HASKELL) +STATIC_INLINE void +unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) +{ + /* write RESUME events to log file and + update blocked and fetch time (depending on type of the orig closure) */ + if (RtsFlags.ParFlags.ParStats.Full) { + DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, + GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure, + 0, 0 /* spark_queue_len(ADVISORY_POOL) */); + if (emptyRunQueue()) + emitSchedule = rtsTrue; + + switch (get_itbl(node)->type) { + case FETCH_ME_BQ: + ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat; + break; + case RBH: + case FETCH_ME: + case BLACKHOLE_BQ: + ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat; + break; +#ifdef DIST + case MVAR: + break; +#endif + default: + barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue"); + } + } +} +#endif + +#if defined(GRAN) +StgBlockingQueueElement * +unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) +{ + StgTSO *tso; + PEs node_loc, tso_loc; + + node_loc = where_is(node); // should be lifted out of loop + tso = (StgTSO *)bqe; // wastes an assignment to get the type right + tso_loc = where_is((StgClosure *)tso); + if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local + /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */ + ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc); + CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime; + // insertThread(tso, node_loc); + new_event(tso_loc, tso_loc, CurrentTime[CurrentProc], + ResumeThread, + tso, node, (rtsSpark*)NULL); + tso->link = END_TSO_QUEUE; // overwrite link just to be sure + // len_local++; + // len++; + } else { // TSO is remote (actually should be FMBQ) + CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime + + RtsFlags.GranFlags.Costs.gunblocktime + + RtsFlags.GranFlags.Costs.latency; + new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc], + UnblockThread, + tso, node, (rtsSpark*)NULL); + tso->link = END_TSO_QUEUE; // overwrite link just to be sure + // len++; + } + /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */ + IF_GRAN_DEBUG(bq, + debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,", + (node_loc==tso_loc ? "Local" : "Global"), + tso->id, tso, CurrentProc, tso->block_info.closure, tso->link)); + tso->block_info.closure = NULL; + debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)", + tso->id, tso)); +} +#elif defined(PARALLEL_HASKELL) +StgBlockingQueueElement * +unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) +{ + StgBlockingQueueElement *next; + + switch (get_itbl(bqe)->type) { + case TSO: + ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked); + /* if it's a TSO just push it onto the run_queue */ + next = bqe->link; + ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging? + APPEND_TO_RUN_QUEUE((StgTSO *)bqe); + threadRunnable(); + unblockCount(bqe, node); + /* reset blocking status after dumping event */ + ((StgTSO *)bqe)->why_blocked = NotBlocked; + break; + + case BLOCKED_FETCH: + /* if it's a BLOCKED_FETCH put it on the PendingFetches list */ + next = bqe->link; + bqe->link = (StgBlockingQueueElement *)PendingFetches; + PendingFetches = (StgBlockedFetch *)bqe; + break; + +# if defined(DEBUG) + /* can ignore this case in a non-debugging setup; + see comments on RBHSave closures above */ + case CONSTR: + /* check that the closure is an RBHSave closure */ + ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info || + get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info || + get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info); + break; + + default: + barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n", + get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), + (StgClosure *)bqe); +# endif + } + IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe))); + return next; +} +#endif + +StgTSO * +unblockOne (Capability *cap, StgTSO *tso) +{ + return unblockOne_(cap,tso,rtsTrue); // allow migration +} + +StgTSO * +unblockOne_ (Capability *cap, StgTSO *tso, + rtsBool allow_migrate USED_IF_THREADS) +{ + StgTSO *next; + + ASSERT(get_itbl(tso)->type == TSO); + ASSERT(tso->why_blocked != NotBlocked); + + tso->why_blocked = NotBlocked; + next = tso->link; + tso->link = END_TSO_QUEUE; + +#if defined(THREADED_RTS) + if (tso->cap == cap || (!tsoLocked(tso) && + allow_migrate && + RtsFlags.ParFlags.wakeupMigrate)) { + // We are waking up this thread on the current Capability, which + // might involve migrating it from the Capability it was last on. + if (tso->bound) { + ASSERT(tso->bound->cap == tso->cap); + tso->bound->cap = cap; + } + tso->cap = cap; + appendToRunQueue(cap,tso); + // we're holding a newly woken thread, make sure we context switch + // quickly so we can migrate it if necessary. + context_switch = 1; + } else { + // we'll try to wake it up on the Capability it was last on. + wakeupThreadOnCapability_lock(tso->cap, tso); + } +#else + appendToRunQueue(cap,tso); + context_switch = 1; +#endif + + debugTrace(DEBUG_sched, + "waking up thread %ld on cap %d", + (long)tso->id, tso->cap->no); + + return next; +} + +/* ---------------------------------------------------------------------------- + awakenBlockedQueue + + wakes up all the threads on the specified queue. + ------------------------------------------------------------------------- */ + +#if defined(GRAN) +void +awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) +{ + StgBlockingQueueElement *bqe; + PEs node_loc; + nat len = 0; + + IF_GRAN_DEBUG(bq, + debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \ + node, CurrentProc, CurrentTime[CurrentProc], + CurrentTSO->id, CurrentTSO)); + + node_loc = where_is(node); + + ASSERT(q == END_BQ_QUEUE || + get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave + get_itbl(q)->type == CONSTR); // closure (type constructor) + ASSERT(is_unique(node)); + + /* FAKE FETCH: magically copy the node to the tso's proc; + no Fetch necessary because in reality the node should not have been + moved to the other PE in the first place + */ + if (CurrentProc!=node_loc) { + IF_GRAN_DEBUG(bq, + debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n", + node, node_loc, CurrentProc, CurrentTSO->id, + // CurrentTSO, where_is(CurrentTSO), + node->header.gran.procs)); + node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc); + IF_GRAN_DEBUG(bq, + debugBelch("## new bitmask of node %p is %#x\n", + node, node->header.gran.procs)); + if (RtsFlags.GranFlags.GranSimStats.Global) { + globalGranStats.tot_fake_fetches++; + } + } + + bqe = q; + // ToDo: check: ASSERT(CurrentProc==node_loc); + while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) { + //next = bqe->link; + /* + bqe points to the current element in the queue + next points to the next element in the queue + */ + //tso = (StgTSO *)bqe; // wastes an assignment to get the type right + //tso_loc = where_is(tso); + len++; + bqe = unblockOne(bqe, node); + } + + /* if this is the BQ of an RBH, we have to put back the info ripped out of + the closure to make room for the anchor of the BQ */ + if (bqe!=END_BQ_QUEUE) { + ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR); + /* + ASSERT((info_ptr==&RBH_Save_0_info) || + (info_ptr==&RBH_Save_1_info) || + (info_ptr==&RBH_Save_2_info)); + */ + /* cf. convertToRBH in RBH.c for writing the RBHSave closure */ + ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0]; + ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1]; + + IF_GRAN_DEBUG(bq, + debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n", + node, info_type(node))); + } + + /* statistics gathering */ + if (RtsFlags.GranFlags.GranSimStats.Global) { + // globalGranStats.tot_bq_processing_time += bq_processing_time; + globalGranStats.tot_bq_len += len; // total length of all bqs awakened + // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only + globalGranStats.tot_awbq++; // total no. of bqs awakened + } + IF_GRAN_DEBUG(bq, + debugBelch("## BQ Stats of %p: [%d entries] %s\n", + node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : "")); +} +#elif defined(PARALLEL_HASKELL) +void +awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) +{ + StgBlockingQueueElement *bqe; + + IF_PAR_DEBUG(verbose, + debugBelch("##-_ AwBQ for node %p on [%x]: \n", + node, mytid)); +#ifdef DIST + //RFP + if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) { + IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n")); + return; + } +#endif + + ASSERT(q == END_BQ_QUEUE || + get_itbl(q)->type == TSO || + get_itbl(q)->type == BLOCKED_FETCH || + get_itbl(q)->type == CONSTR); + + bqe = q; + while (get_itbl(bqe)->type==TSO || + get_itbl(bqe)->type==BLOCKED_FETCH) { + bqe = unblockOne(bqe, node); + } +} + +#else /* !GRAN && !PARALLEL_HASKELL */ + +void +awakenBlockedQueue(Capability *cap, StgTSO *tso) +{ + while (tso != END_TSO_QUEUE) { + tso = unblockOne(cap,tso); + } +} +#endif + + +/* --------------------------------------------------------------------------- + * rtsSupportsBoundThreads(): is the RTS built to support bound threads? + * used by Control.Concurrent for error checking. + * ------------------------------------------------------------------------- */ + +StgBool +rtsSupportsBoundThreads(void) +{ +#if defined(THREADED_RTS) + return rtsTrue; +#else + return rtsFalse; +#endif +} + +/* --------------------------------------------------------------------------- + * isThreadBound(tso): check whether tso is bound to an OS thread. + * ------------------------------------------------------------------------- */ + +StgBool +isThreadBound(StgTSO* tso USED_IF_THREADS) +{ +#if defined(THREADED_RTS) + return (tso->bound != NULL); +#endif + return rtsFalse; +} + +/* ---------------------------------------------------------------------------- + * Debugging: why is a thread blocked + * ------------------------------------------------------------------------- */ + +#if DEBUG +void +printThreadBlockage(StgTSO *tso) +{ + switch (tso->why_blocked) { + case BlockedOnRead: + debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd)); + break; + case BlockedOnWrite: + debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd)); + break; +#if defined(mingw32_HOST_OS) + case BlockedOnDoProc: + debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID); + break; +#endif + case BlockedOnDelay: + debugBelch("is blocked until %ld", (long)(tso->block_info.target)); + break; + case BlockedOnMVar: + debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); + break; + case BlockedOnException: + debugBelch("is blocked on delivering an exception to thread %d", + tso->block_info.tso->id); + break; + case BlockedOnBlackHole: + debugBelch("is blocked on a black hole"); + break; + case NotBlocked: + debugBelch("is not blocked"); + break; +#if defined(PARALLEL_HASKELL) + case BlockedOnGA: + debugBelch("is blocked on global address; local FM_BQ is %p (%s)", + tso->block_info.closure, info_type(tso->block_info.closure)); + break; + case BlockedOnGA_NoSend: + debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)", + tso->block_info.closure, info_type(tso->block_info.closure)); + break; +#endif + case BlockedOnCCall: + debugBelch("is blocked on an external call"); + break; + case BlockedOnCCall_NoUnblockExc: + debugBelch("is blocked on an external call (exceptions were already blocked)"); + break; + case BlockedOnSTM: + debugBelch("is blocked on an STM operation"); + break; + default: + barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)", + tso->why_blocked, tso->id, tso); + } +} + +void +printThreadStatus(StgTSO *t) +{ + debugBelch("\tthread %4d @ %p ", t->id, (void *)t); + { + void *label = lookupThreadLabel(t->id); + if (label) debugBelch("[\"%s\"] ",(char *)label); + } + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + } else { + switch (t->what_next) { + case ThreadKilled: + debugBelch("has been killed"); + break; + case ThreadComplete: + debugBelch("has completed"); + break; + default: + printThreadBlockage(t); + } + debugBelch("\n"); + } +} + +void +printAllThreads(void) +{ + StgTSO *t, *next; + nat i; + Capability *cap; + +# if defined(GRAN) + char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; + ullong_format_string(TIME_ON_PROC(CurrentProc), + time_string, rtsFalse/*no commas!*/); + + debugBelch("all threads at [%s]:\n", time_string); +# elif defined(PARALLEL_HASKELL) + char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; + ullong_format_string(CURRENT_TIME, + time_string, rtsFalse/*no commas!*/); + + debugBelch("all threads at [%s]:\n", time_string); +# else + debugBelch("all threads:\n"); +# endif + + for (i = 0; i < n_capabilities; i++) { + cap = &capabilities[i]; + debugBelch("threads on capability %d:\n", cap->no); + for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) { + printThreadStatus(t); + } + } + + debugBelch("other threads:\n"); + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + if (t->why_blocked != NotBlocked) { + printThreadStatus(t); + } + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + } + } +} + +// useful from gdb +void +printThreadQueue(StgTSO *t) +{ + nat i = 0; + for (; t != END_TSO_QUEUE; t = t->link) { + printThreadStatus(t); + i++; + } + debugBelch("%d threads on queue\n", i); +} + +/* + Print a whole blocking queue attached to node (debugging only). +*/ +# if defined(PARALLEL_HASKELL) +void +print_bq (StgClosure *node) +{ + StgBlockingQueueElement *bqe; + StgTSO *tso; + rtsBool end; + + debugBelch("## BQ of closure %p (%s): ", + node, info_type(node)); + + /* should cover all closures that may have a blocking queue */ + ASSERT(get_itbl(node)->type == BLACKHOLE_BQ || + get_itbl(node)->type == FETCH_ME_BQ || + get_itbl(node)->type == RBH || + get_itbl(node)->type == MVAR); + + ASSERT(node!=(StgClosure*)NULL); // sanity check + + print_bqe(((StgBlockingQueue*)node)->blocking_queue); +} + +/* + Print a whole blocking queue starting with the element bqe. +*/ +void +print_bqe (StgBlockingQueueElement *bqe) +{ + rtsBool end; + + /* + NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure; + */ + for (end = (bqe==END_BQ_QUEUE); + !end; // iterate until bqe points to a CONSTR + end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), + bqe = end ? END_BQ_QUEUE : bqe->link) { + ASSERT(bqe != END_BQ_QUEUE); // sanity check + ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check + /* types of closures that may appear in a blocking queue */ + ASSERT(get_itbl(bqe)->type == TSO || + get_itbl(bqe)->type == BLOCKED_FETCH || + get_itbl(bqe)->type == CONSTR); + /* only BQs of an RBH end with an RBH_Save closure */ + //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH); + + switch (get_itbl(bqe)->type) { + case TSO: + debugBelch(" TSO %u (%x),", + ((StgTSO *)bqe)->id, ((StgTSO *)bqe)); + break; + case BLOCKED_FETCH: + debugBelch(" BF (node=%p, ga=((%x, %d, %x)),", + ((StgBlockedFetch *)bqe)->node, + ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid, + ((StgBlockedFetch *)bqe)->ga.payload.gc.slot, + ((StgBlockedFetch *)bqe)->ga.weight); + break; + case CONSTR: + debugBelch(" %s (IP %p),", + (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" : + get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" : + get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" : + "RBH_Save_?"), get_itbl(bqe)); + break; + default: + barf("Unexpected closure type %s in blocking queue", // of %p (%s)", + info_type((StgClosure *)bqe)); // , node, info_type(node)); + break; + } + } /* for */ + debugBelch("\n"); +} +# elif defined(GRAN) +void +print_bq (StgClosure *node) +{ + StgBlockingQueueElement *bqe; + PEs node_loc, tso_loc; + rtsBool end; + + /* should cover all closures that may have a blocking queue */ + ASSERT(get_itbl(node)->type == BLACKHOLE_BQ || + get_itbl(node)->type == FETCH_ME_BQ || + get_itbl(node)->type == RBH); + + ASSERT(node!=(StgClosure*)NULL); // sanity check + node_loc = where_is(node); + + debugBelch("## BQ of closure %p (%s) on [PE %d]: ", + node, info_type(node), node_loc); + + /* + NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure; + */ + for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE); + !end; // iterate until bqe points to a CONSTR + end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) { + ASSERT(bqe != END_BQ_QUEUE); // sanity check + ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check + /* types of closures that may appear in a blocking queue */ + ASSERT(get_itbl(bqe)->type == TSO || + get_itbl(bqe)->type == CONSTR); + /* only BQs of an RBH end with an RBH_Save closure */ + ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH); + + tso_loc = where_is((StgClosure *)bqe); + switch (get_itbl(bqe)->type) { + case TSO: + debugBelch(" TSO %d (%p) on [PE %d],", + ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc); + break; + case CONSTR: + debugBelch(" %s (IP %p),", + (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" : + get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" : + get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" : + "RBH_Save_?"), get_itbl(bqe)); + break; + default: + barf("Unexpected closure type %s in blocking queue of %p (%s)", + info_type((StgClosure *)bqe), node, info_type(node)); + break; + } + } /* for */ + debugBelch("\n"); +} +# endif + +#if defined(PARALLEL_HASKELL) +nat +run_queue_len(void) +{ + nat i; + StgTSO *tso; + + for (i=0, tso=run_queue_hd; + tso != END_TSO_QUEUE; + i++, tso=tso->link) { + /* nothing */ + } + + return i; +} +#endif + +#endif /* DEBUG */ diff --git a/rts/Threads.h b/rts/Threads.h new file mode 100644 index 0000000000..e331c50dae --- /dev/null +++ b/rts/Threads.h @@ -0,0 +1,46 @@ +/* --------------------------------------------------------------------------- + * + * (c) The GHC Team, 2006 + * + * Thread-related functionality + * + * --------------------------------------------------------------------------*/ + +#ifndef THREADS_H +#define THREADS_H + +#if defined(GRAN) || defined(PARALLEL_HASKELL) +StgBlockingQueueElement * unblockOne (StgBlockingQueueElement *bqe, + StgClosure *node); +#else +StgTSO * unblockOne (Capability *cap, StgTSO *tso); +StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate); +#endif + +#if defined(GRAN) || defined(PARALLEL_HASKELL) +void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node); +#else +void awakenBlockedQueue (Capability *cap, StgTSO *tso); +#endif + +void removeThreadFromMVarQueue (StgMVar *mvar, StgTSO *tso); +void removeThreadFromQueue (StgTSO **queue, StgTSO *tso); +void removeThreadFromDeQueue (StgTSO **head, StgTSO **tail, StgTSO *tso); + +StgBool isThreadBound (StgTSO* tso); + +#ifdef DEBUG +void printThreadBlockage (StgTSO *tso); +void printThreadStatus (StgTSO *t); +void printAllThreads (void); +void printThreadQueue (StgTSO *t); +# if defined(PARALLEL_HASKELL) +void print_bq (StgClosure *node); +void print_bqe (StgBlockingQueueElement *bqe); +nat run_queue_len (void); +# elif defined(GRAN) +void print_bq (StgClosure *node); +# endif +#endif + +#endif /* THREADS_H */ diff --git a/rts/Trace.h b/rts/Trace.h index 19e492c26e..cf6c1411c0 100644 --- a/rts/Trace.h +++ b/rts/Trace.h @@ -45,11 +45,11 @@ void traceEnd (void); #ifdef DEBUG #define debugTrace(class, str, ...) trace(class,str, ## __VA_ARGS__) // variable arg macros are C99, and supported by gcc. -#define debugTraceBegin(class, str, ...) traceBegin(class,str, ## __VA_ARGS__) +#define debugTraceBegin(str, ...) traceBegin(str, ## __VA_ARGS__) #define debugTraceEnd() traceEnd() #else #define debugTrace(class, str, ...) /* nothing */ -#define debugTraceBegin(class, str, ...) /* nothing */ +#define debugTraceBegin(str, ...) /* nothing */ #define debugTraceEnd() /* nothing */ #endif |