diff options
author | Simon Marlow <marlowsd@gmail.com> | 2016-08-30 20:55:10 +0100 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2016-09-12 08:33:24 +0100 |
commit | 454033b54e2f7eef2354cc9d7ae7e7cba4dff09a (patch) | |
tree | 3577ed7b0b42e2acff1502673e1ee474fba31319 /rts/Schedule.c | |
parent | 0e7ccf6d233c66b23a60de4e35e039f78ea3e162 (diff) | |
download | haskell-454033b54e2f7eef2354cc9d7ae7e7cba4dff09a.tar.gz |
Add hs_try_putmvar()
Summary:
This is a fast, non-blocking, asynchronous, interface to tryPutMVar that
can be called from C/C++.
It's useful for callback-based C/C++ APIs: the idea is that the callback
invokes hs_try_putmvar(), and the Haskell code waits for the callback to
run by blocking in takeMVar.
The callback doesn't block - this is often a requirement of
callback-based APIs. The callback wakes up the Haskell thread with
minimal overhead and no unnecessary context-switches.
There are a couple of benchmarks in
testsuite/tests/concurrent/should_run. Some example results comparing
hs_try_putmvar() with using a standard foreign export:
./hs_try_putmvar003 1 64 16 100 +RTS -s -N4 0.49s
./hs_try_putmvar003 2 64 16 100 +RTS -s -N4 2.30s
hs_try_putmvar() is 4x faster for this workload (see the source for
hs_try_putmvar003.hs for details of the workload).
An alternative solution is to use the IO Manager for this. We've tried
it, but there are problems with that approach:
* Need to create a new file descriptor for each callback
* The IO Manger thread(s) become a bottleneck
* More potential for things to go wrong, e.g. throwing an exception in
an IO Manager callback kills the IO Manager thread.
Test Plan: validate; new unit tests
Reviewers: niteria, erikd, ezyang, bgamari, austin, hvr
Subscribers: thomie
Differential Revision: https://phabricator.haskell.org/D2501
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r-- | rts/Schedule.c | 15 |
1 files changed, 14 insertions, 1 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c index 544b9c2115..611d70411f 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -723,7 +723,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) { if (!emptyRunQueue(cap0) || cap0->n_returning_tasks != 0 - || cap0->inbox != (Message*)END_TSO_QUEUE) { + || !emptyInbox(cap0)) { // it already has some work, we just grabbed it at // the wrong moment. Or maybe it's deadlocked! releaseCapability(cap0); @@ -982,6 +982,7 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS) { #if defined(THREADED_RTS) Message *m, *next; + PutMVar *p, *pnext; int r; Capability *cap = *pcap; @@ -1006,7 +1007,9 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS) if (r != 0) return; m = cap->inbox; + p = cap->putMVars; cap->inbox = (Message*)END_TSO_QUEUE; + cap->putMVars = NULL; RELEASE_LOCK(&cap->lock); @@ -1015,10 +1018,20 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS) executeMessage(cap, m); m = next; } + + while (p != NULL) { + pnext = p->link; + performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar), + Unit_closure); + freeStablePtr(p->mvar); + stgFree(p); + p = pnext; + } } #endif } + /* ---------------------------------------------------------------------------- * Activate spark threads (THREADED_RTS) * ------------------------------------------------------------------------- */ |