summaryrefslogtreecommitdiff
path: root/rts
diff options
context:
space:
mode:
Diffstat (limited to 'rts')
-rw-r--r--rts/Capability.c1
-rw-r--r--rts/Capability.h13
-rw-r--r--rts/Prelude.h2
-rw-r--r--rts/PrimOps.cmm8
-rw-r--r--rts/RtsAPI.c75
-rw-r--r--rts/Schedule.c15
-rw-r--r--rts/Task.c11
-rw-r--r--rts/Task.h10
-rw-r--r--rts/Threads.c79
-rw-r--r--rts/Threads.h2
-rw-r--r--rts/package.conf.in2
11 files changed, 208 insertions, 10 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index 681797b409..6979c637bc 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -266,6 +266,7 @@ initCapability (Capability *cap, uint32_t i)
cap->returning_tasks_tl = NULL;
cap->n_returning_tasks = 0;
cap->inbox = (Message*)END_TSO_QUEUE;
+ cap->putMVars = NULL;
cap->sparks = allocSparkPool();
cap->spark_stats.created = 0;
cap->spark_stats.dud = 0;
diff --git a/rts/Capability.h b/rts/Capability.h
index 8e0288b15e..bbf026279f 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -123,6 +123,7 @@ struct Capability_ {
// returning_tasks_{hd,tl}
// wakeup_queue
// inbox
+ // putMVars
Mutex lock;
// Tasks waiting to return from a foreign call, or waiting to make
@@ -138,6 +139,10 @@ struct Capability_ {
// Locks required: cap->lock
Message *inbox;
+ // putMVars are really messages, but they're allocated with malloc() so they
+ // can't go on the inbox queue: the GC would get confused.
+ struct PutMVar_ *putMVars;
+
SparkPool *sparks;
// Stats on spark creation/conversion
@@ -378,6 +383,11 @@ extern uint32_t numa_map[MAX_NUMA_NODES];
Messages
-------------------------------------------------------------------------- */
+typedef struct PutMVar_ {
+ StgStablePtr mvar;
+ struct PutMVar_ *link;
+} PutMVar;
+
#ifdef THREADED_RTS
INLINE_HEADER rtsBool emptyInbox(Capability *cap);
@@ -459,7 +469,8 @@ contextSwitchCapability (Capability *cap)
INLINE_HEADER rtsBool emptyInbox(Capability *cap)
{
- return (cap->inbox == (Message*)END_TSO_QUEUE);
+ return (cap->inbox == (Message*)END_TSO_QUEUE &&
+ cap->putMVars == NULL);
}
#endif
diff --git a/rts/Prelude.h b/rts/Prelude.h
index ae1e9cb266..58de23013b 100644
--- a/rts/Prelude.h
+++ b/rts/Prelude.h
@@ -24,6 +24,7 @@
* modules these names are defined in.
*/
+PRELUDE_CLOSURE(ghczmprim_GHCziTuple_Z0T_closure);
PRELUDE_CLOSURE(ghczmprim_GHCziTypes_True_closure);
PRELUDE_CLOSURE(ghczmprim_GHCziTypes_False_closure);
PRELUDE_CLOSURE(base_GHCziPack_unpackCString_closure);
@@ -87,6 +88,7 @@ PRELUDE_INFO(base_GHCziWord_W64zh_con_info);
PRELUDE_INFO(base_GHCziStable_StablePtr_static_info);
PRELUDE_INFO(base_GHCziStable_StablePtr_con_info);
+#define Unit_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTuple_Z0T_closure)
#define True_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_True_closure)
#define False_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_False_closure)
#define unpackCString_closure DLL_IMPORT_DATA_REF(base_GHCziPack_unpackCString_closure)
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index b468c33df6..02a7dafec3 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -1739,6 +1739,13 @@ loop:
}
+// NOTE: there is another implementation of this function in
+// Threads.c:performTryPutMVar(). Keep them in sync! It was
+// measurably slower to call the C function from here (70% for a
+// tight loop doing tryPutMVar#).
+//
+// TODO: we could kill the duplication by making tryPutMVar# into an
+// inline primop that expands into a C call to performTryPutMVar().
stg_tryPutMVarzh ( P_ mvar, /* :: MVar a */
P_ val, /* :: a */ )
{
@@ -1812,6 +1819,7 @@ loop:
return (1);
}
+
stg_readMVarzh ( P_ mvar, /* :: MVar a */ )
{
W_ val, info, tso, q;
diff --git a/rts/RtsAPI.c b/rts/RtsAPI.c
index 34d68c7e3b..e72430743e 100644
--- a/rts/RtsAPI.c
+++ b/rts/RtsAPI.c
@@ -16,6 +16,7 @@
#include "Schedule.h"
#include "Capability.h"
#include "Stable.h"
+#include "Threads.h"
#include "Weak.h"
/* ----------------------------------------------------------------------------
@@ -620,3 +621,77 @@ void rts_done (void)
freeMyTask();
}
+/* -----------------------------------------------------------------------------
+ tryPutMVar from outside Haskell
+
+ The C call
+
+ hs_try_putmvar(cap, mvar)
+
+ is equivalent to the Haskell call
+
+ tryPutMVar mvar ()
+
+ but it is
+
+ * non-blocking: takes a bounded, short, amount of time
+ * asynchronous: the actual putMVar may be performed after the
+ call returns. That's why hs_try_putmvar() doesn't return a
+ result to say whether the put succeeded.
+
+ NOTE: this call transfers ownership of the StablePtr to the RTS, which will
+ free it after the tryPutMVar has taken place. The reason is that otherwise,
+ it would be very difficult for the caller to arrange to free the StablePtr
+ in all circumstances.
+
+ For more details, see the section "Waking up Haskell threads from C" in the
+ User's Guide.
+ -------------------------------------------------------------------------- */
+
+void hs_try_putmvar (/* in */ int capability,
+ /* in */ HsStablePtr mvar)
+{
+ Task *task = getTask();
+ Capability *cap;
+
+ if (capability < 0) {
+ capability = task->preferred_capability;
+ if (capability < 0) {
+ capability = 0;
+ }
+ }
+ cap = capabilities[capability % enabled_capabilities];
+
+#if !defined(THREADED_RTS)
+
+ performTryPutMVar(cap, (StgMVar*)deRefStablePtr(mvar), Unit_closure);
+ freeStablePtr(mvar);
+
+#else
+
+ ACQUIRE_LOCK(&cap->lock);
+ // If the capability is free, we can perform the tryPutMVar immediately
+ if (cap->running_task == NULL) {
+ cap->running_task = task;
+ task->cap = cap;
+ RELEASE_LOCK(&cap->lock);
+
+ performTryPutMVar(cap, (StgMVar*)deRefStablePtr(mvar), Unit_closure);
+
+ freeStablePtr(mvar);
+
+ // Wake up the capability, which will start running the thread that we
+ // just awoke (if there was one).
+ releaseCapability(cap);
+ } else {
+ PutMVar *p = stgMallocBytes(sizeof(PutMVar),"hs_try_putmvar");
+ // We cannot deref the StablePtr if we don't have a capability,
+ // so we have to store it and deref it later.
+ p->mvar = mvar;
+ p->link = cap->putMVars;
+ cap->putMVars = p;
+ RELEASE_LOCK(&cap->lock);
+ }
+
+#endif
+}
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 544b9c2115..611d70411f 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -723,7 +723,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
|| cap0->n_returning_tasks != 0
- || cap0->inbox != (Message*)END_TSO_QUEUE) {
+ || !emptyInbox(cap0)) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
releaseCapability(cap0);
@@ -982,6 +982,7 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
Message *m, *next;
+ PutMVar *p, *pnext;
int r;
Capability *cap = *pcap;
@@ -1006,7 +1007,9 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
if (r != 0) return;
m = cap->inbox;
+ p = cap->putMVars;
cap->inbox = (Message*)END_TSO_QUEUE;
+ cap->putMVars = NULL;
RELEASE_LOCK(&cap->lock);
@@ -1015,10 +1018,20 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
executeMessage(cap, m);
m = next;
}
+
+ while (p != NULL) {
+ pnext = p->link;
+ performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
+ Unit_closure);
+ freeStablePtr(p->mvar);
+ stgFree(p);
+ p = pnext;
+ }
}
#endif
}
+
/* ----------------------------------------------------------------------------
* Activate spark threads (THREADED_RTS)
* ------------------------------------------------------------------------- */
diff --git a/rts/Task.c b/rts/Task.c
index 9a658e019c..253520f909 100644
--- a/rts/Task.c
+++ b/rts/Task.c
@@ -36,7 +36,6 @@ uint32_t peakWorkerCount;
static int tasksInitialized = 0;
static void freeTask (Task *task);
-static Task * allocTask (void);
static Task * newTask (rtsBool);
#if defined(THREADED_RTS)
@@ -117,8 +116,7 @@ freeTaskManager (void)
return tasksRunning;
}
-static Task *
-allocTask (void)
+Task* getTask (void)
{
Task *task;
@@ -209,7 +207,7 @@ newTask (rtsBool worker)
task->cap = NULL;
task->worker = worker;
- task->stopped = rtsFalse;
+ task->stopped = rtsTrue;
task->running_finalizers = rtsFalse;
task->n_spare_incalls = 0;
task->spare_incalls = NULL;
@@ -304,7 +302,7 @@ newBoundTask (void)
stg_exit(EXIT_FAILURE);
}
- task = allocTask();
+ task = getTask();
task->stopped = rtsFalse;
@@ -452,6 +450,7 @@ startWorkerTask (Capability *cap)
// A worker always gets a fresh Task structure.
task = newTask(rtsTrue);
+ task->stopped = rtsFalse;
// The lock here is to synchronise with taskStart(), to make sure
// that we have finished setting up the Task structure before the
@@ -499,7 +498,7 @@ void rts_setInCallCapability (
int preferred_capability,
int affinity USED_IF_THREADS)
{
- Task *task = allocTask();
+ Task *task = getTask();
task->preferred_capability = preferred_capability;
#ifdef THREADED_RTS
diff --git a/rts/Task.h b/rts/Task.h
index 558f543fac..93234591ba 100644
--- a/rts/Task.h
+++ b/rts/Task.h
@@ -150,7 +150,8 @@ typedef struct Task_ {
struct InCall_ *spare_incalls;
rtsBool worker; // == rtsTrue if this is a worker Task
- rtsBool stopped; // this task has stopped or exited Haskell
+ rtsBool stopped; // == rtsTrue between newBoundTask and
+ // boundTaskExiting, or in a worker Task.
// So that we can detect when a finalizer illegally calls back into Haskell
rtsBool running_finalizers;
@@ -205,7 +206,12 @@ uint32_t freeTaskManager (void);
// thread-local storage and will remain even after boundTaskExiting()
// has been called; to free the memory, see freeMyTask().
//
-Task *newBoundTask (void);
+Task* newBoundTask (void);
+
+// Return the current OS thread's Task, which is created if it doesn't already
+// exist. After you have finished using RTS APIs, you should call freeMyTask()
+// to release this thread's Task.
+Task* getTask (void);
// The current task is a bound task that is exiting.
//
diff --git a/rts/Threads.c b/rts/Threads.c
index 7317249e11..1782da6114 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -744,6 +744,85 @@ threadStackUnderflow (Capability *cap, StgTSO *tso)
}
/* ----------------------------------------------------------------------------
+ Implementation of tryPutMVar#
+
+ NOTE: this should be kept in sync with stg_tryPutMVarzh in PrimOps.cmm
+ ------------------------------------------------------------------------- */
+
+rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value)
+{
+ const StgInfoTable *info;
+ StgMVarTSOQueue *q;
+ StgTSO *tso;
+
+ info = lockClosure((StgClosure*)mvar);
+
+ if (mvar->value != &stg_END_TSO_QUEUE_closure) {
+#if defined(THREADED_RTS)
+ unlockClosure((StgClosure*)mvar, info);
+#endif
+ return rtsFalse;
+ }
+
+ q = mvar->head;
+loop:
+ if (q == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
+ /* No further takes, the MVar is now full. */
+ if (info == &stg_MVAR_CLEAN_info) {
+ dirty_MVAR(&cap->r, (StgClosure*)mvar);
+ }
+
+ mvar->value = value;
+ unlockClosure((StgClosure*)mvar, &stg_MVAR_DIRTY_info);
+ return rtsTrue;
+ }
+ if (q->header.info == &stg_IND_info ||
+ q->header.info == &stg_MSG_NULL_info) {
+ q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee;
+ goto loop;
+ }
+
+ // There are takeMVar(s) waiting: wake up the first one
+ tso = q->tso;
+ mvar->head = q->link;
+ if (mvar->head == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
+ mvar->tail = (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure;
+ }
+
+ ASSERT(tso->block_info.closure == (StgClosure*)mvar);
+ // save why_blocked here, because waking up the thread destroys
+ // this information
+ StgWord why_blocked = tso->why_blocked;
+
+ // actually perform the takeMVar
+ StgStack* stack = tso->stackobj;
+ stack->sp[1] = (W_)value;
+ stack->sp[0] = (W_)&stg_ret_p_info;
+
+ // indicate that the MVar operation has now completed.
+ tso->_link = (StgTSO*)&stg_END_TSO_QUEUE_closure;
+
+ if (stack->dirty == 0) {
+ dirty_STACK(cap, stack);
+ }
+
+ tryWakeupThread(cap, tso);
+
+ // If it was an readMVar, then we can still do work,
+ // so loop back. (XXX: This could take a while)
+ if (why_blocked == BlockedOnMVarRead) {
+ q = ((StgMVarTSOQueue*)q)->link;
+ goto loop;
+ }
+
+ ASSERT(why_blocked == BlockedOnMVar);
+
+ unlockClosure((StgClosure*)mvar, info);
+
+ return rtsTrue;
+}
+
+/* ----------------------------------------------------------------------------
* Debugging: why is a thread blocked
* ------------------------------------------------------------------------- */
diff --git a/rts/Threads.h b/rts/Threads.h
index 01c493e53d..4588008e28 100644
--- a/rts/Threads.h
+++ b/rts/Threads.h
@@ -41,6 +41,8 @@ StgBool isThreadBound (StgTSO* tso);
void threadStackOverflow (Capability *cap, StgTSO *tso);
W_ threadStackUnderflow (Capability *cap, StgTSO *tso);
+rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value);
+
#ifdef DEBUG
void printThreadBlockage (StgTSO *tso);
void printThreadStatus (StgTSO *t);
diff --git a/rts/package.conf.in b/rts/package.conf.in
index 65aa5c333b..03848c45e3 100644
--- a/rts/package.conf.in
+++ b/rts/package.conf.in
@@ -103,6 +103,7 @@ ld-options:
, "-Wl,-u,_base_GHCziPtr_Ptr_con_info"
, "-Wl,-u,_base_GHCziPtr_FunPtr_con_info"
, "-Wl,-u,_base_GHCziStable_StablePtr_con_info"
+ , "-Wl,-u,_ghczmprim_GHCziTuple_Z0T_closure"
, "-Wl,-u,_ghczmprim_GHCziTypes_False_closure"
, "-Wl,-u,_ghczmprim_GHCziTypes_True_closure"
, "-Wl,-u,_base_GHCziPack_unpackCString_closure"
@@ -199,6 +200,7 @@ ld-options:
, "-Wl,-u,base_GHCziPtr_Ptr_con_info"
, "-Wl,-u,base_GHCziPtr_FunPtr_con_info"
, "-Wl,-u,base_GHCziStable_StablePtr_con_info"
+ , "-Wl,-u,ghczmprim_GHCziTuple_Z0T_closure"
, "-Wl,-u,ghczmprim_GHCziTypes_False_closure"
, "-Wl,-u,ghczmprim_GHCziTypes_True_closure"
, "-Wl,-u,base_GHCziPack_unpackCString_closure"