summaryrefslogtreecommitdiff
path: root/rts/Threads.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2016-08-30 20:55:10 +0100
committerSimon Marlow <marlowsd@gmail.com>2016-09-12 08:33:24 +0100
commit454033b54e2f7eef2354cc9d7ae7e7cba4dff09a (patch)
tree3577ed7b0b42e2acff1502673e1ee474fba31319 /rts/Threads.c
parent0e7ccf6d233c66b23a60de4e35e039f78ea3e162 (diff)
downloadhaskell-454033b54e2f7eef2354cc9d7ae7e7cba4dff09a.tar.gz
Add hs_try_putmvar()
Summary: This is a fast, non-blocking, asynchronous, interface to tryPutMVar that can be called from C/C++. It's useful for callback-based C/C++ APIs: the idea is that the callback invokes hs_try_putmvar(), and the Haskell code waits for the callback to run by blocking in takeMVar. The callback doesn't block - this is often a requirement of callback-based APIs. The callback wakes up the Haskell thread with minimal overhead and no unnecessary context-switches. There are a couple of benchmarks in testsuite/tests/concurrent/should_run. Some example results comparing hs_try_putmvar() with using a standard foreign export: ./hs_try_putmvar003 1 64 16 100 +RTS -s -N4 0.49s ./hs_try_putmvar003 2 64 16 100 +RTS -s -N4 2.30s hs_try_putmvar() is 4x faster for this workload (see the source for hs_try_putmvar003.hs for details of the workload). An alternative solution is to use the IO Manager for this. We've tried it, but there are problems with that approach: * Need to create a new file descriptor for each callback * The IO Manger thread(s) become a bottleneck * More potential for things to go wrong, e.g. throwing an exception in an IO Manager callback kills the IO Manager thread. Test Plan: validate; new unit tests Reviewers: niteria, erikd, ezyang, bgamari, austin, hvr Subscribers: thomie Differential Revision: https://phabricator.haskell.org/D2501
Diffstat (limited to 'rts/Threads.c')
-rw-r--r--rts/Threads.c79
1 files changed, 79 insertions, 0 deletions
diff --git a/rts/Threads.c b/rts/Threads.c
index 7317249e11..1782da6114 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -744,6 +744,85 @@ threadStackUnderflow (Capability *cap, StgTSO *tso)
}
/* ----------------------------------------------------------------------------
+ Implementation of tryPutMVar#
+
+ NOTE: this should be kept in sync with stg_tryPutMVarzh in PrimOps.cmm
+ ------------------------------------------------------------------------- */
+
+rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value)
+{
+ const StgInfoTable *info;
+ StgMVarTSOQueue *q;
+ StgTSO *tso;
+
+ info = lockClosure((StgClosure*)mvar);
+
+ if (mvar->value != &stg_END_TSO_QUEUE_closure) {
+#if defined(THREADED_RTS)
+ unlockClosure((StgClosure*)mvar, info);
+#endif
+ return rtsFalse;
+ }
+
+ q = mvar->head;
+loop:
+ if (q == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
+ /* No further takes, the MVar is now full. */
+ if (info == &stg_MVAR_CLEAN_info) {
+ dirty_MVAR(&cap->r, (StgClosure*)mvar);
+ }
+
+ mvar->value = value;
+ unlockClosure((StgClosure*)mvar, &stg_MVAR_DIRTY_info);
+ return rtsTrue;
+ }
+ if (q->header.info == &stg_IND_info ||
+ q->header.info == &stg_MSG_NULL_info) {
+ q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee;
+ goto loop;
+ }
+
+ // There are takeMVar(s) waiting: wake up the first one
+ tso = q->tso;
+ mvar->head = q->link;
+ if (mvar->head == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
+ mvar->tail = (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure;
+ }
+
+ ASSERT(tso->block_info.closure == (StgClosure*)mvar);
+ // save why_blocked here, because waking up the thread destroys
+ // this information
+ StgWord why_blocked = tso->why_blocked;
+
+ // actually perform the takeMVar
+ StgStack* stack = tso->stackobj;
+ stack->sp[1] = (W_)value;
+ stack->sp[0] = (W_)&stg_ret_p_info;
+
+ // indicate that the MVar operation has now completed.
+ tso->_link = (StgTSO*)&stg_END_TSO_QUEUE_closure;
+
+ if (stack->dirty == 0) {
+ dirty_STACK(cap, stack);
+ }
+
+ tryWakeupThread(cap, tso);
+
+ // If it was an readMVar, then we can still do work,
+ // so loop back. (XXX: This could take a while)
+ if (why_blocked == BlockedOnMVarRead) {
+ q = ((StgMVarTSOQueue*)q)->link;
+ goto loop;
+ }
+
+ ASSERT(why_blocked == BlockedOnMVar);
+
+ unlockClosure((StgClosure*)mvar, info);
+
+ return rtsTrue;
+}
+
+/* ----------------------------------------------------------------------------
* Debugging: why is a thread blocked
* ------------------------------------------------------------------------- */