diff options
author | Simon Marlow <marlowsd@gmail.com> | 2016-08-30 20:55:10 +0100 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2016-09-12 08:33:24 +0100 |
commit | 454033b54e2f7eef2354cc9d7ae7e7cba4dff09a (patch) | |
tree | 3577ed7b0b42e2acff1502673e1ee474fba31319 /rts/Threads.c | |
parent | 0e7ccf6d233c66b23a60de4e35e039f78ea3e162 (diff) | |
download | haskell-454033b54e2f7eef2354cc9d7ae7e7cba4dff09a.tar.gz |
Add hs_try_putmvar()
Summary:
This is a fast, non-blocking, asynchronous, interface to tryPutMVar that
can be called from C/C++.
It's useful for callback-based C/C++ APIs: the idea is that the callback
invokes hs_try_putmvar(), and the Haskell code waits for the callback to
run by blocking in takeMVar.
The callback doesn't block - this is often a requirement of
callback-based APIs. The callback wakes up the Haskell thread with
minimal overhead and no unnecessary context-switches.
There are a couple of benchmarks in
testsuite/tests/concurrent/should_run. Some example results comparing
hs_try_putmvar() with using a standard foreign export:
./hs_try_putmvar003 1 64 16 100 +RTS -s -N4 0.49s
./hs_try_putmvar003 2 64 16 100 +RTS -s -N4 2.30s
hs_try_putmvar() is 4x faster for this workload (see the source for
hs_try_putmvar003.hs for details of the workload).
An alternative solution is to use the IO Manager for this. We've tried
it, but there are problems with that approach:
* Need to create a new file descriptor for each callback
* The IO Manger thread(s) become a bottleneck
* More potential for things to go wrong, e.g. throwing an exception in
an IO Manager callback kills the IO Manager thread.
Test Plan: validate; new unit tests
Reviewers: niteria, erikd, ezyang, bgamari, austin, hvr
Subscribers: thomie
Differential Revision: https://phabricator.haskell.org/D2501
Diffstat (limited to 'rts/Threads.c')
-rw-r--r-- | rts/Threads.c | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/rts/Threads.c b/rts/Threads.c index 7317249e11..1782da6114 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -744,6 +744,85 @@ threadStackUnderflow (Capability *cap, StgTSO *tso) } /* ---------------------------------------------------------------------------- + Implementation of tryPutMVar# + + NOTE: this should be kept in sync with stg_tryPutMVarzh in PrimOps.cmm + ------------------------------------------------------------------------- */ + +rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value) +{ + const StgInfoTable *info; + StgMVarTSOQueue *q; + StgTSO *tso; + + info = lockClosure((StgClosure*)mvar); + + if (mvar->value != &stg_END_TSO_QUEUE_closure) { +#if defined(THREADED_RTS) + unlockClosure((StgClosure*)mvar, info); +#endif + return rtsFalse; + } + + q = mvar->head; +loop: + if (q == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) { + /* No further takes, the MVar is now full. */ + if (info == &stg_MVAR_CLEAN_info) { + dirty_MVAR(&cap->r, (StgClosure*)mvar); + } + + mvar->value = value; + unlockClosure((StgClosure*)mvar, &stg_MVAR_DIRTY_info); + return rtsTrue; + } + if (q->header.info == &stg_IND_info || + q->header.info == &stg_MSG_NULL_info) { + q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee; + goto loop; + } + + // There are takeMVar(s) waiting: wake up the first one + tso = q->tso; + mvar->head = q->link; + if (mvar->head == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) { + mvar->tail = (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure; + } + + ASSERT(tso->block_info.closure == (StgClosure*)mvar); + // save why_blocked here, because waking up the thread destroys + // this information + StgWord why_blocked = tso->why_blocked; + + // actually perform the takeMVar + StgStack* stack = tso->stackobj; + stack->sp[1] = (W_)value; + stack->sp[0] = (W_)&stg_ret_p_info; + + // indicate that the MVar operation has now completed. + tso->_link = (StgTSO*)&stg_END_TSO_QUEUE_closure; + + if (stack->dirty == 0) { + dirty_STACK(cap, stack); + } + + tryWakeupThread(cap, tso); + + // If it was an readMVar, then we can still do work, + // so loop back. (XXX: This could take a while) + if (why_blocked == BlockedOnMVarRead) { + q = ((StgMVarTSOQueue*)q)->link; + goto loop; + } + + ASSERT(why_blocked == BlockedOnMVar); + + unlockClosure((StgClosure*)mvar, info); + + return rtsTrue; +} + +/* ---------------------------------------------------------------------------- * Debugging: why is a thread blocked * ------------------------------------------------------------------------- */ |