diff options
24 files changed, 715 insertions, 10 deletions
diff --git a/docs/users_guide/ffi-chap.rst b/docs/users_guide/ffi-chap.rst index 63324d1ff3..f46d902b3d 100644 --- a/docs/users_guide/ffi-chap.rst +++ b/docs/users_guide/ffi-chap.rst @@ -616,6 +616,125 @@ the threads have exited first. (Unofficially, if you want to use this fast and loose version of ``hs_exit()``, then call ``shutdownHaskellAndExit()`` instead). +.. _hs_try_putmvar: + +Waking up Haskell threads from C +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Sometimes we want to be able to wake up a Haskell thread from some C +code. For example, when using a callback-based C API, we register a C +callback and then we need to wait for the callback to run. + +One way to do this is to create a ``foreign export`` that will do +whatever needs to be done to wake up the Haskell thread - perhaps +``putMVar`` - and then call this from our C callback. There are a +couple of problems with this: + +1. Calling a foreign export has a lot of overhead: it creates a + complete new Haskell thread, for example. +2. The call may block for a long time if a GC is in progress. We + can't use this method if the C API we're calling doesn't allow + blocking in the callback. + +For these reasons GHC provides an external API to ``tryPutMVar``, +``hs_try_putmvar``, which you can use to cheaply and asynchronously +wake up a Haskell thread from C/C++. + +.. code-block:: c + + void hs_try_putmvar (int capability, HsStablePtr sp); + +The C call ``hs_try_putmvar(cap, mvar)`` is equivalent to the Haskell +call ``tryPutMVar mvar ()``, except that it is + +* non-blocking: takes a bounded, short, amount of time + +* asynchronous: the actual putMVar may be performed after the call + returns (for example, if the RTS is currently garbage collecting). + That's why ``hs_try_putmvar()`` doesn't return a result to say + whether the put succeeded. It is your responsibility to ensure that + the ``MVar`` is empty; if it is full, ``hs_try_putmvar()`` will have + no effect. + +**Example**. Suppose we have a C/C++ function to call that will return and then +invoke a callback at some point in the future, passing us some data. +We want to wait in Haskell for the callback to be called, and retrieve +the data. We can do it like this: + +.. code-block:: haskell + + import GHC.Conc (newStablePtrPrimMVar, PrimMVar) + + makeExternalCall = mask_ $ do + mvar <- newEmptyMVar + sp <- newStablePtrPrimMVar mvar + fp <- mallocForeignPtr + withForeignPtr fp $ \presult -> do + cap <- threadCapability =<< myThreadId + scheduleCallback sp cap presult + takeMVar mvar `onException` + forkIO (do takeMVar mvar; touchForeignPtr fp) + peek presult + + foreign import ccall "scheduleCallback" + scheduleCallback :: StablePtr PrimMVar + -> Int + -> Ptr Result + -> IO () + +And inside ``scheduleCallback``, we create a callback that will in due +course store the result data in the ``Ptr Result``, and then call +``hs_try_putmvar()``. + +There are a few things to note here. + +* There's a special function to create the ``StablePtr``: + ``newStablePtrPrimMVar``, because the RTS needs a ``StablePtr`` to + the primitive ``MVar#`` object, and we can't create that directly. + Do *not* just use ``newStablePtr`` on the ``MVar``: your program + will crash. + +* The ``StablePtr`` is freed by ``hs_try_putmvar()``. This is because + it would otherwise be difficult to arrange to free the ``StablePtr`` + reliably: we can't free it in Haskell, because if the ``takeMVar`` + is interrupted by an asynchronous exception, then the callback will + fire at a later time. We can't free it in C, because we don't know + when to free it (not when ``hs_try_putmvar()`` returns, because that + is an async call that uses the ``StablePtr`` at some time in the + future). + +* The ``mask_`` is to avoid asynchronous exceptions before the + ``scheduleCallback`` call, which would leak the ``StablePtr``. + +* We find out the current capability number and pass it to C. This is + passed back to ``hs_try_putmvar``, and helps the RTS to know which + capability it should try to perform the ``tryPutMVar`` on. If you + don't care, you can pass ``-1`` for the capability to + ``hs_try_putmvar``, and it will pick an arbitrary one. + + Picking the right capability will help avoid unnecessary context + switches. Ideally you should pass the capability that the thread + that will be woken up last ran on, which you can find by calling + ``threadCapability`` in Haskell. + +* If you want to also pass some data back from the C callback to + Haskell, this is best done by first allocating some memory in + Haskell to receive the data, and passing the address to C, as we did + in the above example. + +* ``takeMVar`` can be interrupted by an asynchronous exception. If + this happens, the callback in C will still run at some point in the + future, will still write the result, and will still call + ``hs_try_putmvar()``. Therefore we have to arrange that the memory + for the result stays alive until the callback has run, so if an + exception is thrown during ``takeMVar`` we fork another thread to + wait for the callback and hold the memory alive using + ``touchForeignPtr``. + +For a fully working example, see +``testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs`` in the +GHC source tree. + .. _ffi-floating-point: Floating point and the FFI diff --git a/includes/HsFFI.h b/includes/HsFFI.h index 4f015b6e25..cdf451037c 100644 --- a/includes/HsFFI.h +++ b/includes/HsFFI.h @@ -113,8 +113,12 @@ extern StgPtr hs_spt_lookup(StgWord64 key[2]); extern int hs_spt_keys(StgPtr keys[], int szKeys); extern int hs_spt_key_count (void); +extern void hs_try_putmvar (int capability, HsStablePtr sp); + /* -------------------------------------------------------------------------- */ + + #ifdef __cplusplus } #endif diff --git a/libraries/base/GHC/Conc.hs b/libraries/base/GHC/Conc.hs index 38fac431b0..afc0a97d30 100644 --- a/libraries/base/GHC/Conc.hs +++ b/libraries/base/GHC/Conc.hs @@ -50,6 +50,8 @@ module GHC.Conc , threadStatus , threadCapability + , newStablePtrPrimMVar, PrimMVar + -- * Waiting , threadDelay , registerDelay diff --git a/libraries/base/GHC/Conc/Sync.hs b/libraries/base/GHC/Conc/Sync.hs index 5476950ec7..5986379cb3 100644 --- a/libraries/base/GHC/Conc/Sync.hs +++ b/libraries/base/GHC/Conc/Sync.hs @@ -59,6 +59,8 @@ module GHC.Conc.Sync , threadStatus , threadCapability + , newStablePtrPrimMVar, PrimMVar + -- * Allocation counter and quota , setAllocationCounter , getAllocationCounter @@ -117,6 +119,7 @@ import GHC.MVar import GHC.Ptr import GHC.Real ( fromIntegral ) import GHC.Show ( Show(..), showString ) +import GHC.Stable ( StablePtr(..) ) import GHC.Weak infixr 0 `par`, `pseq` @@ -615,6 +618,17 @@ mkWeakThreadId t@(ThreadId t#) = IO $ \s -> (# s1, w #) -> (# s1, Weak w #) +data PrimMVar + +-- | Make a StablePtr that can be passed to the C function +-- @hs_try_putmvar()@. The RTS wants a 'StablePtr' to the underlying +-- 'MVar#', but a 'StablePtr#' can only refer to lifted types, so we +-- have to cheat by coercing. +newStablePtrPrimMVar :: MVar () -> IO (StablePtr PrimMVar) +newStablePtrPrimMVar (MVar m) = IO $ \s0 -> + case makeStablePtr# (unsafeCoerce# m :: PrimMVar) s0 of + (# s1, sp #) -> (# s1, StablePtr sp #) + ----------------------------------------------------------------------------- -- Transactional heap operations ----------------------------------------------------------------------------- diff --git a/rts/Capability.c b/rts/Capability.c index 681797b409..6979c637bc 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -266,6 +266,7 @@ initCapability (Capability *cap, uint32_t i) cap->returning_tasks_tl = NULL; cap->n_returning_tasks = 0; cap->inbox = (Message*)END_TSO_QUEUE; + cap->putMVars = NULL; cap->sparks = allocSparkPool(); cap->spark_stats.created = 0; cap->spark_stats.dud = 0; diff --git a/rts/Capability.h b/rts/Capability.h index 8e0288b15e..bbf026279f 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -123,6 +123,7 @@ struct Capability_ { // returning_tasks_{hd,tl} // wakeup_queue // inbox + // putMVars Mutex lock; // Tasks waiting to return from a foreign call, or waiting to make @@ -138,6 +139,10 @@ struct Capability_ { // Locks required: cap->lock Message *inbox; + // putMVars are really messages, but they're allocated with malloc() so they + // can't go on the inbox queue: the GC would get confused. + struct PutMVar_ *putMVars; + SparkPool *sparks; // Stats on spark creation/conversion @@ -378,6 +383,11 @@ extern uint32_t numa_map[MAX_NUMA_NODES]; Messages -------------------------------------------------------------------------- */ +typedef struct PutMVar_ { + StgStablePtr mvar; + struct PutMVar_ *link; +} PutMVar; + #ifdef THREADED_RTS INLINE_HEADER rtsBool emptyInbox(Capability *cap); @@ -459,7 +469,8 @@ contextSwitchCapability (Capability *cap) INLINE_HEADER rtsBool emptyInbox(Capability *cap) { - return (cap->inbox == (Message*)END_TSO_QUEUE); + return (cap->inbox == (Message*)END_TSO_QUEUE && + cap->putMVars == NULL); } #endif diff --git a/rts/Prelude.h b/rts/Prelude.h index ae1e9cb266..58de23013b 100644 --- a/rts/Prelude.h +++ b/rts/Prelude.h @@ -24,6 +24,7 @@ * modules these names are defined in. */ +PRELUDE_CLOSURE(ghczmprim_GHCziTuple_Z0T_closure); PRELUDE_CLOSURE(ghczmprim_GHCziTypes_True_closure); PRELUDE_CLOSURE(ghczmprim_GHCziTypes_False_closure); PRELUDE_CLOSURE(base_GHCziPack_unpackCString_closure); @@ -87,6 +88,7 @@ PRELUDE_INFO(base_GHCziWord_W64zh_con_info); PRELUDE_INFO(base_GHCziStable_StablePtr_static_info); PRELUDE_INFO(base_GHCziStable_StablePtr_con_info); +#define Unit_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTuple_Z0T_closure) #define True_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_True_closure) #define False_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_False_closure) #define unpackCString_closure DLL_IMPORT_DATA_REF(base_GHCziPack_unpackCString_closure) diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index b468c33df6..02a7dafec3 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -1739,6 +1739,13 @@ loop: } +// NOTE: there is another implementation of this function in +// Threads.c:performTryPutMVar(). Keep them in sync! It was +// measurably slower to call the C function from here (70% for a +// tight loop doing tryPutMVar#). +// +// TODO: we could kill the duplication by making tryPutMVar# into an +// inline primop that expands into a C call to performTryPutMVar(). stg_tryPutMVarzh ( P_ mvar, /* :: MVar a */ P_ val, /* :: a */ ) { @@ -1812,6 +1819,7 @@ loop: return (1); } + stg_readMVarzh ( P_ mvar, /* :: MVar a */ ) { W_ val, info, tso, q; diff --git a/rts/RtsAPI.c b/rts/RtsAPI.c index 34d68c7e3b..e72430743e 100644 --- a/rts/RtsAPI.c +++ b/rts/RtsAPI.c @@ -16,6 +16,7 @@ #include "Schedule.h" #include "Capability.h" #include "Stable.h" +#include "Threads.h" #include "Weak.h" /* ---------------------------------------------------------------------------- @@ -620,3 +621,77 @@ void rts_done (void) freeMyTask(); } +/* ----------------------------------------------------------------------------- + tryPutMVar from outside Haskell + + The C call + + hs_try_putmvar(cap, mvar) + + is equivalent to the Haskell call + + tryPutMVar mvar () + + but it is + + * non-blocking: takes a bounded, short, amount of time + * asynchronous: the actual putMVar may be performed after the + call returns. That's why hs_try_putmvar() doesn't return a + result to say whether the put succeeded. + + NOTE: this call transfers ownership of the StablePtr to the RTS, which will + free it after the tryPutMVar has taken place. The reason is that otherwise, + it would be very difficult for the caller to arrange to free the StablePtr + in all circumstances. + + For more details, see the section "Waking up Haskell threads from C" in the + User's Guide. + -------------------------------------------------------------------------- */ + +void hs_try_putmvar (/* in */ int capability, + /* in */ HsStablePtr mvar) +{ + Task *task = getTask(); + Capability *cap; + + if (capability < 0) { + capability = task->preferred_capability; + if (capability < 0) { + capability = 0; + } + } + cap = capabilities[capability % enabled_capabilities]; + +#if !defined(THREADED_RTS) + + performTryPutMVar(cap, (StgMVar*)deRefStablePtr(mvar), Unit_closure); + freeStablePtr(mvar); + +#else + + ACQUIRE_LOCK(&cap->lock); + // If the capability is free, we can perform the tryPutMVar immediately + if (cap->running_task == NULL) { + cap->running_task = task; + task->cap = cap; + RELEASE_LOCK(&cap->lock); + + performTryPutMVar(cap, (StgMVar*)deRefStablePtr(mvar), Unit_closure); + + freeStablePtr(mvar); + + // Wake up the capability, which will start running the thread that we + // just awoke (if there was one). + releaseCapability(cap); + } else { + PutMVar *p = stgMallocBytes(sizeof(PutMVar),"hs_try_putmvar"); + // We cannot deref the StablePtr if we don't have a capability, + // so we have to store it and deref it later. + p->mvar = mvar; + p->link = cap->putMVars; + cap->putMVars = p; + RELEASE_LOCK(&cap->lock); + } + +#endif +} diff --git a/rts/Schedule.c b/rts/Schedule.c index 544b9c2115..611d70411f 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -723,7 +723,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) { if (!emptyRunQueue(cap0) || cap0->n_returning_tasks != 0 - || cap0->inbox != (Message*)END_TSO_QUEUE) { + || !emptyInbox(cap0)) { // it already has some work, we just grabbed it at // the wrong moment. Or maybe it's deadlocked! releaseCapability(cap0); @@ -982,6 +982,7 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS) { #if defined(THREADED_RTS) Message *m, *next; + PutMVar *p, *pnext; int r; Capability *cap = *pcap; @@ -1006,7 +1007,9 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS) if (r != 0) return; m = cap->inbox; + p = cap->putMVars; cap->inbox = (Message*)END_TSO_QUEUE; + cap->putMVars = NULL; RELEASE_LOCK(&cap->lock); @@ -1015,10 +1018,20 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS) executeMessage(cap, m); m = next; } + + while (p != NULL) { + pnext = p->link; + performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar), + Unit_closure); + freeStablePtr(p->mvar); + stgFree(p); + p = pnext; + } } #endif } + /* ---------------------------------------------------------------------------- * Activate spark threads (THREADED_RTS) * ------------------------------------------------------------------------- */ diff --git a/rts/Task.c b/rts/Task.c index 9a658e019c..253520f909 100644 --- a/rts/Task.c +++ b/rts/Task.c @@ -36,7 +36,6 @@ uint32_t peakWorkerCount; static int tasksInitialized = 0; static void freeTask (Task *task); -static Task * allocTask (void); static Task * newTask (rtsBool); #if defined(THREADED_RTS) @@ -117,8 +116,7 @@ freeTaskManager (void) return tasksRunning; } -static Task * -allocTask (void) +Task* getTask (void) { Task *task; @@ -209,7 +207,7 @@ newTask (rtsBool worker) task->cap = NULL; task->worker = worker; - task->stopped = rtsFalse; + task->stopped = rtsTrue; task->running_finalizers = rtsFalse; task->n_spare_incalls = 0; task->spare_incalls = NULL; @@ -304,7 +302,7 @@ newBoundTask (void) stg_exit(EXIT_FAILURE); } - task = allocTask(); + task = getTask(); task->stopped = rtsFalse; @@ -452,6 +450,7 @@ startWorkerTask (Capability *cap) // A worker always gets a fresh Task structure. task = newTask(rtsTrue); + task->stopped = rtsFalse; // The lock here is to synchronise with taskStart(), to make sure // that we have finished setting up the Task structure before the @@ -499,7 +498,7 @@ void rts_setInCallCapability ( int preferred_capability, int affinity USED_IF_THREADS) { - Task *task = allocTask(); + Task *task = getTask(); task->preferred_capability = preferred_capability; #ifdef THREADED_RTS diff --git a/rts/Task.h b/rts/Task.h index 558f543fac..93234591ba 100644 --- a/rts/Task.h +++ b/rts/Task.h @@ -150,7 +150,8 @@ typedef struct Task_ { struct InCall_ *spare_incalls; rtsBool worker; // == rtsTrue if this is a worker Task - rtsBool stopped; // this task has stopped or exited Haskell + rtsBool stopped; // == rtsTrue between newBoundTask and + // boundTaskExiting, or in a worker Task. // So that we can detect when a finalizer illegally calls back into Haskell rtsBool running_finalizers; @@ -205,7 +206,12 @@ uint32_t freeTaskManager (void); // thread-local storage and will remain even after boundTaskExiting() // has been called; to free the memory, see freeMyTask(). // -Task *newBoundTask (void); +Task* newBoundTask (void); + +// Return the current OS thread's Task, which is created if it doesn't already +// exist. After you have finished using RTS APIs, you should call freeMyTask() +// to release this thread's Task. +Task* getTask (void); // The current task is a bound task that is exiting. // diff --git a/rts/Threads.c b/rts/Threads.c index 7317249e11..1782da6114 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -744,6 +744,85 @@ threadStackUnderflow (Capability *cap, StgTSO *tso) } /* ---------------------------------------------------------------------------- + Implementation of tryPutMVar# + + NOTE: this should be kept in sync with stg_tryPutMVarzh in PrimOps.cmm + ------------------------------------------------------------------------- */ + +rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value) +{ + const StgInfoTable *info; + StgMVarTSOQueue *q; + StgTSO *tso; + + info = lockClosure((StgClosure*)mvar); + + if (mvar->value != &stg_END_TSO_QUEUE_closure) { +#if defined(THREADED_RTS) + unlockClosure((StgClosure*)mvar, info); +#endif + return rtsFalse; + } + + q = mvar->head; +loop: + if (q == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) { + /* No further takes, the MVar is now full. */ + if (info == &stg_MVAR_CLEAN_info) { + dirty_MVAR(&cap->r, (StgClosure*)mvar); + } + + mvar->value = value; + unlockClosure((StgClosure*)mvar, &stg_MVAR_DIRTY_info); + return rtsTrue; + } + if (q->header.info == &stg_IND_info || + q->header.info == &stg_MSG_NULL_info) { + q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee; + goto loop; + } + + // There are takeMVar(s) waiting: wake up the first one + tso = q->tso; + mvar->head = q->link; + if (mvar->head == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) { + mvar->tail = (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure; + } + + ASSERT(tso->block_info.closure == (StgClosure*)mvar); + // save why_blocked here, because waking up the thread destroys + // this information + StgWord why_blocked = tso->why_blocked; + + // actually perform the takeMVar + StgStack* stack = tso->stackobj; + stack->sp[1] = (W_)value; + stack->sp[0] = (W_)&stg_ret_p_info; + + // indicate that the MVar operation has now completed. + tso->_link = (StgTSO*)&stg_END_TSO_QUEUE_closure; + + if (stack->dirty == 0) { + dirty_STACK(cap, stack); + } + + tryWakeupThread(cap, tso); + + // If it was an readMVar, then we can still do work, + // so loop back. (XXX: This could take a while) + if (why_blocked == BlockedOnMVarRead) { + q = ((StgMVarTSOQueue*)q)->link; + goto loop; + } + + ASSERT(why_blocked == BlockedOnMVar); + + unlockClosure((StgClosure*)mvar, info); + + return rtsTrue; +} + +/* ---------------------------------------------------------------------------- * Debugging: why is a thread blocked * ------------------------------------------------------------------------- */ diff --git a/rts/Threads.h b/rts/Threads.h index 01c493e53d..4588008e28 100644 --- a/rts/Threads.h +++ b/rts/Threads.h @@ -41,6 +41,8 @@ StgBool isThreadBound (StgTSO* tso); void threadStackOverflow (Capability *cap, StgTSO *tso); W_ threadStackUnderflow (Capability *cap, StgTSO *tso); +rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value); + #ifdef DEBUG void printThreadBlockage (StgTSO *tso); void printThreadStatus (StgTSO *t); diff --git a/rts/package.conf.in b/rts/package.conf.in index 65aa5c333b..03848c45e3 100644 --- a/rts/package.conf.in +++ b/rts/package.conf.in @@ -103,6 +103,7 @@ ld-options: , "-Wl,-u,_base_GHCziPtr_Ptr_con_info" , "-Wl,-u,_base_GHCziPtr_FunPtr_con_info" , "-Wl,-u,_base_GHCziStable_StablePtr_con_info" + , "-Wl,-u,_ghczmprim_GHCziTuple_Z0T_closure" , "-Wl,-u,_ghczmprim_GHCziTypes_False_closure" , "-Wl,-u,_ghczmprim_GHCziTypes_True_closure" , "-Wl,-u,_base_GHCziPack_unpackCString_closure" @@ -199,6 +200,7 @@ ld-options: , "-Wl,-u,base_GHCziPtr_Ptr_con_info" , "-Wl,-u,base_GHCziPtr_FunPtr_con_info" , "-Wl,-u,base_GHCziStable_StablePtr_con_info" + , "-Wl,-u,ghczmprim_GHCziTuple_Z0T_closure" , "-Wl,-u,ghczmprim_GHCziTypes_False_closure" , "-Wl,-u,ghczmprim_GHCziTypes_True_closure" , "-Wl,-u,base_GHCziPack_unpackCString_closure" diff --git a/testsuite/tests/concurrent/should_run/Makefile b/testsuite/tests/concurrent/should_run/Makefile index c6bef49619..26f13c5ff6 100644 --- a/testsuite/tests/concurrent/should_run/Makefile +++ b/testsuite/tests/concurrent/should_run/Makefile @@ -4,3 +4,9 @@ include $(TOP)/mk/test.mk conc059_setup : '$(TEST_HC)' $(TEST_HC_OPTS) -c conc059.hs + +hs_try_putmvar002_setup : + '$(TEST_HC)' $(TEST_HC_OPTS) -c hs_try_putmvar002.hs + +hs_try_putmvar003_setup : + '$(TEST_HC)' $(TEST_HC_OPTS) -c hs_try_putmvar003.hs diff --git a/testsuite/tests/concurrent/should_run/all.T b/testsuite/tests/concurrent/should_run/all.T index 9f2ed284f6..6eda0007b0 100644 --- a/testsuite/tests/concurrent/should_run/all.T +++ b/testsuite/tests/concurrent/should_run/all.T @@ -255,3 +255,34 @@ test('setnumcapabilities001', # omit ghci, which can't handle unboxed tuples: test('compareAndSwap', [omit_ways(['ghci','hpc']), reqlib('primitive')], compile_and_run, ['']) + +test('hs_try_putmvar001', + [ + when(opsys('mingw32'),skip), # uses pthread APIs in the C code + only_ways(['threaded1','threaded2']), + extra_clean(['hs_try_putmvar001_c.o'])], + compile_and_run, + ['hs_try_putmvar001_c.c']) + +# A benchmark for hs_try_putmvar() vs. foreign export +# This one should work for both threaded and non-threaded RTS +test('hs_try_putmvar002', + [ + pre_cmd('$MAKE -s --no-print-directory hs_try_putmvar002_setup'), + extra_clean(['hs_try_putmvar002_c.o']), + extra_run_opts('1 8 10000') + ], + compile_and_run, + ['hs_try_putmvar002_c.c']) + +# Another benchmark for hs_try_putmvar() vs. foreign export +test('hs_try_putmvar003', + [ + when(opsys('mingw32'),skip), # uses pthread APIs in the C code + pre_cmd('$MAKE -s --no-print-directory hs_try_putmvar003_setup'), + only_ways(['threaded1','threaded2']), + extra_clean(['hs_try_putmvar003_c.o']), + extra_run_opts('1 16 32 100') + ], + compile_and_run, + ['hs_try_putmvar003_c.c']) diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs b/testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs new file mode 100644 index 0000000000..af4eabb263 --- /dev/null +++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs @@ -0,0 +1,34 @@ +{-# LANGUAGE MagicHash #-} +module Main where + +import Control.Concurrent +import Control.Exception +import Foreign +import Foreign.C +import GHC.Conc +import GHC.Prim + +-- Sample code demonstrating proper use of hs_try_putmvar() + +main = do + makeExternalCall >>= print + threadDelay 100000 + +makeExternalCall :: IO CInt +makeExternalCall = mask_ $ do + mvar <- newEmptyMVar + sp <- newStablePtrPrimMVar mvar -- freed by hs_try_takemvar() + fp <- mallocForeignPtr + withForeignPtr fp $ \presult -> do + (cap,_) <- threadCapability =<< myThreadId + scheduleCallback sp cap presult + takeMVar mvar `onException` forkIO (do takeMVar mvar; touchForeignPtr fp) + -- the C callback will still run if takeMVar is interrupted, so the + -- exception handler keeps the result memory alive long enough. + peek presult + +foreign import ccall "scheduleCallback" + scheduleCallback :: StablePtr PrimMVar + -> Int + -> Ptr CInt + -> IO () diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar001.stdout b/testsuite/tests/concurrent/should_run/hs_try_putmvar001.stdout new file mode 100644 index 0000000000..d81cc0710e --- /dev/null +++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar001.stdout @@ -0,0 +1 @@ +42 diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar001_c.c b/testsuite/tests/concurrent/should_run/hs_try_putmvar001_c.c new file mode 100644 index 0000000000..f214c5c4d0 --- /dev/null +++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar001_c.c @@ -0,0 +1,31 @@ +#include "HsFFI.h" +#include "Rts.h" +#include "RtsAPI.h" +#include <unistd.h> +#include <pthread.h> + +struct callback { + HsStablePtr mvar; + int cap; + int *presult; +}; + +void* callback(struct callback *p) +{ + usleep(200); + *p->presult = 42; + hs_try_putmvar(p->cap,p->mvar); + free(p); + hs_thread_done(); + return NULL; +} + +void scheduleCallback(HsStablePtr mvar, HsInt cap, int *presult) +{ + pthread_t t; + struct callback *p = malloc(sizeof(struct callback)); + p->mvar = mvar; + p->cap = cap; + p->presult = presult; + pthread_create(&t, NULL, callback, p); +} diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar002.hs b/testsuite/tests/concurrent/should_run/hs_try_putmvar002.hs new file mode 100644 index 0000000000..a8eac42dec --- /dev/null +++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar002.hs @@ -0,0 +1,66 @@ +{-# LANGUAGE MagicHash #-} +module Main where + +import Control.Concurrent +import Control.Exception +import Control.Monad +import Foreign hiding (void) +import Foreign.C +import GHC.Conc +import GHC.Prim +import System.Environment + +-- Measure raw throughput, for M threads that each do N calls to C +-- that call back to hs_try_putmvar() or the foreign export equivalent + +main = do + args <- getArgs + case args of + ["1",n,m] -> experiment2 (read m) (experiment1 (read n)) + ["2",n,m] -> experiment2 (read m) (experiment1FE (read n)) + +-- ----------------------------------------------------------------------------- + +experiment1 :: Int -> IO () +experiment1 n = mask_ $ do + mvar <- newEmptyMVar + (cap,_) <- threadCapability =<< myThreadId + replicateM_ n $ do + sp <- newStablePtrPrimMVar mvar + externalPutMVar sp cap + takeMVar mvar + +foreign import ccall "externalPutMVar" + externalPutMVar :: StablePtr PrimMVar + -> Int + -> IO () + +experiment1FE :: Int -> IO () +experiment1FE n = do + mvar <- newEmptyMVar + (cap,_) <- threadCapability =<< myThreadId + bracket (newStablePtr mvar) freeStablePtr $ \sp -> do + replicateM_ n $ do externalPutMVarFE sp cap; takeMVar mvar + +foreign import ccall "externalPutMVarFE" + externalPutMVarFE :: StablePtr (MVar ()) + -> Int + -> IO () + +callbackPutMVar :: StablePtr (MVar ()) -> IO () +callbackPutMVar sp = do + mvar <- deRefStablePtr sp + void $ tryPutMVar mvar () + +foreign export ccall callbackPutMVar :: StablePtr (MVar ()) -> IO () + +-- ----------------------------------------------------------------------------- +-- Perform M copies of experiment1 concurrently + +experiment2 :: Int -> IO () -> IO () +experiment2 m exp = do + mvars <- replicateM m $ do + m <- newEmptyMVar + forkFinally exp (\_ -> putMVar m ()) + return m + mapM_ takeMVar mvars diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar002_c.c b/testsuite/tests/concurrent/should_run/hs_try_putmvar002_c.c new file mode 100644 index 0000000000..34a339da93 --- /dev/null +++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar002_c.c @@ -0,0 +1,28 @@ +#include "HsFFI.h" +#include <unistd.h> +#include <pthread.h> +#include "hs_try_putmvar002_stub.h" + +void externalPutMVar(HsStablePtr mvar, HsInt cap) +{ + hs_try_putmvar(cap,mvar); +} + +void externalPutMVarFE(HsStablePtr mvar, HsInt cap) +{ + callbackPutMVar(mvar); +} + +void externalManyPutMVars(HsStablePtr mvar, HsInt n, HsInt cap) +{ + for (int i = 0; i < n; i++) { + hs_try_putmvar(cap,mvar); + } +} + +void externalManyPutMVarsFE(HsStablePtr mvar, HsInt n, HsInt cap) +{ + for (int i = 0; i < n; i++) { + callbackPutMVar(mvar); + } +} diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar003.hs b/testsuite/tests/concurrent/should_run/hs_try_putmvar003.hs new file mode 100644 index 0000000000..d74a9cb126 --- /dev/null +++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar003.hs @@ -0,0 +1,88 @@ +{-# LANGUAGE MagicHash #-} +module Main where + +import Control.Concurrent +import Control.Exception +import Control.Monad +import Foreign hiding (void) +import Foreign.C +import GHC.Conc +import GHC.MVar (MVar(..)) +import GHC.Prim +import System.Environment + +-- Measure C to Haskell callback throughput under a workload with +-- several dimensions: +-- +-- * X callback queues (each managed by an OS thread in C) +-- * each queue has Y Haskell threads, each making Z requests +-- +-- And we can run the whole thing in two ways: +-- * With the callbacks calling into a foreign export +-- * With the callbacks using hs_try_putmvar() +-- +-- Example results (using WAY=threaded2) +-- +-- hs_try_putmvar003 1 64 16 500 +RTS -s -N4 1.10s +-- hs_try_putmvar003 2 64 16 500 +RTS -s -N4 9.88s +-- +-- hs_try_putmvar() is 9x faster with these parameters. + +main = do + args <- getArgs + case args of + ["1",x,y,z] -> experiment False (read x) (read y) (read z) + ["2",x,y,z] -> experiment True (read x) (read y) (read z) + +makeExternalCall :: Ptr CallbackQueue -> IO CInt +makeExternalCall q = mask_ $ do + mvar <- newEmptyMVar + sp <- newStablePtrPrimMVar mvar + fp <- mallocForeignPtr + (cap,_) <- threadCapability =<< myThreadId + withForeignPtr fp $ \presult -> do + scheduleCallback q sp cap presult + takeMVar mvar `onException` forkIO (do takeMVar mvar; touchForeignPtr fp) + peek presult + +data CallbackQueue + +foreign import ccall "mkCallbackQueue" + mkCallbackQueue :: Int -> IO (Ptr CallbackQueue) + +foreign import ccall "destroyCallbackQueue" + destroyCallbackQueue :: Ptr CallbackQueue -> IO () + +foreign import ccall "scheduleCallback" + scheduleCallback :: Ptr CallbackQueue + -> StablePtr PrimMVar + -> Int + -> Ptr CInt + -> IO () + +callbackPutMVar :: StablePtr PrimMVar -> IO () +callbackPutMVar sp = do + mvar <- deRefStablePtr sp + void $ tryPutMVar (MVar (unsafeCoerce# mvar)) () + +foreign export ccall callbackPutMVar :: StablePtr PrimMVar -> IO () + +-- Make +-- * x callback queues, each with +-- * y threads, doing +-- * z requests each +experiment :: Bool -> Int -> Int -> Int -> IO () +experiment use_foreign_export x y z = do + mvars <- replicateM x $ async $ do + bracket (mkCallbackQueue (fromEnum use_foreign_export)) + destroyCallbackQueue $ \q -> do + mvars <- replicateM y $ async $ + replicateM_ z $ void $ makeExternalCall q + mapM_ takeMVar mvars + mapM_ takeMVar mvars + +async :: IO () -> IO (MVar ()) +async io = do + m <- newEmptyMVar + forkFinally io (\_ -> putMVar m ()) + return m diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar003_c.c b/testsuite/tests/concurrent/should_run/hs_try_putmvar003_c.c new file mode 100644 index 0000000000..c499b72056 --- /dev/null +++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar003_c.c @@ -0,0 +1,83 @@ +#include "HsFFI.h" +#include "Rts.h" +#include "RtsAPI.h" +#include <unistd.h> +#include <pthread.h> +#include "hs_try_putmvar003_stub.h" + +struct callback_queue { + pthread_mutex_t lock; + pthread_cond_t cond; + int use_foreign_export; + struct callback *pending; +}; + +struct callback { + HsStablePtr mvar; + int cap; + int *presult; + struct callback *next; +}; + +void* callback(struct callback_queue *q) +{ + struct callback *cb; + + pthread_mutex_lock(&q->lock); + do { + if (q->pending == NULL) { + pthread_cond_wait(&q->cond,&q->lock); + } + if (q->pending != NULL) { + cb = q->pending; + q->pending = cb->next; + *cb->presult = 42; + if (q->use_foreign_export) { + callbackPutMVar(cb->mvar); + } else { + hs_try_putmvar(cb->cap,cb->mvar); + } + free(cb); + } + } while (1); + pthread_mutex_unlock(&q->lock); + + hs_thread_done(); + return NULL; +} + +typedef void* threadfunc(void *); + +struct callback_queue* mkCallbackQueue(int use_foreign_export) +{ + struct callback_queue *q = malloc(sizeof(struct callback_queue)); + pthread_t t; + pthread_mutex_init(&q->lock, NULL); + pthread_cond_init(&q->cond, NULL); + pthread_create(&t, NULL, (threadfunc*)callback, q); + q->pending = NULL; + q->use_foreign_export = use_foreign_export; + return q; +} + +void destroyCallbackQueue(struct callback_queue *q) +{ + pthread_mutex_destroy(&q->lock); + pthread_cond_destroy(&q->cond); + free(q); +} + +void scheduleCallback(struct callback_queue *q, + HsStablePtr mvar, + HsInt cap, int *presult) +{ + struct callback *p = malloc(sizeof(struct callback)); + p->mvar = mvar; + p->cap = cap; + p->presult = presult; + pthread_mutex_lock(&q->lock); + p->next = q->pending; + q->pending = p; + pthread_cond_signal(&q->cond); + pthread_mutex_unlock(&q->lock); +} |