summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2010-03-11 09:57:44 +0000
committerSimon Marlow <marlowsd@gmail.com>2010-03-11 09:57:44 +0000
commit7408b39235bccdcde48df2a73337ff976fbc09b7 (patch)
treecf20c372fdc5787170d53df36fc24ecf8113c89e
parent12cfec943127f0c81e1ffa1ca5ce46e888e3027c (diff)
downloadhaskell-7408b39235bccdcde48df2a73337ff976fbc09b7.tar.gz
Use message-passing to implement throwTo in the RTS
This replaces some complicated locking schemes with message-passing in the implementation of throwTo. The benefits are - previously it was impossible to guarantee that a throwTo from a thread running on one CPU to a thread running on another CPU would be noticed, and we had to rely on the GC to pick up these forgotten exceptions. This no longer happens. - the locking regime is simpler (though the code is about the same size) - threads can be unblocked from a blocked_exceptions queue without having to traverse the whole queue now. It's a rare case, but replaces an O(n) operation with an O(1). - generally we move in the direction of sharing less between Capabilities (aka HECs), which will become important with other changes we have planned. Also in this patch I replaced several STM-specific closure types with a generic MUT_PRIM closure type, which allowed a lot of code in the GC and other places to go away, hence the line-count reduction. The message-passing changes resulted in about a net zero line-count difference.
-rw-r--r--includes/rts/Constants.h22
-rw-r--r--includes/rts/storage/ClosureMacros.h10
-rw-r--r--includes/rts/storage/ClosureTypes.h22
-rw-r--r--includes/rts/storage/Closures.h25
-rw-r--r--includes/rts/storage/SMPClosureOps.h26
-rw-r--r--includes/rts/storage/TSO.h19
-rw-r--r--includes/stg/MiscClosures.h6
-rw-r--r--rts/Capability.c67
-rw-r--r--rts/Capability.h28
-rw-r--r--rts/ClosureFlags.c10
-rw-r--r--rts/Exception.cmm43
-rw-r--r--rts/FrontPanel.c2
-rw-r--r--rts/HeapStackCheck.cmm5
-rw-r--r--rts/LdvProfile.c2
-rw-r--r--rts/PrimOps.cmm12
-rw-r--r--rts/Printer.c18
-rw-r--r--rts/ProfHeap.c28
-rw-r--r--rts/RaiseAsync.c512
-rw-r--r--rts/RaiseAsync.h17
-rw-r--r--rts/RetainerProfile.c6
-rw-r--r--rts/RtsMessages.c6
-rw-r--r--rts/STM.c3
-rw-r--r--rts/Schedule.c185
-rw-r--r--rts/Schedule.h28
-rw-r--r--rts/StgMiscClosures.cmm35
-rw-r--r--rts/Threads.c18
-rw-r--r--rts/Threads.h2
-rw-r--r--rts/Trace.h10
-rw-r--r--rts/eventlog/EventLog.h2
-rw-r--r--rts/sm/Compact.c49
-rw-r--r--rts/sm/Evac.c23
-rw-r--r--rts/sm/GC.c1
-rw-r--r--rts/sm/MarkWeak.c39
-rw-r--r--rts/sm/Sanity.c44
-rw-r--r--rts/sm/Scav.c260
35 files changed, 682 insertions, 903 deletions
diff --git a/includes/rts/Constants.h b/includes/rts/Constants.h
index 54a1ca71ca..bfc77fa361 100644
--- a/includes/rts/Constants.h
+++ b/includes/rts/Constants.h
@@ -208,25 +208,27 @@
#define NotBlocked 0
#define BlockedOnMVar 1
#define BlockedOnBlackHole 2
-#define BlockedOnException 3
-#define BlockedOnRead 4
-#define BlockedOnWrite 5
-#define BlockedOnDelay 6
-#define BlockedOnSTM 7
+#define BlockedOnRead 3
+#define BlockedOnWrite 4
+#define BlockedOnDelay 5
+#define BlockedOnSTM 6
/* Win32 only: */
-#define BlockedOnDoProc 8
+#define BlockedOnDoProc 7
/* Only relevant for PAR: */
/* blocked on a remote closure represented by a Global Address: */
-#define BlockedOnGA 9
+#define BlockedOnGA 8
/* same as above but without sending a Fetch message */
-#define BlockedOnGA_NoSend 10
+#define BlockedOnGA_NoSend 9
/* Only relevant for THREADED_RTS: */
-#define BlockedOnCCall 11
-#define BlockedOnCCall_NoUnblockExc 12
+#define BlockedOnCCall 10
+#define BlockedOnCCall_NoUnblockExc 11
/* same as above but don't unblock async exceptions in resumeThread() */
+/* Involved in a message sent to tso->msg_cap */
+#define BlockedOnMsgWakeup 12
+#define BlockedOnMsgThrowTo 13
/*
* These constants are returned to the scheduler by a thread that has
* stopped for one reason or another. See typedef StgThreadReturnCode
diff --git a/includes/rts/storage/ClosureMacros.h b/includes/rts/storage/ClosureMacros.h
index f73d2c5ccd..a115f6f38f 100644
--- a/includes/rts/storage/ClosureMacros.h
+++ b/includes/rts/storage/ClosureMacros.h
@@ -335,18 +335,8 @@ closure_sizeW_ (StgClosure *p, StgInfoTable *info)
return tso_sizeW((StgTSO *)p);
case BCO:
return bco_sizeW((StgBCO *)p);
- case TVAR_WATCH_QUEUE:
- return sizeofW(StgTVarWatchQueue);
- case TVAR:
- return sizeofW(StgTVar);
case TREC_CHUNK:
return sizeofW(StgTRecChunk);
- case TREC_HEADER:
- return sizeofW(StgTRecHeader);
- case ATOMIC_INVARIANT:
- return sizeofW(StgAtomicInvariant);
- case INVARIANT_CHECK_QUEUE:
- return sizeofW(StgInvariantCheckQueue);
default:
return sizeW_fromITBL(info);
}
diff --git a/includes/rts/storage/ClosureTypes.h b/includes/rts/storage/ClosureTypes.h
index 508dce21bc..6a76772d61 100644
--- a/includes/rts/storage/ClosureTypes.h
+++ b/includes/rts/storage/ClosureTypes.h
@@ -74,18 +74,14 @@
#define MUT_VAR_CLEAN 50
#define MUT_VAR_DIRTY 51
#define WEAK 52
-#define STABLE_NAME 53
-#define TSO 54
-#define TVAR_WATCH_QUEUE 55
-#define INVARIANT_CHECK_QUEUE 56
-#define ATOMIC_INVARIANT 57
-#define TVAR 58
-#define TREC_CHUNK 59
-#define TREC_HEADER 60
-#define ATOMICALLY_FRAME 61
-#define CATCH_RETRY_FRAME 62
-#define CATCH_STM_FRAME 63
-#define WHITEHOLE 64
-#define N_CLOSURE_TYPES 65
+#define PRIM 53
+#define MUT_PRIM 54
+#define TSO 55
+#define TREC_CHUNK 56
+#define ATOMICALLY_FRAME 57
+#define CATCH_RETRY_FRAME 58
+#define CATCH_STM_FRAME 59
+#define WHITEHOLE 60
+#define N_CLOSURE_TYPES 61
#endif /* RTS_STORAGE_CLOSURETYPES_H */
diff --git a/includes/rts/storage/Closures.h b/includes/rts/storage/Closures.h
index 892826842e..d7498e2882 100644
--- a/includes/rts/storage/Closures.h
+++ b/includes/rts/storage/Closures.h
@@ -390,10 +390,10 @@ typedef struct StgInvariantCheckQueue_ {
struct StgTRecHeader_ {
StgHeader header;
- TRecState state;
struct StgTRecHeader_ *enclosing_trec;
StgTRecChunk *current_chunk;
StgInvariantCheckQueue *invariants_to_check;
+ TRecState state;
};
typedef struct {
@@ -416,4 +416,27 @@ typedef struct {
StgClosure *alt_code;
} StgCatchRetryFrame;
+/* ----------------------------------------------------------------------------
+ Messages
+ ------------------------------------------------------------------------- */
+
+typedef struct Message_ {
+ StgHeader header;
+ struct Message_ *link;
+} Message;
+
+typedef struct MessageWakeup_ {
+ StgHeader header;
+ Message *link;
+ StgTSO *tso;
+} MessageWakeup;
+
+typedef struct MessageThrowTo_ {
+ StgHeader header;
+ Message *link;
+ StgTSO *source;
+ StgTSO *target;
+ StgClosure *exception;
+} MessageThrowTo;
+
#endif /* RTS_STORAGE_CLOSURES_H */
diff --git a/includes/rts/storage/SMPClosureOps.h b/includes/rts/storage/SMPClosureOps.h
index 582ec0e959..8dee7cbcf9 100644
--- a/includes/rts/storage/SMPClosureOps.h
+++ b/includes/rts/storage/SMPClosureOps.h
@@ -18,6 +18,7 @@
#else
EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p);
+EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p);
EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info);
#if defined(THREADED_RTS)
@@ -43,11 +44,15 @@ EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p)
} while (1);
}
-EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
+EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p)
{
- // This is a strictly ordered write, so we need a write_barrier():
- write_barrier();
- p->header.info = info;
+ StgWord info;
+ info = xchg((P_)(void *)&p->header.info, (W_)&stg_WHITEHOLE_info);
+ if (info != (W_)&stg_WHITEHOLE_info) {
+ return (StgInfoTable *)info;
+ } else {
+ return NULL;
+ }
}
#else /* !THREADED_RTS */
@@ -56,12 +61,19 @@ EXTERN_INLINE StgInfoTable *
lockClosure(StgClosure *p)
{ return (StgInfoTable *)p->header.info; }
-EXTERN_INLINE void
-unlockClosure(StgClosure *p STG_UNUSED, const StgInfoTable *info STG_UNUSED)
-{ /* nothing */ }
+EXTERN_INLINE StgInfoTable *
+tryLockClosure(StgClosure *p)
+{ return (StgInfoTable *)p->header.info; }
#endif /* THREADED_RTS */
+EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
+{
+ // This is a strictly ordered write, so we need a write_barrier():
+ write_barrier();
+ p->header.info = info;
+}
+
// Handy specialised versions of lockClosure()/unlockClosure()
EXTERN_INLINE void lockTSO(StgTSO *tso);
EXTERN_INLINE void lockTSO(StgTSO *tso)
diff --git a/includes/rts/storage/TSO.h b/includes/rts/storage/TSO.h
index e8d97c5ce0..e2015f28ac 100644
--- a/includes/rts/storage/TSO.h
+++ b/includes/rts/storage/TSO.h
@@ -46,7 +46,8 @@ typedef struct {
/* Reason for thread being blocked. See comment above struct StgTso_. */
typedef union {
StgClosure *closure;
- struct StgTSO_ *tso;
+ struct MessageThrowTo_ *throwto;
+ struct MessageWakeup_ *wakeup;
StgInt fd; /* StgInt instead of int, so that it's the same size as the ptrs */
#if defined(mingw32_HOST_OS)
StgAsyncIOResult *async_result;
@@ -87,7 +88,8 @@ typedef struct StgTSO_ {
will already be dirty.
*/
- struct StgTSO_* global_link; /* Links all threads together */
+ struct StgTSO_* global_link; // Links threads on the
+ // generation->threads lists
StgWord dirty; /* non-zero => dirty */
/*
@@ -108,9 +110,9 @@ typedef struct StgTSO_ {
* setTSOLink().
*/
- StgWord16 what_next; /* Values defined in Constants.h */
- StgWord16 why_blocked; /* Values defined in Constants.h */
- StgWord32 flags;
+ StgWord16 what_next; // Values defined in Constants.h
+ StgWord16 why_blocked; // Values defined in Constants.h
+ StgWord32 flags; // Values defined in Constants.h
StgTSOBlockInfo block_info;
StgThreadID id;
int saved_errno;
@@ -123,7 +125,7 @@ typedef struct StgTSO_ {
exceptions. In order to access this field, the TSO must be
locked using lockClosure/unlockClosure (see SMP.h).
*/
- struct StgTSO_ * blocked_exceptions;
+ struct MessageThrowTo_ * blocked_exceptions;
#ifdef TICKY_TICKY
/* TICKY-specific stuff would go here. */
@@ -167,7 +169,7 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
tso->why_blocked tso->block_info location
----------------------------------------------------------------------
- NotBlocked NULL runnable_queue, or running
+ NotBlocked END_TSO_QUEUE runnable_queue, or running
BlockedOnBlackHole the BLACKHOLE blackhole_queue
@@ -175,7 +177,7 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
BlockedOnSTM END_TSO_QUEUE STM wait queue(s)
- BlockedOnException the TSO TSO->blocked_exception
+ BlockedOnMsgThrowTo MessageThrowTo * TSO->blocked_exception
BlockedOnRead NULL blocked_queue
BlockedOnWrite NULL blocked_queue
@@ -189,7 +191,6 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
tso->what_next == ThreadComplete or ThreadKilled
tso->link == (could be on some queue somewhere)
- tso->su == tso->stack + tso->stack_size
tso->sp == tso->stack + tso->stack_size - 1 (i.e. top stack word)
tso->sp[0] == return value of thread, if what_next == ThreadComplete,
exception , if what_next == ThreadKilled
diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h
index e68282ebc0..42e878f945 100644
--- a/includes/stg/MiscClosures.h
+++ b/includes/stg/MiscClosures.h
@@ -114,6 +114,8 @@ RTS_INFO(stg_MUT_ARR_PTRS_FROZEN0_info);
RTS_INFO(stg_MUT_VAR_CLEAN_info);
RTS_INFO(stg_MUT_VAR_DIRTY_info);
RTS_INFO(stg_END_TSO_QUEUE_info);
+RTS_INFO(stg_MSG_WAKEUP_info);
+RTS_INFO(stg_MSG_THROWTO_info);
RTS_INFO(stg_MUT_CONS_info);
RTS_INFO(stg_catch_info);
RTS_INFO(stg_PAP_info);
@@ -163,6 +165,8 @@ RTS_ENTRY(stg_MUT_ARR_PTRS_FROZEN0_entry);
RTS_ENTRY(stg_MUT_VAR_CLEAN_entry);
RTS_ENTRY(stg_MUT_VAR_DIRTY_entry);
RTS_ENTRY(stg_END_TSO_QUEUE_entry);
+RTS_ENTRY(stg_MSG_WAKEUP_entry);
+RTS_ENTRY(stg_MSG_THROWTO_entry);
RTS_ENTRY(stg_MUT_CONS_entry);
RTS_ENTRY(stg_catch_entry);
RTS_ENTRY(stg_PAP_entry);
@@ -205,8 +209,6 @@ RTS_CLOSURE(stg_END_STM_CHUNK_LIST_closure);
RTS_CLOSURE(stg_NO_TREC_closure);
RTS_ENTRY(stg_NO_FINALIZER_entry);
-RTS_ENTRY(stg_END_EXCEPTION_LIST_entry);
-RTS_ENTRY(stg_EXCEPTION_CONS_entry);
#if IN_STG_CODE
extern DLL_IMPORT_RTS StgWordArray stg_CHARLIKE_closure;
diff --git a/rts/Capability.c b/rts/Capability.c
index ce6ecebd72..5f54ecae4d 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -223,8 +223,7 @@ initCapability( Capability *cap, nat i )
cap->suspended_ccalls = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
- cap->wakeup_queue_hd = END_TSO_QUEUE;
- cap->wakeup_queue_tl = END_TSO_QUEUE;
+ cap->inbox = (Message*)END_TSO_QUEUE;
cap->sparks_created = 0;
cap->sparks_converted = 0;
cap->sparks_pruned = 0;
@@ -419,7 +418,7 @@ releaseCapability_ (Capability* cap,
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
if (always_wakeup ||
- !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+ !emptyRunQueue(cap) || !emptyInbox(cap) ||
!emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
@@ -645,11 +644,11 @@ yieldCapability (Capability** pCap, Task *task)
* ------------------------------------------------------------------------- */
void
-wakeupThreadOnCapability (Capability *my_cap,
+wakeupThreadOnCapability (Capability *cap,
Capability *other_cap,
StgTSO *tso)
{
- ACQUIRE_LOCK(&other_cap->lock);
+ MessageWakeup *msg;
// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
@@ -658,27 +657,20 @@ wakeupThreadOnCapability (Capability *my_cap,
}
tso->cap = other_cap;
- ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1);
+ ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
+ tso->block_info.closure->header.info == &stg_IND_info);
- if (other_cap->running_task == NULL) {
- // nobody is running this Capability, we can add our thread
- // directly onto the run queue and start up a Task to run it.
+ ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
- other_cap->running_task = myTask();
- // precond for releaseCapability_() and appendToRunQueue()
+ msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
+ msg->header.info = &stg_MSG_WAKEUP_info;
+ msg->tso = tso;
+ tso->block_info.closure = (StgClosure *)msg;
+ dirty_TSO(cap, tso);
+ write_barrier();
+ tso->why_blocked = BlockedOnMsgWakeup;
- appendToRunQueue(other_cap,tso);
-
- releaseCapability_(other_cap,rtsFalse);
- } else {
- appendToWakeupQueue(my_cap,other_cap,tso);
- other_cap->context_switch = 1;
- // someone is running on this Capability, so it cannot be
- // freed without first checking the wakeup queue (see
- // releaseCapability_).
- }
-
- RELEASE_LOCK(&other_cap->lock);
+ sendMessage(other_cap, (Message*)msg);
}
/* ----------------------------------------------------------------------------
@@ -881,8 +873,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
#if defined(THREADED_RTS)
- evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
- evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
+ evac(user, (StgClosure **)(void *)&cap->inbox);
#endif
for (incall = cap->suspended_ccalls; incall != NULL;
incall=incall->next) {
@@ -910,3 +901,29 @@ markCapabilities (evac_fn evac, void *user)
{
markSomeCapabilities(evac, user, 0, 1, rtsFalse);
}
+
+/* -----------------------------------------------------------------------------
+ Messages
+ -------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void sendMessage(Capability *cap, Message *msg)
+{
+ ACQUIRE_LOCK(&cap->lock);
+
+ msg->link = cap->inbox;
+ cap->inbox = msg;
+
+ if (cap->running_task == NULL) {
+ cap->running_task = myTask();
+ // precond for releaseCapability_()
+ releaseCapability_(cap,rtsFalse);
+ } else {
+ contextSwitchCapability(cap);
+ }
+
+ RELEASE_LOCK(&cap->lock);
+}
+
+#endif // THREADED_RTS
diff --git a/rts/Capability.h b/rts/Capability.h
index 41974dc710..4030b5efd4 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -88,11 +88,8 @@ struct Capability_ {
Task *returning_tasks_hd; // Singly-linked, with head/tail
Task *returning_tasks_tl;
- // A list of threads to append to this Capability's run queue at
- // the earliest opportunity. These are threads that have been
- // woken up by another Capability.
- StgTSO *wakeup_queue_hd;
- StgTSO *wakeup_queue_tl;
+ // Messages, or END_TSO_QUEUE.
+ Message *inbox;
SparkPool *sparks;
@@ -285,6 +282,18 @@ void markCapabilities (evac_fn evac, void *user);
void traverseSparkQueues (evac_fn evac, void *user);
/* -----------------------------------------------------------------------------
+ Messages
+ -------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
+
+void sendMessage (Capability *cap, Message *msg);
+
+#endif // THREADED_RTS
+
+/* -----------------------------------------------------------------------------
* INLINE functions... private below here
* -------------------------------------------------------------------------- */
@@ -333,6 +342,15 @@ contextSwitchCapability (Capability *cap)
cap->context_switch = 1;
}
+#ifdef THREADED_RTS
+
+INLINE_HEADER rtsBool emptyInbox(Capability *cap)
+{
+ return (cap->inbox == (Message*)END_TSO_QUEUE);
+}
+
+#endif
+
END_RTS_PRIVATE
#endif /* CAPABILITY_H */
diff --git a/rts/ClosureFlags.c b/rts/ClosureFlags.c
index 477a892594..358cb40ed3 100644
--- a/rts/ClosureFlags.c
+++ b/rts/ClosureFlags.c
@@ -74,20 +74,16 @@ StgWord16 closure_flags[] = {
[MUT_VAR_CLEAN] = (_HNF| _NS| _MUT|_UPT ),
[MUT_VAR_DIRTY] = (_HNF| _NS| _MUT|_UPT ),
[WEAK] = (_HNF| _NS| _UPT ),
- [STABLE_NAME] = (_HNF| _NS| _UPT ),
+ [PRIM] = (_HNF| _NS| _UPT ),
+ [MUT_PRIM] = (_HNF| _NS| _MUT|_UPT ),
[TSO] = (_HNF| _NS| _MUT|_UPT ),
- [TVAR_WATCH_QUEUE] = ( _NS| _MUT|_UPT ),
- [INVARIANT_CHECK_QUEUE]= ( _NS| _MUT|_UPT ),
- [ATOMIC_INVARIANT] = ( _NS| _MUT|_UPT ),
- [TVAR] = (_HNF| _NS| _MUT|_UPT ),
[TREC_CHUNK] = ( _NS| _MUT|_UPT ),
- [TREC_HEADER] = ( _NS| _MUT|_UPT ),
[ATOMICALLY_FRAME] = ( _BTM ),
[CATCH_RETRY_FRAME] = ( _BTM ),
[CATCH_STM_FRAME] = ( _BTM ),
[WHITEHOLE] = ( 0 )
};
-#if N_CLOSURE_TYPES != 65
+#if N_CLOSURE_TYPES != 61
#error Closure types changed: update ClosureFlags.c!
#endif
diff --git a/rts/Exception.cmm b/rts/Exception.cmm
index 6c887c22dc..55c79cede7 100644
--- a/rts/Exception.cmm
+++ b/rts/Exception.cmm
@@ -56,7 +56,7 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, RET_SMALL )
CInt r;
StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) &
- ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32);
+ %lobits32(~(TSO_BLOCKEX|TSO_INTERRUPTIBLE));
/* Eagerly raise a blocked exception, if there is one */
if (StgTSO_blocked_exceptions(CurrentTSO) != END_TSO_QUEUE) {
@@ -99,8 +99,8 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, RET_SMALL )
INFO_TABLE_RET( stg_blockAsyncExceptionszh_ret, RET_SMALL )
{
- StgTSO_flags(CurrentTSO) =
- StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+ StgTSO_flags(CurrentTSO) = %lobits32(
+ TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE);
Sp_adj(1);
jump %ENTRY_CODE(Sp(0));
@@ -113,8 +113,8 @@ stg_blockAsyncExceptionszh
if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) == 0) {
- StgTSO_flags(CurrentTSO) =
- StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+ StgTSO_flags(CurrentTSO) = %lobits32(
+ TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE);
/* avoid growing the stack unnecessarily */
if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) {
@@ -142,8 +142,8 @@ stg_unblockAsyncExceptionszh
/* If exceptions are already unblocked, there's nothing to do */
if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) != 0) {
- StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) &
- ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32);
+ StgTSO_flags(CurrentTSO) = %lobits32(
+ TO_W_(StgTSO_flags(CurrentTSO)) & ~(TSO_BLOCKEX|TSO_INTERRUPTIBLE));
/* avoid growing the stack unnecessarily */
if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) {
@@ -252,27 +252,22 @@ stg_killThreadzh
}
} else {
W_ out;
- W_ retcode;
+ W_ msg;
out = Sp - WDS(1); /* ok to re-use stack space here */
- (retcode) = foreign "C" throwTo(MyCapability() "ptr",
- CurrentTSO "ptr",
- target "ptr",
- exception "ptr",
- out "ptr") [R1,R2];
+ (msg) = foreign "C" throwTo(MyCapability() "ptr",
+ CurrentTSO "ptr",
+ target "ptr",
+ exception "ptr") [R1,R2];
- switch [THROWTO_SUCCESS .. THROWTO_BLOCKED] (retcode) {
-
- case THROWTO_SUCCESS: {
+ if (msg == NULL) {
jump %ENTRY_CODE(Sp(0));
- }
-
- case THROWTO_BLOCKED: {
- R3 = W_[out];
- // we must block, and call throwToReleaseTarget() before returning
+ } else {
+ StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo;
+ StgTSO_block_info(CurrentTSO) = msg;
+ // we must block, and unlock the message before returning
jump stg_block_throwto;
}
- }
}
}
@@ -507,8 +502,8 @@ retry_pop_stack:
/* Ensure that async excpetions are blocked when running the handler.
*/
- StgTSO_flags(CurrentTSO) =
- StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+ StgTSO_flags(CurrentTSO) = %lobits32(
+ TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE);
/* Call the handler, passing the exception value and a realworld
* token as arguments.
diff --git a/rts/FrontPanel.c b/rts/FrontPanel.c
index 163a7c08ca..ebba4056fb 100644
--- a/rts/FrontPanel.c
+++ b/rts/FrontPanel.c
@@ -697,7 +697,7 @@ residencyCensus( void )
break;
case WEAK:
- case STABLE_NAME:
+ case PRIM:
case MVAR:
case MUT_VAR:
/* case MUT_CONS: FIXME: case does not exist */
diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm
index b516ef2c09..a528a3f22e 100644
--- a/rts/HeapStackCheck.cmm
+++ b/rts/HeapStackCheck.cmm
@@ -631,9 +631,8 @@ INFO_TABLE_RET( stg_block_throwto, RET_SMALL, P_ unused, P_ unused )
stg_block_throwto_finally
{
-#ifdef THREADED_RTS
- foreign "C" throwToReleaseTarget (R3 "ptr");
-#endif
+ // unlock the throwto message
+ unlockClosure(StgTSO_block_info(CurrentTSO), stg_MSG_THROWTO_info);
jump StgReturn;
}
diff --git a/rts/LdvProfile.c b/rts/LdvProfile.c
index c2e7d7ec5a..ccaf10c684 100644
--- a/rts/LdvProfile.c
+++ b/rts/LdvProfile.c
@@ -109,7 +109,7 @@ processHeapClosureForDead( StgClosure *c )
case MUT_VAR_CLEAN:
case MUT_VAR_DIRTY:
case BCO:
- case STABLE_NAME:
+ case PRIM:
case TVAR_WATCH_QUEUE:
case TVAR:
case TREC_HEADER:
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index 5325c85d1f..bf81eeece1 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -542,9 +542,9 @@ stg_forkzh
closure "ptr") [];
/* start blocked if the current thread is blocked */
- StgTSO_flags(threadid) =
- StgTSO_flags(threadid) | (StgTSO_flags(CurrentTSO) &
- (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32));
+ StgTSO_flags(threadid) = %lobits16(
+ TO_W_(StgTSO_flags(threadid)) |
+ TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE));
foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") [];
@@ -572,9 +572,9 @@ stg_forkOnzh
closure "ptr") [];
/* start blocked if the current thread is blocked */
- StgTSO_flags(threadid) =
- StgTSO_flags(threadid) | (StgTSO_flags(CurrentTSO) &
- (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32));
+ StgTSO_flags(threadid) = %lobits16(
+ TO_W_(StgTSO_flags(threadid)) |
+ TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE));
foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") [];
diff --git a/rts/Printer.c b/rts/Printer.c
index 1b8a6dd2c6..e9813299d8 100644
--- a/rts/Printer.c
+++ b/rts/Printer.c
@@ -160,6 +160,12 @@ printClosure( StgClosure *obj )
printStdObjPayload(obj);
break;
+ case PRIM:
+ debugBelch("PRIM(");
+ printPtr((StgPtr)obj->header.info);
+ printStdObjPayload(obj);
+ break;
+
case THUNK:
case THUNK_1_0: case THUNK_0_1:
case THUNK_1_1: case THUNK_0_2: case THUNK_2_0:
@@ -356,10 +362,6 @@ printClosure( StgClosure *obj )
/* ToDo: chase 'link' ? */
break;
- case STABLE_NAME:
- debugBelch("STABLE_NAME(%lu)\n", (lnat)((StgStableName*)obj)->sn);
- break;
-
case TSO:
debugBelch("TSO(");
debugBelch("%lu (%p)",(unsigned long)(((StgTSO*)obj)->id), (StgTSO*)obj);
@@ -1132,14 +1134,10 @@ char *closure_type_names[] = {
[MUT_VAR_CLEAN] = "MUT_VAR_CLEAN",
[MUT_VAR_DIRTY] = "MUT_VAR_DIRTY",
[WEAK] = "WEAK",
- [STABLE_NAME] = "STABLE_NAME",
+ [PRIM] = "PRIM",
+ [MUT_PRIM] = "MUT_PRIM",
[TSO] = "TSO",
- [TVAR_WATCH_QUEUE] = "TVAR_WATCH_QUEUE",
- [INVARIANT_CHECK_QUEUE] = "INVARIANT_CHECK_QUEUE",
- [ATOMIC_INVARIANT] = "ATOMIC_INVARIANT",
- [TVAR] = "TVAR",
[TREC_CHUNK] = "TREC_CHUNK",
- [TREC_HEADER] = "TREC_HEADER",
[ATOMICALLY_FRAME] = "ATOMICALLY_FRAME",
[CATCH_RETRY_FRAME] = "CATCH_RETRY_FRAME",
[CATCH_STM_FRAME] = "CATCH_STM_FRAME",
diff --git a/rts/ProfHeap.c b/rts/ProfHeap.c
index 15337d4585..e90051c5e6 100644
--- a/rts/ProfHeap.c
+++ b/rts/ProfHeap.c
@@ -912,7 +912,8 @@ heapCensusChain( Census *census, bdescr *bd )
case MVAR_CLEAN:
case MVAR_DIRTY:
case WEAK:
- case STABLE_NAME:
+ case PRIM:
+ case MUT_PRIM:
case MUT_VAR_CLEAN:
case MUT_VAR_DIRTY:
prim = rtsTrue;
@@ -960,31 +961,6 @@ heapCensusChain( Census *census, bdescr *bd )
break;
#endif
- case TREC_HEADER:
- prim = rtsTrue;
- size = sizeofW(StgTRecHeader);
- break;
-
- case TVAR_WATCH_QUEUE:
- prim = rtsTrue;
- size = sizeofW(StgTVarWatchQueue);
- break;
-
- case INVARIANT_CHECK_QUEUE:
- prim = rtsTrue;
- size = sizeofW(StgInvariantCheckQueue);
- break;
-
- case ATOMIC_INVARIANT:
- prim = rtsTrue;
- size = sizeofW(StgAtomicInvariant);
- break;
-
- case TVAR:
- prim = rtsTrue;
- size = sizeofW(StgTVar);
- break;
-
case TREC_CHUNK:
prim = rtsTrue;
size = sizeofW(StgTRecChunk);
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index ca5e5ea4c1..d54f823d6e 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -30,10 +30,14 @@ static void raiseAsync (Capability *cap,
static void removeFromQueues(Capability *cap, StgTSO *tso);
-static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target);
+static void blockedThrowTo (Capability *cap,
+ StgTSO *target, MessageThrowTo *msg);
-static void performBlockedException (Capability *cap,
- StgTSO *source, StgTSO *target);
+static void throwToSendMsg (Capability *cap USED_IF_THREADS,
+ Capability *target_cap USED_IF_THREADS,
+ MessageThrowTo *msg USED_IF_THREADS);
+
+static void performBlockedException (Capability *cap, MessageThrowTo *msg);
/* -----------------------------------------------------------------------------
throwToSingleThreaded
@@ -96,59 +100,85 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
may be blocked and could be woken up at any point by another CPU.
We have some delicate synchronisation to do.
- There is a completely safe fallback scheme: it is always possible
- to just block the source TSO on the target TSO's blocked_exceptions
- queue. This queue is locked using lockTSO()/unlockTSO(). It is
- checked at regular intervals: before and after running a thread
- (schedule() and threadPaused() respectively), and just before GC
- (scheduleDoGC()). Activating a thread on this queue should be done
- using maybePerformBlockedException(): this is done in the context
- of the target thread, so the exception can be raised eagerly.
-
- This fallback scheme works even if the target thread is complete or
- killed: scheduleDoGC() will discover the blocked thread before the
- target is GC'd.
-
- Blocking the source thread on the target thread's blocked_exception
- queue is also employed when the target thread is currently blocking
- exceptions (ie. inside Control.Exception.block).
-
- We could use the safe fallback scheme exclusively, but that
- wouldn't be ideal: most calls to throwTo would block immediately,
- possibly until the next GC, which might require the deadlock
- detection mechanism to kick in. So we try to provide promptness
- wherever possible.
-
- We can promptly deliver the exception if the target thread is:
-
- - runnable, on the same Capability as the source thread (because
- we own the run queue and therefore the target thread).
-
- - blocked, and we can obtain exclusive access to it. Obtaining
- exclusive access to the thread depends on how it is blocked.
-
- We must also be careful to not trip over threadStackOverflow(),
- which might be moving the TSO to enlarge its stack.
- lockTSO()/unlockTSO() are used here too.
-
+ The underlying scheme when multiple Capabilities are in use is
+ message passing: when the target of a throwTo is on another
+ Capability, we send a message (a MessageThrowTo closure) to that
+ Capability.
+
+ If the throwTo needs to block because the target TSO is masking
+ exceptions (the TSO_BLOCKEX flag), then the message is placed on
+ the blocked_exceptions queue attached to the target TSO. When the
+ target TSO enters the unmasked state again, it must check the
+ queue. The blocked_exceptions queue is not locked; only the
+ Capability owning the TSO may modify it.
+
+ To make things simpler for throwTo, we always create the message
+ first before deciding what to do. The message may get sent, or it
+ may get attached to a TSO's blocked_exceptions queue, or the
+ exception may get thrown immediately and the message dropped,
+ depending on the current state of the target.
+
+ Currently we send a message if the target belongs to another
+ Capability, and it is
+
+ - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo,
+ BlockedOnCCall
+
+ - or it is masking exceptions (TSO_BLOCKEX)
+
+ Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
+ BlockedOnBlackHole then we acquire ownership of the TSO by locking
+ its parent container (e.g. the MVar) and then raise the exception.
+ We might change these cases to be more message-passing-like in the
+ future.
+
Returns:
- THROWTO_SUCCESS exception was raised, ok to continue
+ NULL exception was raised, ok to continue
- THROWTO_BLOCKED exception was not raised; block the source
- thread then call throwToReleaseTarget() when
- the source thread is properly tidied away.
+ MessageThrowTo * exception was not raised; the source TSO
+ should now put itself in the state
+ BlockedOnMsgThrowTo, and when it is ready
+ it should unlock the mssage using
+ unlockClosure(msg, &stg_MSG_THROWTO_info);
+ If it decides not to raise the exception after
+ all, it can revoke it safely with
+ unlockClosure(msg, &stg_IND_info);
-------------------------------------------------------------------------- */
-nat
+MessageThrowTo *
throwTo (Capability *cap, // the Capability we hold
StgTSO *source, // the TSO sending the exception (or NULL)
StgTSO *target, // the TSO receiving the exception
- StgClosure *exception, // the exception closure
- /*[out]*/ void **out USED_IF_THREADS)
+ StgClosure *exception) // the exception closure
+{
+ MessageThrowTo *msg;
+
+ msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
+ // message starts locked; the caller has to unlock it when it is
+ // ready.
+ msg->header.info = &stg_WHITEHOLE_info;
+ msg->source = source;
+ msg->target = target;
+ msg->exception = exception;
+
+ switch (throwToMsg(cap, msg))
+ {
+ case THROWTO_SUCCESS:
+ return NULL;
+ case THROWTO_BLOCKED:
+ default:
+ return msg;
+ }
+}
+
+
+nat
+throwToMsg (Capability *cap, MessageThrowTo *msg)
{
StgWord status;
+ StgTSO *target = msg->target;
ASSERT(target != END_TSO_QUEUE);
@@ -159,13 +189,10 @@ throwTo (Capability *cap, // the Capability we hold
// ASSERT(get_itbl(target)->type == TSO);
}
- if (source != NULL) {
- debugTrace(DEBUG_sched, "throwTo: from thread %lu to thread %lu",
- (unsigned long)source->id, (unsigned long)target->id);
- } else {
- debugTrace(DEBUG_sched, "throwTo: from RTS to thread %lu",
- (unsigned long)target->id);
- }
+ debugTraceCap(DEBUG_sched, cap,
+ "throwTo: from thread %lu to thread %lu",
+ (unsigned long)msg->source->id,
+ (unsigned long)msg->target->id);
#ifdef DEBUG
traceThreadStatus(DEBUG_sched, target);
@@ -173,6 +200,7 @@ throwTo (Capability *cap, // the Capability we hold
goto check_target;
retry:
+ write_barrier();
debugTrace(DEBUG_sched, "throwTo: retrying...");
check_target:
@@ -188,6 +216,7 @@ check_target:
switch (status) {
case NotBlocked:
+ case BlockedOnMsgWakeup:
/* if status==NotBlocked, and target->cap == cap, then
we own this TSO and can raise the exception.
@@ -251,37 +280,80 @@ check_target:
write_barrier();
target_cap = target->cap;
- if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) {
- // It's on our run queue and not blocking exceptions
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- return THROWTO_SUCCESS;
- } else {
- // Otherwise, just block on the blocked_exceptions queue
- // of the target thread. The queue will get looked at
- // soon enough: it is checked before and after running a
- // thread, and during GC.
- lockTSO(target);
-
- // Avoid race with threadStackOverflow, which may have
- // just moved this TSO.
- if (target->what_next == ThreadRelocated) {
- unlockTSO(target);
- target = target->_link;
- goto retry;
- }
- // check again for ThreadComplete and ThreadKilled. This
- // cooperates with scheduleHandleThreadFinished to ensure
- // that we never miss any threads that are throwing an
- // exception to a thread in the process of terminating.
- if (target->what_next == ThreadComplete
- || target->what_next == ThreadKilled) {
- unlockTSO(target);
+ if (target_cap != cap) {
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
+ } else {
+ if ((target->flags & TSO_BLOCKEX) == 0) {
+ // It's on our run queue and not blocking exceptions
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
return THROWTO_SUCCESS;
+ } else {
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
}
- blockedThrowTo(cap,source,target);
- *out = target;
- return THROWTO_BLOCKED;
- }
+ }
+ }
+
+ case BlockedOnMsgThrowTo:
+ {
+ Capability *target_cap;
+ const StgInfoTable *i;
+ MessageThrowTo *m;
+
+ m = target->block_info.throwto;
+
+ // target is local to this cap, but has sent a throwto
+ // message to another cap.
+ //
+ // The source message is locked. We need to revoke the
+ // target's message so that we can raise the exception, so
+ // we attempt to lock it.
+
+ // There's a possibility of a deadlock if two threads are both
+ // trying to throwTo each other (or more generally, a cycle of
+ // threads). To break the symmetry we compare the addresses
+ // of the MessageThrowTo objects, and the one for which m <
+ // msg gets to spin, while the other can only try to lock
+ // once, but must then back off and unlock both before trying
+ // again.
+ if (m < msg) {
+ i = lockClosure((StgClosure *)m);
+ } else {
+ i = tryLockClosure((StgClosure *)m);
+ if (i == NULL) {
+// debugBelch("collision\n");
+ throwToSendMsg(cap, target->cap, msg);
+ return THROWTO_BLOCKED;
+ }
+ }
+
+ if (i != &stg_MSG_THROWTO_info) {
+ // if it's an IND, this TSO has been woken up by another Cap
+ unlockClosure((StgClosure*)m, i);
+ goto retry;
+ }
+
+ target_cap = target->cap;
+ if (target_cap != cap) {
+ unlockClosure((StgClosure*)m, i);
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
+ }
+
+ if ((target->flags & TSO_BLOCKEX) &&
+ ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+ unlockClosure((StgClosure*)m, i);
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
+ }
+
+ // nobody else can wake up this TSO after we claim the message
+ unlockClosure((StgClosure*)m, &stg_IND_info);
+
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ unblockOne(cap, target);
+ return THROWTO_SUCCESS;
}
case BlockedOnMVar:
@@ -322,14 +394,17 @@ check_target:
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- lockClosure((StgClosure *)target);
- blockedThrowTo(cap,source,target);
+ Capability *target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap,target_cap,msg);
+ } else {
+ blockedThrowTo(cap,target,msg);
+ }
unlockClosure((StgClosure *)mvar, info);
- *out = target;
- return THROWTO_BLOCKED; // caller releases TSO
+ return THROWTO_BLOCKED;
} else {
removeThreadFromMVarQueue(cap, mvar, target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
unblockOne(cap, target);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_SUCCESS;
@@ -346,84 +421,23 @@ check_target:
}
if (target->flags & TSO_BLOCKEX) {
- lockTSO(target);
- blockedThrowTo(cap,source,target);
+ Capability *target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap,target_cap,msg);
+ } else {
+ blockedThrowTo(cap,target,msg);
+ }
RELEASE_LOCK(&sched_mutex);
- *out = target;
- return THROWTO_BLOCKED; // caller releases TSO
+ return THROWTO_BLOCKED; // caller releases lock
} else {
removeThreadFromQueue(cap, &blackhole_queue, target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
unblockOne(cap, target);
RELEASE_LOCK(&sched_mutex);
return THROWTO_SUCCESS;
}
}
- case BlockedOnException:
- {
- StgTSO *target2;
- StgInfoTable *info;
-
- /*
- To obtain exclusive access to a BlockedOnException thread,
- we must call lockClosure() on the TSO on which it is blocked.
- Since the TSO might change underneath our feet, after we
- call lockClosure() we must check that
-
- (a) the closure we locked is actually a TSO
- (b) the original thread is still BlockedOnException,
- (c) the original thread is still blocked on the TSO we locked
- and (d) the target thread has not been relocated.
-
- We synchronise with threadStackOverflow() (which relocates
- threads) using lockClosure()/unlockClosure().
- */
- target2 = target->block_info.tso;
-
- info = lockClosure((StgClosure *)target2);
- if (info != &stg_TSO_info) {
- unlockClosure((StgClosure *)target2, info);
- goto retry;
- }
- if (target->what_next == ThreadRelocated) {
- target = target->_link;
- unlockTSO(target2);
- goto retry;
- }
- if (target2->what_next == ThreadRelocated) {
- target->block_info.tso = target2->_link;
- unlockTSO(target2);
- goto retry;
- }
- if (target->why_blocked != BlockedOnException
- || target->block_info.tso != target2) {
- unlockTSO(target2);
- goto retry;
- }
-
- /*
- Now we have exclusive rights to the target TSO...
-
- If it is blocking exceptions, add the source TSO to its
- blocked_exceptions queue. Otherwise, raise the exception.
- */
- if ((target->flags & TSO_BLOCKEX) &&
- ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- lockTSO(target);
- blockedThrowTo(cap,source,target);
- unlockTSO(target2);
- *out = target;
- return THROWTO_BLOCKED;
- } else {
- removeThreadFromQueue(cap, &target2->blocked_exceptions, target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- unblockOne(cap, target);
- unlockTSO(target2);
- return THROWTO_SUCCESS;
- }
- }
-
case BlockedOnSTM:
lockTSO(target);
// Unblocking BlockedOnSTM threads requires the TSO to be
@@ -434,11 +448,16 @@ check_target:
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- blockedThrowTo(cap,source,target);
- *out = target;
+ Capability *target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap,target_cap,msg);
+ } else {
+ blockedThrowTo(cap,target,msg);
+ }
+ unlockTSO(target);
return THROWTO_BLOCKED;
} else {
- raiseAsync(cap, target, exception, rtsFalse, NULL);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
unblockOne(cap, target);
unlockTSO(target);
return THROWTO_SUCCESS;
@@ -446,19 +465,18 @@ check_target:
case BlockedOnCCall:
case BlockedOnCCall_NoUnblockExc:
- // I don't think it's possible to acquire ownership of a
- // BlockedOnCCall thread. We just assume that the target
- // thread is blocking exceptions, and block on its
- // blocked_exception queue.
- lockTSO(target);
- if (target->why_blocked != BlockedOnCCall &&
- target->why_blocked != BlockedOnCCall_NoUnblockExc) {
- unlockTSO(target);
- goto retry;
- }
- blockedThrowTo(cap,source,target);
- *out = target;
+ {
+ Capability *target_cap;
+
+ target_cap = target->cap;
+ if (target_cap != cap) {
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
+ }
+
+ blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
+ }
#ifndef THREADEDED_RTS
case BlockedOnRead:
@@ -469,11 +487,11 @@ check_target:
#endif
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- blockedThrowTo(cap,source,target);
+ blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
} else {
removeFromQueues(cap,target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
return THROWTO_SUCCESS;
}
#endif
@@ -484,33 +502,34 @@ check_target:
barf("throwTo");
}
-// Block a TSO on another TSO's blocked_exceptions queue.
-// Precondition: we hold an exclusive lock on the target TSO (this is
-// complex to achieve as there's no single lock on a TSO; see
-// throwTo()).
static void
-blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target)
+throwToSendMsg (Capability *cap STG_UNUSED,
+ Capability *target_cap USED_IF_THREADS,
+ MessageThrowTo *msg USED_IF_THREADS)
+
{
- if (source != NULL) {
- debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id);
- setTSOLink(cap, source, target->blocked_exceptions);
- target->blocked_exceptions = source;
- dirty_TSO(cap,target); // we modified the blocked_exceptions queue
-
- source->block_info.tso = target;
- write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is.
- source->why_blocked = BlockedOnException;
- }
-}
+#ifdef THREADED_RTS
+ debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
+ sendMessage(target_cap, (Message*)msg);
+#endif
+}
-#ifdef THREADED_RTS
-void
-throwToReleaseTarget (void *tso)
+// Block a throwTo message on the target TSO's blocked_exceptions
+// queue. The current Capability must own the target TSO in order to
+// modify the blocked_exceptions queue.
+static void
+blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
{
- unlockTSO((StgTSO *)tso);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
+ (unsigned long)target->id);
+
+ ASSERT(target->cap == cap);
+
+ msg->link = (Message*)target->blocked_exceptions;
+ target->blocked_exceptions = msg;
+ dirty_TSO(cap,target); // we modified the blocked_exceptions queue
}
-#endif
/* -----------------------------------------------------------------------------
Waking up threads blocked in throwTo
@@ -532,10 +551,11 @@ throwToReleaseTarget (void *tso)
int
maybePerformBlockedException (Capability *cap, StgTSO *tso)
{
- StgTSO *source;
+ MessageThrowTo *msg;
+ const StgInfoTable *i;
if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
- if (tso->blocked_exceptions != END_TSO_QUEUE) {
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
awakenBlockedExceptionQueue(cap,tso);
return 1;
} else {
@@ -543,32 +563,30 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
}
}
- if (tso->blocked_exceptions != END_TSO_QUEUE &&
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
(tso->flags & TSO_BLOCKEX) != 0) {
debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
}
- if (tso->blocked_exceptions != END_TSO_QUEUE
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
&& ((tso->flags & TSO_BLOCKEX) == 0
|| ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
- // Lock the TSO, this gives us exclusive access to the queue
- lockTSO(tso);
-
- // Check the queue again; it might have changed before we
- // locked it.
- if (tso->blocked_exceptions == END_TSO_QUEUE) {
- unlockTSO(tso);
- return 0;
- }
-
// We unblock just the first thread on the queue, and perform
// its throw immediately.
- source = tso->blocked_exceptions;
- performBlockedException(cap, source, tso);
- tso->blocked_exceptions = unblockOne_(cap, source,
- rtsFalse/*no migrate*/);
- unlockTSO(tso);
+ loop:
+ msg = tso->blocked_exceptions;
+ if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
+ i = lockClosure((StgClosure*)msg);
+ tso->blocked_exceptions = (MessageThrowTo*)msg->link;
+ if (i == &stg_IND_info) {
+ unlockClosure((StgClosure*)msg,i);
+ goto loop;
+ }
+
+ performBlockedException(cap, msg);
+ unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+ unlockClosure((StgClosure*)msg,&stg_IND_info);
return 1;
}
return 0;
@@ -580,25 +598,34 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
void
awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
{
- lockTSO(tso);
- awakenBlockedQueue(cap, tso->blocked_exceptions);
- tso->blocked_exceptions = END_TSO_QUEUE;
- unlockTSO(tso);
+ MessageThrowTo *msg;
+ const StgInfoTable *i;
+
+ for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
+ msg = (MessageThrowTo*)msg->link) {
+ i = lockClosure((StgClosure *)msg);
+ if (i != &stg_IND_info) {
+ unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+ }
+ unlockClosure((StgClosure *)msg,i);
+ }
+ tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
}
static void
-performBlockedException (Capability *cap, StgTSO *source, StgTSO *target)
+performBlockedException (Capability *cap, MessageThrowTo *msg)
{
- StgClosure *exception;
+ StgTSO *source;
+
+ source = msg->source;
- ASSERT(source->why_blocked == BlockedOnException);
- ASSERT(source->block_info.tso->id == target->id);
+ ASSERT(source->why_blocked == BlockedOnMsgThrowTo);
+ ASSERT(source->block_info.closure == (StgClosure *)msg);
ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
- ASSERT(((StgTSO *)source->sp[1])->id == target->id);
+ ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id);
// check ids not pointers, because the thread might be relocated
- exception = (StgClosure *)source->sp[2];
- throwToSingleThreaded(cap, target, exception);
+ throwToSingleThreaded(cap, msg->target, msg->exception);
source->sp += 3;
}
@@ -637,22 +664,25 @@ removeFromQueues(Capability *cap, StgTSO *tso)
removeThreadFromQueue(cap, &blackhole_queue, tso);
goto done;
- case BlockedOnException:
- {
- StgTSO *target = tso->block_info.tso;
-
- // NO: when called by threadPaused(), we probably have this
- // TSO already locked (WHITEHOLEd) because we just placed
- // ourselves on its queue.
- // ASSERT(get_itbl(target)->type == TSO);
-
- while (target->what_next == ThreadRelocated) {
- target = target->_link;
- }
-
- removeThreadFromQueue(cap, &target->blocked_exceptions, tso);
- goto done;
- }
+ case BlockedOnMsgWakeup:
+ {
+ // kill the message, atomically:
+ tso->block_info.wakeup->header.info = &stg_IND_info;
+ break;
+ }
+
+ case BlockedOnMsgThrowTo:
+ {
+ MessageThrowTo *m = tso->block_info.throwto;
+ // The message is locked by us, unless we got here via
+ // deleteAllThreads(), in which case we own all the
+ // capabilities.
+ // ASSERT(m->header.info == &stg_WHITEHOLE_info);
+
+ // unlock and revoke it at the same time
+ unlockClosure((StgClosure*)m,&stg_IND_info);
+ break;
+ }
#if !defined(THREADED_RTS)
case BlockedOnRead:
@@ -743,6 +773,10 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
}
#endif
+ while (tso->what_next == ThreadRelocated) {
+ tso = tso->_link;
+ }
+
// mark it dirty; we're about to change its stack.
dirty_TSO(cap, tso);
diff --git a/rts/RaiseAsync.h b/rts/RaiseAsync.h
index 96eb96e10b..5137d41f5f 100644
--- a/rts/RaiseAsync.h
+++ b/rts/RaiseAsync.h
@@ -29,16 +29,13 @@ void suspendComputation (Capability *cap,
StgTSO *tso,
StgUpdateFrame *stop_here);
-nat throwTo (Capability *cap, // the Capability we hold
- StgTSO *source, // the TSO sending the exception
- StgTSO *target, // the TSO receiving the exception
- StgClosure *exception, // the exception closure
- /*[out]*/ void **out // pass to throwToReleaseTarget()
- );
+MessageThrowTo *throwTo (Capability *cap, // the Capability we hold
+ StgTSO *source,
+ StgTSO *target,
+ StgClosure *exception); // the exception closure
-#ifdef THREADED_RTS
-void throwToReleaseTarget (void *tso);
-#endif
+nat throwToMsg (Capability *cap,
+ MessageThrowTo *msg);
int maybePerformBlockedException (Capability *cap, StgTSO *tso);
void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso);
@@ -52,7 +49,7 @@ interruptible(StgTSO *t)
{
switch (t->why_blocked) {
case BlockedOnMVar:
- case BlockedOnException:
+ case BlockedOnMsgThrowTo:
case BlockedOnRead:
case BlockedOnWrite:
#if defined(mingw32_HOST_OS)
diff --git a/rts/RetainerProfile.c b/rts/RetainerProfile.c
index 4fca19cf2f..b7bc909f63 100644
--- a/rts/RetainerProfile.c
+++ b/rts/RetainerProfile.c
@@ -509,7 +509,7 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child )
// layout.payload.ptrs, no SRT
case CONSTR:
- case STABLE_NAME:
+ case PRIM:
case BCO:
case CONSTR_STATIC:
init_ptrs(&se.info, get_itbl(c)->layout.payload.ptrs,
@@ -883,7 +883,7 @@ pop( StgClosure **c, StgClosure **cp, retainer *r )
}
case CONSTR:
- case STABLE_NAME:
+ case PRIM:
case BCO:
case CONSTR_STATIC:
// StgMutArrPtr.ptrs, no SRT
@@ -1108,7 +1108,7 @@ isRetainer( StgClosure *c )
case CONSTR_STATIC:
case FUN_STATIC:
// misc
- case STABLE_NAME:
+ case PRIM:
case BCO:
case ARR_WORDS:
// STM
diff --git a/rts/RtsMessages.c b/rts/RtsMessages.c
index e2a30a613f..6e75abc8a5 100644
--- a/rts/RtsMessages.c
+++ b/rts/RtsMessages.c
@@ -9,6 +9,8 @@
#include "PosixSource.h"
#include "Rts.h"
+#include "eventlog/EventLog.h"
+
#include <stdio.h>
#include <string.h>
#include <errno.h>
@@ -161,6 +163,10 @@ rtsFatalInternalErrorFn(const char *s, va_list ap)
fflush(stderr);
}
+#ifdef TRACING
+ if (RtsFlags.TraceFlags.tracing == TRACE_EVENTLOG) endEventLogging();
+#endif
+
abort();
// stg_exit(EXIT_INTERNAL_ERROR);
}
diff --git a/rts/STM.c b/rts/STM.c
index ed5a7224ef..be61538434 100644
--- a/rts/STM.c
+++ b/rts/STM.c
@@ -352,8 +352,7 @@ static StgBool watcher_is_tso(StgTVarWatchQueue *q) {
static StgBool watcher_is_invariant(StgTVarWatchQueue *q) {
StgClosure *c = q -> closure;
- StgInfoTable *info = get_itbl(c);
- return (info -> type) == ATOMIC_INVARIANT;
+ return (c->header.info == &stg_ATOMIC_INVARIANT_info);
}
/*......................................................................*/
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 4cca469869..70e0246fbe 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -139,7 +139,7 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
-static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
+static void scheduleProcessInbox(Capability *cap);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
@@ -618,7 +618,7 @@ scheduleFindWork (Capability *cap)
// list each time around the scheduler.
if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
- scheduleCheckWakeupThreads(cap);
+ scheduleProcessInbox(cap);
scheduleCheckBlockedThreads(cap);
@@ -673,7 +673,7 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
if (!force_yield &&
!shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
- !emptyWakeupQueue(cap) ||
+ !emptyInbox(cap) ||
blackholes_need_checking ||
sched_state >= SCHED_INTERRUPTING))
return;
@@ -725,7 +725,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
for (i=0, n_free_caps=0; i < n_capabilities; i++) {
cap0 = &capabilities[i];
if (cap != cap0 && tryGrabCapability(cap0,task)) {
- if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+ if (!emptyRunQueue(cap0)
+ || cap->returning_tasks_hd != NULL
+ || cap->inbox != (Message*)END_TSO_QUEUE) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
releaseCapability(cap0);
@@ -871,23 +873,89 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
* Check for threads woken up by other Capabilities
* ------------------------------------------------------------------------- */
+#if defined(THREADED_RTS)
+static void
+executeMessage (Capability *cap, Message *m)
+{
+ const StgInfoTable *i;
+
+loop:
+ write_barrier(); // allow m->header to be modified by another thread
+ i = m->header.info;
+ if (i == &stg_MSG_WAKEUP_info)
+ {
+ MessageWakeup *w = (MessageWakeup *)m;
+ StgTSO *tso = w->tso;
+ debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
+ (lnat)tso->id);
+ ASSERT(tso->cap == cap);
+ ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
+ ASSERT(tso->block_info.closure == (StgClosure *)m);
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap, tso);
+ }
+ else if (i == &stg_MSG_THROWTO_info)
+ {
+ MessageThrowTo *t = (MessageThrowTo *)m;
+ nat r;
+ const StgInfoTable *i;
+
+ i = lockClosure((StgClosure*)m);
+ if (i != &stg_MSG_THROWTO_info) {
+ unlockClosure((StgClosure*)m, i);
+ goto loop;
+ }
+
+ debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
+ (lnat)t->source->id, (lnat)t->target->id);
+
+ ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
+ ASSERT(t->source->block_info.closure == (StgClosure *)m);
+
+ r = throwToMsg(cap, t);
+
+ switch (r) {
+ case THROWTO_SUCCESS:
+ ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
+ t->source->sp += 3;
+ unblockOne(cap, t->source);
+ // this message is done
+ unlockClosure((StgClosure*)m, &stg_IND_info);
+ break;
+ case THROWTO_BLOCKED:
+ // unlock the message
+ unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
+ break;
+ }
+ }
+ else if (i == &stg_IND_info)
+ {
+ // message was revoked
+ return;
+ }
+ else if (i == &stg_WHITEHOLE_info)
+ {
+ goto loop;
+ }
+ else
+ {
+ barf("executeMessage: %p", i);
+ }
+}
+#endif
+
static void
-scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
+scheduleProcessInbox (Capability *cap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
- // Any threads that were woken up by other Capabilities get
- // appended to our run queue.
- if (!emptyWakeupQueue(cap)) {
- ACQUIRE_LOCK(&cap->lock);
- if (emptyRunQueue(cap)) {
- cap->run_queue_hd = cap->wakeup_queue_hd;
- cap->run_queue_tl = cap->wakeup_queue_tl;
- } else {
- setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
- cap->run_queue_tl = cap->wakeup_queue_tl;
- }
- cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
- RELEASE_LOCK(&cap->lock);
+ Message *m;
+
+ while (!emptyInbox(cap)) {
+ ACQUIRE_LOCK(&cap->lock);
+ m = cap->inbox;
+ cap->inbox = m->link;
+ RELEASE_LOCK(&cap->lock);
+ executeMessage(cap, (Message *)m);
}
#endif
}
@@ -983,7 +1051,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
switch (task->incall->tso->why_blocked) {
case BlockedOnSTM:
case BlockedOnBlackHole:
- case BlockedOnException:
+ case BlockedOnMsgThrowTo:
case BlockedOnMVar:
throwToSingleThreaded(cap, task->incall->tso,
(StgClosure *)nonTermination_closure);
@@ -1268,9 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
*/
// blocked exceptions can now complete, even if the thread was in
- // blocked mode (see #2910). This unconditionally calls
- // lockTSO(), which ensures that we don't miss any threads that
- // are engaged in throwTo() with this thread as a target.
+ // blocked mode (see #2910).
awakenBlockedExceptionQueue (cap, t);
//
@@ -1884,7 +1950,7 @@ resumeThread (void *task_)
if (tso->why_blocked == BlockedOnCCall) {
// avoid locking the TSO if we don't have to
- if (tso->blocked_exceptions != END_TSO_QUEUE) {
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
awakenBlockedExceptionQueue(cap,tso);
}
tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
@@ -2187,10 +2253,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
IF_DEBUG(sanity,checkTSO(tso));
- // don't allow throwTo() to modify the blocked_exceptions queue
- // while we are moving the TSO:
- lockClosure((StgClosure *)tso);
-
if (tso->stack_size >= tso->max_stack_size
&& !(tso->flags & TSO_BLOCKEX)) {
// NB. never raise a StackOverflow exception if the thread is
@@ -2201,7 +2263,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
//
if (tso->flags & TSO_SQUEEZED) {
- unlockTSO(tso);
return tso;
}
// #3677: In a stack overflow situation, stack squeezing may
@@ -2223,7 +2284,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
tso->sp+64)));
// Send this thread the StackOverflow exception
- unlockTSO(tso);
throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
return tso;
}
@@ -2239,7 +2299,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
// the stack anyway.
if ((tso->flags & TSO_SQUEEZED) &&
((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) {
- unlockTSO(tso);
return tso;
}
@@ -2289,9 +2348,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
tso->sp = (P_)&(tso->stack[tso->stack_size]);
tso->why_blocked = NotBlocked;
- unlockTSO(dest);
- unlockTSO(tso);
-
IF_DEBUG(sanity,checkTSO(dest));
#if 0
IF_DEBUG(scheduler,printTSO(dest));
@@ -2324,10 +2380,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
return tso;
}
- // don't allow throwTo() to modify the blocked_exceptions queue
- // while we are moving the TSO:
- lockClosure((StgClosure *)tso);
-
// this is the number of words we'll free
free_w = round_to_mblocks(tso_size_w/2);
@@ -2358,9 +2410,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
task->incall->tso = new_tso;
}
- unlockTSO(new_tso);
- unlockTSO(tso);
-
IF_DEBUG(sanity,checkTSO(new_tso));
return new_tso;
@@ -2691,61 +2740,9 @@ resurrectThreads (StgTSO *threads)
* can wake up threads, remember...).
*/
continue;
- case BlockedOnException:
- // throwTo should never block indefinitely: if the target
- // thread dies or completes, throwTo returns.
- barf("resurrectThreads: thread BlockedOnException");
- break;
default:
- barf("resurrectThreads: thread blocked in a strange way");
+ barf("resurrectThreads: thread blocked in a strange way: %d",
+ tso->why_blocked);
}
}
}
-
-/* -----------------------------------------------------------------------------
- performPendingThrowTos is called after garbage collection, and
- passed a list of threads that were found to have pending throwTos
- (tso->blocked_exceptions was not empty), and were blocked.
- Normally this doesn't happen, because we would deliver the
- exception directly if the target thread is blocked, but there are
- small windows where it might occur on a multiprocessor (see
- throwTo()).
-
- NB. we must be holding all the capabilities at this point, just
- like resurrectThreads().
- -------------------------------------------------------------------------- */
-
-void
-performPendingThrowTos (StgTSO *threads)
-{
- StgTSO *tso, *next;
- Capability *cap;
- Task *task, *saved_task;;
- generation *gen;
-
- task = myTask();
- cap = task->cap;
-
- for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
- next = tso->global_link;
-
- gen = Bdescr((P_)tso)->gen;
- tso->global_link = gen->threads;
- gen->threads = tso;
-
- debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
-
- // We must pretend this Capability belongs to the current Task
- // for the time being, as invariants will be broken otherwise.
- // In fact the current Task has exclusive access to the systme
- // at this point, so this is just bookkeeping:
- task->cap = tso->cap;
- saved_task = tso->cap->running_task;
- tso->cap->running_task = task;
- maybePerformBlockedException(tso->cap, tso);
- tso->cap->running_task = saved_task;
- }
-
- // Restore our original Capability:
- task->cap = cap;
-}
diff --git a/rts/Schedule.h b/rts/Schedule.h
index af322d804f..76138b68f3 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -105,7 +105,6 @@ extern Mutex sched_mutex;
void interruptStgRts (void);
void resurrectThreads (StgTSO *);
-void performPendingThrowTos (StgTSO *);
/* -----------------------------------------------------------------------------
* Some convenient macros/inline functions...
@@ -179,25 +178,6 @@ appendToBlockedQueue(StgTSO *tso)
}
#endif
-#if defined(THREADED_RTS)
-// Assumes: my_cap is owned by the current Task. We hold
-// other_cap->lock, but we do not necessarily own other_cap; another
-// Task may be running on it.
-INLINE_HEADER void
-appendToWakeupQueue (Capability *my_cap, Capability *other_cap, StgTSO *tso)
-{
- ASSERT(tso->_link == END_TSO_QUEUE);
- if (other_cap->wakeup_queue_hd == END_TSO_QUEUE) {
- other_cap->wakeup_queue_hd = tso;
- } else {
- // my_cap is passed to setTSOLink() because it may need to
- // write to the mutable list.
- setTSOLink(my_cap, other_cap->wakeup_queue_tl, tso);
- }
- other_cap->wakeup_queue_tl = tso;
-}
-#endif
-
/* Check whether various thread queues are empty
*/
INLINE_HEADER rtsBool
@@ -212,14 +192,6 @@ emptyRunQueue(Capability *cap)
return emptyQueue(cap->run_queue_hd);
}
-#if defined(THREADED_RTS)
-INLINE_HEADER rtsBool
-emptyWakeupQueue(Capability *cap)
-{
- return emptyQueue(cap->wakeup_queue_hd);
-}
-#endif
-
#if !defined(THREADED_RTS)
#define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd))
#define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))
diff --git a/rts/StgMiscClosures.cmm b/rts/StgMiscClosures.cmm
index 8fd96c16fc..f111875760 100644
--- a/rts/StgMiscClosures.cmm
+++ b/rts/StgMiscClosures.cmm
@@ -419,7 +419,7 @@ CLOSURE(stg_NO_FINALIZER_closure,stg_NO_FINALIZER);
Stable Names are unlifted too.
------------------------------------------------------------------------- */
-INFO_TABLE(stg_STABLE_NAME,0,1,STABLE_NAME,"STABLE_NAME","STABLE_NAME")
+INFO_TABLE(stg_STABLE_NAME,0,1,PRIM,"STABLE_NAME","STABLE_NAME")
{ foreign "C" barf("STABLE_NAME object entered!") never returns; }
/* ----------------------------------------------------------------------------
@@ -439,22 +439,22 @@ INFO_TABLE(stg_MVAR_DIRTY,3,0,MVAR_DIRTY,"MVAR","MVAR")
STM
-------------------------------------------------------------------------- */
-INFO_TABLE(stg_TVAR, 0, 0, TVAR, "TVAR", "TVAR")
+INFO_TABLE(stg_TVAR, 2, 1, MUT_PRIM, "TVAR", "TVAR")
{ foreign "C" barf("TVAR object entered!") never returns; }
-INFO_TABLE(stg_TVAR_WATCH_QUEUE, 0, 0, TVAR_WATCH_QUEUE, "TVAR_WATCH_QUEUE", "TVAR_WATCH_QUEUE")
+INFO_TABLE(stg_TVAR_WATCH_QUEUE, 3, 0, MUT_PRIM, "TVAR_WATCH_QUEUE", "TVAR_WATCH_QUEUE")
{ foreign "C" barf("TVAR_WATCH_QUEUE object entered!") never returns; }
-INFO_TABLE(stg_ATOMIC_INVARIANT, 0, 0, ATOMIC_INVARIANT, "ATOMIC_INVARIANT", "ATOMIC_INVARIANT")
+INFO_TABLE(stg_ATOMIC_INVARIANT, 2, 1, MUT_PRIM, "ATOMIC_INVARIANT", "ATOMIC_INVARIANT")
{ foreign "C" barf("ATOMIC_INVARIANT object entered!") never returns; }
-INFO_TABLE(stg_INVARIANT_CHECK_QUEUE, 0, 0, INVARIANT_CHECK_QUEUE, "INVARIANT_CHECK_QUEUE", "INVARIANT_CHECK_QUEUE")
+INFO_TABLE(stg_INVARIANT_CHECK_QUEUE, 3, 0, MUT_PRIM, "INVARIANT_CHECK_QUEUE", "INVARIANT_CHECK_QUEUE")
{ foreign "C" barf("INVARIANT_CHECK_QUEUE object entered!") never returns; }
INFO_TABLE(stg_TREC_CHUNK, 0, 0, TREC_CHUNK, "TREC_CHUNK", "TREC_CHUNK")
{ foreign "C" barf("TREC_CHUNK object entered!") never returns; }
-INFO_TABLE(stg_TREC_HEADER, 0, 0, TREC_HEADER, "TREC_HEADER", "TREC_HEADER")
+INFO_TABLE(stg_TREC_HEADER, 3, 1, MUT_PRIM, "TREC_HEADER", "TREC_HEADER")
{ foreign "C" barf("TREC_HEADER object entered!") never returns; }
INFO_TABLE_CONSTR(stg_END_STM_WATCH_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_STM_WATCH_QUEUE","END_STM_WATCH_QUEUE")
@@ -478,6 +478,17 @@ CLOSURE(stg_END_STM_CHUNK_LIST_closure,stg_END_STM_CHUNK_LIST);
CLOSURE(stg_NO_TREC_closure,stg_NO_TREC);
/* ----------------------------------------------------------------------------
+ Messages
+ ------------------------------------------------------------------------- */
+
+// PRIM rather than CONSTR, because PRIM objects cannot be duplicated by the GC.
+INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP")
+{ foreign "C" barf("MSG_WAKEUP object entered!") never returns; }
+
+INFO_TABLE_CONSTR(stg_MSG_THROWTO,4,0,0,PRIM,"MSG_THROWTO","MSG_THROWTO")
+{ foreign "C" barf("MSG_THROWTO object entered!") never returns; }
+
+/* ----------------------------------------------------------------------------
END_TSO_QUEUE
This is a static nullary constructor (like []) that we use to mark the
@@ -490,18 +501,6 @@ INFO_TABLE_CONSTR(stg_END_TSO_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_TSO_QUEUE","E
CLOSURE(stg_END_TSO_QUEUE_closure,stg_END_TSO_QUEUE);
/* ----------------------------------------------------------------------------
- Exception lists
- ------------------------------------------------------------------------- */
-
-INFO_TABLE_CONSTR(stg_END_EXCEPTION_LIST,0,0,0,CONSTR_NOCAF_STATIC,"END_EXCEPTION_LIST","END_EXCEPTION_LIST")
-{ foreign "C" barf("END_EXCEPTION_LIST object entered!") never returns; }
-
-CLOSURE(stg_END_EXCEPTION_LIST_closure,stg_END_EXCEPTION_LIST);
-
-INFO_TABLE(stg_EXCEPTION_CONS,1,1,CONSTR,"EXCEPTION_CONS","EXCEPTION_CONS")
-{ foreign "C" barf("EXCEPTION_CONS object entered!") never returns; }
-
-/* ----------------------------------------------------------------------------
Arrays
These come in two basic flavours: arrays of data (StgArrWords) and arrays of
diff --git a/rts/Threads.c b/rts/Threads.c
index 08b7aab66e..f824d021d4 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -74,7 +74,7 @@ createThread(Capability *cap, nat size)
tso->what_next = ThreadRunGHC;
tso->why_blocked = NotBlocked;
- tso->blocked_exceptions = END_TSO_QUEUE;
+ tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
tso->flags = 0;
tso->dirty = 1;
@@ -218,8 +218,9 @@ unblockOne_ (Capability *cap, StgTSO *tso,
// NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
ASSERT(tso->why_blocked != NotBlocked);
+ ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
+ tso->block_info.closure->header.info == &stg_IND_info);
- tso->why_blocked = NotBlocked;
next = tso->_link;
tso->_link = END_TSO_QUEUE;
@@ -235,6 +236,8 @@ unblockOne_ (Capability *cap, StgTSO *tso,
}
tso->cap = cap;
+ write_barrier();
+ tso->why_blocked = NotBlocked;
appendToRunQueue(cap,tso);
// context-switch soonish so we can migrate the new thread if
@@ -246,6 +249,7 @@ unblockOne_ (Capability *cap, StgTSO *tso,
wakeupThreadOnCapability(cap, tso->cap, tso);
}
#else
+ tso->why_blocked = NotBlocked;
appendToRunQueue(cap,tso);
// context-switch soonish so we can migrate the new thread if
@@ -327,13 +331,15 @@ printThreadBlockage(StgTSO *tso)
case BlockedOnMVar:
debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
- case BlockedOnException:
- debugBelch("is blocked on delivering an exception to thread %lu",
- (unsigned long)tso->block_info.tso->id);
- break;
case BlockedOnBlackHole:
debugBelch("is blocked on a black hole");
break;
+ case BlockedOnMsgWakeup:
+ debugBelch("is blocked on a wakeup message");
+ break;
+ case BlockedOnMsgThrowTo:
+ debugBelch("is blocked on a throwto message");
+ break;
case NotBlocked:
debugBelch("is not blocked");
break;
diff --git a/rts/Threads.h b/rts/Threads.h
index 8e0ee264f4..dfe879e7bb 100644
--- a/rts/Threads.h
+++ b/rts/Threads.h
@@ -11,6 +11,8 @@
BEGIN_RTS_PRIVATE
+#define END_BLOCKED_EXCEPTIONS_QUEUE ((MessageThrowTo*)END_TSO_QUEUE)
+
StgTSO * unblockOne (Capability *cap, StgTSO *tso);
StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate);
diff --git a/rts/Trace.h b/rts/Trace.h
index f8b6ad497d..69ea3d3d32 100644
--- a/rts/Trace.h
+++ b/rts/Trace.h
@@ -135,6 +135,15 @@ void traceUserMsg(Capability *cap, char *msg);
#define debugTrace(class, str, ...) /* nothing */
#endif
+#ifdef DEBUG
+#define debugTraceCap(class, cap, msg, ...) \
+ if (RTS_UNLIKELY(class)) { \
+ traceCap_(cap, msg, ##__VA_ARGS__); \
+ }
+#else
+#define debugTraceCap(class, cap, str, ...) /* nothing */
+#endif
+
/*
* Emit a message/event describing the state of a thread
*/
@@ -152,6 +161,7 @@ void traceThreadStatus_ (StgTSO *tso);
#define traceCap(class, cap, msg, ...) /* nothing */
#define trace(class, msg, ...) /* nothing */
#define debugTrace(class, str, ...) /* nothing */
+#define debugTraceCap(class, cap, str, ...) /* nothing */
#define traceThreadStatus(class, tso) /* nothing */
#endif /* TRACING */
diff --git a/rts/eventlog/EventLog.h b/rts/eventlog/EventLog.h
index fd87820e08..6ebf33ddc1 100644
--- a/rts/eventlog/EventLog.h
+++ b/rts/eventlog/EventLog.h
@@ -59,7 +59,7 @@ INLINE_HEADER void postMsg (char *msg STG_UNUSED,
va_list ap STG_UNUSED)
{ /* nothing */ }
-INLINE_HEADER void postCapMsg (Capability *cap,
+INLINE_HEADER void postCapMsg (Capability *cap STG_UNUSED,
char *msg STG_UNUSED,
va_list ap STG_UNUSED)
{ /* nothing */ }
diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c
index e55ae2b7c2..39284f9112 100644
--- a/rts/sm/Compact.c
+++ b/rts/sm/Compact.c
@@ -471,7 +471,8 @@ thread_TSO (StgTSO *tso)
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnBlackHole
- || tso->why_blocked == BlockedOnException
+ || tso->why_blocked == BlockedOnMsgThrowTo
+ || tso->why_blocked == BlockedOnMsgWakeup
) {
thread_(&tso->block_info.closure);
}
@@ -622,7 +623,8 @@ thread_obj (StgInfoTable *info, StgPtr p)
case FUN:
case CONSTR:
- case STABLE_NAME:
+ case PRIM:
+ case MUT_PRIM:
case IND_PERM:
case MUT_VAR_CLEAN:
case MUT_VAR_DIRTY:
@@ -705,32 +707,6 @@ thread_obj (StgInfoTable *info, StgPtr p)
case TSO:
return thread_TSO((StgTSO *)p);
- case TVAR_WATCH_QUEUE:
- {
- StgTVarWatchQueue *wq = (StgTVarWatchQueue *)p;
- thread_(&wq->closure);
- thread_(&wq->next_queue_entry);
- thread_(&wq->prev_queue_entry);
- return p + sizeofW(StgTVarWatchQueue);
- }
-
- case TVAR:
- {
- StgTVar *tvar = (StgTVar *)p;
- thread((void *)&tvar->current_value);
- thread((void *)&tvar->first_watch_queue_entry);
- return p + sizeofW(StgTVar);
- }
-
- case TREC_HEADER:
- {
- StgTRecHeader *trec = (StgTRecHeader *)p;
- thread_(&trec->enclosing_trec);
- thread_(&trec->current_chunk);
- thread_(&trec->invariants_to_check);
- return p + sizeofW(StgTRecHeader);
- }
-
case TREC_CHUNK:
{
StgWord i;
@@ -745,23 +721,6 @@ thread_obj (StgInfoTable *info, StgPtr p)
return p + sizeofW(StgTRecChunk);
}
- case ATOMIC_INVARIANT:
- {
- StgAtomicInvariant *invariant = (StgAtomicInvariant *)p;
- thread_(&invariant->code);
- thread_(&invariant->last_execution);
- return p + sizeofW(StgAtomicInvariant);
- }
-
- case INVARIANT_CHECK_QUEUE:
- {
- StgInvariantCheckQueue *queue = (StgInvariantCheckQueue *)p;
- thread_(&queue->invariant);
- thread_(&queue->my_execution);
- thread_(&queue->next_queue_entry);
- return p + sizeofW(StgInvariantCheckQueue);
- }
-
default:
barf("update_fwd: unknown/strange object %d", (int)(info->type));
return NULL;
diff --git a/rts/sm/Evac.c b/rts/sm/Evac.c
index 76026b0bac..21017a63a0 100644
--- a/rts/sm/Evac.c
+++ b/rts/sm/Evac.c
@@ -626,7 +626,8 @@ loop:
return;
case WEAK:
- case STABLE_NAME:
+ case PRIM:
+ case MUT_PRIM:
copy_tag(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag);
return;
@@ -721,30 +722,10 @@ loop:
}
}
- case TREC_HEADER:
- copy(p,info,q,sizeofW(StgTRecHeader),gen);
- return;
-
- case TVAR_WATCH_QUEUE:
- copy(p,info,q,sizeofW(StgTVarWatchQueue),gen);
- return;
-
- case TVAR:
- copy(p,info,q,sizeofW(StgTVar),gen);
- return;
-
case TREC_CHUNK:
copy(p,info,q,sizeofW(StgTRecChunk),gen);
return;
- case ATOMIC_INVARIANT:
- copy(p,info,q,sizeofW(StgAtomicInvariant),gen);
- return;
-
- case INVARIANT_CHECK_QUEUE:
- copy(p,info,q,sizeofW(StgInvariantCheckQueue),gen);
- return;
-
default:
barf("evacuate: strange closure type %d", (int)(INFO_PTR_TO_STRUCT(info)->type));
}
diff --git a/rts/sm/GC.c b/rts/sm/GC.c
index 2eabdabee3..ae6fc998da 100644
--- a/rts/sm/GC.c
+++ b/rts/sm/GC.c
@@ -732,7 +732,6 @@ SET_GCT(gc_threads[0]);
// send exceptions to any threads which were about to die
RELEASE_SM_LOCK;
resurrectThreads(resurrected_threads);
- performPendingThrowTos(exception_threads);
ACQUIRE_SM_LOCK;
// Update the stable pointer hash table.
diff --git a/rts/sm/MarkWeak.c b/rts/sm/MarkWeak.c
index 7b7187c94d..9df39b9a08 100644
--- a/rts/sm/MarkWeak.c
+++ b/rts/sm/MarkWeak.c
@@ -22,6 +22,7 @@
#include "Schedule.h"
#include "Weak.h"
#include "Storage.h"
+#include "Threads.h"
/* -----------------------------------------------------------------------------
Weak Pointers
@@ -80,9 +81,6 @@ StgWeak *old_weak_ptr_list; // also pending finaliser list
// List of threads found to be unreachable
StgTSO *resurrected_threads;
-// List of blocked threads found to have pending throwTos
-StgTSO *exception_threads;
-
static void resurrectUnreachableThreads (generation *gen);
static rtsBool tidyThreadList (generation *gen);
@@ -93,7 +91,6 @@ initWeakForGC(void)
weak_ptr_list = NULL;
weak_stage = WeakPtrs;
resurrected_threads = END_TSO_QUEUE;
- exception_threads = END_TSO_QUEUE;
}
rtsBool
@@ -286,35 +283,11 @@ static rtsBool tidyThreadList (generation *gen)
next = t->global_link;
- // This is a good place to check for blocked
- // exceptions. It might be the case that a thread is
- // blocked on delivering an exception to a thread that
- // is also blocked - we try to ensure that this
- // doesn't happen in throwTo(), but it's too hard (or
- // impossible) to close all the race holes, so we
- // accept that some might get through and deal with
- // them here. A GC will always happen at some point,
- // even if the system is otherwise deadlocked.
- //
- // If an unreachable thread has blocked
- // exceptions, we really want to perform the
- // blocked exceptions rather than throwing
- // BlockedIndefinitely exceptions. This is the
- // only place we can discover such threads.
- // The target thread might even be
- // ThreadFinished or ThreadKilled. Bugs here
- // will only be seen when running on a
- // multiprocessor.
- if (t->blocked_exceptions != END_TSO_QUEUE) {
- if (tmp == NULL) {
- evacuate((StgClosure **)&t);
- flag = rtsTrue;
- }
- t->global_link = exception_threads;
- exception_threads = t;
- *prev = next;
- continue;
- }
+ // if the thread is not masking exceptions but there are
+ // pending exceptions on its queue, then something has gone
+ // wrong:
+ ASSERT(t->blocked_exceptions == END_BLOCKED_EXCEPTIONS_QUEUE
+ || (t->flags & TSO_BLOCKEX));
if (tmp == NULL) {
// not alive (yet): leave this thread on the
diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c
index 442fee1f7c..11d5424431 100644
--- a/rts/sm/Sanity.c
+++ b/rts/sm/Sanity.c
@@ -307,7 +307,8 @@ checkClosure( StgClosure* p )
case IND_OLDGEN_PERM:
case BLACKHOLE:
case CAF_BLACKHOLE:
- case STABLE_NAME:
+ case PRIM:
+ case MUT_PRIM:
case MUT_VAR_CLEAN:
case MUT_VAR_DIRTY:
case CONSTR_STATIC:
@@ -416,39 +417,6 @@ checkClosure( StgClosure* p )
checkTSO((StgTSO *)p);
return tso_sizeW((StgTSO *)p);
- case TVAR_WATCH_QUEUE:
- {
- StgTVarWatchQueue *wq = (StgTVarWatchQueue *)p;
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->next_queue_entry));
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->prev_queue_entry));
- return sizeofW(StgTVarWatchQueue);
- }
-
- case INVARIANT_CHECK_QUEUE:
- {
- StgInvariantCheckQueue *q = (StgInvariantCheckQueue *)p;
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->invariant));
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->my_execution));
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->next_queue_entry));
- return sizeofW(StgInvariantCheckQueue);
- }
-
- case ATOMIC_INVARIANT:
- {
- StgAtomicInvariant *invariant = (StgAtomicInvariant *)p;
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(invariant->code));
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(invariant->last_execution));
- return sizeofW(StgAtomicInvariant);
- }
-
- case TVAR:
- {
- StgTVar *tv = (StgTVar *)p;
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->current_value));
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->first_watch_queue_entry));
- return sizeofW(StgTVar);
- }
-
case TREC_CHUNK:
{
nat i;
@@ -461,14 +429,6 @@ checkClosure( StgClosure* p )
}
return sizeofW(StgTRecChunk);
}
-
- case TREC_HEADER:
- {
- StgTRecHeader *trec = (StgTRecHeader *)p;
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(trec -> enclosing_trec));
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(trec -> current_chunk));
- return sizeofW(StgTRecHeader);
- }
default:
barf("checkClosure (closure type %d)", info->type);
diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
index 466b9b44f7..1b671a097c 100644
--- a/rts/sm/Scav.c
+++ b/rts/sm/Scav.c
@@ -82,7 +82,8 @@ scavengeTSO (StgTSO *tso)
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnBlackHole
- || tso->why_blocked == BlockedOnException
+ || tso->why_blocked == BlockedOnMsgWakeup
+ || tso->why_blocked == BlockedOnMsgThrowTo
) {
evacuate(&tso->block_info.closure);
}
@@ -394,7 +395,6 @@ scavenge_block (bdescr *bd)
{
StgPtr p, q;
StgInfoTable *info;
- generation *saved_evac_gen;
rtsBool saved_eager_promotion;
gen_workspace *ws;
@@ -403,7 +403,6 @@ scavenge_block (bdescr *bd)
gct->scan_bd = bd;
gct->evac_gen = bd->gen;
- saved_evac_gen = gct->evac_gen;
saved_eager_promotion = gct->eager_promotion;
gct->failed_to_evac = rtsFalse;
@@ -532,7 +531,7 @@ scavenge_block (bdescr *bd)
gen_obj:
case CONSTR:
case WEAK:
- case STABLE_NAME:
+ case PRIM:
{
StgPtr end;
@@ -672,42 +671,21 @@ scavenge_block (bdescr *bd)
break;
}
- case TVAR_WATCH_QUEUE:
+ case MUT_PRIM:
{
- StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p);
- gct->evac_gen = 0;
- evacuate((StgClosure **)&wq->closure);
- evacuate((StgClosure **)&wq->next_queue_entry);
- evacuate((StgClosure **)&wq->prev_queue_entry);
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- p += sizeofW(StgTVarWatchQueue);
- break;
- }
+ StgPtr end;
- case TVAR:
- {
- StgTVar *tvar = ((StgTVar *) p);
- gct->evac_gen = 0;
- evacuate((StgClosure **)&tvar->current_value);
- evacuate((StgClosure **)&tvar->first_watch_queue_entry);
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- p += sizeofW(StgTVar);
- break;
- }
+ gct->eager_promotion = rtsFalse;
- case TREC_HEADER:
- {
- StgTRecHeader *trec = ((StgTRecHeader *) p);
- gct->evac_gen = 0;
- evacuate((StgClosure **)&trec->enclosing_trec);
- evacuate((StgClosure **)&trec->current_chunk);
- evacuate((StgClosure **)&trec->invariants_to_check);
- gct->evac_gen = saved_evac_gen;
+ end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs;
+ for (p = (P_)((StgClosure *)p)->payload; p < end; p++) {
+ evacuate((StgClosure **)p);
+ }
+ p += info->layout.payload.nptrs;
+
+ gct->eager_promotion = saved_eager_promotion;
gct->failed_to_evac = rtsTrue; // mutable
- p += sizeofW(StgTRecHeader);
- break;
+ break;
}
case TREC_CHUNK:
@@ -715,44 +693,19 @@ scavenge_block (bdescr *bd)
StgWord i;
StgTRecChunk *tc = ((StgTRecChunk *) p);
TRecEntry *e = &(tc -> entries[0]);
- gct->evac_gen = 0;
+ gct->eager_promotion = rtsFalse;
evacuate((StgClosure **)&tc->prev_chunk);
for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) {
evacuate((StgClosure **)&e->tvar);
evacuate((StgClosure **)&e->expected_value);
evacuate((StgClosure **)&e->new_value);
}
- gct->evac_gen = saved_evac_gen;
+ gct->eager_promotion = saved_eager_promotion;
gct->failed_to_evac = rtsTrue; // mutable
p += sizeofW(StgTRecChunk);
break;
}
- case ATOMIC_INVARIANT:
- {
- StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p);
- gct->evac_gen = 0;
- evacuate(&invariant->code);
- evacuate((StgClosure **)&invariant->last_execution);
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- p += sizeofW(StgAtomicInvariant);
- break;
- }
-
- case INVARIANT_CHECK_QUEUE:
- {
- StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p);
- gct->evac_gen = 0;
- evacuate((StgClosure **)&queue->invariant);
- evacuate((StgClosure **)&queue->my_execution);
- evacuate((StgClosure **)&queue->next_queue_entry);
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- p += sizeofW(StgInvariantCheckQueue);
- break;
- }
-
default:
barf("scavenge: unimplemented/strange closure type %d @ %p",
info->type, p);
@@ -806,10 +759,10 @@ scavenge_mark_stack(void)
{
StgPtr p, q;
StgInfoTable *info;
- generation *saved_evac_gen;
+ rtsBool saved_eager_promotion;
gct->evac_gen = oldest_gen;
- saved_evac_gen = gct->evac_gen;
+ saved_eager_promotion = gct->eager_promotion;
while ((p = pop_mark_stack())) {
@@ -822,8 +775,6 @@ scavenge_mark_stack(void)
case MVAR_CLEAN:
case MVAR_DIRTY:
{
- rtsBool saved_eager_promotion = gct->eager_promotion;
-
StgMVar *mvar = ((StgMVar *)p);
gct->eager_promotion = rtsFalse;
evacuate((StgClosure **)&mvar->head);
@@ -906,7 +857,7 @@ scavenge_mark_stack(void)
gen_obj:
case CONSTR:
case WEAK:
- case STABLE_NAME:
+ case PRIM:
{
StgPtr end;
@@ -938,8 +889,6 @@ scavenge_mark_stack(void)
case MUT_VAR_CLEAN:
case MUT_VAR_DIRTY: {
- rtsBool saved_eager_promotion = gct->eager_promotion;
-
gct->eager_promotion = rtsFalse;
evacuate(&((StgMutVar *)p)->var);
gct->eager_promotion = saved_eager_promotion;
@@ -986,13 +935,10 @@ scavenge_mark_stack(void)
case MUT_ARR_PTRS_DIRTY:
// follow everything
{
- rtsBool saved_eager;
-
// We don't eagerly promote objects pointed to by a mutable
// array, but if we find the array only points to objects in
// the same or an older generation, we mark it "clean" and
// avoid traversing it during minor GCs.
- saved_eager = gct->eager_promotion;
gct->eager_promotion = rtsFalse;
scavenge_mut_arr_ptrs((StgMutArrPtrs *)p);
@@ -1003,7 +949,7 @@ scavenge_mark_stack(void)
((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
}
- gct->eager_promotion = saved_eager;
+ gct->eager_promotion = saved_eager_promotion;
gct->failed_to_evac = rtsTrue; // mutable anyhow.
break;
}
@@ -1032,81 +978,39 @@ scavenge_mark_stack(void)
break;
}
- case TVAR_WATCH_QUEUE:
- {
- StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p);
- gct->evac_gen = 0;
- evacuate((StgClosure **)&wq->closure);
- evacuate((StgClosure **)&wq->next_queue_entry);
- evacuate((StgClosure **)&wq->prev_queue_entry);
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- break;
- }
-
- case TVAR:
- {
- StgTVar *tvar = ((StgTVar *) p);
- gct->evac_gen = 0;
- evacuate((StgClosure **)&tvar->current_value);
- evacuate((StgClosure **)&tvar->first_watch_queue_entry);
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- break;
- }
-
+ case MUT_PRIM:
+ {
+ StgPtr end;
+
+ gct->eager_promotion = rtsFalse;
+
+ end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs;
+ for (p = (P_)((StgClosure *)p)->payload; p < end; p++) {
+ evacuate((StgClosure **)p);
+ }
+
+ gct->eager_promotion = saved_eager_promotion;
+ gct->failed_to_evac = rtsTrue; // mutable
+ break;
+ }
+
case TREC_CHUNK:
{
StgWord i;
StgTRecChunk *tc = ((StgTRecChunk *) p);
TRecEntry *e = &(tc -> entries[0]);
- gct->evac_gen = 0;
+ gct->eager_promotion = rtsFalse;
evacuate((StgClosure **)&tc->prev_chunk);
for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) {
evacuate((StgClosure **)&e->tvar);
evacuate((StgClosure **)&e->expected_value);
evacuate((StgClosure **)&e->new_value);
}
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- break;
- }
-
- case TREC_HEADER:
- {
- StgTRecHeader *trec = ((StgTRecHeader *) p);
- gct->evac_gen = 0;
- evacuate((StgClosure **)&trec->enclosing_trec);
- evacuate((StgClosure **)&trec->current_chunk);
- evacuate((StgClosure **)&trec->invariants_to_check);
- gct->evac_gen = saved_evac_gen;
+ gct->eager_promotion = saved_eager_promotion;
gct->failed_to_evac = rtsTrue; // mutable
break;
}
- case ATOMIC_INVARIANT:
- {
- StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p);
- gct->evac_gen = 0;
- evacuate(&invariant->code);
- evacuate((StgClosure **)&invariant->last_execution);
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- break;
- }
-
- case INVARIANT_CHECK_QUEUE:
- {
- StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p);
- gct->evac_gen = 0;
- evacuate((StgClosure **)&queue->invariant);
- evacuate((StgClosure **)&queue->my_execution);
- evacuate((StgClosure **)&queue->next_queue_entry);
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- break;
- }
-
default:
barf("scavenge_mark_stack: unimplemented/strange closure type %d @ %p",
info->type, p);
@@ -1133,9 +1037,11 @@ static rtsBool
scavenge_one(StgPtr p)
{
const StgInfoTable *info;
- generation *saved_evac_gen = gct->evac_gen;
rtsBool no_luck;
+ rtsBool saved_eager_promotion;
+ saved_eager_promotion = gct->eager_promotion;
+
ASSERT(LOOKS_LIKE_CLOSURE_PTR(p));
info = get_itbl((StgClosure *)p);
@@ -1144,8 +1050,6 @@ scavenge_one(StgPtr p)
case MVAR_CLEAN:
case MVAR_DIRTY:
{
- rtsBool saved_eager_promotion = gct->eager_promotion;
-
StgMVar *mvar = ((StgMVar *)p);
gct->eager_promotion = rtsFalse;
evacuate((StgClosure **)&mvar->head);
@@ -1190,6 +1094,7 @@ scavenge_one(StgPtr p)
case CONSTR_0_2:
case CONSTR_2_0:
case WEAK:
+ case PRIM:
case IND_PERM:
{
StgPtr q, end;
@@ -1204,7 +1109,6 @@ scavenge_one(StgPtr p)
case MUT_VAR_CLEAN:
case MUT_VAR_DIRTY: {
StgPtr q = p;
- rtsBool saved_eager_promotion = gct->eager_promotion;
gct->eager_promotion = rtsFalse;
evacuate(&((StgMutVar *)p)->var);
@@ -1254,13 +1158,10 @@ scavenge_one(StgPtr p)
case MUT_ARR_PTRS_CLEAN:
case MUT_ARR_PTRS_DIRTY:
{
- rtsBool saved_eager;
-
// We don't eagerly promote objects pointed to by a mutable
// array, but if we find the array only points to objects in
// the same or an older generation, we mark it "clean" and
// avoid traversing it during minor GCs.
- saved_eager = gct->eager_promotion;
gct->eager_promotion = rtsFalse;
scavenge_mut_arr_ptrs((StgMutArrPtrs *)p);
@@ -1271,7 +1172,7 @@ scavenge_one(StgPtr p)
((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
}
- gct->eager_promotion = saved_eager;
+ gct->eager_promotion = saved_eager_promotion;
gct->failed_to_evac = rtsTrue;
break;
}
@@ -1298,81 +1199,40 @@ scavenge_one(StgPtr p)
break;
}
- case TVAR_WATCH_QUEUE:
- {
- StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p);
- gct->evac_gen = 0;
- evacuate((StgClosure **)&wq->closure);
- evacuate((StgClosure **)&wq->next_queue_entry);
- evacuate((StgClosure **)&wq->prev_queue_entry);
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- break;
- }
+ case MUT_PRIM:
+ {
+ StgPtr end;
+
+ gct->eager_promotion = rtsFalse;
+
+ end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs;
+ for (p = (P_)((StgClosure *)p)->payload; p < end; p++) {
+ evacuate((StgClosure **)p);
+ }
- case TVAR:
- {
- StgTVar *tvar = ((StgTVar *) p);
- gct->evac_gen = 0;
- evacuate((StgClosure **)&tvar->current_value);
- evacuate((StgClosure **)&tvar->first_watch_queue_entry);
- gct->evac_gen = saved_evac_gen;
+ gct->eager_promotion = saved_eager_promotion;
gct->failed_to_evac = rtsTrue; // mutable
break;
- }
- case TREC_HEADER:
- {
- StgTRecHeader *trec = ((StgTRecHeader *) p);
- gct->evac_gen = 0;
- evacuate((StgClosure **)&trec->enclosing_trec);
- evacuate((StgClosure **)&trec->current_chunk);
- evacuate((StgClosure **)&trec->invariants_to_check);
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- break;
- }
+ }
case TREC_CHUNK:
{
StgWord i;
StgTRecChunk *tc = ((StgTRecChunk *) p);
TRecEntry *e = &(tc -> entries[0]);
- gct->evac_gen = 0;
+ gct->eager_promotion = rtsFalse;
evacuate((StgClosure **)&tc->prev_chunk);
for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) {
evacuate((StgClosure **)&e->tvar);
evacuate((StgClosure **)&e->expected_value);
evacuate((StgClosure **)&e->new_value);
}
- gct->evac_gen = saved_evac_gen;
+ gct->eager_promotion = saved_eager_promotion;
gct->failed_to_evac = rtsTrue; // mutable
break;
}
- case ATOMIC_INVARIANT:
- {
- StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p);
- gct->evac_gen = 0;
- evacuate(&invariant->code);
- evacuate((StgClosure **)&invariant->last_execution);
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- break;
- }
-
- case INVARIANT_CHECK_QUEUE:
- {
- StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p);
- gct->evac_gen = 0;
- evacuate((StgClosure **)&queue->invariant);
- evacuate((StgClosure **)&queue->my_execution);
- evacuate((StgClosure **)&queue->next_queue_entry);
- gct->evac_gen = saved_evac_gen;
- gct->failed_to_evac = rtsTrue; // mutable
- break;
- }
-
case IND:
// IND can happen, for example, when the interpreter allocates
// a gigantic AP closure (more than one block), which ends up
@@ -1470,8 +1330,8 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
continue;
case MUT_ARR_PTRS_DIRTY:
{
- rtsBool saved_eager;
- saved_eager = gct->eager_promotion;
+ rtsBool saved_eager_promotion;
+ saved_eager_promotion = gct->eager_promotion;
gct->eager_promotion = rtsFalse;
scavenge_mut_arr_ptrs_marked((StgMutArrPtrs *)p);
@@ -1482,7 +1342,7 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
}
- gct->eager_promotion = saved_eager;
+ gct->eager_promotion = saved_eager_promotion;
gct->failed_to_evac = rtsFalse;
recordMutableGen_GC((StgClosure *)p,gen->no);
continue;