diff options
author | Simon Marlow <marlowsd@gmail.com> | 2010-03-11 09:57:44 +0000 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2010-03-11 09:57:44 +0000 |
commit | 7408b39235bccdcde48df2a73337ff976fbc09b7 (patch) | |
tree | cf20c372fdc5787170d53df36fc24ecf8113c89e /rts | |
parent | 12cfec943127f0c81e1ffa1ca5ce46e888e3027c (diff) | |
download | haskell-7408b39235bccdcde48df2a73337ff976fbc09b7.tar.gz |
Use message-passing to implement throwTo in the RTS
This replaces some complicated locking schemes with message-passing
in the implementation of throwTo. The benefits are
- previously it was impossible to guarantee that a throwTo from
a thread running on one CPU to a thread running on another CPU
would be noticed, and we had to rely on the GC to pick up these
forgotten exceptions. This no longer happens.
- the locking regime is simpler (though the code is about the same
size)
- threads can be unblocked from a blocked_exceptions queue without
having to traverse the whole queue now. It's a rare case, but
replaces an O(n) operation with an O(1).
- generally we move in the direction of sharing less between
Capabilities (aka HECs), which will become important with other
changes we have planned.
Also in this patch I replaced several STM-specific closure types with
a generic MUT_PRIM closure type, which allowed a lot of code in the GC
and other places to go away, hence the line-count reduction. The
message-passing changes resulted in about a net zero line-count
difference.
Diffstat (limited to 'rts')
-rw-r--r-- | rts/Capability.c | 67 | ||||
-rw-r--r-- | rts/Capability.h | 28 | ||||
-rw-r--r-- | rts/ClosureFlags.c | 10 | ||||
-rw-r--r-- | rts/Exception.cmm | 43 | ||||
-rw-r--r-- | rts/FrontPanel.c | 2 | ||||
-rw-r--r-- | rts/HeapStackCheck.cmm | 5 | ||||
-rw-r--r-- | rts/LdvProfile.c | 2 | ||||
-rw-r--r-- | rts/PrimOps.cmm | 12 | ||||
-rw-r--r-- | rts/Printer.c | 18 | ||||
-rw-r--r-- | rts/ProfHeap.c | 28 | ||||
-rw-r--r-- | rts/RaiseAsync.c | 512 | ||||
-rw-r--r-- | rts/RaiseAsync.h | 17 | ||||
-rw-r--r-- | rts/RetainerProfile.c | 6 | ||||
-rw-r--r-- | rts/RtsMessages.c | 6 | ||||
-rw-r--r-- | rts/STM.c | 3 | ||||
-rw-r--r-- | rts/Schedule.c | 185 | ||||
-rw-r--r-- | rts/Schedule.h | 28 | ||||
-rw-r--r-- | rts/StgMiscClosures.cmm | 35 | ||||
-rw-r--r-- | rts/Threads.c | 18 | ||||
-rw-r--r-- | rts/Threads.h | 2 | ||||
-rw-r--r-- | rts/Trace.h | 10 | ||||
-rw-r--r-- | rts/eventlog/EventLog.h | 2 | ||||
-rw-r--r-- | rts/sm/Compact.c | 49 | ||||
-rw-r--r-- | rts/sm/Evac.c | 23 | ||||
-rw-r--r-- | rts/sm/GC.c | 1 | ||||
-rw-r--r-- | rts/sm/MarkWeak.c | 39 | ||||
-rw-r--r-- | rts/sm/Sanity.c | 44 | ||||
-rw-r--r-- | rts/sm/Scav.c | 260 |
28 files changed, 604 insertions, 851 deletions
diff --git a/rts/Capability.c b/rts/Capability.c index ce6ecebd72..5f54ecae4d 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -223,8 +223,7 @@ initCapability( Capability *cap, nat i ) cap->suspended_ccalls = NULL; cap->returning_tasks_hd = NULL; cap->returning_tasks_tl = NULL; - cap->wakeup_queue_hd = END_TSO_QUEUE; - cap->wakeup_queue_tl = END_TSO_QUEUE; + cap->inbox = (Message*)END_TSO_QUEUE; cap->sparks_created = 0; cap->sparks_converted = 0; cap->sparks_pruned = 0; @@ -419,7 +418,7 @@ releaseCapability_ (Capability* cap, // If we have an unbound thread on the run queue, or if there's // anything else to do, give the Capability to a worker thread. if (always_wakeup || - !emptyRunQueue(cap) || !emptyWakeupQueue(cap) || + !emptyRunQueue(cap) || !emptyInbox(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) { if (cap->spare_workers) { giveCapabilityToTask(cap,cap->spare_workers); @@ -645,11 +644,11 @@ yieldCapability (Capability** pCap, Task *task) * ------------------------------------------------------------------------- */ void -wakeupThreadOnCapability (Capability *my_cap, +wakeupThreadOnCapability (Capability *cap, Capability *other_cap, StgTSO *tso) { - ACQUIRE_LOCK(&other_cap->lock); + MessageWakeup *msg; // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) if (tso->bound) { @@ -658,27 +657,20 @@ wakeupThreadOnCapability (Capability *my_cap, } tso->cap = other_cap; - ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1); + ASSERT(tso->why_blocked != BlockedOnMsgWakeup || + tso->block_info.closure->header.info == &stg_IND_info); - if (other_cap->running_task == NULL) { - // nobody is running this Capability, we can add our thread - // directly onto the run queue and start up a Task to run it. + ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info); - other_cap->running_task = myTask(); - // precond for releaseCapability_() and appendToRunQueue() + msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup)); + msg->header.info = &stg_MSG_WAKEUP_info; + msg->tso = tso; + tso->block_info.closure = (StgClosure *)msg; + dirty_TSO(cap, tso); + write_barrier(); + tso->why_blocked = BlockedOnMsgWakeup; - appendToRunQueue(other_cap,tso); - - releaseCapability_(other_cap,rtsFalse); - } else { - appendToWakeupQueue(my_cap,other_cap,tso); - other_cap->context_switch = 1; - // someone is running on this Capability, so it cannot be - // freed without first checking the wakeup queue (see - // releaseCapability_). - } - - RELEASE_LOCK(&other_cap->lock); + sendMessage(other_cap, (Message*)msg); } /* ---------------------------------------------------------------------------- @@ -881,8 +873,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, evac(user, (StgClosure **)(void *)&cap->run_queue_hd); evac(user, (StgClosure **)(void *)&cap->run_queue_tl); #if defined(THREADED_RTS) - evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd); - evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl); + evac(user, (StgClosure **)(void *)&cap->inbox); #endif for (incall = cap->suspended_ccalls; incall != NULL; incall=incall->next) { @@ -910,3 +901,29 @@ markCapabilities (evac_fn evac, void *user) { markSomeCapabilities(evac, user, 0, 1, rtsFalse); } + +/* ----------------------------------------------------------------------------- + Messages + -------------------------------------------------------------------------- */ + +#ifdef THREADED_RTS + +void sendMessage(Capability *cap, Message *msg) +{ + ACQUIRE_LOCK(&cap->lock); + + msg->link = cap->inbox; + cap->inbox = msg; + + if (cap->running_task == NULL) { + cap->running_task = myTask(); + // precond for releaseCapability_() + releaseCapability_(cap,rtsFalse); + } else { + contextSwitchCapability(cap); + } + + RELEASE_LOCK(&cap->lock); +} + +#endif // THREADED_RTS diff --git a/rts/Capability.h b/rts/Capability.h index 41974dc710..4030b5efd4 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -88,11 +88,8 @@ struct Capability_ { Task *returning_tasks_hd; // Singly-linked, with head/tail Task *returning_tasks_tl; - // A list of threads to append to this Capability's run queue at - // the earliest opportunity. These are threads that have been - // woken up by another Capability. - StgTSO *wakeup_queue_hd; - StgTSO *wakeup_queue_tl; + // Messages, or END_TSO_QUEUE. + Message *inbox; SparkPool *sparks; @@ -285,6 +282,18 @@ void markCapabilities (evac_fn evac, void *user); void traverseSparkQueues (evac_fn evac, void *user); /* ----------------------------------------------------------------------------- + Messages + -------------------------------------------------------------------------- */ + +#ifdef THREADED_RTS + +INLINE_HEADER rtsBool emptyInbox(Capability *cap);; + +void sendMessage (Capability *cap, Message *msg); + +#endif // THREADED_RTS + +/* ----------------------------------------------------------------------------- * INLINE functions... private below here * -------------------------------------------------------------------------- */ @@ -333,6 +342,15 @@ contextSwitchCapability (Capability *cap) cap->context_switch = 1; } +#ifdef THREADED_RTS + +INLINE_HEADER rtsBool emptyInbox(Capability *cap) +{ + return (cap->inbox == (Message*)END_TSO_QUEUE); +} + +#endif + END_RTS_PRIVATE #endif /* CAPABILITY_H */ diff --git a/rts/ClosureFlags.c b/rts/ClosureFlags.c index 477a892594..358cb40ed3 100644 --- a/rts/ClosureFlags.c +++ b/rts/ClosureFlags.c @@ -74,20 +74,16 @@ StgWord16 closure_flags[] = { [MUT_VAR_CLEAN] = (_HNF| _NS| _MUT|_UPT ), [MUT_VAR_DIRTY] = (_HNF| _NS| _MUT|_UPT ), [WEAK] = (_HNF| _NS| _UPT ), - [STABLE_NAME] = (_HNF| _NS| _UPT ), + [PRIM] = (_HNF| _NS| _UPT ), + [MUT_PRIM] = (_HNF| _NS| _MUT|_UPT ), [TSO] = (_HNF| _NS| _MUT|_UPT ), - [TVAR_WATCH_QUEUE] = ( _NS| _MUT|_UPT ), - [INVARIANT_CHECK_QUEUE]= ( _NS| _MUT|_UPT ), - [ATOMIC_INVARIANT] = ( _NS| _MUT|_UPT ), - [TVAR] = (_HNF| _NS| _MUT|_UPT ), [TREC_CHUNK] = ( _NS| _MUT|_UPT ), - [TREC_HEADER] = ( _NS| _MUT|_UPT ), [ATOMICALLY_FRAME] = ( _BTM ), [CATCH_RETRY_FRAME] = ( _BTM ), [CATCH_STM_FRAME] = ( _BTM ), [WHITEHOLE] = ( 0 ) }; -#if N_CLOSURE_TYPES != 65 +#if N_CLOSURE_TYPES != 61 #error Closure types changed: update ClosureFlags.c! #endif diff --git a/rts/Exception.cmm b/rts/Exception.cmm index 6c887c22dc..55c79cede7 100644 --- a/rts/Exception.cmm +++ b/rts/Exception.cmm @@ -56,7 +56,7 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, RET_SMALL ) CInt r; StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) & - ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32); + %lobits32(~(TSO_BLOCKEX|TSO_INTERRUPTIBLE)); /* Eagerly raise a blocked exception, if there is one */ if (StgTSO_blocked_exceptions(CurrentTSO) != END_TSO_QUEUE) { @@ -99,8 +99,8 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, RET_SMALL ) INFO_TABLE_RET( stg_blockAsyncExceptionszh_ret, RET_SMALL ) { - StgTSO_flags(CurrentTSO) = - StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32; + StgTSO_flags(CurrentTSO) = %lobits32( + TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE); Sp_adj(1); jump %ENTRY_CODE(Sp(0)); @@ -113,8 +113,8 @@ stg_blockAsyncExceptionszh if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) == 0) { - StgTSO_flags(CurrentTSO) = - StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32; + StgTSO_flags(CurrentTSO) = %lobits32( + TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE); /* avoid growing the stack unnecessarily */ if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) { @@ -142,8 +142,8 @@ stg_unblockAsyncExceptionszh /* If exceptions are already unblocked, there's nothing to do */ if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) != 0) { - StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) & - ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32); + StgTSO_flags(CurrentTSO) = %lobits32( + TO_W_(StgTSO_flags(CurrentTSO)) & ~(TSO_BLOCKEX|TSO_INTERRUPTIBLE)); /* avoid growing the stack unnecessarily */ if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) { @@ -252,27 +252,22 @@ stg_killThreadzh } } else { W_ out; - W_ retcode; + W_ msg; out = Sp - WDS(1); /* ok to re-use stack space here */ - (retcode) = foreign "C" throwTo(MyCapability() "ptr", - CurrentTSO "ptr", - target "ptr", - exception "ptr", - out "ptr") [R1,R2]; + (msg) = foreign "C" throwTo(MyCapability() "ptr", + CurrentTSO "ptr", + target "ptr", + exception "ptr") [R1,R2]; - switch [THROWTO_SUCCESS .. THROWTO_BLOCKED] (retcode) { - - case THROWTO_SUCCESS: { + if (msg == NULL) { jump %ENTRY_CODE(Sp(0)); - } - - case THROWTO_BLOCKED: { - R3 = W_[out]; - // we must block, and call throwToReleaseTarget() before returning + } else { + StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo; + StgTSO_block_info(CurrentTSO) = msg; + // we must block, and unlock the message before returning jump stg_block_throwto; } - } } } @@ -507,8 +502,8 @@ retry_pop_stack: /* Ensure that async excpetions are blocked when running the handler. */ - StgTSO_flags(CurrentTSO) = - StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32; + StgTSO_flags(CurrentTSO) = %lobits32( + TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE); /* Call the handler, passing the exception value and a realworld * token as arguments. diff --git a/rts/FrontPanel.c b/rts/FrontPanel.c index 163a7c08ca..ebba4056fb 100644 --- a/rts/FrontPanel.c +++ b/rts/FrontPanel.c @@ -697,7 +697,7 @@ residencyCensus( void ) break; case WEAK: - case STABLE_NAME: + case PRIM: case MVAR: case MUT_VAR: /* case MUT_CONS: FIXME: case does not exist */ diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm index b516ef2c09..a528a3f22e 100644 --- a/rts/HeapStackCheck.cmm +++ b/rts/HeapStackCheck.cmm @@ -631,9 +631,8 @@ INFO_TABLE_RET( stg_block_throwto, RET_SMALL, P_ unused, P_ unused ) stg_block_throwto_finally { -#ifdef THREADED_RTS - foreign "C" throwToReleaseTarget (R3 "ptr"); -#endif + // unlock the throwto message + unlockClosure(StgTSO_block_info(CurrentTSO), stg_MSG_THROWTO_info); jump StgReturn; } diff --git a/rts/LdvProfile.c b/rts/LdvProfile.c index c2e7d7ec5a..ccaf10c684 100644 --- a/rts/LdvProfile.c +++ b/rts/LdvProfile.c @@ -109,7 +109,7 @@ processHeapClosureForDead( StgClosure *c ) case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: case BCO: - case STABLE_NAME: + case PRIM: case TVAR_WATCH_QUEUE: case TVAR: case TREC_HEADER: diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index 5325c85d1f..bf81eeece1 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -542,9 +542,9 @@ stg_forkzh closure "ptr") []; /* start blocked if the current thread is blocked */ - StgTSO_flags(threadid) = - StgTSO_flags(threadid) | (StgTSO_flags(CurrentTSO) & - (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32)); + StgTSO_flags(threadid) = %lobits16( + TO_W_(StgTSO_flags(threadid)) | + TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE)); foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") []; @@ -572,9 +572,9 @@ stg_forkOnzh closure "ptr") []; /* start blocked if the current thread is blocked */ - StgTSO_flags(threadid) = - StgTSO_flags(threadid) | (StgTSO_flags(CurrentTSO) & - (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32)); + StgTSO_flags(threadid) = %lobits16( + TO_W_(StgTSO_flags(threadid)) | + TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE)); foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") []; diff --git a/rts/Printer.c b/rts/Printer.c index 1b8a6dd2c6..e9813299d8 100644 --- a/rts/Printer.c +++ b/rts/Printer.c @@ -160,6 +160,12 @@ printClosure( StgClosure *obj ) printStdObjPayload(obj); break; + case PRIM: + debugBelch("PRIM("); + printPtr((StgPtr)obj->header.info); + printStdObjPayload(obj); + break; + case THUNK: case THUNK_1_0: case THUNK_0_1: case THUNK_1_1: case THUNK_0_2: case THUNK_2_0: @@ -356,10 +362,6 @@ printClosure( StgClosure *obj ) /* ToDo: chase 'link' ? */ break; - case STABLE_NAME: - debugBelch("STABLE_NAME(%lu)\n", (lnat)((StgStableName*)obj)->sn); - break; - case TSO: debugBelch("TSO("); debugBelch("%lu (%p)",(unsigned long)(((StgTSO*)obj)->id), (StgTSO*)obj); @@ -1132,14 +1134,10 @@ char *closure_type_names[] = { [MUT_VAR_CLEAN] = "MUT_VAR_CLEAN", [MUT_VAR_DIRTY] = "MUT_VAR_DIRTY", [WEAK] = "WEAK", - [STABLE_NAME] = "STABLE_NAME", + [PRIM] = "PRIM", + [MUT_PRIM] = "MUT_PRIM", [TSO] = "TSO", - [TVAR_WATCH_QUEUE] = "TVAR_WATCH_QUEUE", - [INVARIANT_CHECK_QUEUE] = "INVARIANT_CHECK_QUEUE", - [ATOMIC_INVARIANT] = "ATOMIC_INVARIANT", - [TVAR] = "TVAR", [TREC_CHUNK] = "TREC_CHUNK", - [TREC_HEADER] = "TREC_HEADER", [ATOMICALLY_FRAME] = "ATOMICALLY_FRAME", [CATCH_RETRY_FRAME] = "CATCH_RETRY_FRAME", [CATCH_STM_FRAME] = "CATCH_STM_FRAME", diff --git a/rts/ProfHeap.c b/rts/ProfHeap.c index 15337d4585..e90051c5e6 100644 --- a/rts/ProfHeap.c +++ b/rts/ProfHeap.c @@ -912,7 +912,8 @@ heapCensusChain( Census *census, bdescr *bd ) case MVAR_CLEAN: case MVAR_DIRTY: case WEAK: - case STABLE_NAME: + case PRIM: + case MUT_PRIM: case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: prim = rtsTrue; @@ -960,31 +961,6 @@ heapCensusChain( Census *census, bdescr *bd ) break; #endif - case TREC_HEADER: - prim = rtsTrue; - size = sizeofW(StgTRecHeader); - break; - - case TVAR_WATCH_QUEUE: - prim = rtsTrue; - size = sizeofW(StgTVarWatchQueue); - break; - - case INVARIANT_CHECK_QUEUE: - prim = rtsTrue; - size = sizeofW(StgInvariantCheckQueue); - break; - - case ATOMIC_INVARIANT: - prim = rtsTrue; - size = sizeofW(StgAtomicInvariant); - break; - - case TVAR: - prim = rtsTrue; - size = sizeofW(StgTVar); - break; - case TREC_CHUNK: prim = rtsTrue; size = sizeofW(StgTRecChunk); diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index ca5e5ea4c1..d54f823d6e 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -30,10 +30,14 @@ static void raiseAsync (Capability *cap, static void removeFromQueues(Capability *cap, StgTSO *tso); -static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target); +static void blockedThrowTo (Capability *cap, + StgTSO *target, MessageThrowTo *msg); -static void performBlockedException (Capability *cap, - StgTSO *source, StgTSO *target); +static void throwToSendMsg (Capability *cap USED_IF_THREADS, + Capability *target_cap USED_IF_THREADS, + MessageThrowTo *msg USED_IF_THREADS); + +static void performBlockedException (Capability *cap, MessageThrowTo *msg); /* ----------------------------------------------------------------------------- throwToSingleThreaded @@ -96,59 +100,85 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) may be blocked and could be woken up at any point by another CPU. We have some delicate synchronisation to do. - There is a completely safe fallback scheme: it is always possible - to just block the source TSO on the target TSO's blocked_exceptions - queue. This queue is locked using lockTSO()/unlockTSO(). It is - checked at regular intervals: before and after running a thread - (schedule() and threadPaused() respectively), and just before GC - (scheduleDoGC()). Activating a thread on this queue should be done - using maybePerformBlockedException(): this is done in the context - of the target thread, so the exception can be raised eagerly. - - This fallback scheme works even if the target thread is complete or - killed: scheduleDoGC() will discover the blocked thread before the - target is GC'd. - - Blocking the source thread on the target thread's blocked_exception - queue is also employed when the target thread is currently blocking - exceptions (ie. inside Control.Exception.block). - - We could use the safe fallback scheme exclusively, but that - wouldn't be ideal: most calls to throwTo would block immediately, - possibly until the next GC, which might require the deadlock - detection mechanism to kick in. So we try to provide promptness - wherever possible. - - We can promptly deliver the exception if the target thread is: - - - runnable, on the same Capability as the source thread (because - we own the run queue and therefore the target thread). - - - blocked, and we can obtain exclusive access to it. Obtaining - exclusive access to the thread depends on how it is blocked. - - We must also be careful to not trip over threadStackOverflow(), - which might be moving the TSO to enlarge its stack. - lockTSO()/unlockTSO() are used here too. - + The underlying scheme when multiple Capabilities are in use is + message passing: when the target of a throwTo is on another + Capability, we send a message (a MessageThrowTo closure) to that + Capability. + + If the throwTo needs to block because the target TSO is masking + exceptions (the TSO_BLOCKEX flag), then the message is placed on + the blocked_exceptions queue attached to the target TSO. When the + target TSO enters the unmasked state again, it must check the + queue. The blocked_exceptions queue is not locked; only the + Capability owning the TSO may modify it. + + To make things simpler for throwTo, we always create the message + first before deciding what to do. The message may get sent, or it + may get attached to a TSO's blocked_exceptions queue, or the + exception may get thrown immediately and the message dropped, + depending on the current state of the target. + + Currently we send a message if the target belongs to another + Capability, and it is + + - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo, + BlockedOnCCall + + - or it is masking exceptions (TSO_BLOCKEX) + + Currently, if the target is BlockedOnMVar, BlockedOnSTM, or + BlockedOnBlackHole then we acquire ownership of the TSO by locking + its parent container (e.g. the MVar) and then raise the exception. + We might change these cases to be more message-passing-like in the + future. + Returns: - THROWTO_SUCCESS exception was raised, ok to continue + NULL exception was raised, ok to continue - THROWTO_BLOCKED exception was not raised; block the source - thread then call throwToReleaseTarget() when - the source thread is properly tidied away. + MessageThrowTo * exception was not raised; the source TSO + should now put itself in the state + BlockedOnMsgThrowTo, and when it is ready + it should unlock the mssage using + unlockClosure(msg, &stg_MSG_THROWTO_info); + If it decides not to raise the exception after + all, it can revoke it safely with + unlockClosure(msg, &stg_IND_info); -------------------------------------------------------------------------- */ -nat +MessageThrowTo * throwTo (Capability *cap, // the Capability we hold StgTSO *source, // the TSO sending the exception (or NULL) StgTSO *target, // the TSO receiving the exception - StgClosure *exception, // the exception closure - /*[out]*/ void **out USED_IF_THREADS) + StgClosure *exception) // the exception closure +{ + MessageThrowTo *msg; + + msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo)); + // message starts locked; the caller has to unlock it when it is + // ready. + msg->header.info = &stg_WHITEHOLE_info; + msg->source = source; + msg->target = target; + msg->exception = exception; + + switch (throwToMsg(cap, msg)) + { + case THROWTO_SUCCESS: + return NULL; + case THROWTO_BLOCKED: + default: + return msg; + } +} + + +nat +throwToMsg (Capability *cap, MessageThrowTo *msg) { StgWord status; + StgTSO *target = msg->target; ASSERT(target != END_TSO_QUEUE); @@ -159,13 +189,10 @@ throwTo (Capability *cap, // the Capability we hold // ASSERT(get_itbl(target)->type == TSO); } - if (source != NULL) { - debugTrace(DEBUG_sched, "throwTo: from thread %lu to thread %lu", - (unsigned long)source->id, (unsigned long)target->id); - } else { - debugTrace(DEBUG_sched, "throwTo: from RTS to thread %lu", - (unsigned long)target->id); - } + debugTraceCap(DEBUG_sched, cap, + "throwTo: from thread %lu to thread %lu", + (unsigned long)msg->source->id, + (unsigned long)msg->target->id); #ifdef DEBUG traceThreadStatus(DEBUG_sched, target); @@ -173,6 +200,7 @@ throwTo (Capability *cap, // the Capability we hold goto check_target; retry: + write_barrier(); debugTrace(DEBUG_sched, "throwTo: retrying..."); check_target: @@ -188,6 +216,7 @@ check_target: switch (status) { case NotBlocked: + case BlockedOnMsgWakeup: /* if status==NotBlocked, and target->cap == cap, then we own this TSO and can raise the exception. @@ -251,37 +280,80 @@ check_target: write_barrier(); target_cap = target->cap; - if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) { - // It's on our run queue and not blocking exceptions - raiseAsync(cap, target, exception, rtsFalse, NULL); - return THROWTO_SUCCESS; - } else { - // Otherwise, just block on the blocked_exceptions queue - // of the target thread. The queue will get looked at - // soon enough: it is checked before and after running a - // thread, and during GC. - lockTSO(target); - - // Avoid race with threadStackOverflow, which may have - // just moved this TSO. - if (target->what_next == ThreadRelocated) { - unlockTSO(target); - target = target->_link; - goto retry; - } - // check again for ThreadComplete and ThreadKilled. This - // cooperates with scheduleHandleThreadFinished to ensure - // that we never miss any threads that are throwing an - // exception to a thread in the process of terminating. - if (target->what_next == ThreadComplete - || target->what_next == ThreadKilled) { - unlockTSO(target); + if (target_cap != cap) { + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; + } else { + if ((target->flags & TSO_BLOCKEX) == 0) { + // It's on our run queue and not blocking exceptions + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); return THROWTO_SUCCESS; + } else { + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; } - blockedThrowTo(cap,source,target); - *out = target; - return THROWTO_BLOCKED; - } + } + } + + case BlockedOnMsgThrowTo: + { + Capability *target_cap; + const StgInfoTable *i; + MessageThrowTo *m; + + m = target->block_info.throwto; + + // target is local to this cap, but has sent a throwto + // message to another cap. + // + // The source message is locked. We need to revoke the + // target's message so that we can raise the exception, so + // we attempt to lock it. + + // There's a possibility of a deadlock if two threads are both + // trying to throwTo each other (or more generally, a cycle of + // threads). To break the symmetry we compare the addresses + // of the MessageThrowTo objects, and the one for which m < + // msg gets to spin, while the other can only try to lock + // once, but must then back off and unlock both before trying + // again. + if (m < msg) { + i = lockClosure((StgClosure *)m); + } else { + i = tryLockClosure((StgClosure *)m); + if (i == NULL) { +// debugBelch("collision\n"); + throwToSendMsg(cap, target->cap, msg); + return THROWTO_BLOCKED; + } + } + + if (i != &stg_MSG_THROWTO_info) { + // if it's an IND, this TSO has been woken up by another Cap + unlockClosure((StgClosure*)m, i); + goto retry; + } + + target_cap = target->cap; + if (target_cap != cap) { + unlockClosure((StgClosure*)m, i); + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; + } + + if ((target->flags & TSO_BLOCKEX) && + ((target->flags & TSO_INTERRUPTIBLE) == 0)) { + unlockClosure((StgClosure*)m, i); + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; + } + + // nobody else can wake up this TSO after we claim the message + unlockClosure((StgClosure*)m, &stg_IND_info); + + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + unblockOne(cap, target); + return THROWTO_SUCCESS; } case BlockedOnMVar: @@ -322,14 +394,17 @@ check_target: if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - lockClosure((StgClosure *)target); - blockedThrowTo(cap,source,target); + Capability *target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap,target_cap,msg); + } else { + blockedThrowTo(cap,target,msg); + } unlockClosure((StgClosure *)mvar, info); - *out = target; - return THROWTO_BLOCKED; // caller releases TSO + return THROWTO_BLOCKED; } else { removeThreadFromMVarQueue(cap, mvar, target); - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); unblockOne(cap, target); unlockClosure((StgClosure *)mvar, info); return THROWTO_SUCCESS; @@ -346,84 +421,23 @@ check_target: } if (target->flags & TSO_BLOCKEX) { - lockTSO(target); - blockedThrowTo(cap,source,target); + Capability *target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap,target_cap,msg); + } else { + blockedThrowTo(cap,target,msg); + } RELEASE_LOCK(&sched_mutex); - *out = target; - return THROWTO_BLOCKED; // caller releases TSO + return THROWTO_BLOCKED; // caller releases lock } else { removeThreadFromQueue(cap, &blackhole_queue, target); - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); unblockOne(cap, target); RELEASE_LOCK(&sched_mutex); return THROWTO_SUCCESS; } } - case BlockedOnException: - { - StgTSO *target2; - StgInfoTable *info; - - /* - To obtain exclusive access to a BlockedOnException thread, - we must call lockClosure() on the TSO on which it is blocked. - Since the TSO might change underneath our feet, after we - call lockClosure() we must check that - - (a) the closure we locked is actually a TSO - (b) the original thread is still BlockedOnException, - (c) the original thread is still blocked on the TSO we locked - and (d) the target thread has not been relocated. - - We synchronise with threadStackOverflow() (which relocates - threads) using lockClosure()/unlockClosure(). - */ - target2 = target->block_info.tso; - - info = lockClosure((StgClosure *)target2); - if (info != &stg_TSO_info) { - unlockClosure((StgClosure *)target2, info); - goto retry; - } - if (target->what_next == ThreadRelocated) { - target = target->_link; - unlockTSO(target2); - goto retry; - } - if (target2->what_next == ThreadRelocated) { - target->block_info.tso = target2->_link; - unlockTSO(target2); - goto retry; - } - if (target->why_blocked != BlockedOnException - || target->block_info.tso != target2) { - unlockTSO(target2); - goto retry; - } - - /* - Now we have exclusive rights to the target TSO... - - If it is blocking exceptions, add the source TSO to its - blocked_exceptions queue. Otherwise, raise the exception. - */ - if ((target->flags & TSO_BLOCKEX) && - ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - lockTSO(target); - blockedThrowTo(cap,source,target); - unlockTSO(target2); - *out = target; - return THROWTO_BLOCKED; - } else { - removeThreadFromQueue(cap, &target2->blocked_exceptions, target); - raiseAsync(cap, target, exception, rtsFalse, NULL); - unblockOne(cap, target); - unlockTSO(target2); - return THROWTO_SUCCESS; - } - } - case BlockedOnSTM: lockTSO(target); // Unblocking BlockedOnSTM threads requires the TSO to be @@ -434,11 +448,16 @@ check_target: } if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - blockedThrowTo(cap,source,target); - *out = target; + Capability *target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap,target_cap,msg); + } else { + blockedThrowTo(cap,target,msg); + } + unlockTSO(target); return THROWTO_BLOCKED; } else { - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); unblockOne(cap, target); unlockTSO(target); return THROWTO_SUCCESS; @@ -446,19 +465,18 @@ check_target: case BlockedOnCCall: case BlockedOnCCall_NoUnblockExc: - // I don't think it's possible to acquire ownership of a - // BlockedOnCCall thread. We just assume that the target - // thread is blocking exceptions, and block on its - // blocked_exception queue. - lockTSO(target); - if (target->why_blocked != BlockedOnCCall && - target->why_blocked != BlockedOnCCall_NoUnblockExc) { - unlockTSO(target); - goto retry; - } - blockedThrowTo(cap,source,target); - *out = target; + { + Capability *target_cap; + + target_cap = target->cap; + if (target_cap != cap) { + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; + } + + blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; + } #ifndef THREADEDED_RTS case BlockedOnRead: @@ -469,11 +487,11 @@ check_target: #endif if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - blockedThrowTo(cap,source,target); + blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; } else { removeFromQueues(cap,target); - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); return THROWTO_SUCCESS; } #endif @@ -484,33 +502,34 @@ check_target: barf("throwTo"); } -// Block a TSO on another TSO's blocked_exceptions queue. -// Precondition: we hold an exclusive lock on the target TSO (this is -// complex to achieve as there's no single lock on a TSO; see -// throwTo()). static void -blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target) +throwToSendMsg (Capability *cap STG_UNUSED, + Capability *target_cap USED_IF_THREADS, + MessageThrowTo *msg USED_IF_THREADS) + { - if (source != NULL) { - debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id); - setTSOLink(cap, source, target->blocked_exceptions); - target->blocked_exceptions = source; - dirty_TSO(cap,target); // we modified the blocked_exceptions queue - - source->block_info.tso = target; - write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is. - source->why_blocked = BlockedOnException; - } -} +#ifdef THREADED_RTS + debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); + sendMessage(target_cap, (Message*)msg); +#endif +} -#ifdef THREADED_RTS -void -throwToReleaseTarget (void *tso) +// Block a throwTo message on the target TSO's blocked_exceptions +// queue. The current Capability must own the target TSO in order to +// modify the blocked_exceptions queue. +static void +blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg) { - unlockTSO((StgTSO *)tso); + debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu", + (unsigned long)target->id); + + ASSERT(target->cap == cap); + + msg->link = (Message*)target->blocked_exceptions; + target->blocked_exceptions = msg; + dirty_TSO(cap,target); // we modified the blocked_exceptions queue } -#endif /* ----------------------------------------------------------------------------- Waking up threads blocked in throwTo @@ -532,10 +551,11 @@ throwToReleaseTarget (void *tso) int maybePerformBlockedException (Capability *cap, StgTSO *tso) { - StgTSO *source; + MessageThrowTo *msg; + const StgInfoTable *i; if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) { - if (tso->blocked_exceptions != END_TSO_QUEUE) { + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { awakenBlockedExceptionQueue(cap,tso); return 1; } else { @@ -543,32 +563,30 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) } } - if (tso->blocked_exceptions != END_TSO_QUEUE && + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && (tso->flags & TSO_BLOCKEX) != 0) { debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); } - if (tso->blocked_exceptions != END_TSO_QUEUE + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && ((tso->flags & TSO_BLOCKEX) == 0 || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) { - // Lock the TSO, this gives us exclusive access to the queue - lockTSO(tso); - - // Check the queue again; it might have changed before we - // locked it. - if (tso->blocked_exceptions == END_TSO_QUEUE) { - unlockTSO(tso); - return 0; - } - // We unblock just the first thread on the queue, and perform // its throw immediately. - source = tso->blocked_exceptions; - performBlockedException(cap, source, tso); - tso->blocked_exceptions = unblockOne_(cap, source, - rtsFalse/*no migrate*/); - unlockTSO(tso); + loop: + msg = tso->blocked_exceptions; + if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0; + i = lockClosure((StgClosure*)msg); + tso->blocked_exceptions = (MessageThrowTo*)msg->link; + if (i == &stg_IND_info) { + unlockClosure((StgClosure*)msg,i); + goto loop; + } + + performBlockedException(cap, msg); + unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); + unlockClosure((StgClosure*)msg,&stg_IND_info); return 1; } return 0; @@ -580,25 +598,34 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) { - lockTSO(tso); - awakenBlockedQueue(cap, tso->blocked_exceptions); - tso->blocked_exceptions = END_TSO_QUEUE; - unlockTSO(tso); + MessageThrowTo *msg; + const StgInfoTable *i; + + for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE; + msg = (MessageThrowTo*)msg->link) { + i = lockClosure((StgClosure *)msg); + if (i != &stg_IND_info) { + unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); + } + unlockClosure((StgClosure *)msg,i); + } + tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; } static void -performBlockedException (Capability *cap, StgTSO *source, StgTSO *target) +performBlockedException (Capability *cap, MessageThrowTo *msg) { - StgClosure *exception; + StgTSO *source; + + source = msg->source; - ASSERT(source->why_blocked == BlockedOnException); - ASSERT(source->block_info.tso->id == target->id); + ASSERT(source->why_blocked == BlockedOnMsgThrowTo); + ASSERT(source->block_info.closure == (StgClosure *)msg); ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info); - ASSERT(((StgTSO *)source->sp[1])->id == target->id); + ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id); // check ids not pointers, because the thread might be relocated - exception = (StgClosure *)source->sp[2]; - throwToSingleThreaded(cap, target, exception); + throwToSingleThreaded(cap, msg->target, msg->exception); source->sp += 3; } @@ -637,22 +664,25 @@ removeFromQueues(Capability *cap, StgTSO *tso) removeThreadFromQueue(cap, &blackhole_queue, tso); goto done; - case BlockedOnException: - { - StgTSO *target = tso->block_info.tso; - - // NO: when called by threadPaused(), we probably have this - // TSO already locked (WHITEHOLEd) because we just placed - // ourselves on its queue. - // ASSERT(get_itbl(target)->type == TSO); - - while (target->what_next == ThreadRelocated) { - target = target->_link; - } - - removeThreadFromQueue(cap, &target->blocked_exceptions, tso); - goto done; - } + case BlockedOnMsgWakeup: + { + // kill the message, atomically: + tso->block_info.wakeup->header.info = &stg_IND_info; + break; + } + + case BlockedOnMsgThrowTo: + { + MessageThrowTo *m = tso->block_info.throwto; + // The message is locked by us, unless we got here via + // deleteAllThreads(), in which case we own all the + // capabilities. + // ASSERT(m->header.info == &stg_WHITEHOLE_info); + + // unlock and revoke it at the same time + unlockClosure((StgClosure*)m,&stg_IND_info); + break; + } #if !defined(THREADED_RTS) case BlockedOnRead: @@ -743,6 +773,10 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, } #endif + while (tso->what_next == ThreadRelocated) { + tso = tso->_link; + } + // mark it dirty; we're about to change its stack. dirty_TSO(cap, tso); diff --git a/rts/RaiseAsync.h b/rts/RaiseAsync.h index 96eb96e10b..5137d41f5f 100644 --- a/rts/RaiseAsync.h +++ b/rts/RaiseAsync.h @@ -29,16 +29,13 @@ void suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here); -nat throwTo (Capability *cap, // the Capability we hold - StgTSO *source, // the TSO sending the exception - StgTSO *target, // the TSO receiving the exception - StgClosure *exception, // the exception closure - /*[out]*/ void **out // pass to throwToReleaseTarget() - ); +MessageThrowTo *throwTo (Capability *cap, // the Capability we hold + StgTSO *source, + StgTSO *target, + StgClosure *exception); // the exception closure -#ifdef THREADED_RTS -void throwToReleaseTarget (void *tso); -#endif +nat throwToMsg (Capability *cap, + MessageThrowTo *msg); int maybePerformBlockedException (Capability *cap, StgTSO *tso); void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso); @@ -52,7 +49,7 @@ interruptible(StgTSO *t) { switch (t->why_blocked) { case BlockedOnMVar: - case BlockedOnException: + case BlockedOnMsgThrowTo: case BlockedOnRead: case BlockedOnWrite: #if defined(mingw32_HOST_OS) diff --git a/rts/RetainerProfile.c b/rts/RetainerProfile.c index 4fca19cf2f..b7bc909f63 100644 --- a/rts/RetainerProfile.c +++ b/rts/RetainerProfile.c @@ -509,7 +509,7 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child ) // layout.payload.ptrs, no SRT case CONSTR: - case STABLE_NAME: + case PRIM: case BCO: case CONSTR_STATIC: init_ptrs(&se.info, get_itbl(c)->layout.payload.ptrs, @@ -883,7 +883,7 @@ pop( StgClosure **c, StgClosure **cp, retainer *r ) } case CONSTR: - case STABLE_NAME: + case PRIM: case BCO: case CONSTR_STATIC: // StgMutArrPtr.ptrs, no SRT @@ -1108,7 +1108,7 @@ isRetainer( StgClosure *c ) case CONSTR_STATIC: case FUN_STATIC: // misc - case STABLE_NAME: + case PRIM: case BCO: case ARR_WORDS: // STM diff --git a/rts/RtsMessages.c b/rts/RtsMessages.c index e2a30a613f..6e75abc8a5 100644 --- a/rts/RtsMessages.c +++ b/rts/RtsMessages.c @@ -9,6 +9,8 @@ #include "PosixSource.h" #include "Rts.h" +#include "eventlog/EventLog.h" + #include <stdio.h> #include <string.h> #include <errno.h> @@ -161,6 +163,10 @@ rtsFatalInternalErrorFn(const char *s, va_list ap) fflush(stderr); } +#ifdef TRACING + if (RtsFlags.TraceFlags.tracing == TRACE_EVENTLOG) endEventLogging(); +#endif + abort(); // stg_exit(EXIT_INTERNAL_ERROR); } @@ -352,8 +352,7 @@ static StgBool watcher_is_tso(StgTVarWatchQueue *q) { static StgBool watcher_is_invariant(StgTVarWatchQueue *q) { StgClosure *c = q -> closure; - StgInfoTable *info = get_itbl(c); - return (info -> type) == ATOMIC_INVARIANT; + return (c->header.info == &stg_ATOMIC_INVARIANT_info); } /*......................................................................*/ diff --git a/rts/Schedule.c b/rts/Schedule.c index 4cca469869..70e0246fbe 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -139,7 +139,7 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); -static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); +static void scheduleProcessInbox(Capability *cap); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); static void schedulePushWork(Capability *cap, Task *task); @@ -618,7 +618,7 @@ scheduleFindWork (Capability *cap) // list each time around the scheduler. if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - scheduleCheckWakeupThreads(cap); + scheduleProcessInbox(cap); scheduleCheckBlockedThreads(cap); @@ -673,7 +673,7 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield) if (!force_yield && !shouldYieldCapability(cap,task) && (!emptyRunQueue(cap) || - !emptyWakeupQueue(cap) || + !emptyInbox(cap) || blackholes_need_checking || sched_state >= SCHED_INTERRUPTING)) return; @@ -725,7 +725,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, for (i=0, n_free_caps=0; i < n_capabilities; i++) { cap0 = &capabilities[i]; if (cap != cap0 && tryGrabCapability(cap0,task)) { - if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) { + if (!emptyRunQueue(cap0) + || cap->returning_tasks_hd != NULL + || cap->inbox != (Message*)END_TSO_QUEUE) { // it already has some work, we just grabbed it at // the wrong moment. Or maybe it's deadlocked! releaseCapability(cap0); @@ -871,23 +873,89 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) * Check for threads woken up by other Capabilities * ------------------------------------------------------------------------- */ +#if defined(THREADED_RTS) +static void +executeMessage (Capability *cap, Message *m) +{ + const StgInfoTable *i; + +loop: + write_barrier(); // allow m->header to be modified by another thread + i = m->header.info; + if (i == &stg_MSG_WAKEUP_info) + { + MessageWakeup *w = (MessageWakeup *)m; + StgTSO *tso = w->tso; + debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", + (lnat)tso->id); + ASSERT(tso->cap == cap); + ASSERT(tso->why_blocked == BlockedOnMsgWakeup); + ASSERT(tso->block_info.closure == (StgClosure *)m); + tso->why_blocked = NotBlocked; + appendToRunQueue(cap, tso); + } + else if (i == &stg_MSG_THROWTO_info) + { + MessageThrowTo *t = (MessageThrowTo *)m; + nat r; + const StgInfoTable *i; + + i = lockClosure((StgClosure*)m); + if (i != &stg_MSG_THROWTO_info) { + unlockClosure((StgClosure*)m, i); + goto loop; + } + + debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", + (lnat)t->source->id, (lnat)t->target->id); + + ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo); + ASSERT(t->source->block_info.closure == (StgClosure *)m); + + r = throwToMsg(cap, t); + + switch (r) { + case THROWTO_SUCCESS: + ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info); + t->source->sp += 3; + unblockOne(cap, t->source); + // this message is done + unlockClosure((StgClosure*)m, &stg_IND_info); + break; + case THROWTO_BLOCKED: + // unlock the message + unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info); + break; + } + } + else if (i == &stg_IND_info) + { + // message was revoked + return; + } + else if (i == &stg_WHITEHOLE_info) + { + goto loop; + } + else + { + barf("executeMessage: %p", i); + } +} +#endif + static void -scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS) +scheduleProcessInbox (Capability *cap USED_IF_THREADS) { #if defined(THREADED_RTS) - // Any threads that were woken up by other Capabilities get - // appended to our run queue. - if (!emptyWakeupQueue(cap)) { - ACQUIRE_LOCK(&cap->lock); - if (emptyRunQueue(cap)) { - cap->run_queue_hd = cap->wakeup_queue_hd; - cap->run_queue_tl = cap->wakeup_queue_tl; - } else { - setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd); - cap->run_queue_tl = cap->wakeup_queue_tl; - } - cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; - RELEASE_LOCK(&cap->lock); + Message *m; + + while (!emptyInbox(cap)) { + ACQUIRE_LOCK(&cap->lock); + m = cap->inbox; + cap->inbox = m->link; + RELEASE_LOCK(&cap->lock); + executeMessage(cap, (Message *)m); } #endif } @@ -983,7 +1051,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) switch (task->incall->tso->why_blocked) { case BlockedOnSTM: case BlockedOnBlackHole: - case BlockedOnException: + case BlockedOnMsgThrowTo: case BlockedOnMVar: throwToSingleThreaded(cap, task->incall->tso, (StgClosure *)nonTermination_closure); @@ -1268,9 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) */ // blocked exceptions can now complete, even if the thread was in - // blocked mode (see #2910). This unconditionally calls - // lockTSO(), which ensures that we don't miss any threads that - // are engaged in throwTo() with this thread as a target. + // blocked mode (see #2910). awakenBlockedExceptionQueue (cap, t); // @@ -1884,7 +1950,7 @@ resumeThread (void *task_) if (tso->why_blocked == BlockedOnCCall) { // avoid locking the TSO if we don't have to - if (tso->blocked_exceptions != END_TSO_QUEUE) { + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { awakenBlockedExceptionQueue(cap,tso); } tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE); @@ -2187,10 +2253,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) IF_DEBUG(sanity,checkTSO(tso)); - // don't allow throwTo() to modify the blocked_exceptions queue - // while we are moving the TSO: - lockClosure((StgClosure *)tso); - if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) { // NB. never raise a StackOverflow exception if the thread is @@ -2201,7 +2263,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) // if (tso->flags & TSO_SQUEEZED) { - unlockTSO(tso); return tso; } // #3677: In a stack overflow situation, stack squeezing may @@ -2223,7 +2284,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) tso->sp+64))); // Send this thread the StackOverflow exception - unlockTSO(tso); throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure); return tso; } @@ -2239,7 +2299,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) // the stack anyway. if ((tso->flags & TSO_SQUEEZED) && ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) { - unlockTSO(tso); return tso; } @@ -2289,9 +2348,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->why_blocked = NotBlocked; - unlockTSO(dest); - unlockTSO(tso); - IF_DEBUG(sanity,checkTSO(dest)); #if 0 IF_DEBUG(scheduler,printTSO(dest)); @@ -2324,10 +2380,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) return tso; } - // don't allow throwTo() to modify the blocked_exceptions queue - // while we are moving the TSO: - lockClosure((StgClosure *)tso); - // this is the number of words we'll free free_w = round_to_mblocks(tso_size_w/2); @@ -2358,9 +2410,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) task->incall->tso = new_tso; } - unlockTSO(new_tso); - unlockTSO(tso); - IF_DEBUG(sanity,checkTSO(new_tso)); return new_tso; @@ -2691,61 +2740,9 @@ resurrectThreads (StgTSO *threads) * can wake up threads, remember...). */ continue; - case BlockedOnException: - // throwTo should never block indefinitely: if the target - // thread dies or completes, throwTo returns. - barf("resurrectThreads: thread BlockedOnException"); - break; default: - barf("resurrectThreads: thread blocked in a strange way"); + barf("resurrectThreads: thread blocked in a strange way: %d", + tso->why_blocked); } } } - -/* ----------------------------------------------------------------------------- - performPendingThrowTos is called after garbage collection, and - passed a list of threads that were found to have pending throwTos - (tso->blocked_exceptions was not empty), and were blocked. - Normally this doesn't happen, because we would deliver the - exception directly if the target thread is blocked, but there are - small windows where it might occur on a multiprocessor (see - throwTo()). - - NB. we must be holding all the capabilities at this point, just - like resurrectThreads(). - -------------------------------------------------------------------------- */ - -void -performPendingThrowTos (StgTSO *threads) -{ - StgTSO *tso, *next; - Capability *cap; - Task *task, *saved_task;; - generation *gen; - - task = myTask(); - cap = task->cap; - - for (tso = threads; tso != END_TSO_QUEUE; tso = next) { - next = tso->global_link; - - gen = Bdescr((P_)tso)->gen; - tso->global_link = gen->threads; - gen->threads = tso; - - debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id); - - // We must pretend this Capability belongs to the current Task - // for the time being, as invariants will be broken otherwise. - // In fact the current Task has exclusive access to the systme - // at this point, so this is just bookkeeping: - task->cap = tso->cap; - saved_task = tso->cap->running_task; - tso->cap->running_task = task; - maybePerformBlockedException(tso->cap, tso); - tso->cap->running_task = saved_task; - } - - // Restore our original Capability: - task->cap = cap; -} diff --git a/rts/Schedule.h b/rts/Schedule.h index af322d804f..76138b68f3 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -105,7 +105,6 @@ extern Mutex sched_mutex; void interruptStgRts (void); void resurrectThreads (StgTSO *); -void performPendingThrowTos (StgTSO *); /* ----------------------------------------------------------------------------- * Some convenient macros/inline functions... @@ -179,25 +178,6 @@ appendToBlockedQueue(StgTSO *tso) } #endif -#if defined(THREADED_RTS) -// Assumes: my_cap is owned by the current Task. We hold -// other_cap->lock, but we do not necessarily own other_cap; another -// Task may be running on it. -INLINE_HEADER void -appendToWakeupQueue (Capability *my_cap, Capability *other_cap, StgTSO *tso) -{ - ASSERT(tso->_link == END_TSO_QUEUE); - if (other_cap->wakeup_queue_hd == END_TSO_QUEUE) { - other_cap->wakeup_queue_hd = tso; - } else { - // my_cap is passed to setTSOLink() because it may need to - // write to the mutable list. - setTSOLink(my_cap, other_cap->wakeup_queue_tl, tso); - } - other_cap->wakeup_queue_tl = tso; -} -#endif - /* Check whether various thread queues are empty */ INLINE_HEADER rtsBool @@ -212,14 +192,6 @@ emptyRunQueue(Capability *cap) return emptyQueue(cap->run_queue_hd); } -#if defined(THREADED_RTS) -INLINE_HEADER rtsBool -emptyWakeupQueue(Capability *cap) -{ - return emptyQueue(cap->wakeup_queue_hd); -} -#endif - #if !defined(THREADED_RTS) #define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd)) #define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue)) diff --git a/rts/StgMiscClosures.cmm b/rts/StgMiscClosures.cmm index 8fd96c16fc..f111875760 100644 --- a/rts/StgMiscClosures.cmm +++ b/rts/StgMiscClosures.cmm @@ -419,7 +419,7 @@ CLOSURE(stg_NO_FINALIZER_closure,stg_NO_FINALIZER); Stable Names are unlifted too. ------------------------------------------------------------------------- */ -INFO_TABLE(stg_STABLE_NAME,0,1,STABLE_NAME,"STABLE_NAME","STABLE_NAME") +INFO_TABLE(stg_STABLE_NAME,0,1,PRIM,"STABLE_NAME","STABLE_NAME") { foreign "C" barf("STABLE_NAME object entered!") never returns; } /* ---------------------------------------------------------------------------- @@ -439,22 +439,22 @@ INFO_TABLE(stg_MVAR_DIRTY,3,0,MVAR_DIRTY,"MVAR","MVAR") STM -------------------------------------------------------------------------- */ -INFO_TABLE(stg_TVAR, 0, 0, TVAR, "TVAR", "TVAR") +INFO_TABLE(stg_TVAR, 2, 1, MUT_PRIM, "TVAR", "TVAR") { foreign "C" barf("TVAR object entered!") never returns; } -INFO_TABLE(stg_TVAR_WATCH_QUEUE, 0, 0, TVAR_WATCH_QUEUE, "TVAR_WATCH_QUEUE", "TVAR_WATCH_QUEUE") +INFO_TABLE(stg_TVAR_WATCH_QUEUE, 3, 0, MUT_PRIM, "TVAR_WATCH_QUEUE", "TVAR_WATCH_QUEUE") { foreign "C" barf("TVAR_WATCH_QUEUE object entered!") never returns; } -INFO_TABLE(stg_ATOMIC_INVARIANT, 0, 0, ATOMIC_INVARIANT, "ATOMIC_INVARIANT", "ATOMIC_INVARIANT") +INFO_TABLE(stg_ATOMIC_INVARIANT, 2, 1, MUT_PRIM, "ATOMIC_INVARIANT", "ATOMIC_INVARIANT") { foreign "C" barf("ATOMIC_INVARIANT object entered!") never returns; } -INFO_TABLE(stg_INVARIANT_CHECK_QUEUE, 0, 0, INVARIANT_CHECK_QUEUE, "INVARIANT_CHECK_QUEUE", "INVARIANT_CHECK_QUEUE") +INFO_TABLE(stg_INVARIANT_CHECK_QUEUE, 3, 0, MUT_PRIM, "INVARIANT_CHECK_QUEUE", "INVARIANT_CHECK_QUEUE") { foreign "C" barf("INVARIANT_CHECK_QUEUE object entered!") never returns; } INFO_TABLE(stg_TREC_CHUNK, 0, 0, TREC_CHUNK, "TREC_CHUNK", "TREC_CHUNK") { foreign "C" barf("TREC_CHUNK object entered!") never returns; } -INFO_TABLE(stg_TREC_HEADER, 0, 0, TREC_HEADER, "TREC_HEADER", "TREC_HEADER") +INFO_TABLE(stg_TREC_HEADER, 3, 1, MUT_PRIM, "TREC_HEADER", "TREC_HEADER") { foreign "C" barf("TREC_HEADER object entered!") never returns; } INFO_TABLE_CONSTR(stg_END_STM_WATCH_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_STM_WATCH_QUEUE","END_STM_WATCH_QUEUE") @@ -478,6 +478,17 @@ CLOSURE(stg_END_STM_CHUNK_LIST_closure,stg_END_STM_CHUNK_LIST); CLOSURE(stg_NO_TREC_closure,stg_NO_TREC); /* ---------------------------------------------------------------------------- + Messages + ------------------------------------------------------------------------- */ + +// PRIM rather than CONSTR, because PRIM objects cannot be duplicated by the GC. +INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP") +{ foreign "C" barf("MSG_WAKEUP object entered!") never returns; } + +INFO_TABLE_CONSTR(stg_MSG_THROWTO,4,0,0,PRIM,"MSG_THROWTO","MSG_THROWTO") +{ foreign "C" barf("MSG_THROWTO object entered!") never returns; } + +/* ---------------------------------------------------------------------------- END_TSO_QUEUE This is a static nullary constructor (like []) that we use to mark the @@ -490,18 +501,6 @@ INFO_TABLE_CONSTR(stg_END_TSO_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_TSO_QUEUE","E CLOSURE(stg_END_TSO_QUEUE_closure,stg_END_TSO_QUEUE); /* ---------------------------------------------------------------------------- - Exception lists - ------------------------------------------------------------------------- */ - -INFO_TABLE_CONSTR(stg_END_EXCEPTION_LIST,0,0,0,CONSTR_NOCAF_STATIC,"END_EXCEPTION_LIST","END_EXCEPTION_LIST") -{ foreign "C" barf("END_EXCEPTION_LIST object entered!") never returns; } - -CLOSURE(stg_END_EXCEPTION_LIST_closure,stg_END_EXCEPTION_LIST); - -INFO_TABLE(stg_EXCEPTION_CONS,1,1,CONSTR,"EXCEPTION_CONS","EXCEPTION_CONS") -{ foreign "C" barf("EXCEPTION_CONS object entered!") never returns; } - -/* ---------------------------------------------------------------------------- Arrays These come in two basic flavours: arrays of data (StgArrWords) and arrays of diff --git a/rts/Threads.c b/rts/Threads.c index 08b7aab66e..f824d021d4 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -74,7 +74,7 @@ createThread(Capability *cap, nat size) tso->what_next = ThreadRunGHC; tso->why_blocked = NotBlocked; - tso->blocked_exceptions = END_TSO_QUEUE; + tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; tso->flags = 0; tso->dirty = 1; @@ -218,8 +218,9 @@ unblockOne_ (Capability *cap, StgTSO *tso, // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO); ASSERT(tso->why_blocked != NotBlocked); + ASSERT(tso->why_blocked != BlockedOnMsgWakeup || + tso->block_info.closure->header.info == &stg_IND_info); - tso->why_blocked = NotBlocked; next = tso->_link; tso->_link = END_TSO_QUEUE; @@ -235,6 +236,8 @@ unblockOne_ (Capability *cap, StgTSO *tso, } tso->cap = cap; + write_barrier(); + tso->why_blocked = NotBlocked; appendToRunQueue(cap,tso); // context-switch soonish so we can migrate the new thread if @@ -246,6 +249,7 @@ unblockOne_ (Capability *cap, StgTSO *tso, wakeupThreadOnCapability(cap, tso->cap, tso); } #else + tso->why_blocked = NotBlocked; appendToRunQueue(cap,tso); // context-switch soonish so we can migrate the new thread if @@ -327,13 +331,15 @@ printThreadBlockage(StgTSO *tso) case BlockedOnMVar: debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); break; - case BlockedOnException: - debugBelch("is blocked on delivering an exception to thread %lu", - (unsigned long)tso->block_info.tso->id); - break; case BlockedOnBlackHole: debugBelch("is blocked on a black hole"); break; + case BlockedOnMsgWakeup: + debugBelch("is blocked on a wakeup message"); + break; + case BlockedOnMsgThrowTo: + debugBelch("is blocked on a throwto message"); + break; case NotBlocked: debugBelch("is not blocked"); break; diff --git a/rts/Threads.h b/rts/Threads.h index 8e0ee264f4..dfe879e7bb 100644 --- a/rts/Threads.h +++ b/rts/Threads.h @@ -11,6 +11,8 @@ BEGIN_RTS_PRIVATE +#define END_BLOCKED_EXCEPTIONS_QUEUE ((MessageThrowTo*)END_TSO_QUEUE) + StgTSO * unblockOne (Capability *cap, StgTSO *tso); StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate); diff --git a/rts/Trace.h b/rts/Trace.h index f8b6ad497d..69ea3d3d32 100644 --- a/rts/Trace.h +++ b/rts/Trace.h @@ -135,6 +135,15 @@ void traceUserMsg(Capability *cap, char *msg); #define debugTrace(class, str, ...) /* nothing */ #endif +#ifdef DEBUG +#define debugTraceCap(class, cap, msg, ...) \ + if (RTS_UNLIKELY(class)) { \ + traceCap_(cap, msg, ##__VA_ARGS__); \ + } +#else +#define debugTraceCap(class, cap, str, ...) /* nothing */ +#endif + /* * Emit a message/event describing the state of a thread */ @@ -152,6 +161,7 @@ void traceThreadStatus_ (StgTSO *tso); #define traceCap(class, cap, msg, ...) /* nothing */ #define trace(class, msg, ...) /* nothing */ #define debugTrace(class, str, ...) /* nothing */ +#define debugTraceCap(class, cap, str, ...) /* nothing */ #define traceThreadStatus(class, tso) /* nothing */ #endif /* TRACING */ diff --git a/rts/eventlog/EventLog.h b/rts/eventlog/EventLog.h index fd87820e08..6ebf33ddc1 100644 --- a/rts/eventlog/EventLog.h +++ b/rts/eventlog/EventLog.h @@ -59,7 +59,7 @@ INLINE_HEADER void postMsg (char *msg STG_UNUSED, va_list ap STG_UNUSED) { /* nothing */ } -INLINE_HEADER void postCapMsg (Capability *cap, +INLINE_HEADER void postCapMsg (Capability *cap STG_UNUSED, char *msg STG_UNUSED, va_list ap STG_UNUSED) { /* nothing */ } diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c index e55ae2b7c2..39284f9112 100644 --- a/rts/sm/Compact.c +++ b/rts/sm/Compact.c @@ -471,7 +471,8 @@ thread_TSO (StgTSO *tso) if ( tso->why_blocked == BlockedOnMVar || tso->why_blocked == BlockedOnBlackHole - || tso->why_blocked == BlockedOnException + || tso->why_blocked == BlockedOnMsgThrowTo + || tso->why_blocked == BlockedOnMsgWakeup ) { thread_(&tso->block_info.closure); } @@ -622,7 +623,8 @@ thread_obj (StgInfoTable *info, StgPtr p) case FUN: case CONSTR: - case STABLE_NAME: + case PRIM: + case MUT_PRIM: case IND_PERM: case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: @@ -705,32 +707,6 @@ thread_obj (StgInfoTable *info, StgPtr p) case TSO: return thread_TSO((StgTSO *)p); - case TVAR_WATCH_QUEUE: - { - StgTVarWatchQueue *wq = (StgTVarWatchQueue *)p; - thread_(&wq->closure); - thread_(&wq->next_queue_entry); - thread_(&wq->prev_queue_entry); - return p + sizeofW(StgTVarWatchQueue); - } - - case TVAR: - { - StgTVar *tvar = (StgTVar *)p; - thread((void *)&tvar->current_value); - thread((void *)&tvar->first_watch_queue_entry); - return p + sizeofW(StgTVar); - } - - case TREC_HEADER: - { - StgTRecHeader *trec = (StgTRecHeader *)p; - thread_(&trec->enclosing_trec); - thread_(&trec->current_chunk); - thread_(&trec->invariants_to_check); - return p + sizeofW(StgTRecHeader); - } - case TREC_CHUNK: { StgWord i; @@ -745,23 +721,6 @@ thread_obj (StgInfoTable *info, StgPtr p) return p + sizeofW(StgTRecChunk); } - case ATOMIC_INVARIANT: - { - StgAtomicInvariant *invariant = (StgAtomicInvariant *)p; - thread_(&invariant->code); - thread_(&invariant->last_execution); - return p + sizeofW(StgAtomicInvariant); - } - - case INVARIANT_CHECK_QUEUE: - { - StgInvariantCheckQueue *queue = (StgInvariantCheckQueue *)p; - thread_(&queue->invariant); - thread_(&queue->my_execution); - thread_(&queue->next_queue_entry); - return p + sizeofW(StgInvariantCheckQueue); - } - default: barf("update_fwd: unknown/strange object %d", (int)(info->type)); return NULL; diff --git a/rts/sm/Evac.c b/rts/sm/Evac.c index 76026b0bac..21017a63a0 100644 --- a/rts/sm/Evac.c +++ b/rts/sm/Evac.c @@ -626,7 +626,8 @@ loop: return; case WEAK: - case STABLE_NAME: + case PRIM: + case MUT_PRIM: copy_tag(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag); return; @@ -721,30 +722,10 @@ loop: } } - case TREC_HEADER: - copy(p,info,q,sizeofW(StgTRecHeader),gen); - return; - - case TVAR_WATCH_QUEUE: - copy(p,info,q,sizeofW(StgTVarWatchQueue),gen); - return; - - case TVAR: - copy(p,info,q,sizeofW(StgTVar),gen); - return; - case TREC_CHUNK: copy(p,info,q,sizeofW(StgTRecChunk),gen); return; - case ATOMIC_INVARIANT: - copy(p,info,q,sizeofW(StgAtomicInvariant),gen); - return; - - case INVARIANT_CHECK_QUEUE: - copy(p,info,q,sizeofW(StgInvariantCheckQueue),gen); - return; - default: barf("evacuate: strange closure type %d", (int)(INFO_PTR_TO_STRUCT(info)->type)); } diff --git a/rts/sm/GC.c b/rts/sm/GC.c index 2eabdabee3..ae6fc998da 100644 --- a/rts/sm/GC.c +++ b/rts/sm/GC.c @@ -732,7 +732,6 @@ SET_GCT(gc_threads[0]); // send exceptions to any threads which were about to die RELEASE_SM_LOCK; resurrectThreads(resurrected_threads); - performPendingThrowTos(exception_threads); ACQUIRE_SM_LOCK; // Update the stable pointer hash table. diff --git a/rts/sm/MarkWeak.c b/rts/sm/MarkWeak.c index 7b7187c94d..9df39b9a08 100644 --- a/rts/sm/MarkWeak.c +++ b/rts/sm/MarkWeak.c @@ -22,6 +22,7 @@ #include "Schedule.h" #include "Weak.h" #include "Storage.h" +#include "Threads.h" /* ----------------------------------------------------------------------------- Weak Pointers @@ -80,9 +81,6 @@ StgWeak *old_weak_ptr_list; // also pending finaliser list // List of threads found to be unreachable StgTSO *resurrected_threads; -// List of blocked threads found to have pending throwTos -StgTSO *exception_threads; - static void resurrectUnreachableThreads (generation *gen); static rtsBool tidyThreadList (generation *gen); @@ -93,7 +91,6 @@ initWeakForGC(void) weak_ptr_list = NULL; weak_stage = WeakPtrs; resurrected_threads = END_TSO_QUEUE; - exception_threads = END_TSO_QUEUE; } rtsBool @@ -286,35 +283,11 @@ static rtsBool tidyThreadList (generation *gen) next = t->global_link; - // This is a good place to check for blocked - // exceptions. It might be the case that a thread is - // blocked on delivering an exception to a thread that - // is also blocked - we try to ensure that this - // doesn't happen in throwTo(), but it's too hard (or - // impossible) to close all the race holes, so we - // accept that some might get through and deal with - // them here. A GC will always happen at some point, - // even if the system is otherwise deadlocked. - // - // If an unreachable thread has blocked - // exceptions, we really want to perform the - // blocked exceptions rather than throwing - // BlockedIndefinitely exceptions. This is the - // only place we can discover such threads. - // The target thread might even be - // ThreadFinished or ThreadKilled. Bugs here - // will only be seen when running on a - // multiprocessor. - if (t->blocked_exceptions != END_TSO_QUEUE) { - if (tmp == NULL) { - evacuate((StgClosure **)&t); - flag = rtsTrue; - } - t->global_link = exception_threads; - exception_threads = t; - *prev = next; - continue; - } + // if the thread is not masking exceptions but there are + // pending exceptions on its queue, then something has gone + // wrong: + ASSERT(t->blocked_exceptions == END_BLOCKED_EXCEPTIONS_QUEUE + || (t->flags & TSO_BLOCKEX)); if (tmp == NULL) { // not alive (yet): leave this thread on the diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c index 442fee1f7c..11d5424431 100644 --- a/rts/sm/Sanity.c +++ b/rts/sm/Sanity.c @@ -307,7 +307,8 @@ checkClosure( StgClosure* p ) case IND_OLDGEN_PERM: case BLACKHOLE: case CAF_BLACKHOLE: - case STABLE_NAME: + case PRIM: + case MUT_PRIM: case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: case CONSTR_STATIC: @@ -416,39 +417,6 @@ checkClosure( StgClosure* p ) checkTSO((StgTSO *)p); return tso_sizeW((StgTSO *)p); - case TVAR_WATCH_QUEUE: - { - StgTVarWatchQueue *wq = (StgTVarWatchQueue *)p; - ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->next_queue_entry)); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->prev_queue_entry)); - return sizeofW(StgTVarWatchQueue); - } - - case INVARIANT_CHECK_QUEUE: - { - StgInvariantCheckQueue *q = (StgInvariantCheckQueue *)p; - ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->invariant)); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->my_execution)); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->next_queue_entry)); - return sizeofW(StgInvariantCheckQueue); - } - - case ATOMIC_INVARIANT: - { - StgAtomicInvariant *invariant = (StgAtomicInvariant *)p; - ASSERT(LOOKS_LIKE_CLOSURE_PTR(invariant->code)); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(invariant->last_execution)); - return sizeofW(StgAtomicInvariant); - } - - case TVAR: - { - StgTVar *tv = (StgTVar *)p; - ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->current_value)); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->first_watch_queue_entry)); - return sizeofW(StgTVar); - } - case TREC_CHUNK: { nat i; @@ -461,14 +429,6 @@ checkClosure( StgClosure* p ) } return sizeofW(StgTRecChunk); } - - case TREC_HEADER: - { - StgTRecHeader *trec = (StgTRecHeader *)p; - ASSERT(LOOKS_LIKE_CLOSURE_PTR(trec -> enclosing_trec)); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(trec -> current_chunk)); - return sizeofW(StgTRecHeader); - } default: barf("checkClosure (closure type %d)", info->type); diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c index 466b9b44f7..1b671a097c 100644 --- a/rts/sm/Scav.c +++ b/rts/sm/Scav.c @@ -82,7 +82,8 @@ scavengeTSO (StgTSO *tso) if ( tso->why_blocked == BlockedOnMVar || tso->why_blocked == BlockedOnBlackHole - || tso->why_blocked == BlockedOnException + || tso->why_blocked == BlockedOnMsgWakeup + || tso->why_blocked == BlockedOnMsgThrowTo ) { evacuate(&tso->block_info.closure); } @@ -394,7 +395,6 @@ scavenge_block (bdescr *bd) { StgPtr p, q; StgInfoTable *info; - generation *saved_evac_gen; rtsBool saved_eager_promotion; gen_workspace *ws; @@ -403,7 +403,6 @@ scavenge_block (bdescr *bd) gct->scan_bd = bd; gct->evac_gen = bd->gen; - saved_evac_gen = gct->evac_gen; saved_eager_promotion = gct->eager_promotion; gct->failed_to_evac = rtsFalse; @@ -532,7 +531,7 @@ scavenge_block (bdescr *bd) gen_obj: case CONSTR: case WEAK: - case STABLE_NAME: + case PRIM: { StgPtr end; @@ -672,42 +671,21 @@ scavenge_block (bdescr *bd) break; } - case TVAR_WATCH_QUEUE: + case MUT_PRIM: { - StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&wq->closure); - evacuate((StgClosure **)&wq->next_queue_entry); - evacuate((StgClosure **)&wq->prev_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - p += sizeofW(StgTVarWatchQueue); - break; - } + StgPtr end; - case TVAR: - { - StgTVar *tvar = ((StgTVar *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&tvar->current_value); - evacuate((StgClosure **)&tvar->first_watch_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - p += sizeofW(StgTVar); - break; - } + gct->eager_promotion = rtsFalse; - case TREC_HEADER: - { - StgTRecHeader *trec = ((StgTRecHeader *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&trec->enclosing_trec); - evacuate((StgClosure **)&trec->current_chunk); - evacuate((StgClosure **)&trec->invariants_to_check); - gct->evac_gen = saved_evac_gen; + end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs; + for (p = (P_)((StgClosure *)p)->payload; p < end; p++) { + evacuate((StgClosure **)p); + } + p += info->layout.payload.nptrs; + + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; // mutable - p += sizeofW(StgTRecHeader); - break; + break; } case TREC_CHUNK: @@ -715,44 +693,19 @@ scavenge_block (bdescr *bd) StgWord i; StgTRecChunk *tc = ((StgTRecChunk *) p); TRecEntry *e = &(tc -> entries[0]); - gct->evac_gen = 0; + gct->eager_promotion = rtsFalse; evacuate((StgClosure **)&tc->prev_chunk); for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) { evacuate((StgClosure **)&e->tvar); evacuate((StgClosure **)&e->expected_value); evacuate((StgClosure **)&e->new_value); } - gct->evac_gen = saved_evac_gen; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; // mutable p += sizeofW(StgTRecChunk); break; } - case ATOMIC_INVARIANT: - { - StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p); - gct->evac_gen = 0; - evacuate(&invariant->code); - evacuate((StgClosure **)&invariant->last_execution); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - p += sizeofW(StgAtomicInvariant); - break; - } - - case INVARIANT_CHECK_QUEUE: - { - StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&queue->invariant); - evacuate((StgClosure **)&queue->my_execution); - evacuate((StgClosure **)&queue->next_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - p += sizeofW(StgInvariantCheckQueue); - break; - } - default: barf("scavenge: unimplemented/strange closure type %d @ %p", info->type, p); @@ -806,10 +759,10 @@ scavenge_mark_stack(void) { StgPtr p, q; StgInfoTable *info; - generation *saved_evac_gen; + rtsBool saved_eager_promotion; gct->evac_gen = oldest_gen; - saved_evac_gen = gct->evac_gen; + saved_eager_promotion = gct->eager_promotion; while ((p = pop_mark_stack())) { @@ -822,8 +775,6 @@ scavenge_mark_stack(void) case MVAR_CLEAN: case MVAR_DIRTY: { - rtsBool saved_eager_promotion = gct->eager_promotion; - StgMVar *mvar = ((StgMVar *)p); gct->eager_promotion = rtsFalse; evacuate((StgClosure **)&mvar->head); @@ -906,7 +857,7 @@ scavenge_mark_stack(void) gen_obj: case CONSTR: case WEAK: - case STABLE_NAME: + case PRIM: { StgPtr end; @@ -938,8 +889,6 @@ scavenge_mark_stack(void) case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: { - rtsBool saved_eager_promotion = gct->eager_promotion; - gct->eager_promotion = rtsFalse; evacuate(&((StgMutVar *)p)->var); gct->eager_promotion = saved_eager_promotion; @@ -986,13 +935,10 @@ scavenge_mark_stack(void) case MUT_ARR_PTRS_DIRTY: // follow everything { - rtsBool saved_eager; - // We don't eagerly promote objects pointed to by a mutable // array, but if we find the array only points to objects in // the same or an older generation, we mark it "clean" and // avoid traversing it during minor GCs. - saved_eager = gct->eager_promotion; gct->eager_promotion = rtsFalse; scavenge_mut_arr_ptrs((StgMutArrPtrs *)p); @@ -1003,7 +949,7 @@ scavenge_mark_stack(void) ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info; } - gct->eager_promotion = saved_eager; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; // mutable anyhow. break; } @@ -1032,81 +978,39 @@ scavenge_mark_stack(void) break; } - case TVAR_WATCH_QUEUE: - { - StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&wq->closure); - evacuate((StgClosure **)&wq->next_queue_entry); - evacuate((StgClosure **)&wq->prev_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - - case TVAR: - { - StgTVar *tvar = ((StgTVar *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&tvar->current_value); - evacuate((StgClosure **)&tvar->first_watch_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - + case MUT_PRIM: + { + StgPtr end; + + gct->eager_promotion = rtsFalse; + + end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs; + for (p = (P_)((StgClosure *)p)->payload; p < end; p++) { + evacuate((StgClosure **)p); + } + + gct->eager_promotion = saved_eager_promotion; + gct->failed_to_evac = rtsTrue; // mutable + break; + } + case TREC_CHUNK: { StgWord i; StgTRecChunk *tc = ((StgTRecChunk *) p); TRecEntry *e = &(tc -> entries[0]); - gct->evac_gen = 0; + gct->eager_promotion = rtsFalse; evacuate((StgClosure **)&tc->prev_chunk); for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) { evacuate((StgClosure **)&e->tvar); evacuate((StgClosure **)&e->expected_value); evacuate((StgClosure **)&e->new_value); } - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - - case TREC_HEADER: - { - StgTRecHeader *trec = ((StgTRecHeader *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&trec->enclosing_trec); - evacuate((StgClosure **)&trec->current_chunk); - evacuate((StgClosure **)&trec->invariants_to_check); - gct->evac_gen = saved_evac_gen; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; // mutable break; } - case ATOMIC_INVARIANT: - { - StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p); - gct->evac_gen = 0; - evacuate(&invariant->code); - evacuate((StgClosure **)&invariant->last_execution); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - - case INVARIANT_CHECK_QUEUE: - { - StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&queue->invariant); - evacuate((StgClosure **)&queue->my_execution); - evacuate((StgClosure **)&queue->next_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - default: barf("scavenge_mark_stack: unimplemented/strange closure type %d @ %p", info->type, p); @@ -1133,9 +1037,11 @@ static rtsBool scavenge_one(StgPtr p) { const StgInfoTable *info; - generation *saved_evac_gen = gct->evac_gen; rtsBool no_luck; + rtsBool saved_eager_promotion; + saved_eager_promotion = gct->eager_promotion; + ASSERT(LOOKS_LIKE_CLOSURE_PTR(p)); info = get_itbl((StgClosure *)p); @@ -1144,8 +1050,6 @@ scavenge_one(StgPtr p) case MVAR_CLEAN: case MVAR_DIRTY: { - rtsBool saved_eager_promotion = gct->eager_promotion; - StgMVar *mvar = ((StgMVar *)p); gct->eager_promotion = rtsFalse; evacuate((StgClosure **)&mvar->head); @@ -1190,6 +1094,7 @@ scavenge_one(StgPtr p) case CONSTR_0_2: case CONSTR_2_0: case WEAK: + case PRIM: case IND_PERM: { StgPtr q, end; @@ -1204,7 +1109,6 @@ scavenge_one(StgPtr p) case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: { StgPtr q = p; - rtsBool saved_eager_promotion = gct->eager_promotion; gct->eager_promotion = rtsFalse; evacuate(&((StgMutVar *)p)->var); @@ -1254,13 +1158,10 @@ scavenge_one(StgPtr p) case MUT_ARR_PTRS_CLEAN: case MUT_ARR_PTRS_DIRTY: { - rtsBool saved_eager; - // We don't eagerly promote objects pointed to by a mutable // array, but if we find the array only points to objects in // the same or an older generation, we mark it "clean" and // avoid traversing it during minor GCs. - saved_eager = gct->eager_promotion; gct->eager_promotion = rtsFalse; scavenge_mut_arr_ptrs((StgMutArrPtrs *)p); @@ -1271,7 +1172,7 @@ scavenge_one(StgPtr p) ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info; } - gct->eager_promotion = saved_eager; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; break; } @@ -1298,81 +1199,40 @@ scavenge_one(StgPtr p) break; } - case TVAR_WATCH_QUEUE: - { - StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&wq->closure); - evacuate((StgClosure **)&wq->next_queue_entry); - evacuate((StgClosure **)&wq->prev_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } + case MUT_PRIM: + { + StgPtr end; + + gct->eager_promotion = rtsFalse; + + end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs; + for (p = (P_)((StgClosure *)p)->payload; p < end; p++) { + evacuate((StgClosure **)p); + } - case TVAR: - { - StgTVar *tvar = ((StgTVar *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&tvar->current_value); - evacuate((StgClosure **)&tvar->first_watch_queue_entry); - gct->evac_gen = saved_evac_gen; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; // mutable break; - } - case TREC_HEADER: - { - StgTRecHeader *trec = ((StgTRecHeader *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&trec->enclosing_trec); - evacuate((StgClosure **)&trec->current_chunk); - evacuate((StgClosure **)&trec->invariants_to_check); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } + } case TREC_CHUNK: { StgWord i; StgTRecChunk *tc = ((StgTRecChunk *) p); TRecEntry *e = &(tc -> entries[0]); - gct->evac_gen = 0; + gct->eager_promotion = rtsFalse; evacuate((StgClosure **)&tc->prev_chunk); for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) { evacuate((StgClosure **)&e->tvar); evacuate((StgClosure **)&e->expected_value); evacuate((StgClosure **)&e->new_value); } - gct->evac_gen = saved_evac_gen; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; // mutable break; } - case ATOMIC_INVARIANT: - { - StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p); - gct->evac_gen = 0; - evacuate(&invariant->code); - evacuate((StgClosure **)&invariant->last_execution); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - - case INVARIANT_CHECK_QUEUE: - { - StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&queue->invariant); - evacuate((StgClosure **)&queue->my_execution); - evacuate((StgClosure **)&queue->next_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - case IND: // IND can happen, for example, when the interpreter allocates // a gigantic AP closure (more than one block), which ends up @@ -1470,8 +1330,8 @@ scavenge_mutable_list(bdescr *bd, generation *gen) continue; case MUT_ARR_PTRS_DIRTY: { - rtsBool saved_eager; - saved_eager = gct->eager_promotion; + rtsBool saved_eager_promotion; + saved_eager_promotion = gct->eager_promotion; gct->eager_promotion = rtsFalse; scavenge_mut_arr_ptrs_marked((StgMutArrPtrs *)p); @@ -1482,7 +1342,7 @@ scavenge_mutable_list(bdescr *bd, generation *gen) ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info; } - gct->eager_promotion = saved_eager; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsFalse; recordMutableGen_GC((StgClosure *)p,gen->no); continue; |