summaryrefslogtreecommitdiff
path: root/rts/Schedule.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2016-08-30 20:55:10 +0100
committerSimon Marlow <marlowsd@gmail.com>2016-09-12 08:33:24 +0100
commit454033b54e2f7eef2354cc9d7ae7e7cba4dff09a (patch)
tree3577ed7b0b42e2acff1502673e1ee474fba31319 /rts/Schedule.c
parent0e7ccf6d233c66b23a60de4e35e039f78ea3e162 (diff)
downloadhaskell-454033b54e2f7eef2354cc9d7ae7e7cba4dff09a.tar.gz
Add hs_try_putmvar()
Summary: This is a fast, non-blocking, asynchronous, interface to tryPutMVar that can be called from C/C++. It's useful for callback-based C/C++ APIs: the idea is that the callback invokes hs_try_putmvar(), and the Haskell code waits for the callback to run by blocking in takeMVar. The callback doesn't block - this is often a requirement of callback-based APIs. The callback wakes up the Haskell thread with minimal overhead and no unnecessary context-switches. There are a couple of benchmarks in testsuite/tests/concurrent/should_run. Some example results comparing hs_try_putmvar() with using a standard foreign export: ./hs_try_putmvar003 1 64 16 100 +RTS -s -N4 0.49s ./hs_try_putmvar003 2 64 16 100 +RTS -s -N4 2.30s hs_try_putmvar() is 4x faster for this workload (see the source for hs_try_putmvar003.hs for details of the workload). An alternative solution is to use the IO Manager for this. We've tried it, but there are problems with that approach: * Need to create a new file descriptor for each callback * The IO Manger thread(s) become a bottleneck * More potential for things to go wrong, e.g. throwing an exception in an IO Manager callback kills the IO Manager thread. Test Plan: validate; new unit tests Reviewers: niteria, erikd, ezyang, bgamari, austin, hvr Subscribers: thomie Differential Revision: https://phabricator.haskell.org/D2501
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r--rts/Schedule.c15
1 files changed, 14 insertions, 1 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 544b9c2115..611d70411f 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -723,7 +723,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
|| cap0->n_returning_tasks != 0
- || cap0->inbox != (Message*)END_TSO_QUEUE) {
+ || !emptyInbox(cap0)) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
releaseCapability(cap0);
@@ -982,6 +982,7 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
Message *m, *next;
+ PutMVar *p, *pnext;
int r;
Capability *cap = *pcap;
@@ -1006,7 +1007,9 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
if (r != 0) return;
m = cap->inbox;
+ p = cap->putMVars;
cap->inbox = (Message*)END_TSO_QUEUE;
+ cap->putMVars = NULL;
RELEASE_LOCK(&cap->lock);
@@ -1015,10 +1018,20 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
executeMessage(cap, m);
m = next;
}
+
+ while (p != NULL) {
+ pnext = p->link;
+ performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
+ Unit_closure);
+ freeStablePtr(p->mvar);
+ stgFree(p);
+ p = pnext;
+ }
}
#endif
}
+
/* ----------------------------------------------------------------------------
* Activate spark threads (THREADED_RTS)
* ------------------------------------------------------------------------- */