summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2016-08-30 20:55:10 +0100
committerSimon Marlow <marlowsd@gmail.com>2016-09-12 08:33:24 +0100
commit454033b54e2f7eef2354cc9d7ae7e7cba4dff09a (patch)
tree3577ed7b0b42e2acff1502673e1ee474fba31319
parent0e7ccf6d233c66b23a60de4e35e039f78ea3e162 (diff)
downloadhaskell-454033b54e2f7eef2354cc9d7ae7e7cba4dff09a.tar.gz
Add hs_try_putmvar()
Summary: This is a fast, non-blocking, asynchronous, interface to tryPutMVar that can be called from C/C++. It's useful for callback-based C/C++ APIs: the idea is that the callback invokes hs_try_putmvar(), and the Haskell code waits for the callback to run by blocking in takeMVar. The callback doesn't block - this is often a requirement of callback-based APIs. The callback wakes up the Haskell thread with minimal overhead and no unnecessary context-switches. There are a couple of benchmarks in testsuite/tests/concurrent/should_run. Some example results comparing hs_try_putmvar() with using a standard foreign export: ./hs_try_putmvar003 1 64 16 100 +RTS -s -N4 0.49s ./hs_try_putmvar003 2 64 16 100 +RTS -s -N4 2.30s hs_try_putmvar() is 4x faster for this workload (see the source for hs_try_putmvar003.hs for details of the workload). An alternative solution is to use the IO Manager for this. We've tried it, but there are problems with that approach: * Need to create a new file descriptor for each callback * The IO Manger thread(s) become a bottleneck * More potential for things to go wrong, e.g. throwing an exception in an IO Manager callback kills the IO Manager thread. Test Plan: validate; new unit tests Reviewers: niteria, erikd, ezyang, bgamari, austin, hvr Subscribers: thomie Differential Revision: https://phabricator.haskell.org/D2501
-rw-r--r--docs/users_guide/ffi-chap.rst119
-rw-r--r--includes/HsFFI.h4
-rw-r--r--libraries/base/GHC/Conc.hs2
-rw-r--r--libraries/base/GHC/Conc/Sync.hs14
-rw-r--r--rts/Capability.c1
-rw-r--r--rts/Capability.h13
-rw-r--r--rts/Prelude.h2
-rw-r--r--rts/PrimOps.cmm8
-rw-r--r--rts/RtsAPI.c75
-rw-r--r--rts/Schedule.c15
-rw-r--r--rts/Task.c11
-rw-r--r--rts/Task.h10
-rw-r--r--rts/Threads.c79
-rw-r--r--rts/Threads.h2
-rw-r--r--rts/package.conf.in2
-rw-r--r--testsuite/tests/concurrent/should_run/Makefile6
-rw-r--r--testsuite/tests/concurrent/should_run/all.T31
-rw-r--r--testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs34
-rw-r--r--testsuite/tests/concurrent/should_run/hs_try_putmvar001.stdout1
-rw-r--r--testsuite/tests/concurrent/should_run/hs_try_putmvar001_c.c31
-rw-r--r--testsuite/tests/concurrent/should_run/hs_try_putmvar002.hs66
-rw-r--r--testsuite/tests/concurrent/should_run/hs_try_putmvar002_c.c28
-rw-r--r--testsuite/tests/concurrent/should_run/hs_try_putmvar003.hs88
-rw-r--r--testsuite/tests/concurrent/should_run/hs_try_putmvar003_c.c83
24 files changed, 715 insertions, 10 deletions
diff --git a/docs/users_guide/ffi-chap.rst b/docs/users_guide/ffi-chap.rst
index 63324d1ff3..f46d902b3d 100644
--- a/docs/users_guide/ffi-chap.rst
+++ b/docs/users_guide/ffi-chap.rst
@@ -616,6 +616,125 @@ the threads have exited first. (Unofficially, if you want to use this
fast and loose version of ``hs_exit()``, then call
``shutdownHaskellAndExit()`` instead).
+.. _hs_try_putmvar:
+
+Waking up Haskell threads from C
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Sometimes we want to be able to wake up a Haskell thread from some C
+code. For example, when using a callback-based C API, we register a C
+callback and then we need to wait for the callback to run.
+
+One way to do this is to create a ``foreign export`` that will do
+whatever needs to be done to wake up the Haskell thread - perhaps
+``putMVar`` - and then call this from our C callback. There are a
+couple of problems with this:
+
+1. Calling a foreign export has a lot of overhead: it creates a
+ complete new Haskell thread, for example.
+2. The call may block for a long time if a GC is in progress. We
+ can't use this method if the C API we're calling doesn't allow
+ blocking in the callback.
+
+For these reasons GHC provides an external API to ``tryPutMVar``,
+``hs_try_putmvar``, which you can use to cheaply and asynchronously
+wake up a Haskell thread from C/C++.
+
+.. code-block:: c
+
+ void hs_try_putmvar (int capability, HsStablePtr sp);
+
+The C call ``hs_try_putmvar(cap, mvar)`` is equivalent to the Haskell
+call ``tryPutMVar mvar ()``, except that it is
+
+* non-blocking: takes a bounded, short, amount of time
+
+* asynchronous: the actual putMVar may be performed after the call
+ returns (for example, if the RTS is currently garbage collecting).
+ That's why ``hs_try_putmvar()`` doesn't return a result to say
+ whether the put succeeded. It is your responsibility to ensure that
+ the ``MVar`` is empty; if it is full, ``hs_try_putmvar()`` will have
+ no effect.
+
+**Example**. Suppose we have a C/C++ function to call that will return and then
+invoke a callback at some point in the future, passing us some data.
+We want to wait in Haskell for the callback to be called, and retrieve
+the data. We can do it like this:
+
+.. code-block:: haskell
+
+ import GHC.Conc (newStablePtrPrimMVar, PrimMVar)
+
+ makeExternalCall = mask_ $ do
+ mvar <- newEmptyMVar
+ sp <- newStablePtrPrimMVar mvar
+ fp <- mallocForeignPtr
+ withForeignPtr fp $ \presult -> do
+ cap <- threadCapability =<< myThreadId
+ scheduleCallback sp cap presult
+ takeMVar mvar `onException`
+ forkIO (do takeMVar mvar; touchForeignPtr fp)
+ peek presult
+
+ foreign import ccall "scheduleCallback"
+ scheduleCallback :: StablePtr PrimMVar
+ -> Int
+ -> Ptr Result
+ -> IO ()
+
+And inside ``scheduleCallback``, we create a callback that will in due
+course store the result data in the ``Ptr Result``, and then call
+``hs_try_putmvar()``.
+
+There are a few things to note here.
+
+* There's a special function to create the ``StablePtr``:
+ ``newStablePtrPrimMVar``, because the RTS needs a ``StablePtr`` to
+ the primitive ``MVar#`` object, and we can't create that directly.
+ Do *not* just use ``newStablePtr`` on the ``MVar``: your program
+ will crash.
+
+* The ``StablePtr`` is freed by ``hs_try_putmvar()``. This is because
+ it would otherwise be difficult to arrange to free the ``StablePtr``
+ reliably: we can't free it in Haskell, because if the ``takeMVar``
+ is interrupted by an asynchronous exception, then the callback will
+ fire at a later time. We can't free it in C, because we don't know
+ when to free it (not when ``hs_try_putmvar()`` returns, because that
+ is an async call that uses the ``StablePtr`` at some time in the
+ future).
+
+* The ``mask_`` is to avoid asynchronous exceptions before the
+ ``scheduleCallback`` call, which would leak the ``StablePtr``.
+
+* We find out the current capability number and pass it to C. This is
+ passed back to ``hs_try_putmvar``, and helps the RTS to know which
+ capability it should try to perform the ``tryPutMVar`` on. If you
+ don't care, you can pass ``-1`` for the capability to
+ ``hs_try_putmvar``, and it will pick an arbitrary one.
+
+ Picking the right capability will help avoid unnecessary context
+ switches. Ideally you should pass the capability that the thread
+ that will be woken up last ran on, which you can find by calling
+ ``threadCapability`` in Haskell.
+
+* If you want to also pass some data back from the C callback to
+ Haskell, this is best done by first allocating some memory in
+ Haskell to receive the data, and passing the address to C, as we did
+ in the above example.
+
+* ``takeMVar`` can be interrupted by an asynchronous exception. If
+ this happens, the callback in C will still run at some point in the
+ future, will still write the result, and will still call
+ ``hs_try_putmvar()``. Therefore we have to arrange that the memory
+ for the result stays alive until the callback has run, so if an
+ exception is thrown during ``takeMVar`` we fork another thread to
+ wait for the callback and hold the memory alive using
+ ``touchForeignPtr``.
+
+For a fully working example, see
+``testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs`` in the
+GHC source tree.
+
.. _ffi-floating-point:
Floating point and the FFI
diff --git a/includes/HsFFI.h b/includes/HsFFI.h
index 4f015b6e25..cdf451037c 100644
--- a/includes/HsFFI.h
+++ b/includes/HsFFI.h
@@ -113,8 +113,12 @@ extern StgPtr hs_spt_lookup(StgWord64 key[2]);
extern int hs_spt_keys(StgPtr keys[], int szKeys);
extern int hs_spt_key_count (void);
+extern void hs_try_putmvar (int capability, HsStablePtr sp);
+
/* -------------------------------------------------------------------------- */
+
+
#ifdef __cplusplus
}
#endif
diff --git a/libraries/base/GHC/Conc.hs b/libraries/base/GHC/Conc.hs
index 38fac431b0..afc0a97d30 100644
--- a/libraries/base/GHC/Conc.hs
+++ b/libraries/base/GHC/Conc.hs
@@ -50,6 +50,8 @@ module GHC.Conc
, threadStatus
, threadCapability
+ , newStablePtrPrimMVar, PrimMVar
+
-- * Waiting
, threadDelay
, registerDelay
diff --git a/libraries/base/GHC/Conc/Sync.hs b/libraries/base/GHC/Conc/Sync.hs
index 5476950ec7..5986379cb3 100644
--- a/libraries/base/GHC/Conc/Sync.hs
+++ b/libraries/base/GHC/Conc/Sync.hs
@@ -59,6 +59,8 @@ module GHC.Conc.Sync
, threadStatus
, threadCapability
+ , newStablePtrPrimMVar, PrimMVar
+
-- * Allocation counter and quota
, setAllocationCounter
, getAllocationCounter
@@ -117,6 +119,7 @@ import GHC.MVar
import GHC.Ptr
import GHC.Real ( fromIntegral )
import GHC.Show ( Show(..), showString )
+import GHC.Stable ( StablePtr(..) )
import GHC.Weak
infixr 0 `par`, `pseq`
@@ -615,6 +618,17 @@ mkWeakThreadId t@(ThreadId t#) = IO $ \s ->
(# s1, w #) -> (# s1, Weak w #)
+data PrimMVar
+
+-- | Make a StablePtr that can be passed to the C function
+-- @hs_try_putmvar()@. The RTS wants a 'StablePtr' to the underlying
+-- 'MVar#', but a 'StablePtr#' can only refer to lifted types, so we
+-- have to cheat by coercing.
+newStablePtrPrimMVar :: MVar () -> IO (StablePtr PrimMVar)
+newStablePtrPrimMVar (MVar m) = IO $ \s0 ->
+ case makeStablePtr# (unsafeCoerce# m :: PrimMVar) s0 of
+ (# s1, sp #) -> (# s1, StablePtr sp #)
+
-----------------------------------------------------------------------------
-- Transactional heap operations
-----------------------------------------------------------------------------
diff --git a/rts/Capability.c b/rts/Capability.c
index 681797b409..6979c637bc 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -266,6 +266,7 @@ initCapability (Capability *cap, uint32_t i)
cap->returning_tasks_tl = NULL;
cap->n_returning_tasks = 0;
cap->inbox = (Message*)END_TSO_QUEUE;
+ cap->putMVars = NULL;
cap->sparks = allocSparkPool();
cap->spark_stats.created = 0;
cap->spark_stats.dud = 0;
diff --git a/rts/Capability.h b/rts/Capability.h
index 8e0288b15e..bbf026279f 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -123,6 +123,7 @@ struct Capability_ {
// returning_tasks_{hd,tl}
// wakeup_queue
// inbox
+ // putMVars
Mutex lock;
// Tasks waiting to return from a foreign call, or waiting to make
@@ -138,6 +139,10 @@ struct Capability_ {
// Locks required: cap->lock
Message *inbox;
+ // putMVars are really messages, but they're allocated with malloc() so they
+ // can't go on the inbox queue: the GC would get confused.
+ struct PutMVar_ *putMVars;
+
SparkPool *sparks;
// Stats on spark creation/conversion
@@ -378,6 +383,11 @@ extern uint32_t numa_map[MAX_NUMA_NODES];
Messages
-------------------------------------------------------------------------- */
+typedef struct PutMVar_ {
+ StgStablePtr mvar;
+ struct PutMVar_ *link;
+} PutMVar;
+
#ifdef THREADED_RTS
INLINE_HEADER rtsBool emptyInbox(Capability *cap);
@@ -459,7 +469,8 @@ contextSwitchCapability (Capability *cap)
INLINE_HEADER rtsBool emptyInbox(Capability *cap)
{
- return (cap->inbox == (Message*)END_TSO_QUEUE);
+ return (cap->inbox == (Message*)END_TSO_QUEUE &&
+ cap->putMVars == NULL);
}
#endif
diff --git a/rts/Prelude.h b/rts/Prelude.h
index ae1e9cb266..58de23013b 100644
--- a/rts/Prelude.h
+++ b/rts/Prelude.h
@@ -24,6 +24,7 @@
* modules these names are defined in.
*/
+PRELUDE_CLOSURE(ghczmprim_GHCziTuple_Z0T_closure);
PRELUDE_CLOSURE(ghczmprim_GHCziTypes_True_closure);
PRELUDE_CLOSURE(ghczmprim_GHCziTypes_False_closure);
PRELUDE_CLOSURE(base_GHCziPack_unpackCString_closure);
@@ -87,6 +88,7 @@ PRELUDE_INFO(base_GHCziWord_W64zh_con_info);
PRELUDE_INFO(base_GHCziStable_StablePtr_static_info);
PRELUDE_INFO(base_GHCziStable_StablePtr_con_info);
+#define Unit_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTuple_Z0T_closure)
#define True_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_True_closure)
#define False_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_False_closure)
#define unpackCString_closure DLL_IMPORT_DATA_REF(base_GHCziPack_unpackCString_closure)
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index b468c33df6..02a7dafec3 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -1739,6 +1739,13 @@ loop:
}
+// NOTE: there is another implementation of this function in
+// Threads.c:performTryPutMVar(). Keep them in sync! It was
+// measurably slower to call the C function from here (70% for a
+// tight loop doing tryPutMVar#).
+//
+// TODO: we could kill the duplication by making tryPutMVar# into an
+// inline primop that expands into a C call to performTryPutMVar().
stg_tryPutMVarzh ( P_ mvar, /* :: MVar a */
P_ val, /* :: a */ )
{
@@ -1812,6 +1819,7 @@ loop:
return (1);
}
+
stg_readMVarzh ( P_ mvar, /* :: MVar a */ )
{
W_ val, info, tso, q;
diff --git a/rts/RtsAPI.c b/rts/RtsAPI.c
index 34d68c7e3b..e72430743e 100644
--- a/rts/RtsAPI.c
+++ b/rts/RtsAPI.c
@@ -16,6 +16,7 @@
#include "Schedule.h"
#include "Capability.h"
#include "Stable.h"
+#include "Threads.h"
#include "Weak.h"
/* ----------------------------------------------------------------------------
@@ -620,3 +621,77 @@ void rts_done (void)
freeMyTask();
}
+/* -----------------------------------------------------------------------------
+ tryPutMVar from outside Haskell
+
+ The C call
+
+ hs_try_putmvar(cap, mvar)
+
+ is equivalent to the Haskell call
+
+ tryPutMVar mvar ()
+
+ but it is
+
+ * non-blocking: takes a bounded, short, amount of time
+ * asynchronous: the actual putMVar may be performed after the
+ call returns. That's why hs_try_putmvar() doesn't return a
+ result to say whether the put succeeded.
+
+ NOTE: this call transfers ownership of the StablePtr to the RTS, which will
+ free it after the tryPutMVar has taken place. The reason is that otherwise,
+ it would be very difficult for the caller to arrange to free the StablePtr
+ in all circumstances.
+
+ For more details, see the section "Waking up Haskell threads from C" in the
+ User's Guide.
+ -------------------------------------------------------------------------- */
+
+void hs_try_putmvar (/* in */ int capability,
+ /* in */ HsStablePtr mvar)
+{
+ Task *task = getTask();
+ Capability *cap;
+
+ if (capability < 0) {
+ capability = task->preferred_capability;
+ if (capability < 0) {
+ capability = 0;
+ }
+ }
+ cap = capabilities[capability % enabled_capabilities];
+
+#if !defined(THREADED_RTS)
+
+ performTryPutMVar(cap, (StgMVar*)deRefStablePtr(mvar), Unit_closure);
+ freeStablePtr(mvar);
+
+#else
+
+ ACQUIRE_LOCK(&cap->lock);
+ // If the capability is free, we can perform the tryPutMVar immediately
+ if (cap->running_task == NULL) {
+ cap->running_task = task;
+ task->cap = cap;
+ RELEASE_LOCK(&cap->lock);
+
+ performTryPutMVar(cap, (StgMVar*)deRefStablePtr(mvar), Unit_closure);
+
+ freeStablePtr(mvar);
+
+ // Wake up the capability, which will start running the thread that we
+ // just awoke (if there was one).
+ releaseCapability(cap);
+ } else {
+ PutMVar *p = stgMallocBytes(sizeof(PutMVar),"hs_try_putmvar");
+ // We cannot deref the StablePtr if we don't have a capability,
+ // so we have to store it and deref it later.
+ p->mvar = mvar;
+ p->link = cap->putMVars;
+ cap->putMVars = p;
+ RELEASE_LOCK(&cap->lock);
+ }
+
+#endif
+}
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 544b9c2115..611d70411f 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -723,7 +723,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
|| cap0->n_returning_tasks != 0
- || cap0->inbox != (Message*)END_TSO_QUEUE) {
+ || !emptyInbox(cap0)) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
releaseCapability(cap0);
@@ -982,6 +982,7 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
Message *m, *next;
+ PutMVar *p, *pnext;
int r;
Capability *cap = *pcap;
@@ -1006,7 +1007,9 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
if (r != 0) return;
m = cap->inbox;
+ p = cap->putMVars;
cap->inbox = (Message*)END_TSO_QUEUE;
+ cap->putMVars = NULL;
RELEASE_LOCK(&cap->lock);
@@ -1015,10 +1018,20 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
executeMessage(cap, m);
m = next;
}
+
+ while (p != NULL) {
+ pnext = p->link;
+ performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
+ Unit_closure);
+ freeStablePtr(p->mvar);
+ stgFree(p);
+ p = pnext;
+ }
}
#endif
}
+
/* ----------------------------------------------------------------------------
* Activate spark threads (THREADED_RTS)
* ------------------------------------------------------------------------- */
diff --git a/rts/Task.c b/rts/Task.c
index 9a658e019c..253520f909 100644
--- a/rts/Task.c
+++ b/rts/Task.c
@@ -36,7 +36,6 @@ uint32_t peakWorkerCount;
static int tasksInitialized = 0;
static void freeTask (Task *task);
-static Task * allocTask (void);
static Task * newTask (rtsBool);
#if defined(THREADED_RTS)
@@ -117,8 +116,7 @@ freeTaskManager (void)
return tasksRunning;
}
-static Task *
-allocTask (void)
+Task* getTask (void)
{
Task *task;
@@ -209,7 +207,7 @@ newTask (rtsBool worker)
task->cap = NULL;
task->worker = worker;
- task->stopped = rtsFalse;
+ task->stopped = rtsTrue;
task->running_finalizers = rtsFalse;
task->n_spare_incalls = 0;
task->spare_incalls = NULL;
@@ -304,7 +302,7 @@ newBoundTask (void)
stg_exit(EXIT_FAILURE);
}
- task = allocTask();
+ task = getTask();
task->stopped = rtsFalse;
@@ -452,6 +450,7 @@ startWorkerTask (Capability *cap)
// A worker always gets a fresh Task structure.
task = newTask(rtsTrue);
+ task->stopped = rtsFalse;
// The lock here is to synchronise with taskStart(), to make sure
// that we have finished setting up the Task structure before the
@@ -499,7 +498,7 @@ void rts_setInCallCapability (
int preferred_capability,
int affinity USED_IF_THREADS)
{
- Task *task = allocTask();
+ Task *task = getTask();
task->preferred_capability = preferred_capability;
#ifdef THREADED_RTS
diff --git a/rts/Task.h b/rts/Task.h
index 558f543fac..93234591ba 100644
--- a/rts/Task.h
+++ b/rts/Task.h
@@ -150,7 +150,8 @@ typedef struct Task_ {
struct InCall_ *spare_incalls;
rtsBool worker; // == rtsTrue if this is a worker Task
- rtsBool stopped; // this task has stopped or exited Haskell
+ rtsBool stopped; // == rtsTrue between newBoundTask and
+ // boundTaskExiting, or in a worker Task.
// So that we can detect when a finalizer illegally calls back into Haskell
rtsBool running_finalizers;
@@ -205,7 +206,12 @@ uint32_t freeTaskManager (void);
// thread-local storage and will remain even after boundTaskExiting()
// has been called; to free the memory, see freeMyTask().
//
-Task *newBoundTask (void);
+Task* newBoundTask (void);
+
+// Return the current OS thread's Task, which is created if it doesn't already
+// exist. After you have finished using RTS APIs, you should call freeMyTask()
+// to release this thread's Task.
+Task* getTask (void);
// The current task is a bound task that is exiting.
//
diff --git a/rts/Threads.c b/rts/Threads.c
index 7317249e11..1782da6114 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -744,6 +744,85 @@ threadStackUnderflow (Capability *cap, StgTSO *tso)
}
/* ----------------------------------------------------------------------------
+ Implementation of tryPutMVar#
+
+ NOTE: this should be kept in sync with stg_tryPutMVarzh in PrimOps.cmm
+ ------------------------------------------------------------------------- */
+
+rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value)
+{
+ const StgInfoTable *info;
+ StgMVarTSOQueue *q;
+ StgTSO *tso;
+
+ info = lockClosure((StgClosure*)mvar);
+
+ if (mvar->value != &stg_END_TSO_QUEUE_closure) {
+#if defined(THREADED_RTS)
+ unlockClosure((StgClosure*)mvar, info);
+#endif
+ return rtsFalse;
+ }
+
+ q = mvar->head;
+loop:
+ if (q == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
+ /* No further takes, the MVar is now full. */
+ if (info == &stg_MVAR_CLEAN_info) {
+ dirty_MVAR(&cap->r, (StgClosure*)mvar);
+ }
+
+ mvar->value = value;
+ unlockClosure((StgClosure*)mvar, &stg_MVAR_DIRTY_info);
+ return rtsTrue;
+ }
+ if (q->header.info == &stg_IND_info ||
+ q->header.info == &stg_MSG_NULL_info) {
+ q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee;
+ goto loop;
+ }
+
+ // There are takeMVar(s) waiting: wake up the first one
+ tso = q->tso;
+ mvar->head = q->link;
+ if (mvar->head == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
+ mvar->tail = (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure;
+ }
+
+ ASSERT(tso->block_info.closure == (StgClosure*)mvar);
+ // save why_blocked here, because waking up the thread destroys
+ // this information
+ StgWord why_blocked = tso->why_blocked;
+
+ // actually perform the takeMVar
+ StgStack* stack = tso->stackobj;
+ stack->sp[1] = (W_)value;
+ stack->sp[0] = (W_)&stg_ret_p_info;
+
+ // indicate that the MVar operation has now completed.
+ tso->_link = (StgTSO*)&stg_END_TSO_QUEUE_closure;
+
+ if (stack->dirty == 0) {
+ dirty_STACK(cap, stack);
+ }
+
+ tryWakeupThread(cap, tso);
+
+ // If it was an readMVar, then we can still do work,
+ // so loop back. (XXX: This could take a while)
+ if (why_blocked == BlockedOnMVarRead) {
+ q = ((StgMVarTSOQueue*)q)->link;
+ goto loop;
+ }
+
+ ASSERT(why_blocked == BlockedOnMVar);
+
+ unlockClosure((StgClosure*)mvar, info);
+
+ return rtsTrue;
+}
+
+/* ----------------------------------------------------------------------------
* Debugging: why is a thread blocked
* ------------------------------------------------------------------------- */
diff --git a/rts/Threads.h b/rts/Threads.h
index 01c493e53d..4588008e28 100644
--- a/rts/Threads.h
+++ b/rts/Threads.h
@@ -41,6 +41,8 @@ StgBool isThreadBound (StgTSO* tso);
void threadStackOverflow (Capability *cap, StgTSO *tso);
W_ threadStackUnderflow (Capability *cap, StgTSO *tso);
+rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value);
+
#ifdef DEBUG
void printThreadBlockage (StgTSO *tso);
void printThreadStatus (StgTSO *t);
diff --git a/rts/package.conf.in b/rts/package.conf.in
index 65aa5c333b..03848c45e3 100644
--- a/rts/package.conf.in
+++ b/rts/package.conf.in
@@ -103,6 +103,7 @@ ld-options:
, "-Wl,-u,_base_GHCziPtr_Ptr_con_info"
, "-Wl,-u,_base_GHCziPtr_FunPtr_con_info"
, "-Wl,-u,_base_GHCziStable_StablePtr_con_info"
+ , "-Wl,-u,_ghczmprim_GHCziTuple_Z0T_closure"
, "-Wl,-u,_ghczmprim_GHCziTypes_False_closure"
, "-Wl,-u,_ghczmprim_GHCziTypes_True_closure"
, "-Wl,-u,_base_GHCziPack_unpackCString_closure"
@@ -199,6 +200,7 @@ ld-options:
, "-Wl,-u,base_GHCziPtr_Ptr_con_info"
, "-Wl,-u,base_GHCziPtr_FunPtr_con_info"
, "-Wl,-u,base_GHCziStable_StablePtr_con_info"
+ , "-Wl,-u,ghczmprim_GHCziTuple_Z0T_closure"
, "-Wl,-u,ghczmprim_GHCziTypes_False_closure"
, "-Wl,-u,ghczmprim_GHCziTypes_True_closure"
, "-Wl,-u,base_GHCziPack_unpackCString_closure"
diff --git a/testsuite/tests/concurrent/should_run/Makefile b/testsuite/tests/concurrent/should_run/Makefile
index c6bef49619..26f13c5ff6 100644
--- a/testsuite/tests/concurrent/should_run/Makefile
+++ b/testsuite/tests/concurrent/should_run/Makefile
@@ -4,3 +4,9 @@ include $(TOP)/mk/test.mk
conc059_setup :
'$(TEST_HC)' $(TEST_HC_OPTS) -c conc059.hs
+
+hs_try_putmvar002_setup :
+ '$(TEST_HC)' $(TEST_HC_OPTS) -c hs_try_putmvar002.hs
+
+hs_try_putmvar003_setup :
+ '$(TEST_HC)' $(TEST_HC_OPTS) -c hs_try_putmvar003.hs
diff --git a/testsuite/tests/concurrent/should_run/all.T b/testsuite/tests/concurrent/should_run/all.T
index 9f2ed284f6..6eda0007b0 100644
--- a/testsuite/tests/concurrent/should_run/all.T
+++ b/testsuite/tests/concurrent/should_run/all.T
@@ -255,3 +255,34 @@ test('setnumcapabilities001',
# omit ghci, which can't handle unboxed tuples:
test('compareAndSwap', [omit_ways(['ghci','hpc']), reqlib('primitive')], compile_and_run, [''])
+
+test('hs_try_putmvar001',
+ [
+ when(opsys('mingw32'),skip), # uses pthread APIs in the C code
+ only_ways(['threaded1','threaded2']),
+ extra_clean(['hs_try_putmvar001_c.o'])],
+ compile_and_run,
+ ['hs_try_putmvar001_c.c'])
+
+# A benchmark for hs_try_putmvar() vs. foreign export
+# This one should work for both threaded and non-threaded RTS
+test('hs_try_putmvar002',
+ [
+ pre_cmd('$MAKE -s --no-print-directory hs_try_putmvar002_setup'),
+ extra_clean(['hs_try_putmvar002_c.o']),
+ extra_run_opts('1 8 10000')
+ ],
+ compile_and_run,
+ ['hs_try_putmvar002_c.c'])
+
+# Another benchmark for hs_try_putmvar() vs. foreign export
+test('hs_try_putmvar003',
+ [
+ when(opsys('mingw32'),skip), # uses pthread APIs in the C code
+ pre_cmd('$MAKE -s --no-print-directory hs_try_putmvar003_setup'),
+ only_ways(['threaded1','threaded2']),
+ extra_clean(['hs_try_putmvar003_c.o']),
+ extra_run_opts('1 16 32 100')
+ ],
+ compile_and_run,
+ ['hs_try_putmvar003_c.c'])
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs b/testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs
new file mode 100644
index 0000000000..af4eabb263
--- /dev/null
+++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs
@@ -0,0 +1,34 @@
+{-# LANGUAGE MagicHash #-}
+module Main where
+
+import Control.Concurrent
+import Control.Exception
+import Foreign
+import Foreign.C
+import GHC.Conc
+import GHC.Prim
+
+-- Sample code demonstrating proper use of hs_try_putmvar()
+
+main = do
+ makeExternalCall >>= print
+ threadDelay 100000
+
+makeExternalCall :: IO CInt
+makeExternalCall = mask_ $ do
+ mvar <- newEmptyMVar
+ sp <- newStablePtrPrimMVar mvar -- freed by hs_try_takemvar()
+ fp <- mallocForeignPtr
+ withForeignPtr fp $ \presult -> do
+ (cap,_) <- threadCapability =<< myThreadId
+ scheduleCallback sp cap presult
+ takeMVar mvar `onException` forkIO (do takeMVar mvar; touchForeignPtr fp)
+ -- the C callback will still run if takeMVar is interrupted, so the
+ -- exception handler keeps the result memory alive long enough.
+ peek presult
+
+foreign import ccall "scheduleCallback"
+ scheduleCallback :: StablePtr PrimMVar
+ -> Int
+ -> Ptr CInt
+ -> IO ()
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar001.stdout b/testsuite/tests/concurrent/should_run/hs_try_putmvar001.stdout
new file mode 100644
index 0000000000..d81cc0710e
--- /dev/null
+++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar001.stdout
@@ -0,0 +1 @@
+42
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar001_c.c b/testsuite/tests/concurrent/should_run/hs_try_putmvar001_c.c
new file mode 100644
index 0000000000..f214c5c4d0
--- /dev/null
+++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar001_c.c
@@ -0,0 +1,31 @@
+#include "HsFFI.h"
+#include "Rts.h"
+#include "RtsAPI.h"
+#include <unistd.h>
+#include <pthread.h>
+
+struct callback {
+ HsStablePtr mvar;
+ int cap;
+ int *presult;
+};
+
+void* callback(struct callback *p)
+{
+ usleep(200);
+ *p->presult = 42;
+ hs_try_putmvar(p->cap,p->mvar);
+ free(p);
+ hs_thread_done();
+ return NULL;
+}
+
+void scheduleCallback(HsStablePtr mvar, HsInt cap, int *presult)
+{
+ pthread_t t;
+ struct callback *p = malloc(sizeof(struct callback));
+ p->mvar = mvar;
+ p->cap = cap;
+ p->presult = presult;
+ pthread_create(&t, NULL, callback, p);
+}
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar002.hs b/testsuite/tests/concurrent/should_run/hs_try_putmvar002.hs
new file mode 100644
index 0000000000..a8eac42dec
--- /dev/null
+++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar002.hs
@@ -0,0 +1,66 @@
+{-# LANGUAGE MagicHash #-}
+module Main where
+
+import Control.Concurrent
+import Control.Exception
+import Control.Monad
+import Foreign hiding (void)
+import Foreign.C
+import GHC.Conc
+import GHC.Prim
+import System.Environment
+
+-- Measure raw throughput, for M threads that each do N calls to C
+-- that call back to hs_try_putmvar() or the foreign export equivalent
+
+main = do
+ args <- getArgs
+ case args of
+ ["1",n,m] -> experiment2 (read m) (experiment1 (read n))
+ ["2",n,m] -> experiment2 (read m) (experiment1FE (read n))
+
+-- -----------------------------------------------------------------------------
+
+experiment1 :: Int -> IO ()
+experiment1 n = mask_ $ do
+ mvar <- newEmptyMVar
+ (cap,_) <- threadCapability =<< myThreadId
+ replicateM_ n $ do
+ sp <- newStablePtrPrimMVar mvar
+ externalPutMVar sp cap
+ takeMVar mvar
+
+foreign import ccall "externalPutMVar"
+ externalPutMVar :: StablePtr PrimMVar
+ -> Int
+ -> IO ()
+
+experiment1FE :: Int -> IO ()
+experiment1FE n = do
+ mvar <- newEmptyMVar
+ (cap,_) <- threadCapability =<< myThreadId
+ bracket (newStablePtr mvar) freeStablePtr $ \sp -> do
+ replicateM_ n $ do externalPutMVarFE sp cap; takeMVar mvar
+
+foreign import ccall "externalPutMVarFE"
+ externalPutMVarFE :: StablePtr (MVar ())
+ -> Int
+ -> IO ()
+
+callbackPutMVar :: StablePtr (MVar ()) -> IO ()
+callbackPutMVar sp = do
+ mvar <- deRefStablePtr sp
+ void $ tryPutMVar mvar ()
+
+foreign export ccall callbackPutMVar :: StablePtr (MVar ()) -> IO ()
+
+-- -----------------------------------------------------------------------------
+-- Perform M copies of experiment1 concurrently
+
+experiment2 :: Int -> IO () -> IO ()
+experiment2 m exp = do
+ mvars <- replicateM m $ do
+ m <- newEmptyMVar
+ forkFinally exp (\_ -> putMVar m ())
+ return m
+ mapM_ takeMVar mvars
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar002_c.c b/testsuite/tests/concurrent/should_run/hs_try_putmvar002_c.c
new file mode 100644
index 0000000000..34a339da93
--- /dev/null
+++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar002_c.c
@@ -0,0 +1,28 @@
+#include "HsFFI.h"
+#include <unistd.h>
+#include <pthread.h>
+#include "hs_try_putmvar002_stub.h"
+
+void externalPutMVar(HsStablePtr mvar, HsInt cap)
+{
+ hs_try_putmvar(cap,mvar);
+}
+
+void externalPutMVarFE(HsStablePtr mvar, HsInt cap)
+{
+ callbackPutMVar(mvar);
+}
+
+void externalManyPutMVars(HsStablePtr mvar, HsInt n, HsInt cap)
+{
+ for (int i = 0; i < n; i++) {
+ hs_try_putmvar(cap,mvar);
+ }
+}
+
+void externalManyPutMVarsFE(HsStablePtr mvar, HsInt n, HsInt cap)
+{
+ for (int i = 0; i < n; i++) {
+ callbackPutMVar(mvar);
+ }
+}
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar003.hs b/testsuite/tests/concurrent/should_run/hs_try_putmvar003.hs
new file mode 100644
index 0000000000..d74a9cb126
--- /dev/null
+++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar003.hs
@@ -0,0 +1,88 @@
+{-# LANGUAGE MagicHash #-}
+module Main where
+
+import Control.Concurrent
+import Control.Exception
+import Control.Monad
+import Foreign hiding (void)
+import Foreign.C
+import GHC.Conc
+import GHC.MVar (MVar(..))
+import GHC.Prim
+import System.Environment
+
+-- Measure C to Haskell callback throughput under a workload with
+-- several dimensions:
+--
+-- * X callback queues (each managed by an OS thread in C)
+-- * each queue has Y Haskell threads, each making Z requests
+--
+-- And we can run the whole thing in two ways:
+-- * With the callbacks calling into a foreign export
+-- * With the callbacks using hs_try_putmvar()
+--
+-- Example results (using WAY=threaded2)
+--
+-- hs_try_putmvar003 1 64 16 500 +RTS -s -N4 1.10s
+-- hs_try_putmvar003 2 64 16 500 +RTS -s -N4 9.88s
+--
+-- hs_try_putmvar() is 9x faster with these parameters.
+
+main = do
+ args <- getArgs
+ case args of
+ ["1",x,y,z] -> experiment False (read x) (read y) (read z)
+ ["2",x,y,z] -> experiment True (read x) (read y) (read z)
+
+makeExternalCall :: Ptr CallbackQueue -> IO CInt
+makeExternalCall q = mask_ $ do
+ mvar <- newEmptyMVar
+ sp <- newStablePtrPrimMVar mvar
+ fp <- mallocForeignPtr
+ (cap,_) <- threadCapability =<< myThreadId
+ withForeignPtr fp $ \presult -> do
+ scheduleCallback q sp cap presult
+ takeMVar mvar `onException` forkIO (do takeMVar mvar; touchForeignPtr fp)
+ peek presult
+
+data CallbackQueue
+
+foreign import ccall "mkCallbackQueue"
+ mkCallbackQueue :: Int -> IO (Ptr CallbackQueue)
+
+foreign import ccall "destroyCallbackQueue"
+ destroyCallbackQueue :: Ptr CallbackQueue -> IO ()
+
+foreign import ccall "scheduleCallback"
+ scheduleCallback :: Ptr CallbackQueue
+ -> StablePtr PrimMVar
+ -> Int
+ -> Ptr CInt
+ -> IO ()
+
+callbackPutMVar :: StablePtr PrimMVar -> IO ()
+callbackPutMVar sp = do
+ mvar <- deRefStablePtr sp
+ void $ tryPutMVar (MVar (unsafeCoerce# mvar)) ()
+
+foreign export ccall callbackPutMVar :: StablePtr PrimMVar -> IO ()
+
+-- Make
+-- * x callback queues, each with
+-- * y threads, doing
+-- * z requests each
+experiment :: Bool -> Int -> Int -> Int -> IO ()
+experiment use_foreign_export x y z = do
+ mvars <- replicateM x $ async $ do
+ bracket (mkCallbackQueue (fromEnum use_foreign_export))
+ destroyCallbackQueue $ \q -> do
+ mvars <- replicateM y $ async $
+ replicateM_ z $ void $ makeExternalCall q
+ mapM_ takeMVar mvars
+ mapM_ takeMVar mvars
+
+async :: IO () -> IO (MVar ())
+async io = do
+ m <- newEmptyMVar
+ forkFinally io (\_ -> putMVar m ())
+ return m
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar003_c.c b/testsuite/tests/concurrent/should_run/hs_try_putmvar003_c.c
new file mode 100644
index 0000000000..c499b72056
--- /dev/null
+++ b/testsuite/tests/concurrent/should_run/hs_try_putmvar003_c.c
@@ -0,0 +1,83 @@
+#include "HsFFI.h"
+#include "Rts.h"
+#include "RtsAPI.h"
+#include <unistd.h>
+#include <pthread.h>
+#include "hs_try_putmvar003_stub.h"
+
+struct callback_queue {
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ int use_foreign_export;
+ struct callback *pending;
+};
+
+struct callback {
+ HsStablePtr mvar;
+ int cap;
+ int *presult;
+ struct callback *next;
+};
+
+void* callback(struct callback_queue *q)
+{
+ struct callback *cb;
+
+ pthread_mutex_lock(&q->lock);
+ do {
+ if (q->pending == NULL) {
+ pthread_cond_wait(&q->cond,&q->lock);
+ }
+ if (q->pending != NULL) {
+ cb = q->pending;
+ q->pending = cb->next;
+ *cb->presult = 42;
+ if (q->use_foreign_export) {
+ callbackPutMVar(cb->mvar);
+ } else {
+ hs_try_putmvar(cb->cap,cb->mvar);
+ }
+ free(cb);
+ }
+ } while (1);
+ pthread_mutex_unlock(&q->lock);
+
+ hs_thread_done();
+ return NULL;
+}
+
+typedef void* threadfunc(void *);
+
+struct callback_queue* mkCallbackQueue(int use_foreign_export)
+{
+ struct callback_queue *q = malloc(sizeof(struct callback_queue));
+ pthread_t t;
+ pthread_mutex_init(&q->lock, NULL);
+ pthread_cond_init(&q->cond, NULL);
+ pthread_create(&t, NULL, (threadfunc*)callback, q);
+ q->pending = NULL;
+ q->use_foreign_export = use_foreign_export;
+ return q;
+}
+
+void destroyCallbackQueue(struct callback_queue *q)
+{
+ pthread_mutex_destroy(&q->lock);
+ pthread_cond_destroy(&q->cond);
+ free(q);
+}
+
+void scheduleCallback(struct callback_queue *q,
+ HsStablePtr mvar,
+ HsInt cap, int *presult)
+{
+ struct callback *p = malloc(sizeof(struct callback));
+ p->mvar = mvar;
+ p->cap = cap;
+ p->presult = presult;
+ pthread_mutex_lock(&q->lock);
+ p->next = q->pending;
+ q->pending = p;
+ pthread_cond_signal(&q->cond);
+ pthread_mutex_unlock(&q->lock);
+}