diff options
Diffstat (limited to 'rts')
-rw-r--r-- | rts/Capability.c | 1 | ||||
-rw-r--r-- | rts/Capability.h | 13 | ||||
-rw-r--r-- | rts/Prelude.h | 2 | ||||
-rw-r--r-- | rts/PrimOps.cmm | 8 | ||||
-rw-r--r-- | rts/RtsAPI.c | 75 | ||||
-rw-r--r-- | rts/Schedule.c | 15 | ||||
-rw-r--r-- | rts/Task.c | 11 | ||||
-rw-r--r-- | rts/Task.h | 10 | ||||
-rw-r--r-- | rts/Threads.c | 79 | ||||
-rw-r--r-- | rts/Threads.h | 2 | ||||
-rw-r--r-- | rts/package.conf.in | 2 |
11 files changed, 208 insertions, 10 deletions
diff --git a/rts/Capability.c b/rts/Capability.c index 681797b409..6979c637bc 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -266,6 +266,7 @@ initCapability (Capability *cap, uint32_t i) cap->returning_tasks_tl = NULL; cap->n_returning_tasks = 0; cap->inbox = (Message*)END_TSO_QUEUE; + cap->putMVars = NULL; cap->sparks = allocSparkPool(); cap->spark_stats.created = 0; cap->spark_stats.dud = 0; diff --git a/rts/Capability.h b/rts/Capability.h index 8e0288b15e..bbf026279f 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -123,6 +123,7 @@ struct Capability_ { // returning_tasks_{hd,tl} // wakeup_queue // inbox + // putMVars Mutex lock; // Tasks waiting to return from a foreign call, or waiting to make @@ -138,6 +139,10 @@ struct Capability_ { // Locks required: cap->lock Message *inbox; + // putMVars are really messages, but they're allocated with malloc() so they + // can't go on the inbox queue: the GC would get confused. + struct PutMVar_ *putMVars; + SparkPool *sparks; // Stats on spark creation/conversion @@ -378,6 +383,11 @@ extern uint32_t numa_map[MAX_NUMA_NODES]; Messages -------------------------------------------------------------------------- */ +typedef struct PutMVar_ { + StgStablePtr mvar; + struct PutMVar_ *link; +} PutMVar; + #ifdef THREADED_RTS INLINE_HEADER rtsBool emptyInbox(Capability *cap); @@ -459,7 +469,8 @@ contextSwitchCapability (Capability *cap) INLINE_HEADER rtsBool emptyInbox(Capability *cap) { - return (cap->inbox == (Message*)END_TSO_QUEUE); + return (cap->inbox == (Message*)END_TSO_QUEUE && + cap->putMVars == NULL); } #endif diff --git a/rts/Prelude.h b/rts/Prelude.h index ae1e9cb266..58de23013b 100644 --- a/rts/Prelude.h +++ b/rts/Prelude.h @@ -24,6 +24,7 @@ * modules these names are defined in. */ +PRELUDE_CLOSURE(ghczmprim_GHCziTuple_Z0T_closure); PRELUDE_CLOSURE(ghczmprim_GHCziTypes_True_closure); PRELUDE_CLOSURE(ghczmprim_GHCziTypes_False_closure); PRELUDE_CLOSURE(base_GHCziPack_unpackCString_closure); @@ -87,6 +88,7 @@ PRELUDE_INFO(base_GHCziWord_W64zh_con_info); PRELUDE_INFO(base_GHCziStable_StablePtr_static_info); PRELUDE_INFO(base_GHCziStable_StablePtr_con_info); +#define Unit_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTuple_Z0T_closure) #define True_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_True_closure) #define False_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_False_closure) #define unpackCString_closure DLL_IMPORT_DATA_REF(base_GHCziPack_unpackCString_closure) diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index b468c33df6..02a7dafec3 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -1739,6 +1739,13 @@ loop: } +// NOTE: there is another implementation of this function in +// Threads.c:performTryPutMVar(). Keep them in sync! It was +// measurably slower to call the C function from here (70% for a +// tight loop doing tryPutMVar#). +// +// TODO: we could kill the duplication by making tryPutMVar# into an +// inline primop that expands into a C call to performTryPutMVar(). stg_tryPutMVarzh ( P_ mvar, /* :: MVar a */ P_ val, /* :: a */ ) { @@ -1812,6 +1819,7 @@ loop: return (1); } + stg_readMVarzh ( P_ mvar, /* :: MVar a */ ) { W_ val, info, tso, q; diff --git a/rts/RtsAPI.c b/rts/RtsAPI.c index 34d68c7e3b..e72430743e 100644 --- a/rts/RtsAPI.c +++ b/rts/RtsAPI.c @@ -16,6 +16,7 @@ #include "Schedule.h" #include "Capability.h" #include "Stable.h" +#include "Threads.h" #include "Weak.h" /* ---------------------------------------------------------------------------- @@ -620,3 +621,77 @@ void rts_done (void) freeMyTask(); } +/* ----------------------------------------------------------------------------- + tryPutMVar from outside Haskell + + The C call + + hs_try_putmvar(cap, mvar) + + is equivalent to the Haskell call + + tryPutMVar mvar () + + but it is + + * non-blocking: takes a bounded, short, amount of time + * asynchronous: the actual putMVar may be performed after the + call returns. That's why hs_try_putmvar() doesn't return a + result to say whether the put succeeded. + + NOTE: this call transfers ownership of the StablePtr to the RTS, which will + free it after the tryPutMVar has taken place. The reason is that otherwise, + it would be very difficult for the caller to arrange to free the StablePtr + in all circumstances. + + For more details, see the section "Waking up Haskell threads from C" in the + User's Guide. + -------------------------------------------------------------------------- */ + +void hs_try_putmvar (/* in */ int capability, + /* in */ HsStablePtr mvar) +{ + Task *task = getTask(); + Capability *cap; + + if (capability < 0) { + capability = task->preferred_capability; + if (capability < 0) { + capability = 0; + } + } + cap = capabilities[capability % enabled_capabilities]; + +#if !defined(THREADED_RTS) + + performTryPutMVar(cap, (StgMVar*)deRefStablePtr(mvar), Unit_closure); + freeStablePtr(mvar); + +#else + + ACQUIRE_LOCK(&cap->lock); + // If the capability is free, we can perform the tryPutMVar immediately + if (cap->running_task == NULL) { + cap->running_task = task; + task->cap = cap; + RELEASE_LOCK(&cap->lock); + + performTryPutMVar(cap, (StgMVar*)deRefStablePtr(mvar), Unit_closure); + + freeStablePtr(mvar); + + // Wake up the capability, which will start running the thread that we + // just awoke (if there was one). + releaseCapability(cap); + } else { + PutMVar *p = stgMallocBytes(sizeof(PutMVar),"hs_try_putmvar"); + // We cannot deref the StablePtr if we don't have a capability, + // so we have to store it and deref it later. + p->mvar = mvar; + p->link = cap->putMVars; + cap->putMVars = p; + RELEASE_LOCK(&cap->lock); + } + +#endif +} diff --git a/rts/Schedule.c b/rts/Schedule.c index 544b9c2115..611d70411f 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -723,7 +723,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) { if (!emptyRunQueue(cap0) || cap0->n_returning_tasks != 0 - || cap0->inbox != (Message*)END_TSO_QUEUE) { + || !emptyInbox(cap0)) { // it already has some work, we just grabbed it at // the wrong moment. Or maybe it's deadlocked! releaseCapability(cap0); @@ -982,6 +982,7 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS) { #if defined(THREADED_RTS) Message *m, *next; + PutMVar *p, *pnext; int r; Capability *cap = *pcap; @@ -1006,7 +1007,9 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS) if (r != 0) return; m = cap->inbox; + p = cap->putMVars; cap->inbox = (Message*)END_TSO_QUEUE; + cap->putMVars = NULL; RELEASE_LOCK(&cap->lock); @@ -1015,10 +1018,20 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS) executeMessage(cap, m); m = next; } + + while (p != NULL) { + pnext = p->link; + performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar), + Unit_closure); + freeStablePtr(p->mvar); + stgFree(p); + p = pnext; + } } #endif } + /* ---------------------------------------------------------------------------- * Activate spark threads (THREADED_RTS) * ------------------------------------------------------------------------- */ diff --git a/rts/Task.c b/rts/Task.c index 9a658e019c..253520f909 100644 --- a/rts/Task.c +++ b/rts/Task.c @@ -36,7 +36,6 @@ uint32_t peakWorkerCount; static int tasksInitialized = 0; static void freeTask (Task *task); -static Task * allocTask (void); static Task * newTask (rtsBool); #if defined(THREADED_RTS) @@ -117,8 +116,7 @@ freeTaskManager (void) return tasksRunning; } -static Task * -allocTask (void) +Task* getTask (void) { Task *task; @@ -209,7 +207,7 @@ newTask (rtsBool worker) task->cap = NULL; task->worker = worker; - task->stopped = rtsFalse; + task->stopped = rtsTrue; task->running_finalizers = rtsFalse; task->n_spare_incalls = 0; task->spare_incalls = NULL; @@ -304,7 +302,7 @@ newBoundTask (void) stg_exit(EXIT_FAILURE); } - task = allocTask(); + task = getTask(); task->stopped = rtsFalse; @@ -452,6 +450,7 @@ startWorkerTask (Capability *cap) // A worker always gets a fresh Task structure. task = newTask(rtsTrue); + task->stopped = rtsFalse; // The lock here is to synchronise with taskStart(), to make sure // that we have finished setting up the Task structure before the @@ -499,7 +498,7 @@ void rts_setInCallCapability ( int preferred_capability, int affinity USED_IF_THREADS) { - Task *task = allocTask(); + Task *task = getTask(); task->preferred_capability = preferred_capability; #ifdef THREADED_RTS diff --git a/rts/Task.h b/rts/Task.h index 558f543fac..93234591ba 100644 --- a/rts/Task.h +++ b/rts/Task.h @@ -150,7 +150,8 @@ typedef struct Task_ { struct InCall_ *spare_incalls; rtsBool worker; // == rtsTrue if this is a worker Task - rtsBool stopped; // this task has stopped or exited Haskell + rtsBool stopped; // == rtsTrue between newBoundTask and + // boundTaskExiting, or in a worker Task. // So that we can detect when a finalizer illegally calls back into Haskell rtsBool running_finalizers; @@ -205,7 +206,12 @@ uint32_t freeTaskManager (void); // thread-local storage and will remain even after boundTaskExiting() // has been called; to free the memory, see freeMyTask(). // -Task *newBoundTask (void); +Task* newBoundTask (void); + +// Return the current OS thread's Task, which is created if it doesn't already +// exist. After you have finished using RTS APIs, you should call freeMyTask() +// to release this thread's Task. +Task* getTask (void); // The current task is a bound task that is exiting. // diff --git a/rts/Threads.c b/rts/Threads.c index 7317249e11..1782da6114 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -744,6 +744,85 @@ threadStackUnderflow (Capability *cap, StgTSO *tso) } /* ---------------------------------------------------------------------------- + Implementation of tryPutMVar# + + NOTE: this should be kept in sync with stg_tryPutMVarzh in PrimOps.cmm + ------------------------------------------------------------------------- */ + +rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value) +{ + const StgInfoTable *info; + StgMVarTSOQueue *q; + StgTSO *tso; + + info = lockClosure((StgClosure*)mvar); + + if (mvar->value != &stg_END_TSO_QUEUE_closure) { +#if defined(THREADED_RTS) + unlockClosure((StgClosure*)mvar, info); +#endif + return rtsFalse; + } + + q = mvar->head; +loop: + if (q == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) { + /* No further takes, the MVar is now full. */ + if (info == &stg_MVAR_CLEAN_info) { + dirty_MVAR(&cap->r, (StgClosure*)mvar); + } + + mvar->value = value; + unlockClosure((StgClosure*)mvar, &stg_MVAR_DIRTY_info); + return rtsTrue; + } + if (q->header.info == &stg_IND_info || + q->header.info == &stg_MSG_NULL_info) { + q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee; + goto loop; + } + + // There are takeMVar(s) waiting: wake up the first one + tso = q->tso; + mvar->head = q->link; + if (mvar->head == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) { + mvar->tail = (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure; + } + + ASSERT(tso->block_info.closure == (StgClosure*)mvar); + // save why_blocked here, because waking up the thread destroys + // this information + StgWord why_blocked = tso->why_blocked; + + // actually perform the takeMVar + StgStack* stack = tso->stackobj; + stack->sp[1] = (W_)value; + stack->sp[0] = (W_)&stg_ret_p_info; + + // indicate that the MVar operation has now completed. + tso->_link = (StgTSO*)&stg_END_TSO_QUEUE_closure; + + if (stack->dirty == 0) { + dirty_STACK(cap, stack); + } + + tryWakeupThread(cap, tso); + + // If it was an readMVar, then we can still do work, + // so loop back. (XXX: This could take a while) + if (why_blocked == BlockedOnMVarRead) { + q = ((StgMVarTSOQueue*)q)->link; + goto loop; + } + + ASSERT(why_blocked == BlockedOnMVar); + + unlockClosure((StgClosure*)mvar, info); + + return rtsTrue; +} + +/* ---------------------------------------------------------------------------- * Debugging: why is a thread blocked * ------------------------------------------------------------------------- */ diff --git a/rts/Threads.h b/rts/Threads.h index 01c493e53d..4588008e28 100644 --- a/rts/Threads.h +++ b/rts/Threads.h @@ -41,6 +41,8 @@ StgBool isThreadBound (StgTSO* tso); void threadStackOverflow (Capability *cap, StgTSO *tso); W_ threadStackUnderflow (Capability *cap, StgTSO *tso); +rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value); + #ifdef DEBUG void printThreadBlockage (StgTSO *tso); void printThreadStatus (StgTSO *t); diff --git a/rts/package.conf.in b/rts/package.conf.in index 65aa5c333b..03848c45e3 100644 --- a/rts/package.conf.in +++ b/rts/package.conf.in @@ -103,6 +103,7 @@ ld-options: , "-Wl,-u,_base_GHCziPtr_Ptr_con_info" , "-Wl,-u,_base_GHCziPtr_FunPtr_con_info" , "-Wl,-u,_base_GHCziStable_StablePtr_con_info" + , "-Wl,-u,_ghczmprim_GHCziTuple_Z0T_closure" , "-Wl,-u,_ghczmprim_GHCziTypes_False_closure" , "-Wl,-u,_ghczmprim_GHCziTypes_True_closure" , "-Wl,-u,_base_GHCziPack_unpackCString_closure" @@ -199,6 +200,7 @@ ld-options: , "-Wl,-u,base_GHCziPtr_Ptr_con_info" , "-Wl,-u,base_GHCziPtr_FunPtr_con_info" , "-Wl,-u,base_GHCziStable_StablePtr_con_info" + , "-Wl,-u,ghczmprim_GHCziTuple_Z0T_closure" , "-Wl,-u,ghczmprim_GHCziTypes_False_closure" , "-Wl,-u,ghczmprim_GHCziTypes_True_closure" , "-Wl,-u,base_GHCziPack_unpackCString_closure" |