diff options
author | Simon Marlow <marlowsd@gmail.com> | 2010-03-29 14:44:56 +0000 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2010-03-29 14:44:56 +0000 |
commit | 5d52d9b64c21dcf77849866584744722f8121389 (patch) | |
tree | 25aeafc9b761e73714c24ae414c0b1c41765c99f | |
parent | 79957d77c1bff767f1041d3fabdeb94d92a52878 (diff) | |
download | haskell-5d52d9b64c21dcf77849866584744722f8121389.tar.gz |
New implementation of BLACKHOLEs
This replaces the global blackhole_queue with a clever scheme that
enables us to queue up blocked threads on the closure that they are
blocked on, while still avoiding atomic instructions in the common
case.
Advantages:
- gets rid of a locked global data structure and some tricky GC code
(replacing it with some per-thread data structures and different
tricky GC code :)
- wakeups are more prompt: parallel/concurrent performance should
benefit. I haven't seen anything dramatic in the parallel
benchmarks so far, but a couple of threading benchmarks do improve
a bit.
- waking up a thread blocked on a blackhole is now O(1) (e.g. if
it is the target of throwTo).
- less sharing and better separation of Capabilities: communication
is done with messages, the data structures are strictly owned by a
Capability and cannot be modified except by sending messages.
- this change will utlimately enable us to do more intelligent
scheduling when threads block on each other. This is what started
off the whole thing, but it isn't done yet (#3838).
I'll be documenting all this on the wiki in due course.
47 files changed, 1208 insertions, 825 deletions
diff --git a/compiler/cmm/CLabel.hs b/compiler/cmm/CLabel.hs index 3ceb982e76..79544445d7 100644 --- a/compiler/cmm/CLabel.hs +++ b/compiler/cmm/CLabel.hs @@ -58,6 +58,7 @@ module CLabel ( mkSplitMarkerLabel, mkDirty_MUT_VAR_Label, mkUpdInfoLabel, + mkBHUpdInfoLabel, mkIndStaticInfoLabel, mkMainCapabilityLabel, mkMAP_FROZEN_infoLabel, @@ -400,6 +401,7 @@ mkStaticConEntryLabel name c = IdLabel name c StaticConEntry mkSplitMarkerLabel = CmmLabel rtsPackageId (fsLit "__stg_split_marker") CmmCode mkDirty_MUT_VAR_Label = CmmLabel rtsPackageId (fsLit "dirty_MUT_VAR") CmmCode mkUpdInfoLabel = CmmLabel rtsPackageId (fsLit "stg_upd_frame") CmmInfo +mkBHUpdInfoLabel = CmmLabel rtsPackageId (fsLit "stg_bh_upd_frame" ) CmmInfo mkIndStaticInfoLabel = CmmLabel rtsPackageId (fsLit "stg_IND_STATIC") CmmInfo mkMainCapabilityLabel = CmmLabel rtsPackageId (fsLit "MainCapability") CmmData mkMAP_FROZEN_infoLabel = CmmLabel rtsPackageId (fsLit "stg_MUT_ARR_PTRS_FROZEN0") CmmInfo diff --git a/compiler/codeGen/CgCallConv.hs b/compiler/codeGen/CgCallConv.hs index 8a1ae8be0c..b8294ea326 100644 --- a/compiler/codeGen/CgCallConv.hs +++ b/compiler/codeGen/CgCallConv.hs @@ -284,7 +284,6 @@ getSequelAmode OnStack -> do { sp_rel <- getSpRelOffset virt_sp ; returnFC (CmmLoad sp_rel bWord) } - UpdateCode -> returnFC (CmmLit (CmmLabel mkUpdInfoLabel)) CaseAlts lbl _ _ -> returnFC (CmmLit (CmmLabel lbl)) } diff --git a/compiler/codeGen/CgClosure.lhs b/compiler/codeGen/CgClosure.lhs index f0fe3d17b2..60ba7f8652 100644 --- a/compiler/codeGen/CgClosure.lhs +++ b/compiler/codeGen/CgClosure.lhs @@ -474,7 +474,12 @@ emitBlackHoleCode is_single_entry = do then do tickyBlackHole (not is_single_entry) let bh_info = CmmReg (CmmGlobal EagerBlackholeInfo) - stmtC (CmmStore (CmmReg nodeReg) bh_info) + stmtsC [ + CmmStore (cmmOffsetW (CmmReg nodeReg) fixedHdrSize) + (CmmReg (CmmGlobal CurrentTSO)), + CmmCall (CmmPrim MO_WriteBarrier) [] [] CmmUnsafe CmmMayReturn, + CmmStore (CmmReg nodeReg) bh_info + ] else nopC \end{code} @@ -489,17 +494,23 @@ setupUpdate closure_info code = code | not (isStaticClosure closure_info) - = if closureUpdReqd closure_info - then do { tickyPushUpdateFrame; pushUpdateFrame (CmmReg nodeReg) code } - else do { tickyUpdateFrameOmitted; code } - + = do + if not (closureUpdReqd closure_info) + then do tickyUpdateFrameOmitted; code + else do + tickyPushUpdateFrame + dflags <- getDynFlags + if not opt_SccProfilingOn && dopt Opt_EagerBlackHoling dflags + then pushBHUpdateFrame (CmmReg nodeReg) code + else pushUpdateFrame (CmmReg nodeReg) code + | otherwise -- A static closure = do { tickyUpdateBhCaf closure_info ; if closureUpdReqd closure_info then do -- Blackhole the (updatable) CAF: { upd_closure <- link_caf closure_info True - ; pushUpdateFrame upd_closure code } + ; pushBHUpdateFrame upd_closure code } else do { -- krc: removed some ticky-related code here. ; tickyUpdateFrameOmitted @@ -553,7 +564,8 @@ link_caf cl_info _is_upd = do { -- Alloc black hole specifying CC_HDR(Node) as the cost centre ; let use_cc = costCentreFrom (CmmReg nodeReg) blame_cc = use_cc - ; hp_offset <- allocDynClosure bh_cl_info use_cc blame_cc [] + tso = CmmReg (CmmGlobal CurrentTSO) + ; hp_offset <- allocDynClosure bh_cl_info use_cc blame_cc [(tso,fixedHdrSize)] ; hp_rel <- getHpRelOffset hp_offset -- Call the RTS function newCAF to add the CAF to the CafList diff --git a/compiler/codeGen/CgMonad.lhs b/compiler/codeGen/CgMonad.lhs index 83d2b72747..e5bca2aab1 100644 --- a/compiler/codeGen/CgMonad.lhs +++ b/compiler/codeGen/CgMonad.lhs @@ -169,7 +169,6 @@ block. \begin{code} data Sequel = OnStack -- Continuation is on the stack - | UpdateCode -- Continuation is update | CaseAlts CLabel -- Jump to this; if the continuation is for a vectored diff --git a/compiler/codeGen/CgStackery.lhs b/compiler/codeGen/CgStackery.lhs index 6683de4c8b..532127a147 100644 --- a/compiler/codeGen/CgStackery.lhs +++ b/compiler/codeGen/CgStackery.lhs @@ -17,7 +17,7 @@ module CgStackery ( setStackFrame, getStackFrame, mkVirtStkOffsets, mkStkAmodes, freeStackSlots, - pushUpdateFrame, emitPushUpdateFrame, + pushUpdateFrame, pushBHUpdateFrame, emitPushUpdateFrame, ) where #include "HsVersions.h" @@ -265,6 +265,14 @@ to reflect the frame pushed. \begin{code} pushUpdateFrame :: CmmExpr -> Code -> Code pushUpdateFrame updatee code + = pushSpecUpdateFrame mkUpdInfoLabel updatee code + +pushBHUpdateFrame :: CmmExpr -> Code -> Code +pushBHUpdateFrame updatee code + = pushSpecUpdateFrame mkBHUpdInfoLabel updatee code + +pushSpecUpdateFrame :: CLabel -> CmmExpr -> Code -> Code +pushSpecUpdateFrame lbl updatee code = do { when debugIsOn $ do { EndOfBlockInfo _ sequel <- getEndOfBlockInfo ; @@ -277,15 +285,25 @@ pushUpdateFrame updatee code -- The location of the lowest-address -- word of the update frame itself - ; setEndOfBlockInfo (EndOfBlockInfo vsp UpdateCode) $ - do { emitPushUpdateFrame frame_addr updatee + -- NB. we used to set the Sequel to 'UpdateCode' so + -- that we could jump directly to the update code if + -- we know that the next frame on the stack is an + -- update frame. However, the RTS can sometimes + -- change an update frame into something else (see + -- e.g. Note [upd-black-hole] in rts/sm/Scav.c), so we + -- no longer make this assumption. + ; setEndOfBlockInfo (EndOfBlockInfo vsp OnStack) $ + do { emitSpecPushUpdateFrame lbl frame_addr updatee ; code } } emitPushUpdateFrame :: CmmExpr -> CmmExpr -> Code -emitPushUpdateFrame frame_addr updatee = do +emitPushUpdateFrame = emitSpecPushUpdateFrame mkUpdInfoLabel + +emitSpecPushUpdateFrame :: CLabel -> CmmExpr -> CmmExpr -> Code +emitSpecPushUpdateFrame lbl frame_addr updatee = do stmtsC [ -- Set the info word - CmmStore frame_addr (mkLblExpr mkUpdInfoLabel) + CmmStore frame_addr (mkLblExpr lbl) , -- And the updatee CmmStore (cmmOffsetB frame_addr off_updatee) updatee ] initUpdFrameProf frame_addr diff --git a/includes/Rts.h b/includes/Rts.h index 3318402364..d79e9ad88e 100644 --- a/includes/Rts.h +++ b/includes/Rts.h @@ -106,10 +106,18 @@ void _assertFail(const char *filename, unsigned int linenum) else \ _assertFail(__FILE__, __LINE__) +#define CHECKM(predicate, msg, ...) \ + if (predicate) \ + /*null*/; \ + else \ + barf(msg, ##__VA_ARGS__) + #ifndef DEBUG #define ASSERT(predicate) /* nothing */ +#define ASSERTM(predicate,msg,...) /* nothing */ #else #define ASSERT(predicate) CHECK(predicate) +#define ASSERTM(predicate,msg,...) CHECKM(predicate,msg,##__VA_ARGS__) #endif /* DEBUG */ /* diff --git a/includes/mkDerivedConstants.c b/includes/mkDerivedConstants.c index 94157f035b..92685cae3a 100644 --- a/includes/mkDerivedConstants.c +++ b/includes/mkDerivedConstants.c @@ -291,6 +291,7 @@ main(int argc, char *argv[]) closure_field(StgTSO, trec); closure_field(StgTSO, flags); closure_field(StgTSO, dirty); + closure_field(StgTSO, bq); closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS); tso_field(StgTSO, sp); tso_field_offset(StgTSO, stack); @@ -382,6 +383,17 @@ main(int argc, char *argv[]) closure_size(StgStableName); closure_field(StgStableName,sn); + closure_size(StgBlockingQueue); + closure_field(StgBlockingQueue, bh); + closure_field(StgBlockingQueue, owner); + closure_field(StgBlockingQueue, queue); + closure_field(StgBlockingQueue, link); + + closure_size(MessageBlackHole); + closure_field(MessageBlackHole, link); + closure_field(MessageBlackHole, tso); + closure_field(MessageBlackHole, bh); + struct_field_("RtsFlags_ProfFlags_showCCSOnException", RTS_FLAGS, ProfFlags.showCCSOnException); struct_field_("RtsFlags_DebugFlags_apply", diff --git a/includes/rts/storage/ClosureMacros.h b/includes/rts/storage/ClosureMacros.h index a115f6f38f..098c65db15 100644 --- a/includes/rts/storage/ClosureMacros.h +++ b/includes/rts/storage/ClosureMacros.h @@ -129,6 +129,12 @@ SET_HDR(c,info,costCentreStack); \ (c)->words = n_words; +// Use when changing a closure from one kind to another +#define OVERWRITE_INFO(c, new_info) \ + LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)(c)); \ + SET_INFO((c), (new_info)); \ + LDV_RECORD_CREATE(c); + /* ----------------------------------------------------------------------------- How to get hold of the static link field for a static closure. -------------------------------------------------------------------------- */ @@ -249,7 +255,7 @@ INLINE_HEADER StgOffset THUNK_SELECTOR_sizeW ( void ) { return sizeofW(StgSelector); } INLINE_HEADER StgOffset BLACKHOLE_sizeW ( void ) -{ return sizeofW(StgHeader)+MIN_PAYLOAD_SIZE; } +{ return sizeofW(StgInd); } // a BLACKHOLE is a kind of indirection /* -------------------------------------------------------------------------- Sizes of closures diff --git a/includes/rts/storage/ClosureTypes.h b/includes/rts/storage/ClosureTypes.h index 6a76772d61..518d39bb11 100644 --- a/includes/rts/storage/ClosureTypes.h +++ b/includes/rts/storage/ClosureTypes.h @@ -62,7 +62,7 @@ #define UPDATE_FRAME 38 #define CATCH_FRAME 39 #define STOP_FRAME 40 -#define CAF_BLACKHOLE 41 +#define BLOCKING_QUEUE 41 #define BLACKHOLE 42 #define MVAR_CLEAN 43 #define MVAR_DIRTY 44 diff --git a/includes/rts/storage/Closures.h b/includes/rts/storage/Closures.h index d7498e2882..802746868c 100644 --- a/includes/rts/storage/Closures.h +++ b/includes/rts/storage/Closures.h @@ -127,6 +127,14 @@ typedef struct { StgInfoTable *saved_info; } StgIndStatic; +typedef struct StgBlockingQueue_ { + StgHeader header; + struct StgBlockingQueue_ *link; // here so it looks like an IND + StgClosure *bh; // the BLACKHOLE + StgTSO *owner; + struct MessageBlackHole_ *queue; +} StgBlockingQueue; + typedef struct { StgHeader header; StgWord words; @@ -433,10 +441,17 @@ typedef struct MessageWakeup_ { typedef struct MessageThrowTo_ { StgHeader header; - Message *link; + struct MessageThrowTo_ *link; StgTSO *source; StgTSO *target; StgClosure *exception; } MessageThrowTo; +typedef struct MessageBlackHole_ { + StgHeader header; + struct MessageBlackHole_ *link; + StgTSO *tso; + StgClosure *bh; +} MessageBlackHole; + #endif /* RTS_STORAGE_CLOSURES_H */ diff --git a/includes/rts/storage/TSO.h b/includes/rts/storage/TSO.h index e2015f28ac..e07be88ac5 100644 --- a/includes/rts/storage/TSO.h +++ b/includes/rts/storage/TSO.h @@ -46,6 +46,7 @@ typedef struct { /* Reason for thread being blocked. See comment above struct StgTso_. */ typedef union { StgClosure *closure; + struct MessageBlackHole_ *bh; struct MessageThrowTo_ *throwto; struct MessageWakeup_ *wakeup; StgInt fd; /* StgInt instead of int, so that it's the same size as the ptrs */ @@ -78,12 +79,17 @@ typedef struct StgTSO_ { */ struct StgTSO_* _link; /* + Currently used for linking TSOs on: + * cap->run_queue_{hd,tl} + * MVAR queue + * (non-THREADED_RTS); the blocked_queue + * and pointing to the relocated version of a ThreadRelocated + NOTE!!! do not modify _link directly, it is subject to a write barrier for generational GC. Instead use the setTSOLink() function. Exceptions to this rule are: * setting the link field to END_TSO_QUEUE - * putting a TSO on the blackhole_queue * setting the link field of the currently running TSO, as it will already be dirty. */ @@ -127,6 +133,12 @@ typedef struct StgTSO_ { */ struct MessageThrowTo_ * blocked_exceptions; + /* + A list of StgBlockingQueue objects, representing threads blocked + on thunks that are under evaluation by this thread. + */ + struct StgBlockingQueue_ *bq; + #ifdef TICKY_TICKY /* TICKY-specific stuff would go here. */ #endif @@ -152,6 +164,18 @@ typedef struct StgTSO_ { void dirty_TSO (Capability *cap, StgTSO *tso); void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target); +// Apply to a TSO before looking at it if you are not sure whether it +// might be ThreadRelocated or not (basically, that's most of the time +// unless the TSO is the current TSO). +// +INLINE_HEADER StgTSO * deRefTSO(StgTSO *tso) +{ + while (tso->what_next == ThreadRelocated) { + tso = tso->_link; + } + return tso; +} + /* ----------------------------------------------------------------------------- Invariants: diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h index 42e878f945..9834c4bbf3 100644 --- a/includes/stg/MiscClosures.h +++ b/includes/stg/MiscClosures.h @@ -44,6 +44,7 @@ /* Stack frames */ RTS_RET_INFO(stg_upd_frame_info); +RTS_RET_INFO(stg_bh_upd_frame_info); RTS_RET_INFO(stg_marked_upd_frame_info); RTS_RET_INFO(stg_noupd_frame_info); RTS_RET_INFO(stg_catch_frame_info); @@ -54,6 +55,7 @@ RTS_RET_INFO(stg_catch_stm_frame_info); RTS_RET_INFO(stg_unblockAsyncExceptionszh_ret_info); RTS_ENTRY(stg_upd_frame_ret); +RTS_ENTRY(stg_bh_upd_frame_ret); RTS_ENTRY(stg_marked_upd_frame_ret); // RTS_FUN(stg_interp_constr_entry); @@ -90,12 +92,12 @@ RTS_INFO(stg_IND_STATIC_info); RTS_INFO(stg_IND_PERM_info); RTS_INFO(stg_IND_OLDGEN_info); RTS_INFO(stg_IND_OLDGEN_PERM_info); -RTS_INFO(stg_CAF_UNENTERED_info); -RTS_INFO(stg_CAF_ENTERED_info); -RTS_INFO(stg_WHITEHOLE_info); RTS_INFO(stg_BLACKHOLE_info); -RTS_INFO(__stg_EAGER_BLACKHOLE_info); RTS_INFO(stg_CAF_BLACKHOLE_info); +RTS_INFO(__stg_EAGER_BLACKHOLE_info); +RTS_INFO(stg_WHITEHOLE_info); +RTS_INFO(stg_BLOCKING_QUEUE_CLEAN_info); +RTS_INFO(stg_BLOCKING_QUEUE_DIRTY_info); RTS_FUN_INFO(stg_BCO_info); RTS_INFO(stg_EVACUATED_info); @@ -115,7 +117,9 @@ RTS_INFO(stg_MUT_VAR_CLEAN_info); RTS_INFO(stg_MUT_VAR_DIRTY_info); RTS_INFO(stg_END_TSO_QUEUE_info); RTS_INFO(stg_MSG_WAKEUP_info); +RTS_INFO(stg_MSG_TRY_WAKEUP_info); RTS_INFO(stg_MSG_THROWTO_info); +RTS_INFO(stg_MSG_BLACKHOLE_info); RTS_INFO(stg_MUT_CONS_info); RTS_INFO(stg_catch_info); RTS_INFO(stg_PAP_info); @@ -142,12 +146,10 @@ RTS_ENTRY(stg_IND_STATIC_entry); RTS_ENTRY(stg_IND_PERM_entry); RTS_ENTRY(stg_IND_OLDGEN_entry); RTS_ENTRY(stg_IND_OLDGEN_PERM_entry); -RTS_ENTRY(stg_CAF_UNENTERED_entry); -RTS_ENTRY(stg_CAF_ENTERED_entry); RTS_ENTRY(stg_WHITEHOLE_entry); RTS_ENTRY(stg_BLACKHOLE_entry); -RTS_ENTRY(__stg_EAGER_BLACKHOLE_entry); RTS_ENTRY(stg_CAF_BLACKHOLE_entry); +RTS_ENTRY(__stg_EAGER_BLACKHOLE_entry); RTS_ENTRY(stg_BCO_entry); RTS_ENTRY(stg_EVACUATED_entry); RTS_ENTRY(stg_WEAK_entry); @@ -166,7 +168,9 @@ RTS_ENTRY(stg_MUT_VAR_CLEAN_entry); RTS_ENTRY(stg_MUT_VAR_DIRTY_entry); RTS_ENTRY(stg_END_TSO_QUEUE_entry); RTS_ENTRY(stg_MSG_WAKEUP_entry); +RTS_ENTRY(stg_MSG_TRY_WAKEUP_entry); RTS_ENTRY(stg_MSG_THROWTO_entry); +RTS_ENTRY(stg_MSG_BLACKHOLE_entry); RTS_ENTRY(stg_MUT_CONS_entry); RTS_ENTRY(stg_catch_entry); RTS_ENTRY(stg_PAP_entry); @@ -404,6 +408,8 @@ RTS_FUN(stg_PAP_apply); RTS_RET_INFO(stg_enter_info); RTS_ENTRY(stg_enter_ret); +RTS_RET_INFO(stg_enter_checkbh_info); +RTS_ENTRY(stg_enter_checkbh_ret); RTS_RET_INFO(stg_gc_void_info); RTS_ENTRY(stg_gc_void_ret); diff --git a/rts/Capability.c b/rts/Capability.c index 5f54ecae4d..f5e77a900f 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -62,9 +62,8 @@ Capability * rts_unsafeGetMyCapability (void) STATIC_INLINE rtsBool globalWorkToDo (void) { - return blackholes_need_checking - || sched_state >= SCHED_INTERRUPTING - ; + return sched_state >= SCHED_INTERRUPTING + || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock } #endif @@ -637,43 +636,6 @@ yieldCapability (Capability** pCap, Task *task) } /* ---------------------------------------------------------------------------- - * Wake up a thread on a Capability. - * - * This is used when the current Task is running on a Capability and - * wishes to wake up a thread on a different Capability. - * ------------------------------------------------------------------------- */ - -void -wakeupThreadOnCapability (Capability *cap, - Capability *other_cap, - StgTSO *tso) -{ - MessageWakeup *msg; - - // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) - if (tso->bound) { - ASSERT(tso->bound->task->cap == tso->cap); - tso->bound->task->cap = other_cap; - } - tso->cap = other_cap; - - ASSERT(tso->why_blocked != BlockedOnMsgWakeup || - tso->block_info.closure->header.info == &stg_IND_info); - - ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info); - - msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup)); - msg->header.info = &stg_MSG_WAKEUP_info; - msg->tso = tso; - tso->block_info.closure = (StgClosure *)msg; - dirty_TSO(cap, tso); - write_barrier(); - tso->why_blocked = BlockedOnMsgWakeup; - - sendMessage(other_cap, (Message*)msg); -} - -/* ---------------------------------------------------------------------------- * prodCapability * * If a Capability is currently idle, wake up a Task on it. Used to @@ -906,24 +868,3 @@ markCapabilities (evac_fn evac, void *user) Messages -------------------------------------------------------------------------- */ -#ifdef THREADED_RTS - -void sendMessage(Capability *cap, Message *msg) -{ - ACQUIRE_LOCK(&cap->lock); - - msg->link = cap->inbox; - cap->inbox = msg; - - if (cap->running_task == NULL) { - cap->running_task = myTask(); - // precond for releaseCapability_() - releaseCapability_(cap,rtsFalse); - } else { - contextSwitchCapability(cap); - } - - RELEASE_LOCK(&cap->lock); -} - -#endif // THREADED_RTS diff --git a/rts/Capability.h b/rts/Capability.h index 4030b5efd4..e12b8ce6e9 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -201,6 +201,8 @@ void waitForReturnCapability (Capability **cap/*in/out*/, Task *task); INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen); +INLINE_HEADER void recordClosureMutated (Capability *cap, StgClosure *p); + #if defined(THREADED_RTS) // Gives up the current capability IFF there is a higher-priority @@ -222,12 +224,6 @@ void yieldCapability (Capability** pCap, Task *task); // void waitForCapability (Task *task, Mutex *mutex, Capability **pCap); -// Wakes up a thread on a Capability (probably a different Capability -// from the one held by the current Task). -// -void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap, - StgTSO *tso); - // Wakes up a worker thread on just one Capability, used when we // need to service some global event. // @@ -289,8 +285,6 @@ void traverseSparkQueues (evac_fn evac, void *user); INLINE_HEADER rtsBool emptyInbox(Capability *cap);; -void sendMessage (Capability *cap, Message *msg); - #endif // THREADED_RTS /* ----------------------------------------------------------------------------- @@ -316,6 +310,15 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen) *bd->free++ = (StgWord)p; } +INLINE_HEADER void +recordClosureMutated (Capability *cap, StgClosure *p) +{ + bdescr *bd; + bd = Bdescr((StgPtr)p); + if (bd->gen_no != 0) recordMutableCap(p,cap,bd->gen_no); +} + + #if defined(THREADED_RTS) INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap) diff --git a/rts/ClosureFlags.c b/rts/ClosureFlags.c index 358cb40ed3..cebd124dd2 100644 --- a/rts/ClosureFlags.c +++ b/rts/ClosureFlags.c @@ -62,8 +62,8 @@ StgWord16 closure_flags[] = { [UPDATE_FRAME] = ( _BTM ), [CATCH_FRAME] = ( _BTM ), [STOP_FRAME] = ( _BTM ), - [CAF_BLACKHOLE] = ( _BTM|_NS| _UPT ), [BLACKHOLE] = ( _NS| _UPT ), + [BLOCKING_QUEUE] = ( _NS| _MUT|_UPT ), [MVAR_CLEAN] = (_HNF| _NS| _MUT|_UPT ), [MVAR_DIRTY] = (_HNF| _NS| _MUT|_UPT ), [ARR_WORDS] = (_HNF| _NS| _UPT ), diff --git a/rts/FrontPanel.c b/rts/FrontPanel.c index ebba4056fb..da42548eb8 100644 --- a/rts/FrontPanel.c +++ b/rts/FrontPanel.c @@ -662,8 +662,6 @@ residencyCensus( void ) type = Thunk; break; - case CAF_BLACKHOLE: - case EAGER_BLACKHOLE: case BLACKHOLE: /* case BLACKHOLE_BQ: FIXME: case does not exist */ size = sizeW_fromITBL(info); diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm index 5bdf600a1c..f8bccc091d 100644 --- a/rts/HeapStackCheck.cmm +++ b/rts/HeapStackCheck.cmm @@ -159,6 +159,24 @@ __stg_gc_enter_1 } /* ----------------------------------------------------------------------------- + stg_enter_checkbh is just like stg_enter, except that we also call + checkBlockingQueues(). The point of this is that the GC can + replace an stg_marked_upd_frame with an stg_enter_checkbh if it + finds that the BLACKHOLE has already been updated by another + thread. It would be unsafe to use stg_enter, because there might + be an orphaned BLOCKING_QUEUE now. + -------------------------------------------------------------------------- */ + +INFO_TABLE_RET( stg_enter_checkbh, RET_SMALL, P_ unused) +{ + R1 = Sp(1); + Sp_adj(2); + foreign "C" checkBlockingQueues(MyCapability() "ptr", + CurrentTSO) [R1]; + ENTER(); +} + +/* ----------------------------------------------------------------------------- Heap checks in Primitive case alternatives A primitive case alternative is entered with a value either in @@ -593,11 +611,7 @@ INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, P_ unused1, P_ unused2 ) // code fragment executed just before we return to the scheduler stg_block_putmvar_finally { -#ifdef THREADED_RTS unlockClosure(R3, stg_MVAR_DIRTY_info); -#else - SET_INFO(R3, stg_MVAR_DIRTY_info); -#endif jump StgReturn; } @@ -611,24 +625,12 @@ stg_block_putmvar BLOCK_BUT_FIRST(stg_block_putmvar_finally); } -// code fragment executed just before we return to the scheduler -stg_block_blackhole_finally -{ -#if defined(THREADED_RTS) - // The last thing we do is release sched_lock, which is - // preventing other threads from accessing blackhole_queue and - // picking up this thread before we are finished with it. - RELEASE_LOCK(sched_mutex "ptr"); -#endif - jump StgReturn; -} - stg_block_blackhole { Sp_adj(-2); Sp(1) = R1; Sp(0) = stg_enter_info; - BLOCK_BUT_FIRST(stg_block_blackhole_finally); + BLOCK_GENERIC; } INFO_TABLE_RET( stg_block_throwto, RET_SMALL, P_ unused, P_ unused ) diff --git a/rts/Interpreter.c b/rts/Interpreter.c index 9071912f2d..16a8e242bd 100644 --- a/rts/Interpreter.c +++ b/rts/Interpreter.c @@ -21,6 +21,7 @@ #include "Disassembler.h" #include "Interpreter.h" #include "ThreadPaused.h" +#include "Threads.h" #include <string.h> /* for memcpy */ #ifdef HAVE_ERRNO_H @@ -443,7 +444,8 @@ do_return: // to a PAP by the GC, violating the invariant that PAPs // always contain a tagged pointer to the function. INTERP_TICK(it_retto_UPDATE); - UPD_IND(cap, ((StgUpdateFrame *)Sp)->updatee, tagged_obj); + updateThunk(cap, cap->r.rCurrentTSO, + ((StgUpdateFrame *)Sp)->updatee, tagged_obj); Sp += sizeofW(StgUpdateFrame); goto do_return; diff --git a/rts/LdvProfile.c b/rts/LdvProfile.c index 412fd05579..799d418145 100644 --- a/rts/LdvProfile.c +++ b/rts/LdvProfile.c @@ -140,7 +140,7 @@ processHeapClosureForDead( StgClosure *c ) case FUN_1_1: case FUN_0_2: case BLACKHOLE: - case CAF_BLACKHOLE: + case BLOCKING_QUEUE: case IND_PERM: case IND_OLDGEN_PERM: /* diff --git a/rts/Linker.c b/rts/Linker.c index 4e8bafface..06240812c2 100644 --- a/rts/Linker.c +++ b/rts/Linker.c @@ -877,7 +877,10 @@ typedef struct _RtsSymbolVal { SymI_HasProto(stable_ptr_table) \ SymI_HasProto(stackOverflow) \ SymI_HasProto(stg_CAF_BLACKHOLE_info) \ + SymI_HasProto(stg_BLACKHOLE_info) \ SymI_HasProto(__stg_EAGER_BLACKHOLE_info) \ + SymI_HasProto(stg_BLOCKING_QUEUE_CLEAN_info) \ + SymI_HasProto(stg_BLOCKING_QUEUE_DIRTY_info) \ SymI_HasProto(startTimer) \ SymI_HasProto(stg_MVAR_CLEAN_info) \ SymI_HasProto(stg_MVAR_DIRTY_info) \ @@ -941,6 +944,7 @@ typedef struct _RtsSymbolVal { SymI_HasProto(stg_sel_8_upd_info) \ SymI_HasProto(stg_sel_9_upd_info) \ SymI_HasProto(stg_upd_frame_info) \ + SymI_HasProto(stg_bh_upd_frame_info) \ SymI_HasProto(suspendThread) \ SymI_HasProto(stg_takeMVarzh) \ SymI_HasProto(stg_threadStatuszh) \ diff --git a/rts/Messages.c b/rts/Messages.c new file mode 100644 index 0000000000..2b40f76e93 --- /dev/null +++ b/rts/Messages.c @@ -0,0 +1,296 @@ +/* --------------------------------------------------------------------------- + * + * (c) The GHC Team, 2010 + * + * Inter-Capability message passing + * + * --------------------------------------------------------------------------*/ + +#include "Rts.h" +#include "Messages.h" +#include "Trace.h" +#include "Capability.h" +#include "Schedule.h" +#include "Threads.h" +#include "RaiseAsync.h" +#include "sm/Storage.h" + +/* ---------------------------------------------------------------------------- + Send a message to another Capability + ------------------------------------------------------------------------- */ + +#ifdef THREADED_RTS + +void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg) +{ + ACQUIRE_LOCK(&to_cap->lock); + +#ifdef DEBUG + { + const StgInfoTable *i = msg->header.info; + if (i != &stg_MSG_WAKEUP_info && + i != &stg_MSG_THROWTO_info && + i != &stg_MSG_BLACKHOLE_info && + i != &stg_MSG_TRY_WAKEUP_info && + i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked + i != &stg_WHITEHOLE_info) { + barf("sendMessage: %p", i); + } + } +#endif + + msg->link = to_cap->inbox; + to_cap->inbox = msg; + + recordClosureMutated(from_cap,(StgClosure*)msg); + + if (to_cap->running_task == NULL) { + to_cap->running_task = myTask(); + // precond for releaseCapability_() + releaseCapability_(to_cap,rtsFalse); + } else { + contextSwitchCapability(to_cap); + } + + RELEASE_LOCK(&to_cap->lock); +} + +#endif /* THREADED_RTS */ + +/* ---------------------------------------------------------------------------- + Handle a message + ------------------------------------------------------------------------- */ + +#ifdef THREADED_RTS + +void +executeMessage (Capability *cap, Message *m) +{ + const StgInfoTable *i; + +loop: + write_barrier(); // allow m->header to be modified by another thread + i = m->header.info; + if (i == &stg_MSG_WAKEUP_info) + { + // the plan is to eventually get rid of these and use + // TRY_WAKEUP instead. + MessageWakeup *w = (MessageWakeup *)m; + StgTSO *tso = w->tso; + debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", + (lnat)tso->id); + ASSERT(tso->cap == cap); + ASSERT(tso->why_blocked == BlockedOnMsgWakeup); + ASSERT(tso->block_info.closure == (StgClosure *)m); + tso->why_blocked = NotBlocked; + appendToRunQueue(cap, tso); + } + else if (i == &stg_MSG_TRY_WAKEUP_info) + { + StgTSO *tso = ((MessageWakeup *)m)->tso; + debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld", + (lnat)tso->id); + tryWakeupThread(cap, tso); + } + else if (i == &stg_MSG_THROWTO_info) + { + MessageThrowTo *t = (MessageThrowTo *)m; + nat r; + const StgInfoTable *i; + + i = lockClosure((StgClosure*)m); + if (i != &stg_MSG_THROWTO_info) { + unlockClosure((StgClosure*)m, i); + goto loop; + } + + debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", + (lnat)t->source->id, (lnat)t->target->id); + + ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo); + ASSERT(t->source->block_info.closure == (StgClosure *)m); + + r = throwToMsg(cap, t); + + switch (r) { + case THROWTO_SUCCESS: + ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info); + t->source->sp += 3; + unblockOne(cap, t->source); + // this message is done + unlockClosure((StgClosure*)m, &stg_IND_info); + break; + case THROWTO_BLOCKED: + // unlock the message + unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info); + break; + } + } + else if (i == &stg_MSG_BLACKHOLE_info) + { + nat r; + MessageBlackHole *b = (MessageBlackHole*)m; + + r = messageBlackHole(cap, b); + if (r == 0) { + tryWakeupThread(cap, b->tso); + } + return; + } + else if (i == &stg_IND_info) + { + // message was revoked + return; + } + else if (i == &stg_WHITEHOLE_info) + { + goto loop; + } + else + { + barf("executeMessage: %p", i); + } +} + +#endif + +/* ---------------------------------------------------------------------------- + Handle a MSG_BLACKHOLE message + + This is called from two places: either we just entered a BLACKHOLE + (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our + cap->inbox. + + We need to establish whether the BLACKHOLE belongs to + this Capability, and + - if so, arrange to block the current thread on it + - otherwise, forward the message to the right place + + Returns: + - 0 if the blocked thread can be woken up by the caller + - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP + at some point in the future. + + ------------------------------------------------------------------------- */ + +nat messageBlackHole(Capability *cap, MessageBlackHole *msg) +{ + const StgInfoTable *info; + StgClosure *p; + StgBlockingQueue *bq; + StgClosure *bh = msg->bh; + StgTSO *owner; + + debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p", + (lnat)msg->tso->id, msg->bh); + + info = bh->header.info; + + // If we got this message in our inbox, it might be that the + // BLACKHOLE has already been updated, and GC has shorted out the + // indirection, so the pointer no longer points to a BLACKHOLE at + // all. + if (info != &stg_BLACKHOLE_info && + info != &stg_CAF_BLACKHOLE_info && + info != &stg_WHITEHOLE_info) { + // if it is a WHITEHOLE, then a thread is in the process of + // trying to BLACKHOLE it. But we know that it was once a + // BLACKHOLE, so there is at least a valid pointer in the + // payload, so we can carry on. + return 0; + } + + // we know at this point that the closure +loop: + p = ((StgInd*)bh)->indirectee; + info = p->header.info; + + if (info == &stg_IND_info) + { + // This could happen, if e.g. we got a BLOCKING_QUEUE that has + // just been replaced with an IND by another thread in + // updateThunk(). In which case, if we read the indirectee + // again we should get the value. + goto loop; + } + + else if (info == &stg_TSO_info) + { + owner = deRefTSO((StgTSO *)p); + +#ifdef THREADED_RTS + if (owner->cap != cap) { + sendMessage(cap, owner->cap, (Message*)msg); + debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no); + return 1; + } +#endif + // owner is the owner of the BLACKHOLE, and resides on this + // Capability. msg->tso is the first thread to block on this + // BLACKHOLE, so we first create a BLOCKING_QUEUE object. + + bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue)); + + // initialise the BLOCKING_QUEUE object + SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM); + bq->bh = bh; + bq->queue = msg; + bq->owner = owner; + + msg->link = (MessageBlackHole*)END_TSO_QUEUE; + + // All BLOCKING_QUEUES are linked in a list on owner->bq, so + // that we can search through them in the event that there is + // a collision to update a BLACKHOLE and a BLOCKING_QUEUE + // becomes orphaned (see updateThunk()). + bq->link = owner->bq; + owner->bq = bq; + dirty_TSO(cap, owner); // we modified owner->bq + + // point to the BLOCKING_QUEUE from the BLACKHOLE + write_barrier(); // make the BQ visible + ((StgInd*)bh)->indirectee = (StgClosure *)bq; + recordClosureMutated(cap,bh); // bh was mutated + + debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", + (lnat)msg->tso->id, (lnat)owner->id); + + return 1; // blocked + } + else if (info == &stg_BLOCKING_QUEUE_CLEAN_info || + info == &stg_BLOCKING_QUEUE_DIRTY_info) + { + StgBlockingQueue *bq = (StgBlockingQueue *)p; + + ASSERT(bq->bh == bh); + + owner = deRefTSO(bq->owner); + + ASSERT(owner != END_TSO_QUEUE); + +#ifdef THREADED_RTS + if (owner->cap != cap) { + sendMessage(cap, owner->cap, (Message*)msg); + debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no); + return 1; + } +#endif + + msg->link = bq->queue; + bq->queue = msg; + recordClosureMutated(cap,(StgClosure*)msg); + + if (info == &stg_BLOCKING_QUEUE_CLEAN_info) { + bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info; + recordClosureMutated(cap,bq); + } + + debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", + (lnat)msg->tso->id, (lnat)owner->id); + + return 1; // blocked + } + + return 0; // not blocked +} + diff --git a/rts/Messages.h b/rts/Messages.h new file mode 100644 index 0000000000..15c037954b --- /dev/null +++ b/rts/Messages.h @@ -0,0 +1,18 @@ +/* --------------------------------------------------------------------------- + * + * (c) The GHC Team, 2010 + * + * Inter-Capability message passing + * + * --------------------------------------------------------------------------*/ + +BEGIN_RTS_PRIVATE + +nat messageBlackHole(Capability *cap, MessageBlackHole *msg); + +#ifdef THREADED_RTS +void executeMessage (Capability *cap, Message *m); +void sendMessage (Capability *from_cap, Capability *to_cap, Message *msg); +#endif + +END_RTS_PRIVATE diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index 4b5e106710..5c575f695b 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -1204,11 +1204,7 @@ stg_takeMVarzh StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif RET_P(val); } else @@ -1216,11 +1212,7 @@ stg_takeMVarzh /* No further putMVars, MVar is now empty */ StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure; -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif RET_P(val); } @@ -1279,21 +1271,13 @@ stg_tryTakeMVarzh if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif } else { /* No further putMVars, MVar is now empty */ StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure; -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif } RET_NP(1, val); @@ -1360,11 +1344,7 @@ stg_putMVarzh StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif jump %ENTRY_CODE(Sp(0)); } else @@ -1372,11 +1352,7 @@ stg_putMVarzh /* No further takes, the MVar is now full. */ StgMVar_value(mvar) = val; -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif jump %ENTRY_CODE(Sp(0)); } @@ -1429,22 +1405,14 @@ stg_tryPutMVarzh StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif } else { /* No further takes, the MVar is now full. */ StgMVar_value(mvar) = R2; -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif } RET_N(1); diff --git a/rts/Printer.c b/rts/Printer.c index e9813299d8..6eecfabbfb 100644 --- a/rts/Printer.c +++ b/rts/Printer.c @@ -257,6 +257,12 @@ printClosure( StgClosure *obj ) debugBelch(")\n"); break; + case BLACKHOLE: + debugBelch("BLACKHOLE("); + printPtr((StgPtr)((StgInd*)obj)->indirectee); + debugBelch(")\n"); + break; + /* Cannot happen -- use default case. case RET_BCO: case RET_SMALL: @@ -296,14 +302,6 @@ printClosure( StgClosure *obj ) break; } - case CAF_BLACKHOLE: - debugBelch("CAF_BH"); - break; - - case BLACKHOLE: - debugBelch("BH\n"); - break; - case ARR_WORDS: { StgWord i; @@ -1122,8 +1120,8 @@ char *closure_type_names[] = { [UPDATE_FRAME] = "UPDATE_FRAME", [CATCH_FRAME] = "CATCH_FRAME", [STOP_FRAME] = "STOP_FRAME", - [CAF_BLACKHOLE] = "CAF_BLACKHOLE", [BLACKHOLE] = "BLACKHOLE", + [BLOCKING_QUEUE] = "BLOCKING_QUEUE", [MVAR_CLEAN] = "MVAR_CLEAN", [MVAR_DIRTY] = "MVAR_DIRTY", [ARR_WORDS] = "ARR_WORDS", diff --git a/rts/ProfHeap.c b/rts/ProfHeap.c index e90051c5e6..4a2816c29d 100644 --- a/rts/ProfHeap.c +++ b/rts/ProfHeap.c @@ -878,8 +878,8 @@ heapCensusChain( Census *census, bdescr *bd ) case IND_PERM: case IND_OLDGEN: case IND_OLDGEN_PERM: - case CAF_BLACKHOLE: case BLACKHOLE: + case BLOCKING_QUEUE: case FUN_1_0: case FUN_0_1: case FUN_1_1: diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index d02a2567ff..d5a4918f34 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -18,6 +18,7 @@ #include "STM.h" #include "sm/Sanity.h" #include "Profiling.h" +#include "Messages.h" #if defined(mingw32_HOST_OS) #include "win32/IOManager.h" #endif @@ -66,13 +67,12 @@ void throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) { + tso = deRefTSO(tso); + // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { return; } - while (tso->what_next == ThreadRelocated) { - tso = tso->_link; - } // Remove it from any blocking queues removeFromQueues(cap,tso); @@ -83,13 +83,12 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, void suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) { + tso = deRefTSO(tso); + // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { return; } - while (tso->what_next == ThreadRelocated) { - tso = tso->_link; - } // Remove it from any blocking queues removeFromQueues(cap,tso); @@ -164,7 +163,7 @@ throwTo (Capability *cap, // the Capability we hold msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo)); // message starts locked; the caller has to unlock it when it is // ready. - msg->header.info = &stg_WHITEHOLE_info; + SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM); msg->source = source; msg->target = target; msg->exception = exception; @@ -185,14 +184,24 @@ throwToMsg (Capability *cap, MessageThrowTo *msg) { StgWord status; StgTSO *target = msg->target; + Capability *target_cap; + goto check_target; + +retry: + write_barrier(); + debugTrace(DEBUG_sched, "throwTo: retrying..."); + +check_target: ASSERT(target != END_TSO_QUEUE); // follow ThreadRelocated links in the target first - while (target->what_next == ThreadRelocated) { - target = target->_link; - // No, it might be a WHITEHOLE: - // ASSERT(get_itbl(target)->type == TSO); + target = deRefTSO(target); + + // Thread already dead? + if (target->what_next == ThreadComplete + || target->what_next == ThreadKilled) { + return THROWTO_SUCCESS; } debugTraceCap(DEBUG_sched, cap, @@ -204,18 +213,10 @@ throwToMsg (Capability *cap, MessageThrowTo *msg) traceThreadStatus(DEBUG_sched, target); #endif - goto check_target; -retry: - write_barrier(); - debugTrace(DEBUG_sched, "throwTo: retrying..."); - -check_target: - ASSERT(target != END_TSO_QUEUE); - - // Thread already dead? - if (target->what_next == ThreadComplete - || target->what_next == ThreadKilled) { - return THROWTO_SUCCESS; + target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; } status = target->why_blocked; @@ -282,28 +283,19 @@ check_target: have also seen the write to Q. */ { - Capability *target_cap; - write_barrier(); - target_cap = target->cap; - if (target_cap != cap) { - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; + if ((target->flags & TSO_BLOCKEX) == 0) { + // It's on our run queue and not blocking exceptions + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; } else { - if ((target->flags & TSO_BLOCKEX) == 0) { - // It's on our run queue and not blocking exceptions - raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - return THROWTO_SUCCESS; - } else { - blockedThrowTo(cap,target,msg); - return THROWTO_BLOCKED; - } + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; } } case BlockedOnMsgThrowTo: { - Capability *target_cap; const StgInfoTable *i; MessageThrowTo *m; @@ -340,13 +332,6 @@ check_target: goto retry; } - target_cap = target->cap; - if (target_cap != cap) { - unlockClosure((StgClosure*)m, i); - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; - } - if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { unlockClosure((StgClosure*)m, i); @@ -358,7 +343,6 @@ check_target: unlockClosure((StgClosure*)m, &stg_IND_info); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); return THROWTO_SUCCESS; } @@ -400,48 +384,30 @@ check_target: if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } + blockedThrowTo(cap,target,msg); unlockClosure((StgClosure *)mvar, info); return THROWTO_BLOCKED; } else { removeThreadFromMVarQueue(cap, mvar, target); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); - unlockClosure((StgClosure *)mvar, info); + if (info == &stg_MVAR_CLEAN_info) { + dirty_MVAR(&cap->r,(StgClosure*)mvar); + } + unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info); return THROWTO_SUCCESS; } } case BlockedOnBlackHole: { - ACQUIRE_LOCK(&sched_mutex); - // double checking the status after the memory barrier: - if (target->why_blocked != BlockedOnBlackHole) { - RELEASE_LOCK(&sched_mutex); - goto retry; - } - - if (target->flags & TSO_BLOCKEX) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } - RELEASE_LOCK(&sched_mutex); - return THROWTO_BLOCKED; // caller releases lock - } else { - removeThreadFromQueue(cap, &blackhole_queue, target); - raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); - RELEASE_LOCK(&sched_mutex); - return THROWTO_SUCCESS; - } + // Revoke the message by replacing it with IND. We're not + // locking anything here, so we might still get a TRY_WAKEUP + // message from the owner of the blackhole some time in the + // future, but that doesn't matter. + ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info); + OVERWRITE_INFO(target->block_info.bh, &stg_IND_info); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; } case BlockedOnSTM: @@ -454,35 +420,19 @@ check_target: } if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } + blockedThrowTo(cap,target,msg); unlockTSO(target); return THROWTO_BLOCKED; } else { raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); unlockTSO(target); return THROWTO_SUCCESS; } case BlockedOnCCall: case BlockedOnCCall_NoUnblockExc: - { - Capability *target_cap; - - target_cap = target->cap; - if (target_cap != cap) { - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; - } - blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; - } #ifndef THREADEDED_RTS case BlockedOnRead: @@ -515,9 +465,9 @@ throwToSendMsg (Capability *cap STG_UNUSED, { #ifdef THREADED_RTS - debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); + debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); - sendMessage(target_cap, (Message*)msg); + sendMessage(cap, target_cap, (Message*)msg); #endif } @@ -532,7 +482,7 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg) ASSERT(target->cap == cap); - msg->link = (Message*)target->blocked_exceptions; + msg->link = target->blocked_exceptions; target->blocked_exceptions = msg; dirty_TSO(cap,target); // we modified the blocked_exceptions queue } @@ -571,7 +521,7 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && (tso->flags & TSO_BLOCKEX) != 0) { - debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); + debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); } if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE @@ -664,16 +614,18 @@ removeFromQueues(Capability *cap, StgTSO *tso) case BlockedOnMVar: removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso); + // we aren't doing a write barrier here: the MVar is supposed to + // be already locked, so replacing the info pointer would unlock it. goto done; case BlockedOnBlackHole: - removeThreadFromQueue(cap, &blackhole_queue, tso); + // nothing to do goto done; case BlockedOnMsgWakeup: { // kill the message, atomically: - tso->block_info.wakeup->header.info = &stg_IND_info; + OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info); break; } @@ -725,7 +677,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) * asynchronous exception in an existing thread. * * We first remove the thread from any queue on which it might be - * blocked. The possible blockages are MVARs and BLACKHOLE_BQs. + * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and + * TSO blocked_exception queues. * * We strip the stack down to the innermost CATCH_FRAME, building * thunks in the heap for all the active computations, so they can @@ -764,8 +717,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, StgClosure *updatee; nat i; - debugTrace(DEBUG_sched, - "raising exception in thread %ld.", (long)tso->id); + debugTraceCap(DEBUG_sched, cap, + "raising exception in thread %ld.", (long)tso->id); #if defined(PROFILING) /* @@ -784,6 +737,15 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, tso->what_next != ThreadKilled && tso->what_next != ThreadRelocated); + // only if we own this TSO (except that deleteThread() calls this + ASSERT(tso->cap == cap); + + // wake it up + if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) { + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); + } + // mark it dirty; we're about to change its stack. dirty_TSO(cap, tso); @@ -871,7 +833,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // Perform the update // TODO: this may waste some work, if the thunk has // already been updated by another thread. - UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); + updateThunk(cap, tso, + ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); } sp += sizeofW(StgUpdateFrame) - 1; @@ -963,8 +926,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, { StgTRecHeader *trec = tso -> trec; StgTRecHeader *outer = trec -> enclosing_trec; - debugTrace(DEBUG_stm, - "found atomically block delivering async exception"); + debugTraceCap(DEBUG_stm, cap, + "found atomically block delivering async exception"); stmAbortTransaction(cap, trec); stmFreeAbortedTRec(cap, trec); tso -> trec = outer; diff --git a/rts/RetainerProfile.c b/rts/RetainerProfile.c index ba4d1465c8..b5db15a7b6 100644 --- a/rts/RetainerProfile.c +++ b/rts/RetainerProfile.c @@ -453,8 +453,6 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child ) // no child, no SRT case CONSTR_0_1: case CONSTR_0_2: - case CAF_BLACKHOLE: - case BLACKHOLE: case ARR_WORDS: *first_child = NULL; return; @@ -470,6 +468,7 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child ) case IND_PERM: case IND_OLDGEN_PERM: case IND_OLDGEN: + case BLACKHOLE: *first_child = ((StgInd *)c)->indirectee; return; case CONSTR_1_0: @@ -916,8 +915,6 @@ pop( StgClosure **c, StgClosure **cp, retainer *r ) // no child (fixed), no SRT case CONSTR_0_1: case CONSTR_0_2: - case CAF_BLACKHOLE: - case BLACKHOLE: case ARR_WORDS: // one child (fixed), no SRT case MUT_VAR_CLEAN: @@ -1059,13 +1056,11 @@ isRetainer( StgClosure *c ) case FUN_0_2: // partial applications case PAP: - // blackholes - case CAF_BLACKHOLE: - case BLACKHOLE: // indirection case IND_PERM: case IND_OLDGEN_PERM: case IND_OLDGEN: + case BLACKHOLE: // static objects case CONSTR_STATIC: case FUN_STATIC: @@ -377,7 +377,7 @@ static void unpark_tso(Capability *cap, StgTSO *tso) { lockTSO(tso); if (tso -> why_blocked == BlockedOnSTM) { TRACE("unpark_tso on tso=%p", tso); - unblockOne(cap,tso); + tryWakeupThread(cap,tso); } else { TRACE("spurious unpark_tso on tso=%p", tso); } diff --git a/rts/Schedule.c b/rts/Schedule.c index f3982b148b..72f6d44a8c 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -39,6 +39,7 @@ #include "Threads.h" #include "Timer.h" #include "ThreadPaused.h" +#include "Messages.h" #ifdef HAVE_SYS_TYPES_H #include <sys/types.h> @@ -66,17 +67,6 @@ StgTSO *blocked_queue_tl = NULL; StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table? #endif -/* Threads blocked on blackholes. - * LOCK: sched_mutex+capability, or all capabilities - */ -StgTSO *blackhole_queue = NULL; - -/* The blackhole_queue should be checked for threads to wake up. See - * Schedule.h for more thorough comment. - * LOCK: none (doesn't matter if we miss an update) - */ -rtsBool blackholes_need_checking = rtsFalse; - /* Set to true when the latest garbage collection failed to reclaim * enough space, and the runtime should proceed to shut itself down in * an orderly fashion (emitting profiling info etc.) @@ -140,7 +130,6 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool); static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); static void scheduleProcessInbox(Capability *cap); -static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); static void schedulePushWork(Capability *cap, Task *task); #if defined(THREADED_RTS) @@ -159,8 +148,6 @@ static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc); static Capability *scheduleDoGC(Capability *cap, Task *task, rtsBool force_major); -static rtsBool checkBlackHoles(Capability *cap); - static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso); static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso); @@ -433,9 +420,6 @@ run_thread: startHeapProfTimer(); - // Check for exceptions blocked on this thread - maybePerformBlockedException (cap, t); - // ---------------------------------------------------------------------- // Run the current thread @@ -506,13 +490,6 @@ run_thread: // happened. So find the new location: t = cap->r.rCurrentTSO; - // We have run some Haskell code: there might be blackhole-blocked - // threads to wake up now. - // Lock-free test here should be ok, we're just setting a flag. - if ( blackhole_queue != END_TSO_QUEUE ) { - blackholes_need_checking = rtsTrue; - } - // And save the current errno in this thread. // XXX: possibly bogus for SMP because this thread might already // be running again, see code below. @@ -612,12 +589,6 @@ scheduleFindWork (Capability *cap) { scheduleStartSignalHandlers(cap); - // Only check the black holes here if we've nothing else to do. - // During normal execution, the black hole list only gets checked - // at GC time, to avoid repeatedly traversing this possibly long - // list each time around the scheduler. - if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - scheduleProcessInbox(cap); scheduleCheckBlockedThreads(cap); @@ -674,7 +645,6 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield) !shouldYieldCapability(cap,task) && (!emptyRunQueue(cap) || !emptyInbox(cap) || - blackholes_need_checking || sched_state >= SCHED_INTERRUPTING)) return; @@ -863,125 +833,11 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) // if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) ) { - awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking ); + awaitEvent (emptyRunQueue(cap)); } #endif } - -/* ---------------------------------------------------------------------------- - * Check for threads woken up by other Capabilities - * ------------------------------------------------------------------------- */ - -#if defined(THREADED_RTS) -static void -executeMessage (Capability *cap, Message *m) -{ - const StgInfoTable *i; - -loop: - write_barrier(); // allow m->header to be modified by another thread - i = m->header.info; - if (i == &stg_MSG_WAKEUP_info) - { - MessageWakeup *w = (MessageWakeup *)m; - StgTSO *tso = w->tso; - debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", - (lnat)tso->id); - ASSERT(tso->cap == cap); - ASSERT(tso->why_blocked == BlockedOnMsgWakeup); - ASSERT(tso->block_info.closure == (StgClosure *)m); - tso->why_blocked = NotBlocked; - appendToRunQueue(cap, tso); - } - else if (i == &stg_MSG_THROWTO_info) - { - MessageThrowTo *t = (MessageThrowTo *)m; - nat r; - const StgInfoTable *i; - - i = lockClosure((StgClosure*)m); - if (i != &stg_MSG_THROWTO_info) { - unlockClosure((StgClosure*)m, i); - goto loop; - } - - debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", - (lnat)t->source->id, (lnat)t->target->id); - - ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo); - ASSERT(t->source->block_info.closure == (StgClosure *)m); - - r = throwToMsg(cap, t); - - switch (r) { - case THROWTO_SUCCESS: - ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info); - t->source->sp += 3; - unblockOne(cap, t->source); - // this message is done - unlockClosure((StgClosure*)m, &stg_IND_info); - break; - case THROWTO_BLOCKED: - // unlock the message - unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info); - break; - } - } - else if (i == &stg_IND_info) - { - // message was revoked - return; - } - else if (i == &stg_WHITEHOLE_info) - { - goto loop; - } - else - { - barf("executeMessage: %p", i); - } -} -#endif - -static void -scheduleProcessInbox (Capability *cap USED_IF_THREADS) -{ -#if defined(THREADED_RTS) - Message *m; - - while (!emptyInbox(cap)) { - ACQUIRE_LOCK(&cap->lock); - m = cap->inbox; - cap->inbox = m->link; - RELEASE_LOCK(&cap->lock); - executeMessage(cap, (Message *)m); - } -#endif -} - -/* ---------------------------------------------------------------------------- - * Check for threads blocked on BLACKHOLEs that can be woken up - * ------------------------------------------------------------------------- */ -static void -scheduleCheckBlackHoles (Capability *cap) -{ - if ( blackholes_need_checking ) // check without the lock first - { - ACQUIRE_LOCK(&sched_mutex); - if ( blackholes_need_checking ) { - blackholes_need_checking = rtsFalse; - // important that we reset the flag *before* checking the - // blackhole queue, otherwise we could get deadlock. This - // happens as follows: we wake up a thread that - // immediately runs on another Capability, blocks on a - // blackhole, and then we reset the blackholes_need_checking flag. - checkBlackHoles(cap); - } - RELEASE_LOCK(&sched_mutex); - } -} - /* ---------------------------------------------------------------------------- * Detect deadlock conditions and attempt to resolve them. * ------------------------------------------------------------------------- */ @@ -1090,6 +946,26 @@ scheduleSendPendingMessages(void) #endif /* ---------------------------------------------------------------------------- + * Process message in the current Capability's inbox + * ------------------------------------------------------------------------- */ + +static void +scheduleProcessInbox (Capability *cap USED_IF_THREADS) +{ +#if defined(THREADED_RTS) + Message *m; + + while (!emptyInbox(cap)) { + ACQUIRE_LOCK(&cap->lock); + m = cap->inbox; + cap->inbox = m->link; + RELEASE_LOCK(&cap->lock); + executeMessage(cap, (Message *)m); + } +#endif +} + +/* ---------------------------------------------------------------------------- * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS) * ------------------------------------------------------------------------- */ @@ -1499,9 +1375,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); } - // do this while the other Capabilities stop: - if (cap) scheduleCheckBlackHoles(cap); - if (gc_type == PENDING_GC_SEQ) { // single-threaded GC: grab all the capabilities @@ -1529,11 +1402,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) waitForGcThreads(cap); } -#else /* !THREADED_RTS */ - - // do this while the other Capabilities stop: - if (cap) scheduleCheckBlackHoles(cap); - #endif IF_DEBUG(scheduler, printAllThreads()); @@ -2093,8 +1961,6 @@ initScheduler(void) sleeping_queue = END_TSO_QUEUE; #endif - blackhole_queue = END_TSO_QUEUE; - sched_state = SCHED_RUNNING; recent_activity = ACTIVITY_YES; @@ -2347,8 +2213,9 @@ threadStackOverflow(Capability *cap, StgTSO *tso) * of the stack, so we don't attempt to scavenge any part of the * dead TSO's stack. */ - tso->what_next = ThreadRelocated; setTSOLink(cap,tso,dest); + write_barrier(); // other threads seeing ThreadRelocated will look at _link + tso->what_next = ThreadRelocated; tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->why_blocked = NotBlocked; @@ -2405,8 +2272,9 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu", (long)tso->id, tso_size_w, tso_sizeW(new_tso)); - tso->what_next = ThreadRelocated; tso->_link = new_tso; // no write barrier reqd: same generation + write_barrier(); // other threads seeing ThreadRelocated will look at _link + tso->what_next = ThreadRelocated; // The TSO attached to this Task may have moved, so update the // pointer to it. @@ -2458,57 +2326,6 @@ void wakeUpRts(void) #endif /* ----------------------------------------------------------------------------- - * checkBlackHoles() - * - * Check the blackhole_queue for threads that can be woken up. We do - * this periodically: before every GC, and whenever the run queue is - * empty. - * - * An elegant solution might be to just wake up all the blocked - * threads with awakenBlockedQueue occasionally: they'll go back to - * sleep again if the object is still a BLACKHOLE. Unfortunately this - * doesn't give us a way to tell whether we've actually managed to - * wake up any threads, so we would be busy-waiting. - * - * -------------------------------------------------------------------------- */ - -static rtsBool -checkBlackHoles (Capability *cap) -{ - StgTSO **prev, *t; - rtsBool any_woke_up = rtsFalse; - StgHalfWord type; - - // blackhole_queue is global: - ASSERT_LOCK_HELD(&sched_mutex); - - debugTrace(DEBUG_sched, "checking threads blocked on black holes"); - - // ASSUMES: sched_mutex - prev = &blackhole_queue; - t = blackhole_queue; - while (t != END_TSO_QUEUE) { - if (t->what_next == ThreadRelocated) { - t = t->_link; - continue; - } - ASSERT(t->why_blocked == BlockedOnBlackHole); - type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type; - if (type != BLACKHOLE && type != CAF_BLACKHOLE) { - IF_DEBUG(sanity,checkTSO(t)); - t = unblockOne(cap, t); - *prev = t; - any_woke_up = rtsTrue; - } else { - prev = &t->_link; - t = t->_link; - } - } - - return any_woke_up; -} - -/* ----------------------------------------------------------------------------- Deleting threads This is used for interruption (^C) and forking, and corresponds to @@ -2517,7 +2334,7 @@ checkBlackHoles (Capability *cap) -------------------------------------------------------------------------- */ static void -deleteThread (Capability *cap, StgTSO *tso) +deleteThread (Capability *cap STG_UNUSED, StgTSO *tso) { // NOTE: must only be called on a TSO that we have exclusive // access to, because we will call throwToSingleThreaded() below. @@ -2526,7 +2343,7 @@ deleteThread (Capability *cap, StgTSO *tso) if (tso->why_blocked != BlockedOnCCall && tso->why_blocked != BlockedOnCCall_NoUnblockExc) { - throwToSingleThreaded(cap,tso,NULL); + throwToSingleThreaded(tso->cap,tso,NULL); } } @@ -2599,8 +2416,8 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) SET_HDR(raise_closure, &stg_raise_info, CCCS); raise_closure->payload[0] = exception; } - UPD_IND(cap, ((StgUpdateFrame *)p)->updatee, - (StgClosure *)raise_closure); + updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee, + (StgClosure *)raise_closure); p = next; continue; diff --git a/rts/Schedule.h b/rts/Schedule.h index 2412285d29..0db2b1ed84 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -86,15 +86,6 @@ extern StgTSO *blocked_queue_hd, *blocked_queue_tl; extern StgTSO *sleeping_queue; #endif -/* Set to rtsTrue if there are threads on the blackhole_queue, and - * it is possible that one or more of them may be available to run. - * This flag is set to rtsFalse after we've checked the queue, and - * set to rtsTrue just before we run some Haskell code. It is used - * to decide whether we should yield the Capability or not. - * Locks required : none (see scheduleCheckBlackHoles()). - */ -extern rtsBool blackholes_need_checking; - extern rtsBool heap_overflow; #if defined(THREADED_RTS) diff --git a/rts/StgMiscClosures.cmm b/rts/StgMiscClosures.cmm index f111875760..830bde5665 100644 --- a/rts/StgMiscClosures.cmm +++ b/rts/StgMiscClosures.cmm @@ -283,96 +283,105 @@ INFO_TABLE(stg_IND_OLDGEN_PERM,1,0,IND_OLDGEN_PERM,"IND_OLDGEN_PERM","IND_OLDGEN waiting for the evaluation of the closure to finish. ------------------------------------------------------------------------- */ -/* Note: a BLACKHOLE must be big enough to be - * overwritten with an indirection/evacuee/catch. Thus we claim it - * has 1 non-pointer word of payload. - */ -INFO_TABLE(stg_BLACKHOLE,0,1,BLACKHOLE,"BLACKHOLE","BLACKHOLE") +INFO_TABLE(stg_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE") { - TICK_ENT_BH(); - -#ifdef THREADED_RTS - // foreign "C" debugBelch("BLACKHOLE entry\n"); -#endif - - /* Actually this is not necessary because R1 is about to be destroyed. */ - LDV_ENTER(R1); + W_ r, p, info, bq, msg, owner, bd; -#if defined(THREADED_RTS) - ACQUIRE_LOCK(sched_mutex "ptr"); - // released in stg_block_blackhole_finally -#endif - - /* Put ourselves on the blackhole queue */ - StgTSO__link(CurrentTSO) = W_[blackhole_queue]; - W_[blackhole_queue] = CurrentTSO; - - /* jot down why and on what closure we are blocked */ - StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16; - StgTSO_block_info(CurrentTSO) = R1; + TICK_ENT_DYN_IND(); /* tick */ - jump stg_block_blackhole; +retry: + p = StgInd_indirectee(R1); + if (GETTAG(p) != 0) { + R1 = p; + jump %ENTRY_CODE(Sp(0)); + } + + info = StgHeader_info(p); + if (info == stg_IND_info) { + // This could happen, if e.g. we got a BLOCKING_QUEUE that has + // just been replaced with an IND by another thread in + // wakeBlockingQueue(). + goto retry; + } + + if (info == stg_TSO_info || + info == stg_BLOCKING_QUEUE_CLEAN_info || + info == stg_BLOCKING_QUEUE_DIRTY_info) + { + ("ptr" msg) = foreign "C" allocate(MyCapability() "ptr", + BYTES_TO_WDS(SIZEOF_MessageBlackHole)) [R1]; + + StgHeader_info(msg) = stg_MSG_BLACKHOLE_info; + MessageBlackHole_tso(msg) = CurrentTSO; + MessageBlackHole_bh(msg) = R1; + + (r) = foreign "C" messageBlackHole(MyCapability() "ptr", msg "ptr") [R1]; + + if (r == 0) { + goto retry; + } else { + StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16; + StgTSO_block_info(CurrentTSO) = msg; + jump stg_block_blackhole; + } + } + else + { + R1 = p; + ENTER(); + } } -/* identical to BLACKHOLEs except for the infotag */ -INFO_TABLE(stg_CAF_BLACKHOLE,0,1,CAF_BLACKHOLE,"CAF_BLACKHOLE","CAF_BLACKHOLE") +INFO_TABLE(__stg_EAGER_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE") { - TICK_ENT_BH(); - LDV_ENTER(R1); - -#if defined(THREADED_RTS) - // foreign "C" debugBelch("BLACKHOLE entry\n"); -#endif - -#if defined(THREADED_RTS) - ACQUIRE_LOCK(sched_mutex "ptr"); - // released in stg_block_blackhole_finally -#endif - - /* Put ourselves on the blackhole queue */ - StgTSO__link(CurrentTSO) = W_[blackhole_queue]; - W_[blackhole_queue] = CurrentTSO; - - /* jot down why and on what closure we are blocked */ - StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16; - StgTSO_block_info(CurrentTSO) = R1; - - jump stg_block_blackhole; + jump ENTRY_LBL(stg_BLACKHOLE); } -INFO_TABLE(__stg_EAGER_BLACKHOLE,0,1,BLACKHOLE,"EAGER_BLACKHOLE","EAGER_BLACKHOLE") +// CAF_BLACKHOLE is allocated when entering a CAF. The reason it is +// distinct from BLACKHOLE is so that we can tell the difference +// between an update frame on the stack that points to a CAF under +// evaluation, and one that points to a closure that is under +// evaluation by another thread (a BLACKHOLE). See threadPaused(). +// +INFO_TABLE(stg_CAF_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE") { - TICK_ENT_BH(); - -#ifdef THREADED_RTS - // foreign "C" debugBelch("BLACKHOLE entry\n"); -#endif - - /* Actually this is not necessary because R1 is about to be destroyed. */ - LDV_ENTER(R1); - -#if defined(THREADED_RTS) - ACQUIRE_LOCK(sched_mutex "ptr"); - // released in stg_block_blackhole_finally -#endif - - /* Put ourselves on the blackhole queue */ - StgTSO__link(CurrentTSO) = W_[blackhole_queue]; - W_[blackhole_queue] = CurrentTSO; + jump ENTRY_LBL(stg_BLACKHOLE); +} - /* jot down why and on what closure we are blocked */ - StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16; - StgTSO_block_info(CurrentTSO) = R1; +INFO_TABLE(stg_BLOCKING_QUEUE_CLEAN,4,0,BLOCKING_QUEUE,"BLOCKING_QUEUE","BLOCKING_QUEUE") +{ foreign "C" barf("BLOCKING_QUEUE_CLEAN object entered!") never returns; } + - jump stg_block_blackhole; -} +INFO_TABLE(stg_BLOCKING_QUEUE_DIRTY,4,0,BLOCKING_QUEUE,"BLOCKING_QUEUE","BLOCKING_QUEUE") +{ foreign "C" barf("BLOCKING_QUEUE_DIRTY object entered!") never returns; } + /* ---------------------------------------------------------------------------- Whiteholes are used for the "locked" state of a closure (see lockClosure()) ------------------------------------------------------------------------- */ INFO_TABLE(stg_WHITEHOLE, 0,0, WHITEHOLE, "WHITEHOLE", "WHITEHOLE") -{ foreign "C" barf("WHITEHOLE object entered!") never returns; } +{ +#if defined(THREADED_RTS) + W_ info, i; + + i = 0; +loop: + // spin until the WHITEHOLE is updated + info = StgHeader_info(R1); + if (info == stg_WHITEHOLE_info) { + i = i + 1; + if (i == SPIN_COUNT) { + i = 0; + foreign "C" yieldThread() [R1]; + } + goto loop; + } + jump %ENTRY_CODE(info); +#else + foreign "C" barf("WHITEHOLE object entered!") never returns; +#endif +} /* ---------------------------------------------------------------------------- Some static info tables for things that don't get entered, and @@ -485,9 +494,15 @@ CLOSURE(stg_NO_TREC_closure,stg_NO_TREC); INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP") { foreign "C" barf("MSG_WAKEUP object entered!") never returns; } +INFO_TABLE_CONSTR(stg_MSG_TRY_WAKEUP,2,0,0,PRIM,"MSG_TRY_WAKEUP","MSG_TRY_WAKEUP") +{ foreign "C" barf("MSG_TRY_WAKEUP object entered!") never returns; } + INFO_TABLE_CONSTR(stg_MSG_THROWTO,4,0,0,PRIM,"MSG_THROWTO","MSG_THROWTO") { foreign "C" barf("MSG_THROWTO object entered!") never returns; } +INFO_TABLE_CONSTR(stg_MSG_BLACKHOLE,3,0,0,PRIM,"MSG_BLACKHOLE","MSG_BLACKHOLE") +{ foreign "C" barf("MSG_BLACKHOLE object entered!") never returns; } + /* ---------------------------------------------------------------------------- END_TSO_QUEUE diff --git a/rts/ThreadPaused.c b/rts/ThreadPaused.c index 75712b04d6..7aee59dcd5 100644 --- a/rts/ThreadPaused.c +++ b/rts/ThreadPaused.c @@ -14,6 +14,7 @@ #include "Updates.h" #include "RaiseAsync.h" #include "Trace.h" +#include "Threads.h" #include <string.h> // for memmove() @@ -75,7 +76,7 @@ stackSqueeze(Capability *cap, StgTSO *tso, StgPtr bottom) * screw us up if we don't check. */ if (upd->updatee != updatee && !closure_IND(upd->updatee)) { - UPD_IND(cap, upd->updatee, updatee); + updateThunk(cap, tso, upd->updatee, updatee); } // now mark this update frame as a stack gap. The gap @@ -196,7 +197,7 @@ threadPaused(Capability *cap, StgTSO *tso) maybePerformBlockedException (cap, tso); if (tso->what_next == ThreadKilled) { return; } - // NB. Blackholing is *not* optional, we must either do lazy + // NB. Blackholing is *compulsory*, we must either do lazy // blackholing, or eager blackholing consistently. See Note // [upd-black-hole] in sm/Scav.c. @@ -229,8 +230,9 @@ threadPaused(Capability *cap, StgTSO *tso) #ifdef THREADED_RTS retry: #endif - if (closure_flags[INFO_PTR_TO_STRUCT(bh_info)->type] & _IND - || bh_info == &stg_BLACKHOLE_info) { + if (bh_info == &stg_BLACKHOLE_info || + bh_info == &stg_WHITEHOLE_info) + { debugTrace(DEBUG_squeeze, "suspending duplicate work: %ld words of stack", (long)((StgPtr)frame - tso->sp)); @@ -245,6 +247,7 @@ threadPaused(Capability *cap, StgTSO *tso) // the value to the frame underneath: tso->sp = (StgPtr)frame + sizeofW(StgUpdateFrame) - 2; tso->sp[1] = (StgWord)bh; + ASSERT(bh->header.info != &stg_TSO_info); tso->sp[0] = (W_)&stg_enter_info; // And continue with threadPaused; there might be @@ -254,33 +257,40 @@ threadPaused(Capability *cap, StgTSO *tso) continue; } - if (bh->header.info != &stg_CAF_BLACKHOLE_info) { - // zero out the slop so that the sanity checker can tell - // where the next closure is. - DEBUG_FILL_SLOP(bh); -#ifdef PROFILING - // @LDV profiling - // We pretend that bh is now dead. - LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh); -#endif - // an EAGER_BLACKHOLE gets turned into a BLACKHOLE here. + // zero out the slop so that the sanity checker can tell + // where the next closure is. + DEBUG_FILL_SLOP(bh); + + // @LDV profiling + // We pretend that bh is now dead. + LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)bh); + + // an EAGER_BLACKHOLE or CAF_BLACKHOLE gets turned into a + // BLACKHOLE here. #ifdef THREADED_RTS - cur_bh_info = (const StgInfoTable *) - cas((StgVolatilePtr)&bh->header.info, - (StgWord)bh_info, - (StgWord)&stg_BLACKHOLE_info); - - if (cur_bh_info != bh_info) { - bh_info = cur_bh_info; - goto retry; - } -#else - SET_INFO(bh,&stg_BLACKHOLE_info); + // first we turn it into a WHITEHOLE to claim it, and if + // successful we write our TSO and then the BLACKHOLE info pointer. + cur_bh_info = (const StgInfoTable *) + cas((StgVolatilePtr)&bh->header.info, + (StgWord)bh_info, + (StgWord)&stg_WHITEHOLE_info); + + if (cur_bh_info != bh_info) { + bh_info = cur_bh_info; + goto retry; + } #endif - // We pretend that bh has just been created. - LDV_RECORD_CREATE(bh); - } + // The payload of the BLACKHOLE points to the TSO + ((StgInd *)bh)->indirectee = (StgClosure *)tso; + write_barrier(); + SET_INFO(bh,&stg_BLACKHOLE_info); + + // .. and we need a write barrier, since we just mutated the closure: + recordClosureMutated(cap,bh); + + // We pretend that bh has just been created. + LDV_RECORD_CREATE(bh); frame = (StgClosure *) ((StgUpdateFrame *)frame + 1); if (prev_was_update_frame) { diff --git a/rts/Threads.c b/rts/Threads.c index f824d021d4..0c3e591665 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -9,11 +9,16 @@ #include "PosixSource.h" #include "Rts.h" +#include "Capability.h" +#include "Updates.h" #include "Threads.h" #include "STM.h" #include "Schedule.h" #include "Trace.h" #include "ThreadLabels.h" +#include "Updates.h" +#include "Messages.h" +#include "sm/Storage.h" /* Next thread ID to allocate. * LOCK: sched_mutex @@ -74,7 +79,9 @@ createThread(Capability *cap, nat size) tso->what_next = ThreadRunGHC; tso->why_blocked = NotBlocked; + tso->block_info.closure = (StgClosure *)END_TSO_QUEUE; tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; + tso->bq = (StgBlockingQueue *)END_TSO_QUEUE; tso->flags = 0; tso->dirty = 1; @@ -146,7 +153,7 @@ rts_getThreadId(StgPtr tso) Fails fatally if the TSO is not on the queue. -------------------------------------------------------------------------- */ -void +rtsBool // returns True if we modified queue removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso) { StgTSO *t, *prev; @@ -156,28 +163,32 @@ removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso) if (t == tso) { if (prev) { setTSOLink(cap,prev,t->_link); + return rtsFalse; } else { *queue = t->_link; + return rtsTrue; } - return; } } barf("removeThreadFromQueue: not found"); } -void +rtsBool // returns True if we modified head or tail removeThreadFromDeQueue (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso) { StgTSO *t, *prev; + rtsBool flag = rtsFalse; prev = NULL; for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) { if (t == tso) { if (prev) { setTSOLink(cap,prev,t->_link); + flag = rtsFalse; } else { *head = t->_link; + flag = rtsTrue; } if (*tail == tso) { if (prev) { @@ -185,8 +196,10 @@ removeThreadFromDeQueue (Capability *cap, } else { *tail = END_TSO_QUEUE; } - } - return; + return rtsTrue; + } else { + return flag; + } } } barf("removeThreadFromMVarQueue: not found"); @@ -195,7 +208,10 @@ removeThreadFromDeQueue (Capability *cap, void removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso) { + // caller must do the write barrier, because replacing the info + // pointer will unlock the MVar. removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso); + tso->_link = END_TSO_QUEUE; } /* ---------------------------------------------------------------------------- @@ -263,6 +279,38 @@ unblockOne_ (Capability *cap, StgTSO *tso, return next; } +void +tryWakeupThread (Capability *cap, StgTSO *tso) +{ +#ifdef THREADED_RTS + if (tso->cap != cap) + { + MessageWakeup *msg; + msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup)); + SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM); + msg->tso = tso; + sendMessage(cap, tso->cap, (Message*)msg); + return; + } +#endif + + switch (tso->why_blocked) + { + case BlockedOnBlackHole: + case BlockedOnSTM: + { + // just run the thread now, if the BH is not really available, + // we'll block again. + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); + break; + } + default: + // otherwise, do nothing + break; + } +} + /* ---------------------------------------------------------------------------- awakenBlockedQueue @@ -270,13 +318,160 @@ unblockOne_ (Capability *cap, StgTSO *tso, ------------------------------------------------------------------------- */ void -awakenBlockedQueue(Capability *cap, StgTSO *tso) +wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq) { - while (tso != END_TSO_QUEUE) { - tso = unblockOne(cap,tso); + MessageBlackHole *msg; + const StgInfoTable *i; + + ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info || + bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info ); + + for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; + msg = msg->link) { + i = msg->header.info; + if (i != &stg_IND_info) { + ASSERT(i == &stg_MSG_BLACKHOLE_info); + tryWakeupThread(cap,msg->tso); + } + } + + // overwrite the BQ with an indirection so it will be + // collected at the next GC. +#if defined(DEBUG) && !defined(THREADED_RTS) + // XXX FILL_SLOP, but not if THREADED_RTS because in that case + // another thread might be looking at this BLOCKING_QUEUE and + // checking the owner field at the same time. + bq->bh = 0; bq->queue = 0; bq->owner = 0; +#endif + OVERWRITE_INFO(bq, &stg_IND_info); +} + +// If we update a closure that we know we BLACKHOLE'd, and the closure +// no longer points to the current TSO as its owner, then there may be +// an orphaned BLOCKING_QUEUE closure with blocked threads attached to +// it. We therefore traverse the BLOCKING_QUEUEs attached to the +// current TSO to see if any can now be woken up. +void +checkBlockingQueues (Capability *cap, StgTSO *tso) +{ + StgBlockingQueue *bq, *next; + StgClosure *p; + + debugTraceCap(DEBUG_sched, cap, + "collision occurred; checking blocking queues for thread %ld", + (lnat)tso->id); + + for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) { + next = bq->link; + + if (bq->header.info == &stg_IND_info) { + // ToDo: could short it out right here, to avoid + // traversing this IND multiple times. + continue; + } + + p = bq->bh; + + if (p->header.info != &stg_BLACKHOLE_info || + ((StgInd *)p)->indirectee != (StgClosure*)bq) + { + wakeBlockingQueue(cap,bq); + } } } +/* ---------------------------------------------------------------------------- + updateThunk + + Update a thunk with a value. In order to do this, we need to know + which TSO owns (or is evaluating) the thunk, in case we need to + awaken any threads that are blocked on it. + ------------------------------------------------------------------------- */ + +void +updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val) +{ + StgClosure *v; + StgTSO *owner; + const StgInfoTable *i; + + i = thunk->header.info; + if (i != &stg_BLACKHOLE_info && + i != &stg_CAF_BLACKHOLE_info && + i != &stg_WHITEHOLE_info) { + updateWithIndirection(cap, thunk, val); + return; + } + + v = ((StgInd*)thunk)->indirectee; + + updateWithIndirection(cap, thunk, val); + + i = v->header.info; + if (i == &stg_TSO_info) { + owner = deRefTSO((StgTSO*)v); + if (owner != tso) { + checkBlockingQueues(cap, tso); + } + return; + } + + if (i != &stg_BLOCKING_QUEUE_CLEAN_info && + i != &stg_BLOCKING_QUEUE_DIRTY_info) { + checkBlockingQueues(cap, tso); + return; + } + + owner = deRefTSO(((StgBlockingQueue*)v)->owner); + + if (owner != tso) { + checkBlockingQueues(cap, tso); + } else { + wakeBlockingQueue(cap, (StgBlockingQueue*)v); + } +} + +/* ---------------------------------------------------------------------------- + * Wake up a thread on a Capability. + * + * This is used when the current Task is running on a Capability and + * wishes to wake up a thread on a different Capability. + * ------------------------------------------------------------------------- */ + +#ifdef THREADED_RTS + +void +wakeupThreadOnCapability (Capability *cap, + Capability *other_cap, + StgTSO *tso) +{ + MessageWakeup *msg; + + // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) + if (tso->bound) { + ASSERT(tso->bound->task->cap == tso->cap); + tso->bound->task->cap = other_cap; + } + tso->cap = other_cap; + + ASSERT(tso->why_blocked != BlockedOnMsgWakeup || + tso->block_info.closure->header.info == &stg_IND_info); + + ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info); + + msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup)); + SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM); + msg->tso = tso; + tso->block_info.closure = (StgClosure *)msg; + dirty_TSO(cap, tso); + write_barrier(); + tso->why_blocked = BlockedOnMsgWakeup; + + sendMessage(cap, other_cap, (Message*)msg); +} + +#endif /* THREADED_RTS */ + /* --------------------------------------------------------------------------- * rtsSupportsBoundThreads(): is the RTS built to support bound threads? * used by Control.Concurrent for error checking. @@ -332,7 +527,8 @@ printThreadBlockage(StgTSO *tso) debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); break; case BlockedOnBlackHole: - debugBelch("is blocked on a black hole"); + debugBelch("is blocked on a black hole %p", + ((StgBlockingQueue*)tso->block_info.bh->bh)); break; case BlockedOnMsgWakeup: debugBelch("is blocked on a wakeup message"); diff --git a/rts/Threads.h b/rts/Threads.h index dfe879e7bb..000cf1b8e2 100644 --- a/rts/Threads.h +++ b/rts/Threads.h @@ -16,11 +16,26 @@ BEGIN_RTS_PRIVATE StgTSO * unblockOne (Capability *cap, StgTSO *tso); StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate); -void awakenBlockedQueue (Capability *cap, StgTSO *tso); +void checkBlockingQueues (Capability *cap, StgTSO *tso); +void wakeBlockingQueue (Capability *cap, StgBlockingQueue *bq); +void tryWakeupThread (Capability *cap, StgTSO *tso); + +// Wakes up a thread on a Capability (probably a different Capability +// from the one held by the current Task). +// +#ifdef THREADED_RTS +void wakeupThreadOnCapability (Capability *cap, + Capability *other_cap, + StgTSO *tso); +#endif + +void updateThunk (Capability *cap, StgTSO *tso, + StgClosure *thunk, StgClosure *val); void removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso); -void removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso); -void removeThreadFromDeQueue (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso); + +rtsBool removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso); +rtsBool removeThreadFromDeQueue (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso); StgBool isThreadBound (StgTSO* tso); diff --git a/rts/Timer.c b/rts/Timer.c index a5d42fbc9d..dddc75414d 100644 --- a/rts/Timer.c +++ b/rts/Timer.c @@ -76,8 +76,6 @@ handle_tick(int unused STG_UNUSED) ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTime / RtsFlags.MiscFlags.tickInterval; recent_activity = ACTIVITY_INACTIVE; - blackholes_need_checking = rtsTrue; - /* hack: re-use the blackholes_need_checking flag */ wakeUpRts(); } } diff --git a/rts/Updates.cmm b/rts/Updates.cmm index e0fd7c30d5..7af59657c1 100644 --- a/rts/Updates.cmm +++ b/rts/Updates.cmm @@ -16,10 +16,11 @@ #include "Updates.h" -/* on entry to the update code - (1) R1 points to the closure being returned - (2) Sp points to the update frame -*/ +#if defined(PROFILING) +#define UPD_FRAME_PARAMS W_ unused1, W_ unused2, P_ unused3 +#else +#define UPD_FRAME_PARAMS P_ unused1 +#endif /* The update fragment has been tuned so as to generate good code with gcc, which accounts for some of the strangeness in the @@ -30,38 +31,69 @@ code), since we don't mind duplicating this jump. */ -#define UPD_FRAME_ENTRY_TEMPLATE \ - { \ - W_ updatee; \ - \ - updatee = StgUpdateFrame_updatee(Sp); \ - \ - /* remove the update frame from the stack */ \ - Sp = Sp + SIZEOF_StgUpdateFrame; \ - \ - /* ToDo: it might be a PAP, so we should check... */ \ - TICK_UPD_CON_IN_NEW(sizeW_fromITBL(%GET_STD_INFO(updatee))); \ - \ - updateWithIndirection(stg_IND_direct_info, \ - updatee, \ - R1, \ - jump %ENTRY_CODE(Sp(0))); \ - } - -#if defined(PROFILING) -#define UPD_FRAME_PARAMS W_ unused1, W_ unused2, P_ unused3 -#else -#define UPD_FRAME_PARAMS P_ unused1 -#endif - -/* this bitmap indicates that the first word of an update frame is a - * non-pointer - this is the update frame link. (for profiling, - * there's a cost-centre-stack in there too). - */ +/* on entry to the update code + (1) R1 points to the closure being returned + (2) Sp points to the update frame +*/ INFO_TABLE_RET( stg_upd_frame, UPDATE_FRAME, UPD_FRAME_PARAMS) -UPD_FRAME_ENTRY_TEMPLATE +{ + W_ updatee; + + updatee = StgUpdateFrame_updatee(Sp); + + /* remove the update frame from the stack */ + Sp = Sp + SIZEOF_StgUpdateFrame; + + /* ToDo: it might be a PAP, so we should check... */ + TICK_UPD_CON_IN_NEW(sizeW_fromITBL(%GET_STD_INFO(updatee))); + + updateWithIndirection(updatee, + R1, + jump %ENTRY_CODE(Sp(0))); +} INFO_TABLE_RET( stg_marked_upd_frame, UPDATE_FRAME, UPD_FRAME_PARAMS) -UPD_FRAME_ENTRY_TEMPLATE +{ + W_ updatee, v, i, tso, link; + + // we know the closure is a BLACKHOLE + updatee = StgUpdateFrame_updatee(Sp); + v = StgInd_indirectee(updatee); + + // remove the update frame from the stack + Sp = Sp + SIZEOF_StgUpdateFrame; + + if (GETTAG(v) != 0) { + // updated by someone else: discard our value and use the + // other one to increase sharing, but check the blocking + // queues to see if any threads were waiting on this BLACKHOLE. + R1 = v; + foreign "C" checkBlockingQueues(MyCapability() "ptr", + CurrentTSO "ptr") [R1]; + jump %ENTRY_CODE(Sp(0)); + } + + // common case: it is still our BLACKHOLE + if (v == CurrentTSO) { + updateWithIndirection(updatee, + R1, + jump %ENTRY_CODE(Sp(0))); + } + + // The other cases are all handled by the generic code + foreign "C" updateThunk (MyCapability() "ptr", CurrentTSO "ptr", + updatee "ptr", R1 "ptr") [R1]; + + jump %ENTRY_CODE(Sp(0)); +} + +// Special update frame code for CAFs and eager-blackholed thunks: it +// knows how to update blackholes, but is distinct from +// stg_marked_upd_frame so that lazy blackholing won't treat it as the +// high watermark. +INFO_TABLE_RET (stg_bh_upd_frame, UPDATE_FRAME, UPD_FRAME_PARAMS) +{ + jump stg_marked_upd_frame_info; +} diff --git a/rts/Updates.h b/rts/Updates.h index 2b3c35df0c..de9276c909 100644 --- a/rts/Updates.h +++ b/rts/Updates.h @@ -48,8 +48,7 @@ BEGIN_RTS_PRIVATE W_ sz; \ W_ i; \ inf = %GET_STD_INFO(p); \ - if (%INFO_TYPE(inf) != HALF_W_(BLACKHOLE) \ - && %INFO_TYPE(inf) != HALF_W_(CAF_BLACKHOLE)) { \ + if (%INFO_TYPE(inf) != HALF_W_(BLACKHOLE)) { \ if (%INFO_TYPE(inf) == HALF_W_(THUNK_SELECTOR)) { \ sz = BYTES_TO_WDS(SIZEOF_StgSelector_NoThunkHdr); \ } else { \ @@ -82,7 +81,6 @@ FILL_SLOP(StgClosure *p) switch (inf->type) { case BLACKHOLE: - case CAF_BLACKHOLE: goto no_slop; // we already filled in the slop when we overwrote the thunk // with BLACKHOLE, and also an evacuated BLACKHOLE is only the @@ -127,23 +125,21 @@ no_slop: */ #ifdef CMINUSMINUS -#define updateWithIndirection(ind_info, p1, p2, and_then) \ +#define updateWithIndirection(p1, p2, and_then) \ W_ bd; \ \ DEBUG_FILL_SLOP(p1); \ LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC(p1); \ StgInd_indirectee(p1) = p2; \ prim %write_barrier() []; \ + SET_INFO(p1, stg_BLACKHOLE_info); \ + LDV_RECORD_CREATE(p1); \ bd = Bdescr(p1); \ if (bdescr_gen_no(bd) != 0 :: bits16) { \ recordMutableCap(p1, TO_W_(bdescr_gen_no(bd)), R1); \ - SET_INFO(p1, stg_IND_OLDGEN_info); \ - LDV_RECORD_CREATE(p1); \ TICK_UPD_OLD_IND(); \ and_then; \ } else { \ - SET_INFO(p1, ind_info); \ - LDV_RECORD_CREATE(p1); \ TICK_UPD_NEW_IND(); \ and_then; \ } @@ -151,7 +147,6 @@ no_slop: #else /* !CMINUSMINUS */ INLINE_HEADER void updateWithIndirection (Capability *cap, - const StgInfoTable *ind_info, StgClosure *p1, StgClosure *p2) { @@ -164,25 +159,19 @@ INLINE_HEADER void updateWithIndirection (Capability *cap, LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC(p1); ((StgInd *)p1)->indirectee = p2; write_barrier(); + SET_INFO(p1, &stg_BLACKHOLE_info); + LDV_RECORD_CREATE(p1); bd = Bdescr((StgPtr)p1); if (bd->gen_no != 0) { recordMutableCap(p1, cap, bd->gen_no); - SET_INFO(p1, &stg_IND_OLDGEN_info); TICK_UPD_OLD_IND(); } else { - SET_INFO(p1, ind_info); - LDV_RECORD_CREATE(p1); TICK_UPD_NEW_IND(); } } #endif /* CMINUSMINUS */ -#define UPD_IND(cap, updclosure, heapptr) \ - updateWithIndirection(cap, &stg_IND_info, \ - updclosure, \ - heapptr) - #ifndef CMINUSMINUS END_RTS_PRIVATE #endif diff --git a/rts/posix/OSMem.c b/rts/posix/OSMem.c index 79c7fbfffd..608345b309 100644 --- a/rts/posix/OSMem.c +++ b/rts/posix/OSMem.c @@ -131,8 +131,9 @@ my_mmap (void *addr, lnat size) (errno == EINVAL && sizeof(void*)==4 && size >= 0xc0000000)) { // If we request more than 3Gig, then we get EINVAL // instead of ENOMEM (at least on Linux). - errorBelch("out of memory (requested %lu bytes)", size); - stg_exit(EXIT_FAILURE); + barf("out of memory (requested %lu bytes)", size); +// abort(); +// stg_exit(EXIT_FAILURE); } else { barf("getMBlock: mmap: %s", strerror(errno)); } diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c index 39284f9112..6de42efae4 100644 --- a/rts/sm/Compact.c +++ b/rts/sm/Compact.c @@ -628,8 +628,8 @@ thread_obj (StgInfoTable *info, StgPtr p) case IND_PERM: case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: - case CAF_BLACKHOLE: case BLACKHOLE: + case BLOCKING_QUEUE: { StgPtr end; @@ -967,9 +967,6 @@ compact(StgClosure *static_objects) // any threads resurrected during this GC thread((void *)&resurrected_threads); - // the blackhole queue - thread((void *)&blackhole_queue); - // the task list { Task *task; diff --git a/rts/sm/Evac.c b/rts/sm/Evac.c index d5c9b8a4a5..37cbee59e0 100644 --- a/rts/sm/Evac.c +++ b/rts/sm/Evac.c @@ -139,7 +139,6 @@ copy_tag_nolock(StgClosure **p, const StgInfoTable *info, nat i; to = alloc_for_copy(size,gen); - *p = TAG_CLOSURE(tag,(StgClosure*)to); from = (StgPtr)src; to[0] = (W_)info; @@ -150,6 +149,7 @@ copy_tag_nolock(StgClosure **p, const StgInfoTable *info, // if somebody else reads the forwarding pointer, we better make // sure there's a closure at the end of it. write_barrier(); + *p = TAG_CLOSURE(tag,(StgClosure*)to); src->header.info = (const StgInfoTable *)MK_FORWARDING_PTR(to); // if (to+size+2 < bd->start + BLOCK_SIZE_W) { @@ -166,7 +166,7 @@ copy_tag_nolock(StgClosure **p, const StgInfoTable *info, /* Special version of copy() for when we only want to copy the info * pointer of an object, but reserve some padding after it. This is - * used to optimise evacuation of BLACKHOLEs. + * used to optimise evacuation of TSOs. */ static rtsBool copyPart(StgClosure **p, StgClosure *src, nat size_to_reserve, @@ -195,7 +195,6 @@ spin: #endif to = alloc_for_copy(size_to_reserve, gen); - *p = (StgClosure *)to; from = (StgPtr)src; to[0] = info; @@ -203,10 +202,9 @@ spin: to[i] = from[i]; } -#if defined(PARALLEL_GC) write_barrier(); -#endif src->header.info = (const StgInfoTable*)MK_FORWARDING_PTR(to); + *p = (StgClosure *)to; #ifdef PROFILING // We store the size of the just evacuated object in the LDV word so that @@ -366,7 +364,7 @@ loop: tag = GET_CLOSURE_TAG(q); q = UNTAG_CLOSURE(q); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(q)); + ASSERTM(LOOKS_LIKE_CLOSURE_PTR(q), "invalid closure, info=%p", q->header.info); if (!HEAP_ALLOCED_GC(q)) { @@ -629,21 +627,42 @@ loop: copy_tag_nolock(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag); return; + case BLACKHOLE: + { + StgClosure *r; + const StgInfoTable *i; + r = ((StgInd*)q)->indirectee; + if (GET_CLOSURE_TAG(r) == 0) { + i = r->header.info; + if (IS_FORWARDING_PTR(i)) { + r = (StgClosure *)UN_FORWARDING_PTR(i); + i = r->header.info; + } + if (i == &stg_TSO_info + || i == &stg_WHITEHOLE_info + || i == &stg_BLOCKING_QUEUE_CLEAN_info + || i == &stg_BLOCKING_QUEUE_DIRTY_info) { + copy(p,info,q,sizeofW(StgInd),gen); + return; + } + ASSERT(i != &stg_IND_info); + } + q = r; + *p = r; + goto loop; + } + + case BLOCKING_QUEUE: case WEAK: case PRIM: case MUT_PRIM: - copy_tag(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag); + copy(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen); return; case BCO: copy(p,info,q,bco_sizeW((StgBCO *)q),gen); return; - case CAF_BLACKHOLE: - case BLACKHOLE: - copyPart(p,q,BLACKHOLE_sizeW(),sizeofW(StgHeader),gen); - return; - case THUNK_SELECTOR: eval_thunk_selector(p, (StgSelector *)q, rtsTrue); return; @@ -756,11 +775,7 @@ unchain_thunk_selectors(StgSelector *p, StgClosure *val) prev = NULL; while (p) { -#ifdef THREADED_RTS ASSERT(p->header.info == &stg_WHITEHOLE_info); -#else - ASSERT(p->header.info == &stg_BLACKHOLE_info); -#endif // val must be in to-space. Not always: when we recursively // invoke eval_thunk_selector(), the recursive calls will not // evacuate the value (because we want to select on the value, @@ -783,7 +798,13 @@ unchain_thunk_selectors(StgSelector *p, StgClosure *val) // indirection pointing to itself, and we want the program // to deadlock if it ever enters this closure, so // BLACKHOLE is correct. - SET_INFO(p, &stg_BLACKHOLE_info); + + // XXX we do not have BLACKHOLEs any more; replace with + // a THUNK_SELECTOR again. This will go into a loop if it is + // entered, and should result in a NonTermination exception. + ((StgThunk *)p)->payload[0] = val; + write_barrier(); + SET_INFO(p, &stg_sel_0_upd_info); } else { ((StgInd *)p)->indirectee = val; write_barrier(); @@ -813,7 +834,7 @@ eval_thunk_selector (StgClosure **q, StgSelector * p, rtsBool evac) prev_thunk_selector = NULL; // this is a chain of THUNK_SELECTORs that we are going to update // to point to the value of the current THUNK_SELECTOR. Each - // closure on the chain is a BLACKHOLE, and points to the next in the + // closure on the chain is a WHITEHOLE, and points to the next in the // chain with payload[0]. selector_chain: @@ -851,7 +872,7 @@ selector_chain: } - // BLACKHOLE the selector thunk, since it is now under evaluation. + // WHITEHOLE the selector thunk, since it is now under evaluation. // This is important to stop us going into an infinite loop if // this selector thunk eventually refers to itself. #if defined(THREADED_RTS) @@ -884,7 +905,7 @@ selector_chain: #else // Save the real info pointer (NOTE: not the same as get_itbl()). info_ptr = (StgWord)p->header.info; - SET_INFO(p,&stg_BLACKHOLE_info); + SET_INFO(p,&stg_WHITEHOLE_info); #endif field = INFO_PTR_TO_STRUCT(info_ptr)->layout.selector_offset; @@ -936,11 +957,7 @@ selector_loop: // the original selector thunk, p. SET_INFO(p, (StgInfoTable *)info_ptr); LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)p); -#if defined(THREADED_RTS) SET_INFO(p, &stg_WHITEHOLE_info); -#else - SET_INFO(p, &stg_BLACKHOLE_info); -#endif #endif // the closure in val is now the "value" of the @@ -998,6 +1015,33 @@ selector_loop: selectee = UNTAG_CLOSURE( ((StgInd *)selectee)->indirectee ); goto selector_loop; + case BLACKHOLE: + { + StgClosure *r; + const StgInfoTable *i; + r = ((StgInd*)selectee)->indirectee; + + // establish whether this BH has been updated, and is now an + // indirection, as in evacuate(). + if (GET_CLOSURE_TAG(r) == 0) { + i = r->header.info; + if (IS_FORWARDING_PTR(i)) { + r = (StgClosure *)UN_FORWARDING_PTR(i); + i = r->header.info; + } + if (i == &stg_TSO_info + || i == &stg_WHITEHOLE_info + || i == &stg_BLOCKING_QUEUE_CLEAN_info + || i == &stg_BLOCKING_QUEUE_DIRTY_info) { + goto bale_out; + } + ASSERT(i != &stg_IND_info); + } + + selectee = UNTAG_CLOSURE( ((StgInd *)selectee)->indirectee ); + goto selector_loop; + } + case THUNK_SELECTOR: { StgClosure *val; @@ -1032,8 +1076,6 @@ selector_loop: case THUNK_1_1: case THUNK_0_2: case THUNK_STATIC: - case CAF_BLACKHOLE: - case BLACKHOLE: // not evaluated yet goto bale_out; diff --git a/rts/sm/GC.c b/rts/sm/GC.c index ae6fc998da..4d63724ba0 100644 --- a/rts/sm/GC.c +++ b/rts/sm/GC.c @@ -395,13 +395,6 @@ SET_GCT(gc_threads[0]); // The other threads are now stopped. We might recurse back to // here, but from now on this is the only thread. - // if any blackholes are alive, make the threads that wait on - // them alive too. - if (traverseBlackholeQueue()) { - inc_running(); - continue; - } - // must be last... invariant is that everything is fully // scavenged at this point. if (traverseWeakPtrList()) { // returns rtsTrue if evaced something diff --git a/rts/sm/MarkWeak.c b/rts/sm/MarkWeak.c index 0ac807ff79..e65c176c0a 100644 --- a/rts/sm/MarkWeak.c +++ b/rts/sm/MarkWeak.c @@ -210,21 +210,6 @@ traverseWeakPtrList(void) } } - /* Finally, we can update the blackhole_queue. This queue - * simply strings together TSOs blocked on black holes, it is - * not intended to keep anything alive. Hence, we do not follow - * pointers on the blackhole_queue until now, when we have - * determined which TSOs are otherwise reachable. We know at - * this point that all TSOs have been evacuated, however. - */ - { - StgTSO **pt; - for (pt = &blackhole_queue; *pt != END_TSO_QUEUE; pt = &((*pt)->_link)) { - *pt = (StgTSO *)isAlive((StgClosure *)*pt); - ASSERT(*pt != NULL); - } - } - weak_stage = WeakDone; // *now* we're done, return rtsTrue; // but one more round of scavenging, please } @@ -310,49 +295,6 @@ static rtsBool tidyThreadList (generation *gen) } /* ----------------------------------------------------------------------------- - The blackhole queue - - Threads on this list behave like weak pointers during the normal - phase of garbage collection: if the blackhole is reachable, then - the thread is reachable too. - -------------------------------------------------------------------------- */ -rtsBool -traverseBlackholeQueue (void) -{ - StgTSO *prev, *t, *tmp; - rtsBool flag; - nat type; - - flag = rtsFalse; - prev = NULL; - - for (t = blackhole_queue; t != END_TSO_QUEUE; prev=t, t = t->_link) { - // if the thread is not yet alive... - if (! (tmp = (StgTSO *)isAlive((StgClosure*)t))) { - // if the closure it is blocked on is either (a) a - // reachable BLAKCHOLE or (b) not a BLACKHOLE, then we - // make the thread alive. - if (!isAlive(t->block_info.closure)) { - type = get_itbl(t->block_info.closure)->type; - if (type == BLACKHOLE || type == CAF_BLACKHOLE) { - continue; - } - } - evacuate((StgClosure **)&t); - if (prev) { - prev->_link = t; - } else { - blackhole_queue = t; - } - // no write barrier when on the blackhole queue, - // because we traverse the whole queue on every GC. - flag = rtsTrue; - } - } - return flag; -} - -/* ----------------------------------------------------------------------------- Evacuate every weak pointer object on the weak_ptr_list, and update the link fields. diff --git a/rts/sm/MarkWeak.h b/rts/sm/MarkWeak.h index 018dd6cd79..5c05ab2499 100644 --- a/rts/sm/MarkWeak.h +++ b/rts/sm/MarkWeak.h @@ -23,7 +23,6 @@ extern StgTSO *exception_threads; void initWeakForGC ( void ); rtsBool traverseWeakPtrList ( void ); void markWeakPtrList ( void ); -rtsBool traverseBlackholeQueue ( void ); END_RTS_PRIVATE diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c index 11d5424431..14230779d7 100644 --- a/rts/sm/Sanity.c +++ b/rts/sm/Sanity.c @@ -306,7 +306,6 @@ checkClosure( StgClosure* p ) case IND_OLDGEN: case IND_OLDGEN_PERM: case BLACKHOLE: - case CAF_BLACKHOLE: case PRIM: case MUT_PRIM: case MUT_VAR_CLEAN: @@ -323,6 +322,23 @@ checkClosure( StgClosure* p ) return sizeW_fromITBL(info); } + case BLOCKING_QUEUE: + { + StgBlockingQueue *bq = (StgBlockingQueue *)p; + + // NO: the BH might have been updated now + // ASSERT(get_itbl(bq->bh)->type == BLACKHOLE); + ASSERT(LOOKS_LIKE_CLOSURE_PTR(bq->bh)); + + ASSERT(get_itbl(bq->owner)->type == TSO); + ASSERT(bq->queue == END_TSO_QUEUE || get_itbl(bq->queue)->type == TSO); + ASSERT(bq->link == (StgBlockingQueue*)END_TSO_QUEUE || + get_itbl(bq->link)->type == IND || + get_itbl(bq->link)->type == BLOCKING_QUEUE); + + return sizeofW(StgBlockingQueue); + } + case BCO: { StgBCO *bco = (StgBCO *)p; ASSERT(LOOKS_LIKE_CLOSURE_PTR(bco->instrs)); @@ -516,6 +532,11 @@ checkTSO(StgTSO *tso) return; } + ASSERT(tso->_link == END_TSO_QUEUE || get_itbl(tso->_link)->type == TSO); + ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->block_info.closure)); + ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->bq)); + ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->blocked_exceptions)); + ASSERT(stack <= sp && sp < stack_end); checkStackChunk(sp, stack_end); @@ -539,9 +560,7 @@ checkGlobalTSOList (rtsBool checkTSOs) if (checkTSOs) checkTSO(tso); - while (tso->what_next == ThreadRelocated) { - tso = tso->_link; - } + tso = deRefTSO(tso); // If this TSO is dirty and in an old generation, it better // be on the mutable list. diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c index 1b671a097c..75c186c972 100644 --- a/rts/sm/Scav.c +++ b/rts/sm/Scav.c @@ -46,17 +46,6 @@ static void scavenge_large_bitmap (StgPtr p, Scavenge a TSO. -------------------------------------------------------------------------- */ -STATIC_INLINE void -scavenge_TSO_link (StgTSO *tso) -{ - // We don't always chase the link field: TSOs on the blackhole - // queue are not automatically alive, so the link field is a - // "weak" pointer in that case. - if (tso->why_blocked != BlockedOnBlackHole) { - evacuate((StgClosure **)&tso->_link); - } -} - static void scavengeTSO (StgTSO *tso) { @@ -87,7 +76,18 @@ scavengeTSO (StgTSO *tso) ) { evacuate(&tso->block_info.closure); } +#ifdef THREADED_RTS + // in the THREADED_RTS, block_info.closure must always point to a + // valid closure, because we assume this in throwTo(). In the + // non-threaded RTS it might be a FD (for + // BlockedOnRead/BlockedOnWrite) or a time value (BlockedOnDelay) + else { + tso->block_info.closure = (StgClosure *)END_TSO_QUEUE; + } +#endif + evacuate((StgClosure **)&tso->blocked_exceptions); + evacuate((StgClosure **)&tso->bq); // scavange current transaction record evacuate((StgClosure **)&tso->trec); @@ -97,10 +97,10 @@ scavengeTSO (StgTSO *tso) if (gct->failed_to_evac) { tso->dirty = 1; - scavenge_TSO_link(tso); + evacuate((StgClosure **)&tso->_link); } else { tso->dirty = 0; - scavenge_TSO_link(tso); + evacuate((StgClosure **)&tso->_link); if (gct->failed_to_evac) { tso->flags |= TSO_LINK_DIRTY; } else { @@ -570,6 +570,7 @@ scavenge_block (bdescr *bd) } // fall through case IND_OLDGEN_PERM: + case BLACKHOLE: evacuate(&((StgInd *)p)->indirectee); p += sizeofW(StgInd); break; @@ -588,10 +589,25 @@ scavenge_block (bdescr *bd) p += sizeofW(StgMutVar); break; - case CAF_BLACKHOLE: - case BLACKHOLE: - p += BLACKHOLE_sizeW(); - break; + case BLOCKING_QUEUE: + { + StgBlockingQueue *bq = (StgBlockingQueue *)p; + + gct->eager_promotion = rtsFalse; + evacuate(&bq->bh); + evacuate((StgClosure**)&bq->owner); + evacuate((StgClosure**)&bq->queue); + evacuate((StgClosure**)&bq->link); + gct->eager_promotion = saved_eager_promotion; + + if (gct->failed_to_evac) { + bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info; + } else { + bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info; + } + p += sizeofW(StgBlockingQueue); + break; + } case THUNK_SELECTOR: { @@ -884,6 +900,7 @@ scavenge_mark_stack(void) case IND_OLDGEN: case IND_OLDGEN_PERM: + case BLACKHOLE: evacuate(&((StgInd *)p)->indirectee); break; @@ -901,8 +918,25 @@ scavenge_mark_stack(void) break; } - case CAF_BLACKHOLE: - case BLACKHOLE: + case BLOCKING_QUEUE: + { + StgBlockingQueue *bq = (StgBlockingQueue *)p; + + gct->eager_promotion = rtsFalse; + evacuate(&bq->bh); + evacuate((StgClosure**)&bq->owner); + evacuate((StgClosure**)&bq->queue); + evacuate((StgClosure**)&bq->link); + gct->eager_promotion = saved_eager_promotion; + + if (gct->failed_to_evac) { + bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info; + } else { + bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info; + } + break; + } + case ARR_WORDS: break; @@ -1122,10 +1156,25 @@ scavenge_one(StgPtr p) break; } - case CAF_BLACKHOLE: - case BLACKHOLE: - break; - + case BLOCKING_QUEUE: + { + StgBlockingQueue *bq = (StgBlockingQueue *)p; + + gct->eager_promotion = rtsFalse; + evacuate(&bq->bh); + evacuate((StgClosure**)&bq->owner); + evacuate((StgClosure**)&bq->queue); + evacuate((StgClosure**)&bq->link); + gct->eager_promotion = saved_eager_promotion; + + if (gct->failed_to_evac) { + bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info; + } else { + bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info; + } + break; + } + case THUNK_SELECTOR: { StgSelector *s = (StgSelector *)p; @@ -1239,6 +1288,7 @@ scavenge_one(StgPtr p) // on the large-object list and then gets updated. See #3424. case IND_OLDGEN: case IND_OLDGEN_PERM: + case BLACKHOLE: case IND_STATIC: evacuate(&((StgInd *)p)->indirectee); @@ -1300,7 +1350,7 @@ scavenge_mutable_list(bdescr *bd, generation *gen) #ifdef DEBUG switch (get_itbl((StgClosure *)p)->type) { case MUT_VAR_CLEAN: - barf("MUT_VAR_CLEAN on mutable list"); + // can happen due to concurrent writeMutVars case MUT_VAR_DIRTY: mutlist_MUTVARS++; break; case MUT_ARR_PTRS_CLEAN: @@ -1356,7 +1406,7 @@ scavenge_mutable_list(bdescr *bd, generation *gen) // this assertion would be invalid: // ASSERT(tso->flags & TSO_LINK_DIRTY); - scavenge_TSO_link(tso); + evacuate((StgClosure **)&tso->_link); if (gct->failed_to_evac) { recordMutableGen_GC((StgClosure *)p,gen->no); gct->failed_to_evac = rtsFalse; @@ -1576,10 +1626,12 @@ scavenge_stack(StgPtr p, StgPtr stack_end) // before GC, but that seems like overkill. // // Scavenging this update frame as normal would be disastrous; - // the updatee would end up pointing to the value. So we turn - // the indirection into an IND_PERM, so that evacuate will - // copy the indirection into the old generation instead of - // discarding it. + // the updatee would end up pointing to the value. So we + // check whether the value after evacuation is a BLACKHOLE, + // and if not, we change the update frame to an stg_enter + // frame that simply returns the value. Hence, blackholing is + // compulsory (otherwise we would have to check for thunks + // too). // // Note [upd-black-hole] // One slight hiccup is that the THUNK_SELECTOR machinery can @@ -1590,22 +1642,17 @@ scavenge_stack(StgPtr p, StgPtr stack_end) // the updatee is never a THUNK_SELECTOR and we're ok. // NB. this is a new invariant: blackholing is not optional. { - nat type; - const StgInfoTable *i; - StgClosure *updatee; - - updatee = ((StgUpdateFrame *)p)->updatee; - i = updatee->header.info; - if (!IS_FORWARDING_PTR(i)) { - type = get_itbl(updatee)->type; - if (type == IND) { - updatee->header.info = &stg_IND_PERM_info; - } else if (type == IND_OLDGEN) { - updatee->header.info = &stg_IND_OLDGEN_PERM_info; - } + StgUpdateFrame *frame = (StgUpdateFrame *)p; + StgClosure *v; + + evacuate(&frame->updatee); + v = frame->updatee; + if (GET_CLOSURE_TAG(v) != 0 || + (get_itbl(v)->type != BLACKHOLE)) { + // blackholing is compulsory, see above. + frame->header.info = (const StgInfoTable*)&stg_enter_checkbh_info; } - evacuate(&((StgUpdateFrame *)p)->updatee); - ASSERT(GET_CLOSURE_TAG(((StgUpdateFrame *)p)->updatee) == 0); + ASSERT(v->header.info != &stg_TSO_info); p += sizeofW(StgUpdateFrame); continue; } diff --git a/rts/sm/Storage.c b/rts/sm/Storage.c index 6aedb96a59..02344003b8 100644 --- a/rts/sm/Storage.c +++ b/rts/sm/Storage.c @@ -107,7 +107,7 @@ initStorage( void ) * doing something reasonable. */ /* We use the NOT_NULL variant or gcc warns that the test is always true */ - ASSERT(LOOKS_LIKE_INFO_PTR_NOT_NULL((StgWord)&stg_BLACKHOLE_info)); + ASSERT(LOOKS_LIKE_INFO_PTR_NOT_NULL((StgWord)&stg_BLOCKING_QUEUE_CLEAN_info)); ASSERT(LOOKS_LIKE_CLOSURE_PTR(&stg_dummy_ret_closure)); ASSERT(!HEAP_ALLOCED(&stg_dummy_ret_closure)); @@ -229,13 +229,13 @@ freeStorage (void) The entry code for every CAF does the following: - - builds a CAF_BLACKHOLE in the heap - - pushes an update frame pointing to the CAF_BLACKHOLE + - builds a BLACKHOLE in the heap + - pushes an update frame pointing to the BLACKHOLE - invokes UPD_CAF(), which: - calls newCaf, below - - updates the CAF with a static indirection to the CAF_BLACKHOLE + - updates the CAF with a static indirection to the BLACKHOLE - Why do we build a BLACKHOLE in the heap rather than just updating + Why do we build an BLACKHOLE in the heap rather than just updating the thunk directly? It's so that we only need one kind of update frame - otherwise we'd need a static version of the update frame too. @@ -699,11 +699,9 @@ void dirty_MUT_VAR(StgRegTable *reg, StgClosure *p) { Capability *cap = regTableToCapability(reg); - bdescr *bd; if (p->header.info == &stg_MUT_VAR_CLEAN_info) { p->header.info = &stg_MUT_VAR_DIRTY_info; - bd = Bdescr((StgPtr)p); - if (bd->gen_no > 0) recordMutableCap(p,cap,bd->gen_no); + recordClosureMutated(cap,p); } } @@ -716,11 +714,9 @@ dirty_MUT_VAR(StgRegTable *reg, StgClosure *p) void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target) { - bdescr *bd; if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) { tso->flags |= TSO_LINK_DIRTY; - bd = Bdescr((StgPtr)tso); - if (bd->gen_no > 0) recordMutableCap((StgClosure*)tso,cap,bd->gen_no); + recordClosureMutated(cap,(StgClosure*)tso); } tso->_link = target; } @@ -728,10 +724,8 @@ setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target) void dirty_TSO (Capability *cap, StgTSO *tso) { - bdescr *bd; if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) { - bd = Bdescr((StgPtr)tso); - if (bd->gen_no > 0) recordMutableCap((StgClosure*)tso,cap,bd->gen_no); + recordClosureMutated(cap,(StgClosure*)tso); } tso->dirty = 1; } @@ -747,10 +741,7 @@ dirty_TSO (Capability *cap, StgTSO *tso) void dirty_MVAR(StgRegTable *reg, StgClosure *p) { - Capability *cap = regTableToCapability(reg); - bdescr *bd; - bd = Bdescr((StgPtr)p); - if (bd->gen_no > 0) recordMutableCap(p,cap,bd->gen_no); + recordClosureMutated(regTableToCapability(reg),p); } /* ----------------------------------------------------------------------------- diff --git a/utils/genapply/GenApply.hs b/utils/genapply/GenApply.hs index 765bfb3be6..16d33940fb 100644 --- a/utils/genapply/GenApply.hs +++ b/utils/genapply/GenApply.hs @@ -564,8 +564,8 @@ genApply regstatus args = -- else: text "case AP,", text " AP_STACK,", - text " CAF_BLACKHOLE,", text " BLACKHOLE,", + text " WHITEHOLE,", text " THUNK,", text " THUNK_1_0,", text " THUNK_0_1,", |