summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2010-03-29 14:44:56 +0000
committerSimon Marlow <marlowsd@gmail.com>2010-03-29 14:44:56 +0000
commit5d52d9b64c21dcf77849866584744722f8121389 (patch)
tree25aeafc9b761e73714c24ae414c0b1c41765c99f
parent79957d77c1bff767f1041d3fabdeb94d92a52878 (diff)
downloadhaskell-5d52d9b64c21dcf77849866584744722f8121389.tar.gz
New implementation of BLACKHOLEs
This replaces the global blackhole_queue with a clever scheme that enables us to queue up blocked threads on the closure that they are blocked on, while still avoiding atomic instructions in the common case. Advantages: - gets rid of a locked global data structure and some tricky GC code (replacing it with some per-thread data structures and different tricky GC code :) - wakeups are more prompt: parallel/concurrent performance should benefit. I haven't seen anything dramatic in the parallel benchmarks so far, but a couple of threading benchmarks do improve a bit. - waking up a thread blocked on a blackhole is now O(1) (e.g. if it is the target of throwTo). - less sharing and better separation of Capabilities: communication is done with messages, the data structures are strictly owned by a Capability and cannot be modified except by sending messages. - this change will utlimately enable us to do more intelligent scheduling when threads block on each other. This is what started off the whole thing, but it isn't done yet (#3838). I'll be documenting all this on the wiki in due course.
-rw-r--r--compiler/cmm/CLabel.hs2
-rw-r--r--compiler/codeGen/CgCallConv.hs1
-rw-r--r--compiler/codeGen/CgClosure.lhs26
-rw-r--r--compiler/codeGen/CgMonad.lhs1
-rw-r--r--compiler/codeGen/CgStackery.lhs28
-rw-r--r--includes/Rts.h8
-rw-r--r--includes/mkDerivedConstants.c12
-rw-r--r--includes/rts/storage/ClosureMacros.h8
-rw-r--r--includes/rts/storage/ClosureTypes.h2
-rw-r--r--includes/rts/storage/Closures.h17
-rw-r--r--includes/rts/storage/TSO.h26
-rw-r--r--includes/stg/MiscClosures.h20
-rw-r--r--rts/Capability.c63
-rw-r--r--rts/Capability.h19
-rw-r--r--rts/ClosureFlags.c2
-rw-r--r--rts/FrontPanel.c2
-rw-r--r--rts/HeapStackCheck.cmm36
-rw-r--r--rts/Interpreter.c4
-rw-r--r--rts/LdvProfile.c2
-rw-r--r--rts/Linker.c4
-rw-r--r--rts/Messages.c296
-rw-r--r--rts/Messages.h18
-rw-r--r--rts/PrimOps.cmm32
-rw-r--r--rts/Printer.c16
-rw-r--r--rts/ProfHeap.c2
-rw-r--r--rts/RaiseAsync.c175
-rw-r--r--rts/RetainerProfile.c9
-rw-r--r--rts/STM.c2
-rw-r--r--rts/Schedule.c243
-rw-r--r--rts/Schedule.h9
-rw-r--r--rts/StgMiscClosures.cmm161
-rw-r--r--rts/ThreadPaused.c66
-rw-r--r--rts/Threads.c214
-rw-r--r--rts/Threads.h21
-rw-r--r--rts/Timer.c2
-rw-r--r--rts/Updates.cmm100
-rw-r--r--rts/Updates.h23
-rw-r--r--rts/posix/OSMem.c5
-rw-r--r--rts/sm/Compact.c5
-rw-r--r--rts/sm/Evac.c94
-rw-r--r--rts/sm/GC.c7
-rw-r--r--rts/sm/MarkWeak.c58
-rw-r--r--rts/sm/MarkWeak.h1
-rw-r--r--rts/sm/Sanity.c27
-rw-r--r--rts/sm/Scav.c135
-rw-r--r--rts/sm/Storage.c27
-rw-r--r--utils/genapply/GenApply.hs2
47 files changed, 1208 insertions, 825 deletions
diff --git a/compiler/cmm/CLabel.hs b/compiler/cmm/CLabel.hs
index 3ceb982e76..79544445d7 100644
--- a/compiler/cmm/CLabel.hs
+++ b/compiler/cmm/CLabel.hs
@@ -58,6 +58,7 @@ module CLabel (
mkSplitMarkerLabel,
mkDirty_MUT_VAR_Label,
mkUpdInfoLabel,
+ mkBHUpdInfoLabel,
mkIndStaticInfoLabel,
mkMainCapabilityLabel,
mkMAP_FROZEN_infoLabel,
@@ -400,6 +401,7 @@ mkStaticConEntryLabel name c = IdLabel name c StaticConEntry
mkSplitMarkerLabel = CmmLabel rtsPackageId (fsLit "__stg_split_marker") CmmCode
mkDirty_MUT_VAR_Label = CmmLabel rtsPackageId (fsLit "dirty_MUT_VAR") CmmCode
mkUpdInfoLabel = CmmLabel rtsPackageId (fsLit "stg_upd_frame") CmmInfo
+mkBHUpdInfoLabel = CmmLabel rtsPackageId (fsLit "stg_bh_upd_frame" ) CmmInfo
mkIndStaticInfoLabel = CmmLabel rtsPackageId (fsLit "stg_IND_STATIC") CmmInfo
mkMainCapabilityLabel = CmmLabel rtsPackageId (fsLit "MainCapability") CmmData
mkMAP_FROZEN_infoLabel = CmmLabel rtsPackageId (fsLit "stg_MUT_ARR_PTRS_FROZEN0") CmmInfo
diff --git a/compiler/codeGen/CgCallConv.hs b/compiler/codeGen/CgCallConv.hs
index 8a1ae8be0c..b8294ea326 100644
--- a/compiler/codeGen/CgCallConv.hs
+++ b/compiler/codeGen/CgCallConv.hs
@@ -284,7 +284,6 @@ getSequelAmode
OnStack -> do { sp_rel <- getSpRelOffset virt_sp
; returnFC (CmmLoad sp_rel bWord) }
- UpdateCode -> returnFC (CmmLit (CmmLabel mkUpdInfoLabel))
CaseAlts lbl _ _ -> returnFC (CmmLit (CmmLabel lbl))
}
diff --git a/compiler/codeGen/CgClosure.lhs b/compiler/codeGen/CgClosure.lhs
index f0fe3d17b2..60ba7f8652 100644
--- a/compiler/codeGen/CgClosure.lhs
+++ b/compiler/codeGen/CgClosure.lhs
@@ -474,7 +474,12 @@ emitBlackHoleCode is_single_entry = do
then do
tickyBlackHole (not is_single_entry)
let bh_info = CmmReg (CmmGlobal EagerBlackholeInfo)
- stmtC (CmmStore (CmmReg nodeReg) bh_info)
+ stmtsC [
+ CmmStore (cmmOffsetW (CmmReg nodeReg) fixedHdrSize)
+ (CmmReg (CmmGlobal CurrentTSO)),
+ CmmCall (CmmPrim MO_WriteBarrier) [] [] CmmUnsafe CmmMayReturn,
+ CmmStore (CmmReg nodeReg) bh_info
+ ]
else
nopC
\end{code}
@@ -489,17 +494,23 @@ setupUpdate closure_info code
= code
| not (isStaticClosure closure_info)
- = if closureUpdReqd closure_info
- then do { tickyPushUpdateFrame; pushUpdateFrame (CmmReg nodeReg) code }
- else do { tickyUpdateFrameOmitted; code }
-
+ = do
+ if not (closureUpdReqd closure_info)
+ then do tickyUpdateFrameOmitted; code
+ else do
+ tickyPushUpdateFrame
+ dflags <- getDynFlags
+ if not opt_SccProfilingOn && dopt Opt_EagerBlackHoling dflags
+ then pushBHUpdateFrame (CmmReg nodeReg) code
+ else pushUpdateFrame (CmmReg nodeReg) code
+
| otherwise -- A static closure
= do { tickyUpdateBhCaf closure_info
; if closureUpdReqd closure_info
then do -- Blackhole the (updatable) CAF:
{ upd_closure <- link_caf closure_info True
- ; pushUpdateFrame upd_closure code }
+ ; pushBHUpdateFrame upd_closure code }
else do
{ -- krc: removed some ticky-related code here.
; tickyUpdateFrameOmitted
@@ -553,7 +564,8 @@ link_caf cl_info _is_upd = do
{ -- Alloc black hole specifying CC_HDR(Node) as the cost centre
; let use_cc = costCentreFrom (CmmReg nodeReg)
blame_cc = use_cc
- ; hp_offset <- allocDynClosure bh_cl_info use_cc blame_cc []
+ tso = CmmReg (CmmGlobal CurrentTSO)
+ ; hp_offset <- allocDynClosure bh_cl_info use_cc blame_cc [(tso,fixedHdrSize)]
; hp_rel <- getHpRelOffset hp_offset
-- Call the RTS function newCAF to add the CAF to the CafList
diff --git a/compiler/codeGen/CgMonad.lhs b/compiler/codeGen/CgMonad.lhs
index 83d2b72747..e5bca2aab1 100644
--- a/compiler/codeGen/CgMonad.lhs
+++ b/compiler/codeGen/CgMonad.lhs
@@ -169,7 +169,6 @@ block.
\begin{code}
data Sequel
= OnStack -- Continuation is on the stack
- | UpdateCode -- Continuation is update
| CaseAlts
CLabel -- Jump to this; if the continuation is for a vectored
diff --git a/compiler/codeGen/CgStackery.lhs b/compiler/codeGen/CgStackery.lhs
index 6683de4c8b..532127a147 100644
--- a/compiler/codeGen/CgStackery.lhs
+++ b/compiler/codeGen/CgStackery.lhs
@@ -17,7 +17,7 @@ module CgStackery (
setStackFrame, getStackFrame,
mkVirtStkOffsets, mkStkAmodes,
freeStackSlots,
- pushUpdateFrame, emitPushUpdateFrame,
+ pushUpdateFrame, pushBHUpdateFrame, emitPushUpdateFrame,
) where
#include "HsVersions.h"
@@ -265,6 +265,14 @@ to reflect the frame pushed.
\begin{code}
pushUpdateFrame :: CmmExpr -> Code -> Code
pushUpdateFrame updatee code
+ = pushSpecUpdateFrame mkUpdInfoLabel updatee code
+
+pushBHUpdateFrame :: CmmExpr -> Code -> Code
+pushBHUpdateFrame updatee code
+ = pushSpecUpdateFrame mkBHUpdInfoLabel updatee code
+
+pushSpecUpdateFrame :: CLabel -> CmmExpr -> Code -> Code
+pushSpecUpdateFrame lbl updatee code
= do {
when debugIsOn $ do
{ EndOfBlockInfo _ sequel <- getEndOfBlockInfo ;
@@ -277,15 +285,25 @@ pushUpdateFrame updatee code
-- The location of the lowest-address
-- word of the update frame itself
- ; setEndOfBlockInfo (EndOfBlockInfo vsp UpdateCode) $
- do { emitPushUpdateFrame frame_addr updatee
+ -- NB. we used to set the Sequel to 'UpdateCode' so
+ -- that we could jump directly to the update code if
+ -- we know that the next frame on the stack is an
+ -- update frame. However, the RTS can sometimes
+ -- change an update frame into something else (see
+ -- e.g. Note [upd-black-hole] in rts/sm/Scav.c), so we
+ -- no longer make this assumption.
+ ; setEndOfBlockInfo (EndOfBlockInfo vsp OnStack) $
+ do { emitSpecPushUpdateFrame lbl frame_addr updatee
; code }
}
emitPushUpdateFrame :: CmmExpr -> CmmExpr -> Code
-emitPushUpdateFrame frame_addr updatee = do
+emitPushUpdateFrame = emitSpecPushUpdateFrame mkUpdInfoLabel
+
+emitSpecPushUpdateFrame :: CLabel -> CmmExpr -> CmmExpr -> Code
+emitSpecPushUpdateFrame lbl frame_addr updatee = do
stmtsC [ -- Set the info word
- CmmStore frame_addr (mkLblExpr mkUpdInfoLabel)
+ CmmStore frame_addr (mkLblExpr lbl)
, -- And the updatee
CmmStore (cmmOffsetB frame_addr off_updatee) updatee ]
initUpdFrameProf frame_addr
diff --git a/includes/Rts.h b/includes/Rts.h
index 3318402364..d79e9ad88e 100644
--- a/includes/Rts.h
+++ b/includes/Rts.h
@@ -106,10 +106,18 @@ void _assertFail(const char *filename, unsigned int linenum)
else \
_assertFail(__FILE__, __LINE__)
+#define CHECKM(predicate, msg, ...) \
+ if (predicate) \
+ /*null*/; \
+ else \
+ barf(msg, ##__VA_ARGS__)
+
#ifndef DEBUG
#define ASSERT(predicate) /* nothing */
+#define ASSERTM(predicate,msg,...) /* nothing */
#else
#define ASSERT(predicate) CHECK(predicate)
+#define ASSERTM(predicate,msg,...) CHECKM(predicate,msg,##__VA_ARGS__)
#endif /* DEBUG */
/*
diff --git a/includes/mkDerivedConstants.c b/includes/mkDerivedConstants.c
index 94157f035b..92685cae3a 100644
--- a/includes/mkDerivedConstants.c
+++ b/includes/mkDerivedConstants.c
@@ -291,6 +291,7 @@ main(int argc, char *argv[])
closure_field(StgTSO, trec);
closure_field(StgTSO, flags);
closure_field(StgTSO, dirty);
+ closure_field(StgTSO, bq);
closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS);
tso_field(StgTSO, sp);
tso_field_offset(StgTSO, stack);
@@ -382,6 +383,17 @@ main(int argc, char *argv[])
closure_size(StgStableName);
closure_field(StgStableName,sn);
+ closure_size(StgBlockingQueue);
+ closure_field(StgBlockingQueue, bh);
+ closure_field(StgBlockingQueue, owner);
+ closure_field(StgBlockingQueue, queue);
+ closure_field(StgBlockingQueue, link);
+
+ closure_size(MessageBlackHole);
+ closure_field(MessageBlackHole, link);
+ closure_field(MessageBlackHole, tso);
+ closure_field(MessageBlackHole, bh);
+
struct_field_("RtsFlags_ProfFlags_showCCSOnException",
RTS_FLAGS, ProfFlags.showCCSOnException);
struct_field_("RtsFlags_DebugFlags_apply",
diff --git a/includes/rts/storage/ClosureMacros.h b/includes/rts/storage/ClosureMacros.h
index a115f6f38f..098c65db15 100644
--- a/includes/rts/storage/ClosureMacros.h
+++ b/includes/rts/storage/ClosureMacros.h
@@ -129,6 +129,12 @@
SET_HDR(c,info,costCentreStack); \
(c)->words = n_words;
+// Use when changing a closure from one kind to another
+#define OVERWRITE_INFO(c, new_info) \
+ LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)(c)); \
+ SET_INFO((c), (new_info)); \
+ LDV_RECORD_CREATE(c);
+
/* -----------------------------------------------------------------------------
How to get hold of the static link field for a static closure.
-------------------------------------------------------------------------- */
@@ -249,7 +255,7 @@ INLINE_HEADER StgOffset THUNK_SELECTOR_sizeW ( void )
{ return sizeofW(StgSelector); }
INLINE_HEADER StgOffset BLACKHOLE_sizeW ( void )
-{ return sizeofW(StgHeader)+MIN_PAYLOAD_SIZE; }
+{ return sizeofW(StgInd); } // a BLACKHOLE is a kind of indirection
/* --------------------------------------------------------------------------
Sizes of closures
diff --git a/includes/rts/storage/ClosureTypes.h b/includes/rts/storage/ClosureTypes.h
index 6a76772d61..518d39bb11 100644
--- a/includes/rts/storage/ClosureTypes.h
+++ b/includes/rts/storage/ClosureTypes.h
@@ -62,7 +62,7 @@
#define UPDATE_FRAME 38
#define CATCH_FRAME 39
#define STOP_FRAME 40
-#define CAF_BLACKHOLE 41
+#define BLOCKING_QUEUE 41
#define BLACKHOLE 42
#define MVAR_CLEAN 43
#define MVAR_DIRTY 44
diff --git a/includes/rts/storage/Closures.h b/includes/rts/storage/Closures.h
index d7498e2882..802746868c 100644
--- a/includes/rts/storage/Closures.h
+++ b/includes/rts/storage/Closures.h
@@ -127,6 +127,14 @@ typedef struct {
StgInfoTable *saved_info;
} StgIndStatic;
+typedef struct StgBlockingQueue_ {
+ StgHeader header;
+ struct StgBlockingQueue_ *link; // here so it looks like an IND
+ StgClosure *bh; // the BLACKHOLE
+ StgTSO *owner;
+ struct MessageBlackHole_ *queue;
+} StgBlockingQueue;
+
typedef struct {
StgHeader header;
StgWord words;
@@ -433,10 +441,17 @@ typedef struct MessageWakeup_ {
typedef struct MessageThrowTo_ {
StgHeader header;
- Message *link;
+ struct MessageThrowTo_ *link;
StgTSO *source;
StgTSO *target;
StgClosure *exception;
} MessageThrowTo;
+typedef struct MessageBlackHole_ {
+ StgHeader header;
+ struct MessageBlackHole_ *link;
+ StgTSO *tso;
+ StgClosure *bh;
+} MessageBlackHole;
+
#endif /* RTS_STORAGE_CLOSURES_H */
diff --git a/includes/rts/storage/TSO.h b/includes/rts/storage/TSO.h
index e2015f28ac..e07be88ac5 100644
--- a/includes/rts/storage/TSO.h
+++ b/includes/rts/storage/TSO.h
@@ -46,6 +46,7 @@ typedef struct {
/* Reason for thread being blocked. See comment above struct StgTso_. */
typedef union {
StgClosure *closure;
+ struct MessageBlackHole_ *bh;
struct MessageThrowTo_ *throwto;
struct MessageWakeup_ *wakeup;
StgInt fd; /* StgInt instead of int, so that it's the same size as the ptrs */
@@ -78,12 +79,17 @@ typedef struct StgTSO_ {
*/
struct StgTSO_* _link;
/*
+ Currently used for linking TSOs on:
+ * cap->run_queue_{hd,tl}
+ * MVAR queue
+ * (non-THREADED_RTS); the blocked_queue
+ * and pointing to the relocated version of a ThreadRelocated
+
NOTE!!! do not modify _link directly, it is subject to
a write barrier for generational GC. Instead use the
setTSOLink() function. Exceptions to this rule are:
* setting the link field to END_TSO_QUEUE
- * putting a TSO on the blackhole_queue
* setting the link field of the currently running TSO, as it
will already be dirty.
*/
@@ -127,6 +133,12 @@ typedef struct StgTSO_ {
*/
struct MessageThrowTo_ * blocked_exceptions;
+ /*
+ A list of StgBlockingQueue objects, representing threads blocked
+ on thunks that are under evaluation by this thread.
+ */
+ struct StgBlockingQueue_ *bq;
+
#ifdef TICKY_TICKY
/* TICKY-specific stuff would go here. */
#endif
@@ -152,6 +164,18 @@ typedef struct StgTSO_ {
void dirty_TSO (Capability *cap, StgTSO *tso);
void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
+// Apply to a TSO before looking at it if you are not sure whether it
+// might be ThreadRelocated or not (basically, that's most of the time
+// unless the TSO is the current TSO).
+//
+INLINE_HEADER StgTSO * deRefTSO(StgTSO *tso)
+{
+ while (tso->what_next == ThreadRelocated) {
+ tso = tso->_link;
+ }
+ return tso;
+}
+
/* -----------------------------------------------------------------------------
Invariants:
diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h
index 42e878f945..9834c4bbf3 100644
--- a/includes/stg/MiscClosures.h
+++ b/includes/stg/MiscClosures.h
@@ -44,6 +44,7 @@
/* Stack frames */
RTS_RET_INFO(stg_upd_frame_info);
+RTS_RET_INFO(stg_bh_upd_frame_info);
RTS_RET_INFO(stg_marked_upd_frame_info);
RTS_RET_INFO(stg_noupd_frame_info);
RTS_RET_INFO(stg_catch_frame_info);
@@ -54,6 +55,7 @@ RTS_RET_INFO(stg_catch_stm_frame_info);
RTS_RET_INFO(stg_unblockAsyncExceptionszh_ret_info);
RTS_ENTRY(stg_upd_frame_ret);
+RTS_ENTRY(stg_bh_upd_frame_ret);
RTS_ENTRY(stg_marked_upd_frame_ret);
// RTS_FUN(stg_interp_constr_entry);
@@ -90,12 +92,12 @@ RTS_INFO(stg_IND_STATIC_info);
RTS_INFO(stg_IND_PERM_info);
RTS_INFO(stg_IND_OLDGEN_info);
RTS_INFO(stg_IND_OLDGEN_PERM_info);
-RTS_INFO(stg_CAF_UNENTERED_info);
-RTS_INFO(stg_CAF_ENTERED_info);
-RTS_INFO(stg_WHITEHOLE_info);
RTS_INFO(stg_BLACKHOLE_info);
-RTS_INFO(__stg_EAGER_BLACKHOLE_info);
RTS_INFO(stg_CAF_BLACKHOLE_info);
+RTS_INFO(__stg_EAGER_BLACKHOLE_info);
+RTS_INFO(stg_WHITEHOLE_info);
+RTS_INFO(stg_BLOCKING_QUEUE_CLEAN_info);
+RTS_INFO(stg_BLOCKING_QUEUE_DIRTY_info);
RTS_FUN_INFO(stg_BCO_info);
RTS_INFO(stg_EVACUATED_info);
@@ -115,7 +117,9 @@ RTS_INFO(stg_MUT_VAR_CLEAN_info);
RTS_INFO(stg_MUT_VAR_DIRTY_info);
RTS_INFO(stg_END_TSO_QUEUE_info);
RTS_INFO(stg_MSG_WAKEUP_info);
+RTS_INFO(stg_MSG_TRY_WAKEUP_info);
RTS_INFO(stg_MSG_THROWTO_info);
+RTS_INFO(stg_MSG_BLACKHOLE_info);
RTS_INFO(stg_MUT_CONS_info);
RTS_INFO(stg_catch_info);
RTS_INFO(stg_PAP_info);
@@ -142,12 +146,10 @@ RTS_ENTRY(stg_IND_STATIC_entry);
RTS_ENTRY(stg_IND_PERM_entry);
RTS_ENTRY(stg_IND_OLDGEN_entry);
RTS_ENTRY(stg_IND_OLDGEN_PERM_entry);
-RTS_ENTRY(stg_CAF_UNENTERED_entry);
-RTS_ENTRY(stg_CAF_ENTERED_entry);
RTS_ENTRY(stg_WHITEHOLE_entry);
RTS_ENTRY(stg_BLACKHOLE_entry);
-RTS_ENTRY(__stg_EAGER_BLACKHOLE_entry);
RTS_ENTRY(stg_CAF_BLACKHOLE_entry);
+RTS_ENTRY(__stg_EAGER_BLACKHOLE_entry);
RTS_ENTRY(stg_BCO_entry);
RTS_ENTRY(stg_EVACUATED_entry);
RTS_ENTRY(stg_WEAK_entry);
@@ -166,7 +168,9 @@ RTS_ENTRY(stg_MUT_VAR_CLEAN_entry);
RTS_ENTRY(stg_MUT_VAR_DIRTY_entry);
RTS_ENTRY(stg_END_TSO_QUEUE_entry);
RTS_ENTRY(stg_MSG_WAKEUP_entry);
+RTS_ENTRY(stg_MSG_TRY_WAKEUP_entry);
RTS_ENTRY(stg_MSG_THROWTO_entry);
+RTS_ENTRY(stg_MSG_BLACKHOLE_entry);
RTS_ENTRY(stg_MUT_CONS_entry);
RTS_ENTRY(stg_catch_entry);
RTS_ENTRY(stg_PAP_entry);
@@ -404,6 +408,8 @@ RTS_FUN(stg_PAP_apply);
RTS_RET_INFO(stg_enter_info);
RTS_ENTRY(stg_enter_ret);
+RTS_RET_INFO(stg_enter_checkbh_info);
+RTS_ENTRY(stg_enter_checkbh_ret);
RTS_RET_INFO(stg_gc_void_info);
RTS_ENTRY(stg_gc_void_ret);
diff --git a/rts/Capability.c b/rts/Capability.c
index 5f54ecae4d..f5e77a900f 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -62,9 +62,8 @@ Capability * rts_unsafeGetMyCapability (void)
STATIC_INLINE rtsBool
globalWorkToDo (void)
{
- return blackholes_need_checking
- || sched_state >= SCHED_INTERRUPTING
- ;
+ return sched_state >= SCHED_INTERRUPTING
+ || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
}
#endif
@@ -637,43 +636,6 @@ yieldCapability (Capability** pCap, Task *task)
}
/* ----------------------------------------------------------------------------
- * Wake up a thread on a Capability.
- *
- * This is used when the current Task is running on a Capability and
- * wishes to wake up a thread on a different Capability.
- * ------------------------------------------------------------------------- */
-
-void
-wakeupThreadOnCapability (Capability *cap,
- Capability *other_cap,
- StgTSO *tso)
-{
- MessageWakeup *msg;
-
- // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
- if (tso->bound) {
- ASSERT(tso->bound->task->cap == tso->cap);
- tso->bound->task->cap = other_cap;
- }
- tso->cap = other_cap;
-
- ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
- tso->block_info.closure->header.info == &stg_IND_info);
-
- ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
-
- msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
- msg->header.info = &stg_MSG_WAKEUP_info;
- msg->tso = tso;
- tso->block_info.closure = (StgClosure *)msg;
- dirty_TSO(cap, tso);
- write_barrier();
- tso->why_blocked = BlockedOnMsgWakeup;
-
- sendMessage(other_cap, (Message*)msg);
-}
-
-/* ----------------------------------------------------------------------------
* prodCapability
*
* If a Capability is currently idle, wake up a Task on it. Used to
@@ -906,24 +868,3 @@ markCapabilities (evac_fn evac, void *user)
Messages
-------------------------------------------------------------------------- */
-#ifdef THREADED_RTS
-
-void sendMessage(Capability *cap, Message *msg)
-{
- ACQUIRE_LOCK(&cap->lock);
-
- msg->link = cap->inbox;
- cap->inbox = msg;
-
- if (cap->running_task == NULL) {
- cap->running_task = myTask();
- // precond for releaseCapability_()
- releaseCapability_(cap,rtsFalse);
- } else {
- contextSwitchCapability(cap);
- }
-
- RELEASE_LOCK(&cap->lock);
-}
-
-#endif // THREADED_RTS
diff --git a/rts/Capability.h b/rts/Capability.h
index 4030b5efd4..e12b8ce6e9 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -201,6 +201,8 @@ void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
+INLINE_HEADER void recordClosureMutated (Capability *cap, StgClosure *p);
+
#if defined(THREADED_RTS)
// Gives up the current capability IFF there is a higher-priority
@@ -222,12 +224,6 @@ void yieldCapability (Capability** pCap, Task *task);
//
void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
-// Wakes up a thread on a Capability (probably a different Capability
-// from the one held by the current Task).
-//
-void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap,
- StgTSO *tso);
-
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
//
@@ -289,8 +285,6 @@ void traverseSparkQueues (evac_fn evac, void *user);
INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
-void sendMessage (Capability *cap, Message *msg);
-
#endif // THREADED_RTS
/* -----------------------------------------------------------------------------
@@ -316,6 +310,15 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
*bd->free++ = (StgWord)p;
}
+INLINE_HEADER void
+recordClosureMutated (Capability *cap, StgClosure *p)
+{
+ bdescr *bd;
+ bd = Bdescr((StgPtr)p);
+ if (bd->gen_no != 0) recordMutableCap(p,cap,bd->gen_no);
+}
+
+
#if defined(THREADED_RTS)
INLINE_HEADER rtsBool
emptySparkPoolCap (Capability *cap)
diff --git a/rts/ClosureFlags.c b/rts/ClosureFlags.c
index 358cb40ed3..cebd124dd2 100644
--- a/rts/ClosureFlags.c
+++ b/rts/ClosureFlags.c
@@ -62,8 +62,8 @@ StgWord16 closure_flags[] = {
[UPDATE_FRAME] = ( _BTM ),
[CATCH_FRAME] = ( _BTM ),
[STOP_FRAME] = ( _BTM ),
- [CAF_BLACKHOLE] = ( _BTM|_NS| _UPT ),
[BLACKHOLE] = ( _NS| _UPT ),
+ [BLOCKING_QUEUE] = ( _NS| _MUT|_UPT ),
[MVAR_CLEAN] = (_HNF| _NS| _MUT|_UPT ),
[MVAR_DIRTY] = (_HNF| _NS| _MUT|_UPT ),
[ARR_WORDS] = (_HNF| _NS| _UPT ),
diff --git a/rts/FrontPanel.c b/rts/FrontPanel.c
index ebba4056fb..da42548eb8 100644
--- a/rts/FrontPanel.c
+++ b/rts/FrontPanel.c
@@ -662,8 +662,6 @@ residencyCensus( void )
type = Thunk;
break;
- case CAF_BLACKHOLE:
- case EAGER_BLACKHOLE:
case BLACKHOLE:
/* case BLACKHOLE_BQ: FIXME: case does not exist */
size = sizeW_fromITBL(info);
diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm
index 5bdf600a1c..f8bccc091d 100644
--- a/rts/HeapStackCheck.cmm
+++ b/rts/HeapStackCheck.cmm
@@ -159,6 +159,24 @@ __stg_gc_enter_1
}
/* -----------------------------------------------------------------------------
+ stg_enter_checkbh is just like stg_enter, except that we also call
+ checkBlockingQueues(). The point of this is that the GC can
+ replace an stg_marked_upd_frame with an stg_enter_checkbh if it
+ finds that the BLACKHOLE has already been updated by another
+ thread. It would be unsafe to use stg_enter, because there might
+ be an orphaned BLOCKING_QUEUE now.
+ -------------------------------------------------------------------------- */
+
+INFO_TABLE_RET( stg_enter_checkbh, RET_SMALL, P_ unused)
+{
+ R1 = Sp(1);
+ Sp_adj(2);
+ foreign "C" checkBlockingQueues(MyCapability() "ptr",
+ CurrentTSO) [R1];
+ ENTER();
+}
+
+/* -----------------------------------------------------------------------------
Heap checks in Primitive case alternatives
A primitive case alternative is entered with a value either in
@@ -593,11 +611,7 @@ INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, P_ unused1, P_ unused2 )
// code fragment executed just before we return to the scheduler
stg_block_putmvar_finally
{
-#ifdef THREADED_RTS
unlockClosure(R3, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(R3, stg_MVAR_DIRTY_info);
-#endif
jump StgReturn;
}
@@ -611,24 +625,12 @@ stg_block_putmvar
BLOCK_BUT_FIRST(stg_block_putmvar_finally);
}
-// code fragment executed just before we return to the scheduler
-stg_block_blackhole_finally
-{
-#if defined(THREADED_RTS)
- // The last thing we do is release sched_lock, which is
- // preventing other threads from accessing blackhole_queue and
- // picking up this thread before we are finished with it.
- RELEASE_LOCK(sched_mutex "ptr");
-#endif
- jump StgReturn;
-}
-
stg_block_blackhole
{
Sp_adj(-2);
Sp(1) = R1;
Sp(0) = stg_enter_info;
- BLOCK_BUT_FIRST(stg_block_blackhole_finally);
+ BLOCK_GENERIC;
}
INFO_TABLE_RET( stg_block_throwto, RET_SMALL, P_ unused, P_ unused )
diff --git a/rts/Interpreter.c b/rts/Interpreter.c
index 9071912f2d..16a8e242bd 100644
--- a/rts/Interpreter.c
+++ b/rts/Interpreter.c
@@ -21,6 +21,7 @@
#include "Disassembler.h"
#include "Interpreter.h"
#include "ThreadPaused.h"
+#include "Threads.h"
#include <string.h> /* for memcpy */
#ifdef HAVE_ERRNO_H
@@ -443,7 +444,8 @@ do_return:
// to a PAP by the GC, violating the invariant that PAPs
// always contain a tagged pointer to the function.
INTERP_TICK(it_retto_UPDATE);
- UPD_IND(cap, ((StgUpdateFrame *)Sp)->updatee, tagged_obj);
+ updateThunk(cap, cap->r.rCurrentTSO,
+ ((StgUpdateFrame *)Sp)->updatee, tagged_obj);
Sp += sizeofW(StgUpdateFrame);
goto do_return;
diff --git a/rts/LdvProfile.c b/rts/LdvProfile.c
index 412fd05579..799d418145 100644
--- a/rts/LdvProfile.c
+++ b/rts/LdvProfile.c
@@ -140,7 +140,7 @@ processHeapClosureForDead( StgClosure *c )
case FUN_1_1:
case FUN_0_2:
case BLACKHOLE:
- case CAF_BLACKHOLE:
+ case BLOCKING_QUEUE:
case IND_PERM:
case IND_OLDGEN_PERM:
/*
diff --git a/rts/Linker.c b/rts/Linker.c
index 4e8bafface..06240812c2 100644
--- a/rts/Linker.c
+++ b/rts/Linker.c
@@ -877,7 +877,10 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stable_ptr_table) \
SymI_HasProto(stackOverflow) \
SymI_HasProto(stg_CAF_BLACKHOLE_info) \
+ SymI_HasProto(stg_BLACKHOLE_info) \
SymI_HasProto(__stg_EAGER_BLACKHOLE_info) \
+ SymI_HasProto(stg_BLOCKING_QUEUE_CLEAN_info) \
+ SymI_HasProto(stg_BLOCKING_QUEUE_DIRTY_info) \
SymI_HasProto(startTimer) \
SymI_HasProto(stg_MVAR_CLEAN_info) \
SymI_HasProto(stg_MVAR_DIRTY_info) \
@@ -941,6 +944,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_sel_8_upd_info) \
SymI_HasProto(stg_sel_9_upd_info) \
SymI_HasProto(stg_upd_frame_info) \
+ SymI_HasProto(stg_bh_upd_frame_info) \
SymI_HasProto(suspendThread) \
SymI_HasProto(stg_takeMVarzh) \
SymI_HasProto(stg_threadStatuszh) \
diff --git a/rts/Messages.c b/rts/Messages.c
new file mode 100644
index 0000000000..2b40f76e93
--- /dev/null
+++ b/rts/Messages.c
@@ -0,0 +1,296 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2010
+ *
+ * Inter-Capability message passing
+ *
+ * --------------------------------------------------------------------------*/
+
+#include "Rts.h"
+#include "Messages.h"
+#include "Trace.h"
+#include "Capability.h"
+#include "Schedule.h"
+#include "Threads.h"
+#include "RaiseAsync.h"
+#include "sm/Storage.h"
+
+/* ----------------------------------------------------------------------------
+ Send a message to another Capability
+ ------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
+{
+ ACQUIRE_LOCK(&to_cap->lock);
+
+#ifdef DEBUG
+ {
+ const StgInfoTable *i = msg->header.info;
+ if (i != &stg_MSG_WAKEUP_info &&
+ i != &stg_MSG_THROWTO_info &&
+ i != &stg_MSG_BLACKHOLE_info &&
+ i != &stg_MSG_TRY_WAKEUP_info &&
+ i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
+ i != &stg_WHITEHOLE_info) {
+ barf("sendMessage: %p", i);
+ }
+ }
+#endif
+
+ msg->link = to_cap->inbox;
+ to_cap->inbox = msg;
+
+ recordClosureMutated(from_cap,(StgClosure*)msg);
+
+ if (to_cap->running_task == NULL) {
+ to_cap->running_task = myTask();
+ // precond for releaseCapability_()
+ releaseCapability_(to_cap,rtsFalse);
+ } else {
+ contextSwitchCapability(to_cap);
+ }
+
+ RELEASE_LOCK(&to_cap->lock);
+}
+
+#endif /* THREADED_RTS */
+
+/* ----------------------------------------------------------------------------
+ Handle a message
+ ------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void
+executeMessage (Capability *cap, Message *m)
+{
+ const StgInfoTable *i;
+
+loop:
+ write_barrier(); // allow m->header to be modified by another thread
+ i = m->header.info;
+ if (i == &stg_MSG_WAKEUP_info)
+ {
+ // the plan is to eventually get rid of these and use
+ // TRY_WAKEUP instead.
+ MessageWakeup *w = (MessageWakeup *)m;
+ StgTSO *tso = w->tso;
+ debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
+ (lnat)tso->id);
+ ASSERT(tso->cap == cap);
+ ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
+ ASSERT(tso->block_info.closure == (StgClosure *)m);
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap, tso);
+ }
+ else if (i == &stg_MSG_TRY_WAKEUP_info)
+ {
+ StgTSO *tso = ((MessageWakeup *)m)->tso;
+ debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
+ (lnat)tso->id);
+ tryWakeupThread(cap, tso);
+ }
+ else if (i == &stg_MSG_THROWTO_info)
+ {
+ MessageThrowTo *t = (MessageThrowTo *)m;
+ nat r;
+ const StgInfoTable *i;
+
+ i = lockClosure((StgClosure*)m);
+ if (i != &stg_MSG_THROWTO_info) {
+ unlockClosure((StgClosure*)m, i);
+ goto loop;
+ }
+
+ debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
+ (lnat)t->source->id, (lnat)t->target->id);
+
+ ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
+ ASSERT(t->source->block_info.closure == (StgClosure *)m);
+
+ r = throwToMsg(cap, t);
+
+ switch (r) {
+ case THROWTO_SUCCESS:
+ ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
+ t->source->sp += 3;
+ unblockOne(cap, t->source);
+ // this message is done
+ unlockClosure((StgClosure*)m, &stg_IND_info);
+ break;
+ case THROWTO_BLOCKED:
+ // unlock the message
+ unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
+ break;
+ }
+ }
+ else if (i == &stg_MSG_BLACKHOLE_info)
+ {
+ nat r;
+ MessageBlackHole *b = (MessageBlackHole*)m;
+
+ r = messageBlackHole(cap, b);
+ if (r == 0) {
+ tryWakeupThread(cap, b->tso);
+ }
+ return;
+ }
+ else if (i == &stg_IND_info)
+ {
+ // message was revoked
+ return;
+ }
+ else if (i == &stg_WHITEHOLE_info)
+ {
+ goto loop;
+ }
+ else
+ {
+ barf("executeMessage: %p", i);
+ }
+}
+
+#endif
+
+/* ----------------------------------------------------------------------------
+ Handle a MSG_BLACKHOLE message
+
+ This is called from two places: either we just entered a BLACKHOLE
+ (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
+ cap->inbox.
+
+ We need to establish whether the BLACKHOLE belongs to
+ this Capability, and
+ - if so, arrange to block the current thread on it
+ - otherwise, forward the message to the right place
+
+ Returns:
+ - 0 if the blocked thread can be woken up by the caller
+ - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
+ at some point in the future.
+
+ ------------------------------------------------------------------------- */
+
+nat messageBlackHole(Capability *cap, MessageBlackHole *msg)
+{
+ const StgInfoTable *info;
+ StgClosure *p;
+ StgBlockingQueue *bq;
+ StgClosure *bh = msg->bh;
+ StgTSO *owner;
+
+ debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p",
+ (lnat)msg->tso->id, msg->bh);
+
+ info = bh->header.info;
+
+ // If we got this message in our inbox, it might be that the
+ // BLACKHOLE has already been updated, and GC has shorted out the
+ // indirection, so the pointer no longer points to a BLACKHOLE at
+ // all.
+ if (info != &stg_BLACKHOLE_info &&
+ info != &stg_CAF_BLACKHOLE_info &&
+ info != &stg_WHITEHOLE_info) {
+ // if it is a WHITEHOLE, then a thread is in the process of
+ // trying to BLACKHOLE it. But we know that it was once a
+ // BLACKHOLE, so there is at least a valid pointer in the
+ // payload, so we can carry on.
+ return 0;
+ }
+
+ // we know at this point that the closure
+loop:
+ p = ((StgInd*)bh)->indirectee;
+ info = p->header.info;
+
+ if (info == &stg_IND_info)
+ {
+ // This could happen, if e.g. we got a BLOCKING_QUEUE that has
+ // just been replaced with an IND by another thread in
+ // updateThunk(). In which case, if we read the indirectee
+ // again we should get the value.
+ goto loop;
+ }
+
+ else if (info == &stg_TSO_info)
+ {
+ owner = deRefTSO((StgTSO *)p);
+
+#ifdef THREADED_RTS
+ if (owner->cap != cap) {
+ sendMessage(cap, owner->cap, (Message*)msg);
+ debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
+ return 1;
+ }
+#endif
+ // owner is the owner of the BLACKHOLE, and resides on this
+ // Capability. msg->tso is the first thread to block on this
+ // BLACKHOLE, so we first create a BLOCKING_QUEUE object.
+
+ bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
+
+ // initialise the BLOCKING_QUEUE object
+ SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
+ bq->bh = bh;
+ bq->queue = msg;
+ bq->owner = owner;
+
+ msg->link = (MessageBlackHole*)END_TSO_QUEUE;
+
+ // All BLOCKING_QUEUES are linked in a list on owner->bq, so
+ // that we can search through them in the event that there is
+ // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
+ // becomes orphaned (see updateThunk()).
+ bq->link = owner->bq;
+ owner->bq = bq;
+ dirty_TSO(cap, owner); // we modified owner->bq
+
+ // point to the BLOCKING_QUEUE from the BLACKHOLE
+ write_barrier(); // make the BQ visible
+ ((StgInd*)bh)->indirectee = (StgClosure *)bq;
+ recordClosureMutated(cap,bh); // bh was mutated
+
+ debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
+ (lnat)msg->tso->id, (lnat)owner->id);
+
+ return 1; // blocked
+ }
+ else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
+ info == &stg_BLOCKING_QUEUE_DIRTY_info)
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+ ASSERT(bq->bh == bh);
+
+ owner = deRefTSO(bq->owner);
+
+ ASSERT(owner != END_TSO_QUEUE);
+
+#ifdef THREADED_RTS
+ if (owner->cap != cap) {
+ sendMessage(cap, owner->cap, (Message*)msg);
+ debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
+ return 1;
+ }
+#endif
+
+ msg->link = bq->queue;
+ bq->queue = msg;
+ recordClosureMutated(cap,(StgClosure*)msg);
+
+ if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
+ bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ recordClosureMutated(cap,bq);
+ }
+
+ debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
+ (lnat)msg->tso->id, (lnat)owner->id);
+
+ return 1; // blocked
+ }
+
+ return 0; // not blocked
+}
+
diff --git a/rts/Messages.h b/rts/Messages.h
new file mode 100644
index 0000000000..15c037954b
--- /dev/null
+++ b/rts/Messages.h
@@ -0,0 +1,18 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2010
+ *
+ * Inter-Capability message passing
+ *
+ * --------------------------------------------------------------------------*/
+
+BEGIN_RTS_PRIVATE
+
+nat messageBlackHole(Capability *cap, MessageBlackHole *msg);
+
+#ifdef THREADED_RTS
+void executeMessage (Capability *cap, Message *m);
+void sendMessage (Capability *from_cap, Capability *to_cap, Message *msg);
+#endif
+
+END_RTS_PRIVATE
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index 4b5e106710..5c575f695b 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -1204,11 +1204,7 @@ stg_takeMVarzh
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
RET_P(val);
}
else
@@ -1216,11 +1212,7 @@ stg_takeMVarzh
/* No further putMVars, MVar is now empty */
StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
RET_P(val);
}
@@ -1279,21 +1271,13 @@ stg_tryTakeMVarzh
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
}
else
{
/* No further putMVars, MVar is now empty */
StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
}
RET_NP(1, val);
@@ -1360,11 +1344,7 @@ stg_putMVarzh
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
jump %ENTRY_CODE(Sp(0));
}
else
@@ -1372,11 +1352,7 @@ stg_putMVarzh
/* No further takes, the MVar is now full. */
StgMVar_value(mvar) = val;
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
jump %ENTRY_CODE(Sp(0));
}
@@ -1429,22 +1405,14 @@ stg_tryPutMVarzh
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
}
else
{
/* No further takes, the MVar is now full. */
StgMVar_value(mvar) = R2;
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
}
RET_N(1);
diff --git a/rts/Printer.c b/rts/Printer.c
index e9813299d8..6eecfabbfb 100644
--- a/rts/Printer.c
+++ b/rts/Printer.c
@@ -257,6 +257,12 @@ printClosure( StgClosure *obj )
debugBelch(")\n");
break;
+ case BLACKHOLE:
+ debugBelch("BLACKHOLE(");
+ printPtr((StgPtr)((StgInd*)obj)->indirectee);
+ debugBelch(")\n");
+ break;
+
/* Cannot happen -- use default case.
case RET_BCO:
case RET_SMALL:
@@ -296,14 +302,6 @@ printClosure( StgClosure *obj )
break;
}
- case CAF_BLACKHOLE:
- debugBelch("CAF_BH");
- break;
-
- case BLACKHOLE:
- debugBelch("BH\n");
- break;
-
case ARR_WORDS:
{
StgWord i;
@@ -1122,8 +1120,8 @@ char *closure_type_names[] = {
[UPDATE_FRAME] = "UPDATE_FRAME",
[CATCH_FRAME] = "CATCH_FRAME",
[STOP_FRAME] = "STOP_FRAME",
- [CAF_BLACKHOLE] = "CAF_BLACKHOLE",
[BLACKHOLE] = "BLACKHOLE",
+ [BLOCKING_QUEUE] = "BLOCKING_QUEUE",
[MVAR_CLEAN] = "MVAR_CLEAN",
[MVAR_DIRTY] = "MVAR_DIRTY",
[ARR_WORDS] = "ARR_WORDS",
diff --git a/rts/ProfHeap.c b/rts/ProfHeap.c
index e90051c5e6..4a2816c29d 100644
--- a/rts/ProfHeap.c
+++ b/rts/ProfHeap.c
@@ -878,8 +878,8 @@ heapCensusChain( Census *census, bdescr *bd )
case IND_PERM:
case IND_OLDGEN:
case IND_OLDGEN_PERM:
- case CAF_BLACKHOLE:
case BLACKHOLE:
+ case BLOCKING_QUEUE:
case FUN_1_0:
case FUN_0_1:
case FUN_1_1:
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index d02a2567ff..d5a4918f34 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -18,6 +18,7 @@
#include "STM.h"
#include "sm/Sanity.h"
#include "Profiling.h"
+#include "Messages.h"
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
@@ -66,13 +67,12 @@ void
throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
rtsBool stop_at_atomically)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
}
- while (tso->what_next == ThreadRelocated) {
- tso = tso->_link;
- }
// Remove it from any blocking queues
removeFromQueues(cap,tso);
@@ -83,13 +83,12 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
void
suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
}
- while (tso->what_next == ThreadRelocated) {
- tso = tso->_link;
- }
// Remove it from any blocking queues
removeFromQueues(cap,tso);
@@ -164,7 +163,7 @@ throwTo (Capability *cap, // the Capability we hold
msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
// message starts locked; the caller has to unlock it when it is
// ready.
- msg->header.info = &stg_WHITEHOLE_info;
+ SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
msg->source = source;
msg->target = target;
msg->exception = exception;
@@ -185,14 +184,24 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
{
StgWord status;
StgTSO *target = msg->target;
+ Capability *target_cap;
+ goto check_target;
+
+retry:
+ write_barrier();
+ debugTrace(DEBUG_sched, "throwTo: retrying...");
+
+check_target:
ASSERT(target != END_TSO_QUEUE);
// follow ThreadRelocated links in the target first
- while (target->what_next == ThreadRelocated) {
- target = target->_link;
- // No, it might be a WHITEHOLE:
- // ASSERT(get_itbl(target)->type == TSO);
+ target = deRefTSO(target);
+
+ // Thread already dead?
+ if (target->what_next == ThreadComplete
+ || target->what_next == ThreadKilled) {
+ return THROWTO_SUCCESS;
}
debugTraceCap(DEBUG_sched, cap,
@@ -204,18 +213,10 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
traceThreadStatus(DEBUG_sched, target);
#endif
- goto check_target;
-retry:
- write_barrier();
- debugTrace(DEBUG_sched, "throwTo: retrying...");
-
-check_target:
- ASSERT(target != END_TSO_QUEUE);
-
- // Thread already dead?
- if (target->what_next == ThreadComplete
- || target->what_next == ThreadKilled) {
- return THROWTO_SUCCESS;
+ target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
}
status = target->why_blocked;
@@ -282,28 +283,19 @@ check_target:
have also seen the write to Q.
*/
{
- Capability *target_cap;
-
write_barrier();
- target_cap = target->cap;
- if (target_cap != cap) {
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
+ if ((target->flags & TSO_BLOCKEX) == 0) {
+ // It's on our run queue and not blocking exceptions
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
} else {
- if ((target->flags & TSO_BLOCKEX) == 0) {
- // It's on our run queue and not blocking exceptions
- raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- return THROWTO_SUCCESS;
- } else {
- blockedThrowTo(cap,target,msg);
- return THROWTO_BLOCKED;
- }
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
}
}
case BlockedOnMsgThrowTo:
{
- Capability *target_cap;
const StgInfoTable *i;
MessageThrowTo *m;
@@ -340,13 +332,6 @@ check_target:
goto retry;
}
- target_cap = target->cap;
- if (target_cap != cap) {
- unlockClosure((StgClosure*)m, i);
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
- }
-
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
unlockClosure((StgClosure*)m, i);
@@ -358,7 +343,6 @@ check_target:
unlockClosure((StgClosure*)m, &stg_IND_info);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
return THROWTO_SUCCESS;
}
@@ -400,48 +384,30 @@ check_target:
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
+ blockedThrowTo(cap,target,msg);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_BLOCKED;
} else {
removeThreadFromMVarQueue(cap, mvar, target);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
- unlockClosure((StgClosure *)mvar, info);
+ if (info == &stg_MVAR_CLEAN_info) {
+ dirty_MVAR(&cap->r,(StgClosure*)mvar);
+ }
+ unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info);
return THROWTO_SUCCESS;
}
}
case BlockedOnBlackHole:
{
- ACQUIRE_LOCK(&sched_mutex);
- // double checking the status after the memory barrier:
- if (target->why_blocked != BlockedOnBlackHole) {
- RELEASE_LOCK(&sched_mutex);
- goto retry;
- }
-
- if (target->flags & TSO_BLOCKEX) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
- RELEASE_LOCK(&sched_mutex);
- return THROWTO_BLOCKED; // caller releases lock
- } else {
- removeThreadFromQueue(cap, &blackhole_queue, target);
- raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
- RELEASE_LOCK(&sched_mutex);
- return THROWTO_SUCCESS;
- }
+ // Revoke the message by replacing it with IND. We're not
+ // locking anything here, so we might still get a TRY_WAKEUP
+ // message from the owner of the blackhole some time in the
+ // future, but that doesn't matter.
+ ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
+ OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
}
case BlockedOnSTM:
@@ -454,35 +420,19 @@ check_target:
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
+ blockedThrowTo(cap,target,msg);
unlockTSO(target);
return THROWTO_BLOCKED;
} else {
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
unlockTSO(target);
return THROWTO_SUCCESS;
}
case BlockedOnCCall:
case BlockedOnCCall_NoUnblockExc:
- {
- Capability *target_cap;
-
- target_cap = target->cap;
- if (target_cap != cap) {
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
- }
-
blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
- }
#ifndef THREADEDED_RTS
case BlockedOnRead:
@@ -515,9 +465,9 @@ throwToSendMsg (Capability *cap STG_UNUSED,
{
#ifdef THREADED_RTS
- debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
- sendMessage(target_cap, (Message*)msg);
+ sendMessage(cap, target_cap, (Message*)msg);
#endif
}
@@ -532,7 +482,7 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
ASSERT(target->cap == cap);
- msg->link = (Message*)target->blocked_exceptions;
+ msg->link = target->blocked_exceptions;
target->blocked_exceptions = msg;
dirty_TSO(cap,target); // we modified the blocked_exceptions queue
}
@@ -571,7 +521,7 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
(tso->flags & TSO_BLOCKEX) != 0) {
- debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
}
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
@@ -664,16 +614,18 @@ removeFromQueues(Capability *cap, StgTSO *tso)
case BlockedOnMVar:
removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
+ // we aren't doing a write barrier here: the MVar is supposed to
+ // be already locked, so replacing the info pointer would unlock it.
goto done;
case BlockedOnBlackHole:
- removeThreadFromQueue(cap, &blackhole_queue, tso);
+ // nothing to do
goto done;
case BlockedOnMsgWakeup:
{
// kill the message, atomically:
- tso->block_info.wakeup->header.info = &stg_IND_info;
+ OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info);
break;
}
@@ -725,7 +677,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
* asynchronous exception in an existing thread.
*
* We first remove the thread from any queue on which it might be
- * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
+ * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and
+ * TSO blocked_exception queues.
*
* We strip the stack down to the innermost CATCH_FRAME, building
* thunks in the heap for all the active computations, so they can
@@ -764,8 +717,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
StgClosure *updatee;
nat i;
- debugTrace(DEBUG_sched,
- "raising exception in thread %ld.", (long)tso->id);
+ debugTraceCap(DEBUG_sched, cap,
+ "raising exception in thread %ld.", (long)tso->id);
#if defined(PROFILING)
/*
@@ -784,6 +737,15 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
tso->what_next != ThreadKilled &&
tso->what_next != ThreadRelocated);
+ // only if we own this TSO (except that deleteThread() calls this
+ ASSERT(tso->cap == cap);
+
+ // wake it up
+ if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) {
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+ }
+
// mark it dirty; we're about to change its stack.
dirty_TSO(cap, tso);
@@ -871,7 +833,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
// Perform the update
// TODO: this may waste some work, if the thunk has
// already been updated by another thread.
- UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+ updateThunk(cap, tso,
+ ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
}
sp += sizeofW(StgUpdateFrame) - 1;
@@ -963,8 +926,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
{
StgTRecHeader *trec = tso -> trec;
StgTRecHeader *outer = trec -> enclosing_trec;
- debugTrace(DEBUG_stm,
- "found atomically block delivering async exception");
+ debugTraceCap(DEBUG_stm, cap,
+ "found atomically block delivering async exception");
stmAbortTransaction(cap, trec);
stmFreeAbortedTRec(cap, trec);
tso -> trec = outer;
diff --git a/rts/RetainerProfile.c b/rts/RetainerProfile.c
index ba4d1465c8..b5db15a7b6 100644
--- a/rts/RetainerProfile.c
+++ b/rts/RetainerProfile.c
@@ -453,8 +453,6 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child )
// no child, no SRT
case CONSTR_0_1:
case CONSTR_0_2:
- case CAF_BLACKHOLE:
- case BLACKHOLE:
case ARR_WORDS:
*first_child = NULL;
return;
@@ -470,6 +468,7 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child )
case IND_PERM:
case IND_OLDGEN_PERM:
case IND_OLDGEN:
+ case BLACKHOLE:
*first_child = ((StgInd *)c)->indirectee;
return;
case CONSTR_1_0:
@@ -916,8 +915,6 @@ pop( StgClosure **c, StgClosure **cp, retainer *r )
// no child (fixed), no SRT
case CONSTR_0_1:
case CONSTR_0_2:
- case CAF_BLACKHOLE:
- case BLACKHOLE:
case ARR_WORDS:
// one child (fixed), no SRT
case MUT_VAR_CLEAN:
@@ -1059,13 +1056,11 @@ isRetainer( StgClosure *c )
case FUN_0_2:
// partial applications
case PAP:
- // blackholes
- case CAF_BLACKHOLE:
- case BLACKHOLE:
// indirection
case IND_PERM:
case IND_OLDGEN_PERM:
case IND_OLDGEN:
+ case BLACKHOLE:
// static objects
case CONSTR_STATIC:
case FUN_STATIC:
diff --git a/rts/STM.c b/rts/STM.c
index be61538434..f98d201b07 100644
--- a/rts/STM.c
+++ b/rts/STM.c
@@ -377,7 +377,7 @@ static void unpark_tso(Capability *cap, StgTSO *tso) {
lockTSO(tso);
if (tso -> why_blocked == BlockedOnSTM) {
TRACE("unpark_tso on tso=%p", tso);
- unblockOne(cap,tso);
+ tryWakeupThread(cap,tso);
} else {
TRACE("spurious unpark_tso on tso=%p", tso);
}
diff --git a/rts/Schedule.c b/rts/Schedule.c
index f3982b148b..72f6d44a8c 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -39,6 +39,7 @@
#include "Threads.h"
#include "Timer.h"
#include "ThreadPaused.h"
+#include "Messages.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
@@ -66,17 +67,6 @@ StgTSO *blocked_queue_tl = NULL;
StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
#endif
-/* Threads blocked on blackholes.
- * LOCK: sched_mutex+capability, or all capabilities
- */
-StgTSO *blackhole_queue = NULL;
-
-/* The blackhole_queue should be checked for threads to wake up. See
- * Schedule.h for more thorough comment.
- * LOCK: none (doesn't matter if we miss an update)
- */
-rtsBool blackholes_need_checking = rtsFalse;
-
/* Set to true when the latest garbage collection failed to reclaim
* enough space, and the runtime should proceed to shut itself down in
* an orderly fashion (emitting profiling info etc.)
@@ -140,7 +130,6 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool);
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleProcessInbox(Capability *cap);
-static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
#if defined(THREADED_RTS)
@@ -159,8 +148,6 @@ static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
static Capability *scheduleDoGC(Capability *cap, Task *task,
rtsBool force_major);
-static rtsBool checkBlackHoles(Capability *cap);
-
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso);
@@ -433,9 +420,6 @@ run_thread:
startHeapProfTimer();
- // Check for exceptions blocked on this thread
- maybePerformBlockedException (cap, t);
-
// ----------------------------------------------------------------------
// Run the current thread
@@ -506,13 +490,6 @@ run_thread:
// happened. So find the new location:
t = cap->r.rCurrentTSO;
- // We have run some Haskell code: there might be blackhole-blocked
- // threads to wake up now.
- // Lock-free test here should be ok, we're just setting a flag.
- if ( blackhole_queue != END_TSO_QUEUE ) {
- blackholes_need_checking = rtsTrue;
- }
-
// And save the current errno in this thread.
// XXX: possibly bogus for SMP because this thread might already
// be running again, see code below.
@@ -612,12 +589,6 @@ scheduleFindWork (Capability *cap)
{
scheduleStartSignalHandlers(cap);
- // Only check the black holes here if we've nothing else to do.
- // During normal execution, the black hole list only gets checked
- // at GC time, to avoid repeatedly traversing this possibly long
- // list each time around the scheduler.
- if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
-
scheduleProcessInbox(cap);
scheduleCheckBlockedThreads(cap);
@@ -674,7 +645,6 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
!shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
!emptyInbox(cap) ||
- blackholes_need_checking ||
sched_state >= SCHED_INTERRUPTING))
return;
@@ -863,125 +833,11 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
//
if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
{
- awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
+ awaitEvent (emptyRunQueue(cap));
}
#endif
}
-
-/* ----------------------------------------------------------------------------
- * Check for threads woken up by other Capabilities
- * ------------------------------------------------------------------------- */
-
-#if defined(THREADED_RTS)
-static void
-executeMessage (Capability *cap, Message *m)
-{
- const StgInfoTable *i;
-
-loop:
- write_barrier(); // allow m->header to be modified by another thread
- i = m->header.info;
- if (i == &stg_MSG_WAKEUP_info)
- {
- MessageWakeup *w = (MessageWakeup *)m;
- StgTSO *tso = w->tso;
- debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
- (lnat)tso->id);
- ASSERT(tso->cap == cap);
- ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
- ASSERT(tso->block_info.closure == (StgClosure *)m);
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap, tso);
- }
- else if (i == &stg_MSG_THROWTO_info)
- {
- MessageThrowTo *t = (MessageThrowTo *)m;
- nat r;
- const StgInfoTable *i;
-
- i = lockClosure((StgClosure*)m);
- if (i != &stg_MSG_THROWTO_info) {
- unlockClosure((StgClosure*)m, i);
- goto loop;
- }
-
- debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
- (lnat)t->source->id, (lnat)t->target->id);
-
- ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
- ASSERT(t->source->block_info.closure == (StgClosure *)m);
-
- r = throwToMsg(cap, t);
-
- switch (r) {
- case THROWTO_SUCCESS:
- ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
- t->source->sp += 3;
- unblockOne(cap, t->source);
- // this message is done
- unlockClosure((StgClosure*)m, &stg_IND_info);
- break;
- case THROWTO_BLOCKED:
- // unlock the message
- unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
- break;
- }
- }
- else if (i == &stg_IND_info)
- {
- // message was revoked
- return;
- }
- else if (i == &stg_WHITEHOLE_info)
- {
- goto loop;
- }
- else
- {
- barf("executeMessage: %p", i);
- }
-}
-#endif
-
-static void
-scheduleProcessInbox (Capability *cap USED_IF_THREADS)
-{
-#if defined(THREADED_RTS)
- Message *m;
-
- while (!emptyInbox(cap)) {
- ACQUIRE_LOCK(&cap->lock);
- m = cap->inbox;
- cap->inbox = m->link;
- RELEASE_LOCK(&cap->lock);
- executeMessage(cap, (Message *)m);
- }
-#endif
-}
-
-/* ----------------------------------------------------------------------------
- * Check for threads blocked on BLACKHOLEs that can be woken up
- * ------------------------------------------------------------------------- */
-static void
-scheduleCheckBlackHoles (Capability *cap)
-{
- if ( blackholes_need_checking ) // check without the lock first
- {
- ACQUIRE_LOCK(&sched_mutex);
- if ( blackholes_need_checking ) {
- blackholes_need_checking = rtsFalse;
- // important that we reset the flag *before* checking the
- // blackhole queue, otherwise we could get deadlock. This
- // happens as follows: we wake up a thread that
- // immediately runs on another Capability, blocks on a
- // blackhole, and then we reset the blackholes_need_checking flag.
- checkBlackHoles(cap);
- }
- RELEASE_LOCK(&sched_mutex);
- }
-}
-
/* ----------------------------------------------------------------------------
* Detect deadlock conditions and attempt to resolve them.
* ------------------------------------------------------------------------- */
@@ -1090,6 +946,26 @@ scheduleSendPendingMessages(void)
#endif
/* ----------------------------------------------------------------------------
+ * Process message in the current Capability's inbox
+ * ------------------------------------------------------------------------- */
+
+static void
+scheduleProcessInbox (Capability *cap USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ Message *m;
+
+ while (!emptyInbox(cap)) {
+ ACQUIRE_LOCK(&cap->lock);
+ m = cap->inbox;
+ cap->inbox = m->link;
+ RELEASE_LOCK(&cap->lock);
+ executeMessage(cap, (Message *)m);
+ }
+#endif
+}
+
+/* ----------------------------------------------------------------------------
* Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
* ------------------------------------------------------------------------- */
@@ -1499,9 +1375,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
}
- // do this while the other Capabilities stop:
- if (cap) scheduleCheckBlackHoles(cap);
-
if (gc_type == PENDING_GC_SEQ)
{
// single-threaded GC: grab all the capabilities
@@ -1529,11 +1402,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
waitForGcThreads(cap);
}
-#else /* !THREADED_RTS */
-
- // do this while the other Capabilities stop:
- if (cap) scheduleCheckBlackHoles(cap);
-
#endif
IF_DEBUG(scheduler, printAllThreads());
@@ -2093,8 +1961,6 @@ initScheduler(void)
sleeping_queue = END_TSO_QUEUE;
#endif
- blackhole_queue = END_TSO_QUEUE;
-
sched_state = SCHED_RUNNING;
recent_activity = ACTIVITY_YES;
@@ -2347,8 +2213,9 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
* of the stack, so we don't attempt to scavenge any part of the
* dead TSO's stack.
*/
- tso->what_next = ThreadRelocated;
setTSOLink(cap,tso,dest);
+ write_barrier(); // other threads seeing ThreadRelocated will look at _link
+ tso->what_next = ThreadRelocated;
tso->sp = (P_)&(tso->stack[tso->stack_size]);
tso->why_blocked = NotBlocked;
@@ -2405,8 +2272,9 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
(long)tso->id, tso_size_w, tso_sizeW(new_tso));
- tso->what_next = ThreadRelocated;
tso->_link = new_tso; // no write barrier reqd: same generation
+ write_barrier(); // other threads seeing ThreadRelocated will look at _link
+ tso->what_next = ThreadRelocated;
// The TSO attached to this Task may have moved, so update the
// pointer to it.
@@ -2458,57 +2326,6 @@ void wakeUpRts(void)
#endif
/* -----------------------------------------------------------------------------
- * checkBlackHoles()
- *
- * Check the blackhole_queue for threads that can be woken up. We do
- * this periodically: before every GC, and whenever the run queue is
- * empty.
- *
- * An elegant solution might be to just wake up all the blocked
- * threads with awakenBlockedQueue occasionally: they'll go back to
- * sleep again if the object is still a BLACKHOLE. Unfortunately this
- * doesn't give us a way to tell whether we've actually managed to
- * wake up any threads, so we would be busy-waiting.
- *
- * -------------------------------------------------------------------------- */
-
-static rtsBool
-checkBlackHoles (Capability *cap)
-{
- StgTSO **prev, *t;
- rtsBool any_woke_up = rtsFalse;
- StgHalfWord type;
-
- // blackhole_queue is global:
- ASSERT_LOCK_HELD(&sched_mutex);
-
- debugTrace(DEBUG_sched, "checking threads blocked on black holes");
-
- // ASSUMES: sched_mutex
- prev = &blackhole_queue;
- t = blackhole_queue;
- while (t != END_TSO_QUEUE) {
- if (t->what_next == ThreadRelocated) {
- t = t->_link;
- continue;
- }
- ASSERT(t->why_blocked == BlockedOnBlackHole);
- type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
- if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
- IF_DEBUG(sanity,checkTSO(t));
- t = unblockOne(cap, t);
- *prev = t;
- any_woke_up = rtsTrue;
- } else {
- prev = &t->_link;
- t = t->_link;
- }
- }
-
- return any_woke_up;
-}
-
-/* -----------------------------------------------------------------------------
Deleting threads
This is used for interruption (^C) and forking, and corresponds to
@@ -2517,7 +2334,7 @@ checkBlackHoles (Capability *cap)
-------------------------------------------------------------------------- */
static void
-deleteThread (Capability *cap, StgTSO *tso)
+deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
{
// NOTE: must only be called on a TSO that we have exclusive
// access to, because we will call throwToSingleThreaded() below.
@@ -2526,7 +2343,7 @@ deleteThread (Capability *cap, StgTSO *tso)
if (tso->why_blocked != BlockedOnCCall &&
tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- throwToSingleThreaded(cap,tso,NULL);
+ throwToSingleThreaded(tso->cap,tso,NULL);
}
}
@@ -2599,8 +2416,8 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
SET_HDR(raise_closure, &stg_raise_info, CCCS);
raise_closure->payload[0] = exception;
}
- UPD_IND(cap, ((StgUpdateFrame *)p)->updatee,
- (StgClosure *)raise_closure);
+ updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
+ (StgClosure *)raise_closure);
p = next;
continue;
diff --git a/rts/Schedule.h b/rts/Schedule.h
index 2412285d29..0db2b1ed84 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -86,15 +86,6 @@ extern StgTSO *blocked_queue_hd, *blocked_queue_tl;
extern StgTSO *sleeping_queue;
#endif
-/* Set to rtsTrue if there are threads on the blackhole_queue, and
- * it is possible that one or more of them may be available to run.
- * This flag is set to rtsFalse after we've checked the queue, and
- * set to rtsTrue just before we run some Haskell code. It is used
- * to decide whether we should yield the Capability or not.
- * Locks required : none (see scheduleCheckBlackHoles()).
- */
-extern rtsBool blackholes_need_checking;
-
extern rtsBool heap_overflow;
#if defined(THREADED_RTS)
diff --git a/rts/StgMiscClosures.cmm b/rts/StgMiscClosures.cmm
index f111875760..830bde5665 100644
--- a/rts/StgMiscClosures.cmm
+++ b/rts/StgMiscClosures.cmm
@@ -283,96 +283,105 @@ INFO_TABLE(stg_IND_OLDGEN_PERM,1,0,IND_OLDGEN_PERM,"IND_OLDGEN_PERM","IND_OLDGEN
waiting for the evaluation of the closure to finish.
------------------------------------------------------------------------- */
-/* Note: a BLACKHOLE must be big enough to be
- * overwritten with an indirection/evacuee/catch. Thus we claim it
- * has 1 non-pointer word of payload.
- */
-INFO_TABLE(stg_BLACKHOLE,0,1,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
+INFO_TABLE(stg_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
{
- TICK_ENT_BH();
-
-#ifdef THREADED_RTS
- // foreign "C" debugBelch("BLACKHOLE entry\n");
-#endif
-
- /* Actually this is not necessary because R1 is about to be destroyed. */
- LDV_ENTER(R1);
+ W_ r, p, info, bq, msg, owner, bd;
-#if defined(THREADED_RTS)
- ACQUIRE_LOCK(sched_mutex "ptr");
- // released in stg_block_blackhole_finally
-#endif
-
- /* Put ourselves on the blackhole queue */
- StgTSO__link(CurrentTSO) = W_[blackhole_queue];
- W_[blackhole_queue] = CurrentTSO;
-
- /* jot down why and on what closure we are blocked */
- StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
- StgTSO_block_info(CurrentTSO) = R1;
+ TICK_ENT_DYN_IND(); /* tick */
- jump stg_block_blackhole;
+retry:
+ p = StgInd_indirectee(R1);
+ if (GETTAG(p) != 0) {
+ R1 = p;
+ jump %ENTRY_CODE(Sp(0));
+ }
+
+ info = StgHeader_info(p);
+ if (info == stg_IND_info) {
+ // This could happen, if e.g. we got a BLOCKING_QUEUE that has
+ // just been replaced with an IND by another thread in
+ // wakeBlockingQueue().
+ goto retry;
+ }
+
+ if (info == stg_TSO_info ||
+ info == stg_BLOCKING_QUEUE_CLEAN_info ||
+ info == stg_BLOCKING_QUEUE_DIRTY_info)
+ {
+ ("ptr" msg) = foreign "C" allocate(MyCapability() "ptr",
+ BYTES_TO_WDS(SIZEOF_MessageBlackHole)) [R1];
+
+ StgHeader_info(msg) = stg_MSG_BLACKHOLE_info;
+ MessageBlackHole_tso(msg) = CurrentTSO;
+ MessageBlackHole_bh(msg) = R1;
+
+ (r) = foreign "C" messageBlackHole(MyCapability() "ptr", msg "ptr") [R1];
+
+ if (r == 0) {
+ goto retry;
+ } else {
+ StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
+ StgTSO_block_info(CurrentTSO) = msg;
+ jump stg_block_blackhole;
+ }
+ }
+ else
+ {
+ R1 = p;
+ ENTER();
+ }
}
-/* identical to BLACKHOLEs except for the infotag */
-INFO_TABLE(stg_CAF_BLACKHOLE,0,1,CAF_BLACKHOLE,"CAF_BLACKHOLE","CAF_BLACKHOLE")
+INFO_TABLE(__stg_EAGER_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
{
- TICK_ENT_BH();
- LDV_ENTER(R1);
-
-#if defined(THREADED_RTS)
- // foreign "C" debugBelch("BLACKHOLE entry\n");
-#endif
-
-#if defined(THREADED_RTS)
- ACQUIRE_LOCK(sched_mutex "ptr");
- // released in stg_block_blackhole_finally
-#endif
-
- /* Put ourselves on the blackhole queue */
- StgTSO__link(CurrentTSO) = W_[blackhole_queue];
- W_[blackhole_queue] = CurrentTSO;
-
- /* jot down why and on what closure we are blocked */
- StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
- StgTSO_block_info(CurrentTSO) = R1;
-
- jump stg_block_blackhole;
+ jump ENTRY_LBL(stg_BLACKHOLE);
}
-INFO_TABLE(__stg_EAGER_BLACKHOLE,0,1,BLACKHOLE,"EAGER_BLACKHOLE","EAGER_BLACKHOLE")
+// CAF_BLACKHOLE is allocated when entering a CAF. The reason it is
+// distinct from BLACKHOLE is so that we can tell the difference
+// between an update frame on the stack that points to a CAF under
+// evaluation, and one that points to a closure that is under
+// evaluation by another thread (a BLACKHOLE). See threadPaused().
+//
+INFO_TABLE(stg_CAF_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
{
- TICK_ENT_BH();
-
-#ifdef THREADED_RTS
- // foreign "C" debugBelch("BLACKHOLE entry\n");
-#endif
-
- /* Actually this is not necessary because R1 is about to be destroyed. */
- LDV_ENTER(R1);
-
-#if defined(THREADED_RTS)
- ACQUIRE_LOCK(sched_mutex "ptr");
- // released in stg_block_blackhole_finally
-#endif
-
- /* Put ourselves on the blackhole queue */
- StgTSO__link(CurrentTSO) = W_[blackhole_queue];
- W_[blackhole_queue] = CurrentTSO;
+ jump ENTRY_LBL(stg_BLACKHOLE);
+}
- /* jot down why and on what closure we are blocked */
- StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
- StgTSO_block_info(CurrentTSO) = R1;
+INFO_TABLE(stg_BLOCKING_QUEUE_CLEAN,4,0,BLOCKING_QUEUE,"BLOCKING_QUEUE","BLOCKING_QUEUE")
+{ foreign "C" barf("BLOCKING_QUEUE_CLEAN object entered!") never returns; }
+
- jump stg_block_blackhole;
-}
+INFO_TABLE(stg_BLOCKING_QUEUE_DIRTY,4,0,BLOCKING_QUEUE,"BLOCKING_QUEUE","BLOCKING_QUEUE")
+{ foreign "C" barf("BLOCKING_QUEUE_DIRTY object entered!") never returns; }
+
/* ----------------------------------------------------------------------------
Whiteholes are used for the "locked" state of a closure (see lockClosure())
------------------------------------------------------------------------- */
INFO_TABLE(stg_WHITEHOLE, 0,0, WHITEHOLE, "WHITEHOLE", "WHITEHOLE")
-{ foreign "C" barf("WHITEHOLE object entered!") never returns; }
+{
+#if defined(THREADED_RTS)
+ W_ info, i;
+
+ i = 0;
+loop:
+ // spin until the WHITEHOLE is updated
+ info = StgHeader_info(R1);
+ if (info == stg_WHITEHOLE_info) {
+ i = i + 1;
+ if (i == SPIN_COUNT) {
+ i = 0;
+ foreign "C" yieldThread() [R1];
+ }
+ goto loop;
+ }
+ jump %ENTRY_CODE(info);
+#else
+ foreign "C" barf("WHITEHOLE object entered!") never returns;
+#endif
+}
/* ----------------------------------------------------------------------------
Some static info tables for things that don't get entered, and
@@ -485,9 +494,15 @@ CLOSURE(stg_NO_TREC_closure,stg_NO_TREC);
INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP")
{ foreign "C" barf("MSG_WAKEUP object entered!") never returns; }
+INFO_TABLE_CONSTR(stg_MSG_TRY_WAKEUP,2,0,0,PRIM,"MSG_TRY_WAKEUP","MSG_TRY_WAKEUP")
+{ foreign "C" barf("MSG_TRY_WAKEUP object entered!") never returns; }
+
INFO_TABLE_CONSTR(stg_MSG_THROWTO,4,0,0,PRIM,"MSG_THROWTO","MSG_THROWTO")
{ foreign "C" barf("MSG_THROWTO object entered!") never returns; }
+INFO_TABLE_CONSTR(stg_MSG_BLACKHOLE,3,0,0,PRIM,"MSG_BLACKHOLE","MSG_BLACKHOLE")
+{ foreign "C" barf("MSG_BLACKHOLE object entered!") never returns; }
+
/* ----------------------------------------------------------------------------
END_TSO_QUEUE
diff --git a/rts/ThreadPaused.c b/rts/ThreadPaused.c
index 75712b04d6..7aee59dcd5 100644
--- a/rts/ThreadPaused.c
+++ b/rts/ThreadPaused.c
@@ -14,6 +14,7 @@
#include "Updates.h"
#include "RaiseAsync.h"
#include "Trace.h"
+#include "Threads.h"
#include <string.h> // for memmove()
@@ -75,7 +76,7 @@ stackSqueeze(Capability *cap, StgTSO *tso, StgPtr bottom)
* screw us up if we don't check.
*/
if (upd->updatee != updatee && !closure_IND(upd->updatee)) {
- UPD_IND(cap, upd->updatee, updatee);
+ updateThunk(cap, tso, upd->updatee, updatee);
}
// now mark this update frame as a stack gap. The gap
@@ -196,7 +197,7 @@ threadPaused(Capability *cap, StgTSO *tso)
maybePerformBlockedException (cap, tso);
if (tso->what_next == ThreadKilled) { return; }
- // NB. Blackholing is *not* optional, we must either do lazy
+ // NB. Blackholing is *compulsory*, we must either do lazy
// blackholing, or eager blackholing consistently. See Note
// [upd-black-hole] in sm/Scav.c.
@@ -229,8 +230,9 @@ threadPaused(Capability *cap, StgTSO *tso)
#ifdef THREADED_RTS
retry:
#endif
- if (closure_flags[INFO_PTR_TO_STRUCT(bh_info)->type] & _IND
- || bh_info == &stg_BLACKHOLE_info) {
+ if (bh_info == &stg_BLACKHOLE_info ||
+ bh_info == &stg_WHITEHOLE_info)
+ {
debugTrace(DEBUG_squeeze,
"suspending duplicate work: %ld words of stack",
(long)((StgPtr)frame - tso->sp));
@@ -245,6 +247,7 @@ threadPaused(Capability *cap, StgTSO *tso)
// the value to the frame underneath:
tso->sp = (StgPtr)frame + sizeofW(StgUpdateFrame) - 2;
tso->sp[1] = (StgWord)bh;
+ ASSERT(bh->header.info != &stg_TSO_info);
tso->sp[0] = (W_)&stg_enter_info;
// And continue with threadPaused; there might be
@@ -254,33 +257,40 @@ threadPaused(Capability *cap, StgTSO *tso)
continue;
}
- if (bh->header.info != &stg_CAF_BLACKHOLE_info) {
- // zero out the slop so that the sanity checker can tell
- // where the next closure is.
- DEBUG_FILL_SLOP(bh);
-#ifdef PROFILING
- // @LDV profiling
- // We pretend that bh is now dead.
- LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh);
-#endif
- // an EAGER_BLACKHOLE gets turned into a BLACKHOLE here.
+ // zero out the slop so that the sanity checker can tell
+ // where the next closure is.
+ DEBUG_FILL_SLOP(bh);
+
+ // @LDV profiling
+ // We pretend that bh is now dead.
+ LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)bh);
+
+ // an EAGER_BLACKHOLE or CAF_BLACKHOLE gets turned into a
+ // BLACKHOLE here.
#ifdef THREADED_RTS
- cur_bh_info = (const StgInfoTable *)
- cas((StgVolatilePtr)&bh->header.info,
- (StgWord)bh_info,
- (StgWord)&stg_BLACKHOLE_info);
-
- if (cur_bh_info != bh_info) {
- bh_info = cur_bh_info;
- goto retry;
- }
-#else
- SET_INFO(bh,&stg_BLACKHOLE_info);
+ // first we turn it into a WHITEHOLE to claim it, and if
+ // successful we write our TSO and then the BLACKHOLE info pointer.
+ cur_bh_info = (const StgInfoTable *)
+ cas((StgVolatilePtr)&bh->header.info,
+ (StgWord)bh_info,
+ (StgWord)&stg_WHITEHOLE_info);
+
+ if (cur_bh_info != bh_info) {
+ bh_info = cur_bh_info;
+ goto retry;
+ }
#endif
- // We pretend that bh has just been created.
- LDV_RECORD_CREATE(bh);
- }
+ // The payload of the BLACKHOLE points to the TSO
+ ((StgInd *)bh)->indirectee = (StgClosure *)tso;
+ write_barrier();
+ SET_INFO(bh,&stg_BLACKHOLE_info);
+
+ // .. and we need a write barrier, since we just mutated the closure:
+ recordClosureMutated(cap,bh);
+
+ // We pretend that bh has just been created.
+ LDV_RECORD_CREATE(bh);
frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
if (prev_was_update_frame) {
diff --git a/rts/Threads.c b/rts/Threads.c
index f824d021d4..0c3e591665 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -9,11 +9,16 @@
#include "PosixSource.h"
#include "Rts.h"
+#include "Capability.h"
+#include "Updates.h"
#include "Threads.h"
#include "STM.h"
#include "Schedule.h"
#include "Trace.h"
#include "ThreadLabels.h"
+#include "Updates.h"
+#include "Messages.h"
+#include "sm/Storage.h"
/* Next thread ID to allocate.
* LOCK: sched_mutex
@@ -74,7 +79,9 @@ createThread(Capability *cap, nat size)
tso->what_next = ThreadRunGHC;
tso->why_blocked = NotBlocked;
+ tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
+ tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
tso->flags = 0;
tso->dirty = 1;
@@ -146,7 +153,7 @@ rts_getThreadId(StgPtr tso)
Fails fatally if the TSO is not on the queue.
-------------------------------------------------------------------------- */
-void
+rtsBool // returns True if we modified queue
removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
{
StgTSO *t, *prev;
@@ -156,28 +163,32 @@ removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
if (t == tso) {
if (prev) {
setTSOLink(cap,prev,t->_link);
+ return rtsFalse;
} else {
*queue = t->_link;
+ return rtsTrue;
}
- return;
}
}
barf("removeThreadFromQueue: not found");
}
-void
+rtsBool // returns True if we modified head or tail
removeThreadFromDeQueue (Capability *cap,
StgTSO **head, StgTSO **tail, StgTSO *tso)
{
StgTSO *t, *prev;
+ rtsBool flag = rtsFalse;
prev = NULL;
for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
if (t == tso) {
if (prev) {
setTSOLink(cap,prev,t->_link);
+ flag = rtsFalse;
} else {
*head = t->_link;
+ flag = rtsTrue;
}
if (*tail == tso) {
if (prev) {
@@ -185,8 +196,10 @@ removeThreadFromDeQueue (Capability *cap,
} else {
*tail = END_TSO_QUEUE;
}
- }
- return;
+ return rtsTrue;
+ } else {
+ return flag;
+ }
}
}
barf("removeThreadFromMVarQueue: not found");
@@ -195,7 +208,10 @@ removeThreadFromDeQueue (Capability *cap,
void
removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
{
+ // caller must do the write barrier, because replacing the info
+ // pointer will unlock the MVar.
removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
+ tso->_link = END_TSO_QUEUE;
}
/* ----------------------------------------------------------------------------
@@ -263,6 +279,38 @@ unblockOne_ (Capability *cap, StgTSO *tso,
return next;
}
+void
+tryWakeupThread (Capability *cap, StgTSO *tso)
+{
+#ifdef THREADED_RTS
+ if (tso->cap != cap)
+ {
+ MessageWakeup *msg;
+ msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
+ SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
+ msg->tso = tso;
+ sendMessage(cap, tso->cap, (Message*)msg);
+ return;
+ }
+#endif
+
+ switch (tso->why_blocked)
+ {
+ case BlockedOnBlackHole:
+ case BlockedOnSTM:
+ {
+ // just run the thread now, if the BH is not really available,
+ // we'll block again.
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+ break;
+ }
+ default:
+ // otherwise, do nothing
+ break;
+ }
+}
+
/* ----------------------------------------------------------------------------
awakenBlockedQueue
@@ -270,13 +318,160 @@ unblockOne_ (Capability *cap, StgTSO *tso,
------------------------------------------------------------------------- */
void
-awakenBlockedQueue(Capability *cap, StgTSO *tso)
+wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
{
- while (tso != END_TSO_QUEUE) {
- tso = unblockOne(cap,tso);
+ MessageBlackHole *msg;
+ const StgInfoTable *i;
+
+ ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
+ bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
+
+ for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
+ msg = msg->link) {
+ i = msg->header.info;
+ if (i != &stg_IND_info) {
+ ASSERT(i == &stg_MSG_BLACKHOLE_info);
+ tryWakeupThread(cap,msg->tso);
+ }
+ }
+
+ // overwrite the BQ with an indirection so it will be
+ // collected at the next GC.
+#if defined(DEBUG) && !defined(THREADED_RTS)
+ // XXX FILL_SLOP, but not if THREADED_RTS because in that case
+ // another thread might be looking at this BLOCKING_QUEUE and
+ // checking the owner field at the same time.
+ bq->bh = 0; bq->queue = 0; bq->owner = 0;
+#endif
+ OVERWRITE_INFO(bq, &stg_IND_info);
+}
+
+// If we update a closure that we know we BLACKHOLE'd, and the closure
+// no longer points to the current TSO as its owner, then there may be
+// an orphaned BLOCKING_QUEUE closure with blocked threads attached to
+// it. We therefore traverse the BLOCKING_QUEUEs attached to the
+// current TSO to see if any can now be woken up.
+void
+checkBlockingQueues (Capability *cap, StgTSO *tso)
+{
+ StgBlockingQueue *bq, *next;
+ StgClosure *p;
+
+ debugTraceCap(DEBUG_sched, cap,
+ "collision occurred; checking blocking queues for thread %ld",
+ (lnat)tso->id);
+
+ for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
+ next = bq->link;
+
+ if (bq->header.info == &stg_IND_info) {
+ // ToDo: could short it out right here, to avoid
+ // traversing this IND multiple times.
+ continue;
+ }
+
+ p = bq->bh;
+
+ if (p->header.info != &stg_BLACKHOLE_info ||
+ ((StgInd *)p)->indirectee != (StgClosure*)bq)
+ {
+ wakeBlockingQueue(cap,bq);
+ }
}
}
+/* ----------------------------------------------------------------------------
+ updateThunk
+
+ Update a thunk with a value. In order to do this, we need to know
+ which TSO owns (or is evaluating) the thunk, in case we need to
+ awaken any threads that are blocked on it.
+ ------------------------------------------------------------------------- */
+
+void
+updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
+{
+ StgClosure *v;
+ StgTSO *owner;
+ const StgInfoTable *i;
+
+ i = thunk->header.info;
+ if (i != &stg_BLACKHOLE_info &&
+ i != &stg_CAF_BLACKHOLE_info &&
+ i != &stg_WHITEHOLE_info) {
+ updateWithIndirection(cap, thunk, val);
+ return;
+ }
+
+ v = ((StgInd*)thunk)->indirectee;
+
+ updateWithIndirection(cap, thunk, val);
+
+ i = v->header.info;
+ if (i == &stg_TSO_info) {
+ owner = deRefTSO((StgTSO*)v);
+ if (owner != tso) {
+ checkBlockingQueues(cap, tso);
+ }
+ return;
+ }
+
+ if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
+ i != &stg_BLOCKING_QUEUE_DIRTY_info) {
+ checkBlockingQueues(cap, tso);
+ return;
+ }
+
+ owner = deRefTSO(((StgBlockingQueue*)v)->owner);
+
+ if (owner != tso) {
+ checkBlockingQueues(cap, tso);
+ } else {
+ wakeBlockingQueue(cap, (StgBlockingQueue*)v);
+ }
+}
+
+/* ----------------------------------------------------------------------------
+ * Wake up a thread on a Capability.
+ *
+ * This is used when the current Task is running on a Capability and
+ * wishes to wake up a thread on a different Capability.
+ * ------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void
+wakeupThreadOnCapability (Capability *cap,
+ Capability *other_cap,
+ StgTSO *tso)
+{
+ MessageWakeup *msg;
+
+ // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
+ if (tso->bound) {
+ ASSERT(tso->bound->task->cap == tso->cap);
+ tso->bound->task->cap = other_cap;
+ }
+ tso->cap = other_cap;
+
+ ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
+ tso->block_info.closure->header.info == &stg_IND_info);
+
+ ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
+
+ msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
+ SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
+ msg->tso = tso;
+ tso->block_info.closure = (StgClosure *)msg;
+ dirty_TSO(cap, tso);
+ write_barrier();
+ tso->why_blocked = BlockedOnMsgWakeup;
+
+ sendMessage(cap, other_cap, (Message*)msg);
+}
+
+#endif /* THREADED_RTS */
+
/* ---------------------------------------------------------------------------
* rtsSupportsBoundThreads(): is the RTS built to support bound threads?
* used by Control.Concurrent for error checking.
@@ -332,7 +527,8 @@ printThreadBlockage(StgTSO *tso)
debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
case BlockedOnBlackHole:
- debugBelch("is blocked on a black hole");
+ debugBelch("is blocked on a black hole %p",
+ ((StgBlockingQueue*)tso->block_info.bh->bh));
break;
case BlockedOnMsgWakeup:
debugBelch("is blocked on a wakeup message");
diff --git a/rts/Threads.h b/rts/Threads.h
index dfe879e7bb..000cf1b8e2 100644
--- a/rts/Threads.h
+++ b/rts/Threads.h
@@ -16,11 +16,26 @@ BEGIN_RTS_PRIVATE
StgTSO * unblockOne (Capability *cap, StgTSO *tso);
StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate);
-void awakenBlockedQueue (Capability *cap, StgTSO *tso);
+void checkBlockingQueues (Capability *cap, StgTSO *tso);
+void wakeBlockingQueue (Capability *cap, StgBlockingQueue *bq);
+void tryWakeupThread (Capability *cap, StgTSO *tso);
+
+// Wakes up a thread on a Capability (probably a different Capability
+// from the one held by the current Task).
+//
+#ifdef THREADED_RTS
+void wakeupThreadOnCapability (Capability *cap,
+ Capability *other_cap,
+ StgTSO *tso);
+#endif
+
+void updateThunk (Capability *cap, StgTSO *tso,
+ StgClosure *thunk, StgClosure *val);
void removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso);
-void removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso);
-void removeThreadFromDeQueue (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso);
+
+rtsBool removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso);
+rtsBool removeThreadFromDeQueue (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso);
StgBool isThreadBound (StgTSO* tso);
diff --git a/rts/Timer.c b/rts/Timer.c
index a5d42fbc9d..dddc75414d 100644
--- a/rts/Timer.c
+++ b/rts/Timer.c
@@ -76,8 +76,6 @@ handle_tick(int unused STG_UNUSED)
ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTime /
RtsFlags.MiscFlags.tickInterval;
recent_activity = ACTIVITY_INACTIVE;
- blackholes_need_checking = rtsTrue;
- /* hack: re-use the blackholes_need_checking flag */
wakeUpRts();
}
}
diff --git a/rts/Updates.cmm b/rts/Updates.cmm
index e0fd7c30d5..7af59657c1 100644
--- a/rts/Updates.cmm
+++ b/rts/Updates.cmm
@@ -16,10 +16,11 @@
#include "Updates.h"
-/* on entry to the update code
- (1) R1 points to the closure being returned
- (2) Sp points to the update frame
-*/
+#if defined(PROFILING)
+#define UPD_FRAME_PARAMS W_ unused1, W_ unused2, P_ unused3
+#else
+#define UPD_FRAME_PARAMS P_ unused1
+#endif
/* The update fragment has been tuned so as to generate good
code with gcc, which accounts for some of the strangeness in the
@@ -30,38 +31,69 @@
code), since we don't mind duplicating this jump.
*/
-#define UPD_FRAME_ENTRY_TEMPLATE \
- { \
- W_ updatee; \
- \
- updatee = StgUpdateFrame_updatee(Sp); \
- \
- /* remove the update frame from the stack */ \
- Sp = Sp + SIZEOF_StgUpdateFrame; \
- \
- /* ToDo: it might be a PAP, so we should check... */ \
- TICK_UPD_CON_IN_NEW(sizeW_fromITBL(%GET_STD_INFO(updatee))); \
- \
- updateWithIndirection(stg_IND_direct_info, \
- updatee, \
- R1, \
- jump %ENTRY_CODE(Sp(0))); \
- }
-
-#if defined(PROFILING)
-#define UPD_FRAME_PARAMS W_ unused1, W_ unused2, P_ unused3
-#else
-#define UPD_FRAME_PARAMS P_ unused1
-#endif
-
-/* this bitmap indicates that the first word of an update frame is a
- * non-pointer - this is the update frame link. (for profiling,
- * there's a cost-centre-stack in there too).
- */
+/* on entry to the update code
+ (1) R1 points to the closure being returned
+ (2) Sp points to the update frame
+*/
INFO_TABLE_RET( stg_upd_frame, UPDATE_FRAME, UPD_FRAME_PARAMS)
-UPD_FRAME_ENTRY_TEMPLATE
+{
+ W_ updatee;
+
+ updatee = StgUpdateFrame_updatee(Sp);
+
+ /* remove the update frame from the stack */
+ Sp = Sp + SIZEOF_StgUpdateFrame;
+
+ /* ToDo: it might be a PAP, so we should check... */
+ TICK_UPD_CON_IN_NEW(sizeW_fromITBL(%GET_STD_INFO(updatee)));
+
+ updateWithIndirection(updatee,
+ R1,
+ jump %ENTRY_CODE(Sp(0)));
+}
INFO_TABLE_RET( stg_marked_upd_frame, UPDATE_FRAME, UPD_FRAME_PARAMS)
-UPD_FRAME_ENTRY_TEMPLATE
+{
+ W_ updatee, v, i, tso, link;
+
+ // we know the closure is a BLACKHOLE
+ updatee = StgUpdateFrame_updatee(Sp);
+ v = StgInd_indirectee(updatee);
+
+ // remove the update frame from the stack
+ Sp = Sp + SIZEOF_StgUpdateFrame;
+
+ if (GETTAG(v) != 0) {
+ // updated by someone else: discard our value and use the
+ // other one to increase sharing, but check the blocking
+ // queues to see if any threads were waiting on this BLACKHOLE.
+ R1 = v;
+ foreign "C" checkBlockingQueues(MyCapability() "ptr",
+ CurrentTSO "ptr") [R1];
+ jump %ENTRY_CODE(Sp(0));
+ }
+
+ // common case: it is still our BLACKHOLE
+ if (v == CurrentTSO) {
+ updateWithIndirection(updatee,
+ R1,
+ jump %ENTRY_CODE(Sp(0)));
+ }
+
+ // The other cases are all handled by the generic code
+ foreign "C" updateThunk (MyCapability() "ptr", CurrentTSO "ptr",
+ updatee "ptr", R1 "ptr") [R1];
+
+ jump %ENTRY_CODE(Sp(0));
+}
+
+// Special update frame code for CAFs and eager-blackholed thunks: it
+// knows how to update blackholes, but is distinct from
+// stg_marked_upd_frame so that lazy blackholing won't treat it as the
+// high watermark.
+INFO_TABLE_RET (stg_bh_upd_frame, UPDATE_FRAME, UPD_FRAME_PARAMS)
+{
+ jump stg_marked_upd_frame_info;
+}
diff --git a/rts/Updates.h b/rts/Updates.h
index 2b3c35df0c..de9276c909 100644
--- a/rts/Updates.h
+++ b/rts/Updates.h
@@ -48,8 +48,7 @@ BEGIN_RTS_PRIVATE
W_ sz; \
W_ i; \
inf = %GET_STD_INFO(p); \
- if (%INFO_TYPE(inf) != HALF_W_(BLACKHOLE) \
- && %INFO_TYPE(inf) != HALF_W_(CAF_BLACKHOLE)) { \
+ if (%INFO_TYPE(inf) != HALF_W_(BLACKHOLE)) { \
if (%INFO_TYPE(inf) == HALF_W_(THUNK_SELECTOR)) { \
sz = BYTES_TO_WDS(SIZEOF_StgSelector_NoThunkHdr); \
} else { \
@@ -82,7 +81,6 @@ FILL_SLOP(StgClosure *p)
switch (inf->type) {
case BLACKHOLE:
- case CAF_BLACKHOLE:
goto no_slop;
// we already filled in the slop when we overwrote the thunk
// with BLACKHOLE, and also an evacuated BLACKHOLE is only the
@@ -127,23 +125,21 @@ no_slop:
*/
#ifdef CMINUSMINUS
-#define updateWithIndirection(ind_info, p1, p2, and_then) \
+#define updateWithIndirection(p1, p2, and_then) \
W_ bd; \
\
DEBUG_FILL_SLOP(p1); \
LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC(p1); \
StgInd_indirectee(p1) = p2; \
prim %write_barrier() []; \
+ SET_INFO(p1, stg_BLACKHOLE_info); \
+ LDV_RECORD_CREATE(p1); \
bd = Bdescr(p1); \
if (bdescr_gen_no(bd) != 0 :: bits16) { \
recordMutableCap(p1, TO_W_(bdescr_gen_no(bd)), R1); \
- SET_INFO(p1, stg_IND_OLDGEN_info); \
- LDV_RECORD_CREATE(p1); \
TICK_UPD_OLD_IND(); \
and_then; \
} else { \
- SET_INFO(p1, ind_info); \
- LDV_RECORD_CREATE(p1); \
TICK_UPD_NEW_IND(); \
and_then; \
}
@@ -151,7 +147,6 @@ no_slop:
#else /* !CMINUSMINUS */
INLINE_HEADER void updateWithIndirection (Capability *cap,
- const StgInfoTable *ind_info,
StgClosure *p1,
StgClosure *p2)
{
@@ -164,25 +159,19 @@ INLINE_HEADER void updateWithIndirection (Capability *cap,
LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC(p1);
((StgInd *)p1)->indirectee = p2;
write_barrier();
+ SET_INFO(p1, &stg_BLACKHOLE_info);
+ LDV_RECORD_CREATE(p1);
bd = Bdescr((StgPtr)p1);
if (bd->gen_no != 0) {
recordMutableCap(p1, cap, bd->gen_no);
- SET_INFO(p1, &stg_IND_OLDGEN_info);
TICK_UPD_OLD_IND();
} else {
- SET_INFO(p1, ind_info);
- LDV_RECORD_CREATE(p1);
TICK_UPD_NEW_IND();
}
}
#endif /* CMINUSMINUS */
-#define UPD_IND(cap, updclosure, heapptr) \
- updateWithIndirection(cap, &stg_IND_info, \
- updclosure, \
- heapptr)
-
#ifndef CMINUSMINUS
END_RTS_PRIVATE
#endif
diff --git a/rts/posix/OSMem.c b/rts/posix/OSMem.c
index 79c7fbfffd..608345b309 100644
--- a/rts/posix/OSMem.c
+++ b/rts/posix/OSMem.c
@@ -131,8 +131,9 @@ my_mmap (void *addr, lnat size)
(errno == EINVAL && sizeof(void*)==4 && size >= 0xc0000000)) {
// If we request more than 3Gig, then we get EINVAL
// instead of ENOMEM (at least on Linux).
- errorBelch("out of memory (requested %lu bytes)", size);
- stg_exit(EXIT_FAILURE);
+ barf("out of memory (requested %lu bytes)", size);
+// abort();
+// stg_exit(EXIT_FAILURE);
} else {
barf("getMBlock: mmap: %s", strerror(errno));
}
diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c
index 39284f9112..6de42efae4 100644
--- a/rts/sm/Compact.c
+++ b/rts/sm/Compact.c
@@ -628,8 +628,8 @@ thread_obj (StgInfoTable *info, StgPtr p)
case IND_PERM:
case MUT_VAR_CLEAN:
case MUT_VAR_DIRTY:
- case CAF_BLACKHOLE:
case BLACKHOLE:
+ case BLOCKING_QUEUE:
{
StgPtr end;
@@ -967,9 +967,6 @@ compact(StgClosure *static_objects)
// any threads resurrected during this GC
thread((void *)&resurrected_threads);
- // the blackhole queue
- thread((void *)&blackhole_queue);
-
// the task list
{
Task *task;
diff --git a/rts/sm/Evac.c b/rts/sm/Evac.c
index d5c9b8a4a5..37cbee59e0 100644
--- a/rts/sm/Evac.c
+++ b/rts/sm/Evac.c
@@ -139,7 +139,6 @@ copy_tag_nolock(StgClosure **p, const StgInfoTable *info,
nat i;
to = alloc_for_copy(size,gen);
- *p = TAG_CLOSURE(tag,(StgClosure*)to);
from = (StgPtr)src;
to[0] = (W_)info;
@@ -150,6 +149,7 @@ copy_tag_nolock(StgClosure **p, const StgInfoTable *info,
// if somebody else reads the forwarding pointer, we better make
// sure there's a closure at the end of it.
write_barrier();
+ *p = TAG_CLOSURE(tag,(StgClosure*)to);
src->header.info = (const StgInfoTable *)MK_FORWARDING_PTR(to);
// if (to+size+2 < bd->start + BLOCK_SIZE_W) {
@@ -166,7 +166,7 @@ copy_tag_nolock(StgClosure **p, const StgInfoTable *info,
/* Special version of copy() for when we only want to copy the info
* pointer of an object, but reserve some padding after it. This is
- * used to optimise evacuation of BLACKHOLEs.
+ * used to optimise evacuation of TSOs.
*/
static rtsBool
copyPart(StgClosure **p, StgClosure *src, nat size_to_reserve,
@@ -195,7 +195,6 @@ spin:
#endif
to = alloc_for_copy(size_to_reserve, gen);
- *p = (StgClosure *)to;
from = (StgPtr)src;
to[0] = info;
@@ -203,10 +202,9 @@ spin:
to[i] = from[i];
}
-#if defined(PARALLEL_GC)
write_barrier();
-#endif
src->header.info = (const StgInfoTable*)MK_FORWARDING_PTR(to);
+ *p = (StgClosure *)to;
#ifdef PROFILING
// We store the size of the just evacuated object in the LDV word so that
@@ -366,7 +364,7 @@ loop:
tag = GET_CLOSURE_TAG(q);
q = UNTAG_CLOSURE(q);
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(q));
+ ASSERTM(LOOKS_LIKE_CLOSURE_PTR(q), "invalid closure, info=%p", q->header.info);
if (!HEAP_ALLOCED_GC(q)) {
@@ -629,21 +627,42 @@ loop:
copy_tag_nolock(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag);
return;
+ case BLACKHOLE:
+ {
+ StgClosure *r;
+ const StgInfoTable *i;
+ r = ((StgInd*)q)->indirectee;
+ if (GET_CLOSURE_TAG(r) == 0) {
+ i = r->header.info;
+ if (IS_FORWARDING_PTR(i)) {
+ r = (StgClosure *)UN_FORWARDING_PTR(i);
+ i = r->header.info;
+ }
+ if (i == &stg_TSO_info
+ || i == &stg_WHITEHOLE_info
+ || i == &stg_BLOCKING_QUEUE_CLEAN_info
+ || i == &stg_BLOCKING_QUEUE_DIRTY_info) {
+ copy(p,info,q,sizeofW(StgInd),gen);
+ return;
+ }
+ ASSERT(i != &stg_IND_info);
+ }
+ q = r;
+ *p = r;
+ goto loop;
+ }
+
+ case BLOCKING_QUEUE:
case WEAK:
case PRIM:
case MUT_PRIM:
- copy_tag(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag);
+ copy(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen);
return;
case BCO:
copy(p,info,q,bco_sizeW((StgBCO *)q),gen);
return;
- case CAF_BLACKHOLE:
- case BLACKHOLE:
- copyPart(p,q,BLACKHOLE_sizeW(),sizeofW(StgHeader),gen);
- return;
-
case THUNK_SELECTOR:
eval_thunk_selector(p, (StgSelector *)q, rtsTrue);
return;
@@ -756,11 +775,7 @@ unchain_thunk_selectors(StgSelector *p, StgClosure *val)
prev = NULL;
while (p)
{
-#ifdef THREADED_RTS
ASSERT(p->header.info == &stg_WHITEHOLE_info);
-#else
- ASSERT(p->header.info == &stg_BLACKHOLE_info);
-#endif
// val must be in to-space. Not always: when we recursively
// invoke eval_thunk_selector(), the recursive calls will not
// evacuate the value (because we want to select on the value,
@@ -783,7 +798,13 @@ unchain_thunk_selectors(StgSelector *p, StgClosure *val)
// indirection pointing to itself, and we want the program
// to deadlock if it ever enters this closure, so
// BLACKHOLE is correct.
- SET_INFO(p, &stg_BLACKHOLE_info);
+
+ // XXX we do not have BLACKHOLEs any more; replace with
+ // a THUNK_SELECTOR again. This will go into a loop if it is
+ // entered, and should result in a NonTermination exception.
+ ((StgThunk *)p)->payload[0] = val;
+ write_barrier();
+ SET_INFO(p, &stg_sel_0_upd_info);
} else {
((StgInd *)p)->indirectee = val;
write_barrier();
@@ -813,7 +834,7 @@ eval_thunk_selector (StgClosure **q, StgSelector * p, rtsBool evac)
prev_thunk_selector = NULL;
// this is a chain of THUNK_SELECTORs that we are going to update
// to point to the value of the current THUNK_SELECTOR. Each
- // closure on the chain is a BLACKHOLE, and points to the next in the
+ // closure on the chain is a WHITEHOLE, and points to the next in the
// chain with payload[0].
selector_chain:
@@ -851,7 +872,7 @@ selector_chain:
}
- // BLACKHOLE the selector thunk, since it is now under evaluation.
+ // WHITEHOLE the selector thunk, since it is now under evaluation.
// This is important to stop us going into an infinite loop if
// this selector thunk eventually refers to itself.
#if defined(THREADED_RTS)
@@ -884,7 +905,7 @@ selector_chain:
#else
// Save the real info pointer (NOTE: not the same as get_itbl()).
info_ptr = (StgWord)p->header.info;
- SET_INFO(p,&stg_BLACKHOLE_info);
+ SET_INFO(p,&stg_WHITEHOLE_info);
#endif
field = INFO_PTR_TO_STRUCT(info_ptr)->layout.selector_offset;
@@ -936,11 +957,7 @@ selector_loop:
// the original selector thunk, p.
SET_INFO(p, (StgInfoTable *)info_ptr);
LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)p);
-#if defined(THREADED_RTS)
SET_INFO(p, &stg_WHITEHOLE_info);
-#else
- SET_INFO(p, &stg_BLACKHOLE_info);
-#endif
#endif
// the closure in val is now the "value" of the
@@ -998,6 +1015,33 @@ selector_loop:
selectee = UNTAG_CLOSURE( ((StgInd *)selectee)->indirectee );
goto selector_loop;
+ case BLACKHOLE:
+ {
+ StgClosure *r;
+ const StgInfoTable *i;
+ r = ((StgInd*)selectee)->indirectee;
+
+ // establish whether this BH has been updated, and is now an
+ // indirection, as in evacuate().
+ if (GET_CLOSURE_TAG(r) == 0) {
+ i = r->header.info;
+ if (IS_FORWARDING_PTR(i)) {
+ r = (StgClosure *)UN_FORWARDING_PTR(i);
+ i = r->header.info;
+ }
+ if (i == &stg_TSO_info
+ || i == &stg_WHITEHOLE_info
+ || i == &stg_BLOCKING_QUEUE_CLEAN_info
+ || i == &stg_BLOCKING_QUEUE_DIRTY_info) {
+ goto bale_out;
+ }
+ ASSERT(i != &stg_IND_info);
+ }
+
+ selectee = UNTAG_CLOSURE( ((StgInd *)selectee)->indirectee );
+ goto selector_loop;
+ }
+
case THUNK_SELECTOR:
{
StgClosure *val;
@@ -1032,8 +1076,6 @@ selector_loop:
case THUNK_1_1:
case THUNK_0_2:
case THUNK_STATIC:
- case CAF_BLACKHOLE:
- case BLACKHOLE:
// not evaluated yet
goto bale_out;
diff --git a/rts/sm/GC.c b/rts/sm/GC.c
index ae6fc998da..4d63724ba0 100644
--- a/rts/sm/GC.c
+++ b/rts/sm/GC.c
@@ -395,13 +395,6 @@ SET_GCT(gc_threads[0]);
// The other threads are now stopped. We might recurse back to
// here, but from now on this is the only thread.
- // if any blackholes are alive, make the threads that wait on
- // them alive too.
- if (traverseBlackholeQueue()) {
- inc_running();
- continue;
- }
-
// must be last... invariant is that everything is fully
// scavenged at this point.
if (traverseWeakPtrList()) { // returns rtsTrue if evaced something
diff --git a/rts/sm/MarkWeak.c b/rts/sm/MarkWeak.c
index 0ac807ff79..e65c176c0a 100644
--- a/rts/sm/MarkWeak.c
+++ b/rts/sm/MarkWeak.c
@@ -210,21 +210,6 @@ traverseWeakPtrList(void)
}
}
- /* Finally, we can update the blackhole_queue. This queue
- * simply strings together TSOs blocked on black holes, it is
- * not intended to keep anything alive. Hence, we do not follow
- * pointers on the blackhole_queue until now, when we have
- * determined which TSOs are otherwise reachable. We know at
- * this point that all TSOs have been evacuated, however.
- */
- {
- StgTSO **pt;
- for (pt = &blackhole_queue; *pt != END_TSO_QUEUE; pt = &((*pt)->_link)) {
- *pt = (StgTSO *)isAlive((StgClosure *)*pt);
- ASSERT(*pt != NULL);
- }
- }
-
weak_stage = WeakDone; // *now* we're done,
return rtsTrue; // but one more round of scavenging, please
}
@@ -310,49 +295,6 @@ static rtsBool tidyThreadList (generation *gen)
}
/* -----------------------------------------------------------------------------
- The blackhole queue
-
- Threads on this list behave like weak pointers during the normal
- phase of garbage collection: if the blackhole is reachable, then
- the thread is reachable too.
- -------------------------------------------------------------------------- */
-rtsBool
-traverseBlackholeQueue (void)
-{
- StgTSO *prev, *t, *tmp;
- rtsBool flag;
- nat type;
-
- flag = rtsFalse;
- prev = NULL;
-
- for (t = blackhole_queue; t != END_TSO_QUEUE; prev=t, t = t->_link) {
- // if the thread is not yet alive...
- if (! (tmp = (StgTSO *)isAlive((StgClosure*)t))) {
- // if the closure it is blocked on is either (a) a
- // reachable BLAKCHOLE or (b) not a BLACKHOLE, then we
- // make the thread alive.
- if (!isAlive(t->block_info.closure)) {
- type = get_itbl(t->block_info.closure)->type;
- if (type == BLACKHOLE || type == CAF_BLACKHOLE) {
- continue;
- }
- }
- evacuate((StgClosure **)&t);
- if (prev) {
- prev->_link = t;
- } else {
- blackhole_queue = t;
- }
- // no write barrier when on the blackhole queue,
- // because we traverse the whole queue on every GC.
- flag = rtsTrue;
- }
- }
- return flag;
-}
-
-/* -----------------------------------------------------------------------------
Evacuate every weak pointer object on the weak_ptr_list, and update
the link fields.
diff --git a/rts/sm/MarkWeak.h b/rts/sm/MarkWeak.h
index 018dd6cd79..5c05ab2499 100644
--- a/rts/sm/MarkWeak.h
+++ b/rts/sm/MarkWeak.h
@@ -23,7 +23,6 @@ extern StgTSO *exception_threads;
void initWeakForGC ( void );
rtsBool traverseWeakPtrList ( void );
void markWeakPtrList ( void );
-rtsBool traverseBlackholeQueue ( void );
END_RTS_PRIVATE
diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c
index 11d5424431..14230779d7 100644
--- a/rts/sm/Sanity.c
+++ b/rts/sm/Sanity.c
@@ -306,7 +306,6 @@ checkClosure( StgClosure* p )
case IND_OLDGEN:
case IND_OLDGEN_PERM:
case BLACKHOLE:
- case CAF_BLACKHOLE:
case PRIM:
case MUT_PRIM:
case MUT_VAR_CLEAN:
@@ -323,6 +322,23 @@ checkClosure( StgClosure* p )
return sizeW_fromITBL(info);
}
+ case BLOCKING_QUEUE:
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+ // NO: the BH might have been updated now
+ // ASSERT(get_itbl(bq->bh)->type == BLACKHOLE);
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(bq->bh));
+
+ ASSERT(get_itbl(bq->owner)->type == TSO);
+ ASSERT(bq->queue == END_TSO_QUEUE || get_itbl(bq->queue)->type == TSO);
+ ASSERT(bq->link == (StgBlockingQueue*)END_TSO_QUEUE ||
+ get_itbl(bq->link)->type == IND ||
+ get_itbl(bq->link)->type == BLOCKING_QUEUE);
+
+ return sizeofW(StgBlockingQueue);
+ }
+
case BCO: {
StgBCO *bco = (StgBCO *)p;
ASSERT(LOOKS_LIKE_CLOSURE_PTR(bco->instrs));
@@ -516,6 +532,11 @@ checkTSO(StgTSO *tso)
return;
}
+ ASSERT(tso->_link == END_TSO_QUEUE || get_itbl(tso->_link)->type == TSO);
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->block_info.closure));
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->bq));
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->blocked_exceptions));
+
ASSERT(stack <= sp && sp < stack_end);
checkStackChunk(sp, stack_end);
@@ -539,9 +560,7 @@ checkGlobalTSOList (rtsBool checkTSOs)
if (checkTSOs)
checkTSO(tso);
- while (tso->what_next == ThreadRelocated) {
- tso = tso->_link;
- }
+ tso = deRefTSO(tso);
// If this TSO is dirty and in an old generation, it better
// be on the mutable list.
diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
index 1b671a097c..75c186c972 100644
--- a/rts/sm/Scav.c
+++ b/rts/sm/Scav.c
@@ -46,17 +46,6 @@ static void scavenge_large_bitmap (StgPtr p,
Scavenge a TSO.
-------------------------------------------------------------------------- */
-STATIC_INLINE void
-scavenge_TSO_link (StgTSO *tso)
-{
- // We don't always chase the link field: TSOs on the blackhole
- // queue are not automatically alive, so the link field is a
- // "weak" pointer in that case.
- if (tso->why_blocked != BlockedOnBlackHole) {
- evacuate((StgClosure **)&tso->_link);
- }
-}
-
static void
scavengeTSO (StgTSO *tso)
{
@@ -87,7 +76,18 @@ scavengeTSO (StgTSO *tso)
) {
evacuate(&tso->block_info.closure);
}
+#ifdef THREADED_RTS
+ // in the THREADED_RTS, block_info.closure must always point to a
+ // valid closure, because we assume this in throwTo(). In the
+ // non-threaded RTS it might be a FD (for
+ // BlockedOnRead/BlockedOnWrite) or a time value (BlockedOnDelay)
+ else {
+ tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
+ }
+#endif
+
evacuate((StgClosure **)&tso->blocked_exceptions);
+ evacuate((StgClosure **)&tso->bq);
// scavange current transaction record
evacuate((StgClosure **)&tso->trec);
@@ -97,10 +97,10 @@ scavengeTSO (StgTSO *tso)
if (gct->failed_to_evac) {
tso->dirty = 1;
- scavenge_TSO_link(tso);
+ evacuate((StgClosure **)&tso->_link);
} else {
tso->dirty = 0;
- scavenge_TSO_link(tso);
+ evacuate((StgClosure **)&tso->_link);
if (gct->failed_to_evac) {
tso->flags |= TSO_LINK_DIRTY;
} else {
@@ -570,6 +570,7 @@ scavenge_block (bdescr *bd)
}
// fall through
case IND_OLDGEN_PERM:
+ case BLACKHOLE:
evacuate(&((StgInd *)p)->indirectee);
p += sizeofW(StgInd);
break;
@@ -588,10 +589,25 @@ scavenge_block (bdescr *bd)
p += sizeofW(StgMutVar);
break;
- case CAF_BLACKHOLE:
- case BLACKHOLE:
- p += BLACKHOLE_sizeW();
- break;
+ case BLOCKING_QUEUE:
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+ gct->eager_promotion = rtsFalse;
+ evacuate(&bq->bh);
+ evacuate((StgClosure**)&bq->owner);
+ evacuate((StgClosure**)&bq->queue);
+ evacuate((StgClosure**)&bq->link);
+ gct->eager_promotion = saved_eager_promotion;
+
+ if (gct->failed_to_evac) {
+ bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ } else {
+ bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info;
+ }
+ p += sizeofW(StgBlockingQueue);
+ break;
+ }
case THUNK_SELECTOR:
{
@@ -884,6 +900,7 @@ scavenge_mark_stack(void)
case IND_OLDGEN:
case IND_OLDGEN_PERM:
+ case BLACKHOLE:
evacuate(&((StgInd *)p)->indirectee);
break;
@@ -901,8 +918,25 @@ scavenge_mark_stack(void)
break;
}
- case CAF_BLACKHOLE:
- case BLACKHOLE:
+ case BLOCKING_QUEUE:
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+ gct->eager_promotion = rtsFalse;
+ evacuate(&bq->bh);
+ evacuate((StgClosure**)&bq->owner);
+ evacuate((StgClosure**)&bq->queue);
+ evacuate((StgClosure**)&bq->link);
+ gct->eager_promotion = saved_eager_promotion;
+
+ if (gct->failed_to_evac) {
+ bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ } else {
+ bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info;
+ }
+ break;
+ }
+
case ARR_WORDS:
break;
@@ -1122,10 +1156,25 @@ scavenge_one(StgPtr p)
break;
}
- case CAF_BLACKHOLE:
- case BLACKHOLE:
- break;
-
+ case BLOCKING_QUEUE:
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+ gct->eager_promotion = rtsFalse;
+ evacuate(&bq->bh);
+ evacuate((StgClosure**)&bq->owner);
+ evacuate((StgClosure**)&bq->queue);
+ evacuate((StgClosure**)&bq->link);
+ gct->eager_promotion = saved_eager_promotion;
+
+ if (gct->failed_to_evac) {
+ bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ } else {
+ bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info;
+ }
+ break;
+ }
+
case THUNK_SELECTOR:
{
StgSelector *s = (StgSelector *)p;
@@ -1239,6 +1288,7 @@ scavenge_one(StgPtr p)
// on the large-object list and then gets updated. See #3424.
case IND_OLDGEN:
case IND_OLDGEN_PERM:
+ case BLACKHOLE:
case IND_STATIC:
evacuate(&((StgInd *)p)->indirectee);
@@ -1300,7 +1350,7 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
#ifdef DEBUG
switch (get_itbl((StgClosure *)p)->type) {
case MUT_VAR_CLEAN:
- barf("MUT_VAR_CLEAN on mutable list");
+ // can happen due to concurrent writeMutVars
case MUT_VAR_DIRTY:
mutlist_MUTVARS++; break;
case MUT_ARR_PTRS_CLEAN:
@@ -1356,7 +1406,7 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
// this assertion would be invalid:
// ASSERT(tso->flags & TSO_LINK_DIRTY);
- scavenge_TSO_link(tso);
+ evacuate((StgClosure **)&tso->_link);
if (gct->failed_to_evac) {
recordMutableGen_GC((StgClosure *)p,gen->no);
gct->failed_to_evac = rtsFalse;
@@ -1576,10 +1626,12 @@ scavenge_stack(StgPtr p, StgPtr stack_end)
// before GC, but that seems like overkill.
//
// Scavenging this update frame as normal would be disastrous;
- // the updatee would end up pointing to the value. So we turn
- // the indirection into an IND_PERM, so that evacuate will
- // copy the indirection into the old generation instead of
- // discarding it.
+ // the updatee would end up pointing to the value. So we
+ // check whether the value after evacuation is a BLACKHOLE,
+ // and if not, we change the update frame to an stg_enter
+ // frame that simply returns the value. Hence, blackholing is
+ // compulsory (otherwise we would have to check for thunks
+ // too).
//
// Note [upd-black-hole]
// One slight hiccup is that the THUNK_SELECTOR machinery can
@@ -1590,22 +1642,17 @@ scavenge_stack(StgPtr p, StgPtr stack_end)
// the updatee is never a THUNK_SELECTOR and we're ok.
// NB. this is a new invariant: blackholing is not optional.
{
- nat type;
- const StgInfoTable *i;
- StgClosure *updatee;
-
- updatee = ((StgUpdateFrame *)p)->updatee;
- i = updatee->header.info;
- if (!IS_FORWARDING_PTR(i)) {
- type = get_itbl(updatee)->type;
- if (type == IND) {
- updatee->header.info = &stg_IND_PERM_info;
- } else if (type == IND_OLDGEN) {
- updatee->header.info = &stg_IND_OLDGEN_PERM_info;
- }
+ StgUpdateFrame *frame = (StgUpdateFrame *)p;
+ StgClosure *v;
+
+ evacuate(&frame->updatee);
+ v = frame->updatee;
+ if (GET_CLOSURE_TAG(v) != 0 ||
+ (get_itbl(v)->type != BLACKHOLE)) {
+ // blackholing is compulsory, see above.
+ frame->header.info = (const StgInfoTable*)&stg_enter_checkbh_info;
}
- evacuate(&((StgUpdateFrame *)p)->updatee);
- ASSERT(GET_CLOSURE_TAG(((StgUpdateFrame *)p)->updatee) == 0);
+ ASSERT(v->header.info != &stg_TSO_info);
p += sizeofW(StgUpdateFrame);
continue;
}
diff --git a/rts/sm/Storage.c b/rts/sm/Storage.c
index 6aedb96a59..02344003b8 100644
--- a/rts/sm/Storage.c
+++ b/rts/sm/Storage.c
@@ -107,7 +107,7 @@ initStorage( void )
* doing something reasonable.
*/
/* We use the NOT_NULL variant or gcc warns that the test is always true */
- ASSERT(LOOKS_LIKE_INFO_PTR_NOT_NULL((StgWord)&stg_BLACKHOLE_info));
+ ASSERT(LOOKS_LIKE_INFO_PTR_NOT_NULL((StgWord)&stg_BLOCKING_QUEUE_CLEAN_info));
ASSERT(LOOKS_LIKE_CLOSURE_PTR(&stg_dummy_ret_closure));
ASSERT(!HEAP_ALLOCED(&stg_dummy_ret_closure));
@@ -229,13 +229,13 @@ freeStorage (void)
The entry code for every CAF does the following:
- - builds a CAF_BLACKHOLE in the heap
- - pushes an update frame pointing to the CAF_BLACKHOLE
+ - builds a BLACKHOLE in the heap
+ - pushes an update frame pointing to the BLACKHOLE
- invokes UPD_CAF(), which:
- calls newCaf, below
- - updates the CAF with a static indirection to the CAF_BLACKHOLE
+ - updates the CAF with a static indirection to the BLACKHOLE
- Why do we build a BLACKHOLE in the heap rather than just updating
+ Why do we build an BLACKHOLE in the heap rather than just updating
the thunk directly? It's so that we only need one kind of update
frame - otherwise we'd need a static version of the update frame too.
@@ -699,11 +699,9 @@ void
dirty_MUT_VAR(StgRegTable *reg, StgClosure *p)
{
Capability *cap = regTableToCapability(reg);
- bdescr *bd;
if (p->header.info == &stg_MUT_VAR_CLEAN_info) {
p->header.info = &stg_MUT_VAR_DIRTY_info;
- bd = Bdescr((StgPtr)p);
- if (bd->gen_no > 0) recordMutableCap(p,cap,bd->gen_no);
+ recordClosureMutated(cap,p);
}
}
@@ -716,11 +714,9 @@ dirty_MUT_VAR(StgRegTable *reg, StgClosure *p)
void
setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target)
{
- bdescr *bd;
if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {
tso->flags |= TSO_LINK_DIRTY;
- bd = Bdescr((StgPtr)tso);
- if (bd->gen_no > 0) recordMutableCap((StgClosure*)tso,cap,bd->gen_no);
+ recordClosureMutated(cap,(StgClosure*)tso);
}
tso->_link = target;
}
@@ -728,10 +724,8 @@ setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target)
void
dirty_TSO (Capability *cap, StgTSO *tso)
{
- bdescr *bd;
if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {
- bd = Bdescr((StgPtr)tso);
- if (bd->gen_no > 0) recordMutableCap((StgClosure*)tso,cap,bd->gen_no);
+ recordClosureMutated(cap,(StgClosure*)tso);
}
tso->dirty = 1;
}
@@ -747,10 +741,7 @@ dirty_TSO (Capability *cap, StgTSO *tso)
void
dirty_MVAR(StgRegTable *reg, StgClosure *p)
{
- Capability *cap = regTableToCapability(reg);
- bdescr *bd;
- bd = Bdescr((StgPtr)p);
- if (bd->gen_no > 0) recordMutableCap(p,cap,bd->gen_no);
+ recordClosureMutated(regTableToCapability(reg),p);
}
/* -----------------------------------------------------------------------------
diff --git a/utils/genapply/GenApply.hs b/utils/genapply/GenApply.hs
index 765bfb3be6..16d33940fb 100644
--- a/utils/genapply/GenApply.hs
+++ b/utils/genapply/GenApply.hs
@@ -564,8 +564,8 @@ genApply regstatus args =
-- else:
text "case AP,",
text " AP_STACK,",
- text " CAF_BLACKHOLE,",
text " BLACKHOLE,",
+ text " WHITEHOLE,",
text " THUNK,",
text " THUNK_1_0,",
text " THUNK_0_1,",