summaryrefslogtreecommitdiff
path: root/rts
diff options
context:
space:
mode:
authorTamar Christina <tamar@zhox.com>2019-06-16 21:54:23 +0100
committerBen Gamari <ben@smart-cactus.org>2020-07-15 16:41:01 -0400
commit90e69f779b6da755fac472337535a1321cbb7917 (patch)
tree935ccfc0e38bfae2133b926347edb51bafecdfa7 /rts
parent356dc3feae967b1c361130f1f356ef9ad6a693e4 (diff)
downloadhaskell-90e69f779b6da755fac472337535a1321cbb7917.tar.gz
winio: Add IOPort synchronization primitive
Diffstat (limited to 'rts')
-rw-r--r--rts/PrimOps.cmm173
-rw-r--r--rts/RaiseAsync.c16
-rw-r--r--rts/RtsSymbols.c3
-rw-r--r--rts/Schedule.c37
-rw-r--r--rts/Schedule.h2
-rw-r--r--rts/Threads.c5
-rw-r--r--rts/Trace.c1
-rw-r--r--rts/TraverseHeap.c1
-rw-r--r--rts/sm/Compact.c1
-rw-r--r--rts/sm/Sanity.c1
-rw-r--r--rts/sm/Scav.c1
11 files changed, 231 insertions, 10 deletions
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index e13e89b98c..0b1b1419a1 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -1998,6 +1998,179 @@ stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ )
}
/* -----------------------------------------------------------------------------
+ * IOPort primitives
+ *
+ * readIOPort & writeIOPort work as follows. Firstly, an important invariant:
+ *
+ * If the IOPort is full, then the request is silently dropped and the
+ * message is lost. If the IOPort is empty then the
+ * blocking queue contains only the thread blocked on IOPort. An IOPort only
+ * supports a single read and a single write to it.
+ *
+ * readIOPort:
+ * IOPort empty : then add ourselves to the blocking queue
+ * IOPort full : remove the value from the IOPort, and
+ * blocking queue empty : return
+ * blocking queue non-empty : perform the only blocked
+ * writeIOPort from the queue, and
+ * wake up the thread
+ * (IOPort is now empty)
+ *
+ * writeIOPort is just the dual of the above algorithm.
+ *
+ * How do we "perform a writeIOPort"? Well, By storing the value and prt on the
+ * stack, same way we do with MVars. Semantically the operations mutate the
+ * stack the same way so we will re-use the logic and datastructures for MVars
+ * for IOPort. See stg_block_putmvar and stg_block_takemvar in HeapStackCheck.c
+ * for the stack layout, and the PerformPut and PerformTake macros below. We
+ * also re-use the closure types MVAR_CLEAN/_DIRTY for IOPort.
+ *
+ * The remaining caveats of MVar thus also apply for an IOPort. The main
+ * crucial difference between an MVar and IOPort is that the scheduler will not
+ * be allowed to interrupt a blocked IOPort just because it thinks there's a
+ * deadlock. This is especially crucial for the non-threaded runtime.
+ *
+ * -------------------------------------------------------------------------- */
+
+stg_readIOPortzh ( P_ ioport /* :: IOPort a */ )
+{
+ W_ val, info, tso, q;
+
+ LOCK_CLOSURE(ioport, info);
+
+ /* If the MVar is empty, put ourselves on the blocked readers
+ * list and wait until we're woken up.
+ */
+ if (StgMVar_value(ioport) == stg_END_TSO_QUEUE_closure) {
+
+ if (info == stg_MVAR_CLEAN_info) {
+ ccall dirty_MVAR(BaseReg "ptr", ioport "ptr");
+ }
+
+ ALLOC_PRIM_WITH_CUSTOM_FAILURE
+ (SIZEOF_StgMVarTSOQueue,
+ unlockClosure(ioport, stg_MVAR_DIRTY_info);
+ GC_PRIM_P(stg_readIOPortzh, ioport));
+
+ q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
+
+ // readIOPorts are pushed to the front of the queue, so
+ // they get handled immediately
+ SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+ StgMVarTSOQueue_link(q) = StgMVar_head(ioport);
+ StgMVarTSOQueue_tso(q) = CurrentTSO;
+
+ StgTSO__link(CurrentTSO) = q;
+ StgTSO_block_info(CurrentTSO) = ioport;
+ StgTSO_why_blocked(CurrentTSO) = BlockedOnIOCompletion::I16;
+ StgMVar_head(ioport) = q;
+
+ if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(ioport) = q;
+ }
+
+ jump stg_block_readmvar(ioport);
+ }
+
+ val = StgMVar_value(ioport);
+
+ unlockClosure(ioport, info);
+ return (val);
+}
+
+stg_writeIOPortzh ( P_ ioport, /* :: IOPort a */
+ P_ val, /* :: a */ )
+{
+ W_ info, tso, q;
+
+ LOCK_CLOSURE(ioport, info);
+
+ /* If there is already a value in the queue, then silently ignore the
+ second put. TODO: Correct usages of IOPort should never have a second
+ put, so perhaps raise an error instead, but I have no idea how to do this
+ safely and correctly at this point. */
+ if (StgMVar_value(ioport) != stg_END_TSO_QUEUE_closure) {
+ unlockClosure(ioport, info);
+ return (0);
+ }
+
+ q = StgMVar_head(ioport);
+loop:
+ if (q == stg_END_TSO_QUEUE_closure) {
+ /* No further takes, the IOPort is now full. */
+ if (info == stg_MVAR_CLEAN_info) {
+ ccall dirty_MVAR(BaseReg "ptr", ioport "ptr");
+ }
+ StgMVar_value(ioport) = val;
+ unlockClosure(ioport, stg_MVAR_DIRTY_info);
+ return (1);
+ }
+ if (StgHeader_info(q) == stg_IND_info ||
+ StgHeader_info(q) == stg_MSG_NULL_info) {
+ q = StgInd_indirectee(q);
+ goto loop;
+ }
+
+ // There are readIOPort(s) waiting: wake up the first one
+
+ tso = StgMVarTSOQueue_tso(q);
+ StgMVar_head(ioport) = StgMVarTSOQueue_link(q);
+ if (StgMVar_head(ioport) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure;
+ }
+
+ ASSERT(StgTSO_block_info(tso) == ioport);
+ // save why_blocked here, because waking up the thread destroys
+ // this information
+ W_ why_blocked;
+ why_blocked = TO_W_(StgTSO_why_blocked(tso));
+
+ // actually perform the takeMVar
+ W_ stack;
+ stack = StgTSO_stackobj(tso);
+ PerformTake(stack, val);
+
+ // indicate that the MVar operation has now completed.
+ StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
+
+ if (TO_W_(StgStack_dirty(stack)) == 0) {
+ ccall dirty_STACK(MyCapability() "ptr", stack "ptr");
+ }
+
+ ccall tryWakeupThread(MyCapability() "ptr", tso);
+
+ // If it was a readIOPort, then we can still do work,
+ // so loop back. (XXX: This could take a while)
+ if (why_blocked == BlockedOnIOCompletion) {
+ q = StgMVarTSOQueue_link(q);
+ goto loop;
+ }
+
+ ASSERT(why_blocked == BlockedOnIOCompletion);
+
+ unlockClosure(ioport, info);
+ return (1);
+}
+/* -----------------------------------------------------------------------------
+ IOPort primitives
+ -------------------------------------------------------------------------- */
+
+stg_newIOPortzh ( gcptr init )
+{
+ W_ ioport;
+
+ ALLOC_PRIM_ (SIZEOF_StgMVar, stg_newIOPortzh);
+
+ ioport = Hp - SIZEOF_StgMVar + WDS(1);
+ SET_HDR(ioport, stg_MVAR_DIRTY_info,CCCS);
+ // MVARs start dirty: generation 0 has no mutable list
+ StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure;
+ StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure;
+ StgMVar_value(ioport) = stg_END_TSO_QUEUE_closure;
+ return (ioport);
+}
+
+/* -----------------------------------------------------------------------------
Stable pointer primitives
------------------------------------------------------------------------- */
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index e8a6a81747..719c05435d 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -174,11 +174,11 @@ throwToSelf (Capability *cap, StgTSO *tso, StgClosure *exception)
- or it is masking exceptions (TSO_BLOCKEX)
- Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
- BlockedOnBlackHole then we acquire ownership of the TSO by locking
- its parent container (e.g. the MVar) and then raise the exception.
- We might change these cases to be more message-passing-like in the
- future.
+ Currently, if the target is BlockedOnMVar, BlockedOnSTM,
+ BlockedOnIOCompletion or BlockedOnBlackHole then we acquire ownership of the
+ TSO by locking its parent container (e.g. the MVar) and then raise the
+ exception. We might change these cases to be more message-passing-like in
+ the future.
Returns:
@@ -343,6 +343,7 @@ check_target:
case BlockedOnMVar:
case BlockedOnMVarRead:
+ case BlockedOnIOCompletion:
{
/*
To establish ownership of this TSO, we need to acquire a
@@ -367,7 +368,9 @@ check_target:
// we have the MVar, let's check whether the thread
// is still blocked on the same MVar.
- if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
+ if ((target->why_blocked != BlockedOnMVar
+ && target->why_blocked != BlockedOnMVarRead
+ && target->why_blocked != BlockedOnIOCompletion)
|| (StgMVar *)target->block_info.closure != mvar) {
unlockClosure((StgClosure *)mvar, info);
goto retry;
@@ -679,6 +682,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
case BlockedOnMVar:
case BlockedOnMVarRead:
+ case BlockedOnIOCompletion:
removeFromMVarBlockedQueue(tso);
goto done;
diff --git a/rts/RtsSymbols.c b/rts/RtsSymbols.c
index ff32932ea9..0f0bd56c82 100644
--- a/rts/RtsSymbols.c
+++ b/rts/RtsSymbols.c
@@ -706,6 +706,9 @@
SymI_HasProto(stg_newMVarzh) \
SymI_HasProto(stg_newMutVarzh) \
SymI_HasProto(stg_newTVarzh) \
+ SymI_HasProto(stg_readIOPortzh) \
+ SymI_HasProto(stg_writeIOPortzh) \
+ SymI_HasProto(stg_newIOPortzh) \
SymI_HasProto(stg_noDuplicatezh) \
SymI_HasProto(stg_atomicModifyMutVar2zh) \
SymI_HasProto(stg_atomicModifyMutVarzuzh) \
diff --git a/rts/Schedule.c b/rts/Schedule.c
index ce1a1fc060..fab357aa06 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -198,6 +198,7 @@ schedule (Capability *initialCapability, Task *task)
bool ready_to_gc;
cap = initialCapability;
+ t = NULL;
// Pre-condition: this task owns initialCapability.
// The sched_mutex is *NOT* held
@@ -301,8 +302,13 @@ schedule (Capability *initialCapability, Task *task)
// Additionally, it is not fatal for the
// threaded RTS to reach here with no threads to run.
//
+ // Since IOPorts have no deadlock avoidance guarantees you may also reach
+ // this point when blocked on an IO Port. If this is the case the only
+ // thing that could unblock it is an I/O event.
+ //
// win32: might be here due to awaitEvent() being abandoned
- // as a result of a console event having been delivered.
+ // as a result of a console event having been delivered or as a result of
+ // waiting on an async I/O to complete with WinIO.
#if defined(THREADED_RTS)
scheduleYield(&cap,task);
@@ -310,9 +316,23 @@ schedule (Capability *initialCapability, Task *task)
if (emptyRunQueue(cap)) continue; // look for work again
#endif
-#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
+#if !defined(THREADED_RTS)
if ( emptyRunQueue(cap) ) {
+ /* On the non-threaded RTS if the queue is empty and the last action was
+ blocked on an I/O completion port, then just wait till we're woken
+ up by the RTS with more work. */
+ if (t && t->why_blocked == BlockedOnIOCompletion)
+ {
+ fprintf (stderr, "waiting: %d.\n", t->why_blocked);
+ awaitEvent (emptyRunQueue(cap));
+ fprintf (stderr, "running: %d.\n", t->why_blocked);
+ continue;
+ }
+ continue;
+
+#if !defined(mingw32_HOST_OS)
ASSERT(sched_state >= SCHED_INTERRUPTING);
+#endif
}
#endif
@@ -928,6 +948,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
*/
if (recent_activity != ACTIVITY_INACTIVE) return;
#endif
+ if (task->incall->tso->why_blocked == BlockedOnIOCompletion) return;
debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
@@ -980,6 +1001,10 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
throwToSingleThreaded(cap, task->incall->tso,
(StgClosure *)nonTermination_closure);
return;
+ case BlockedOnIOCompletion:
+ /* We're blocked waiting for an external I/O call, let's just
+ chill for a bit. */
+ return;
default:
barf("deadlock: main thread blocked in a strange way");
}
@@ -2676,9 +2701,10 @@ initScheduler(void)
sched_state = SCHED_RUNNING;
recent_activity = ACTIVITY_YES;
-#if defined(THREADED_RTS)
+
/* Initialise the mutex and condition variables used by
* the scheduler. */
+#if defined(THREADED_RTS)
initMutex(&sched_mutex);
initMutex(&sync_finished_mutex);
initCondition(&sync_finished_cond);
@@ -3164,6 +3190,11 @@ resurrectThreads (StgTSO *threads)
throwToSingleThreaded(cap, tso,
(StgClosure *)blockedIndefinitelyOnSTM_closure);
break;
+ case BlockedOnIOCompletion:
+ /* I/O Ports may not be reachable by the GC as they may be getting
+ * notified by the RTS. As such this call should be treated as if
+ * it is masking the exception. */
+ continue;
case NotBlocked:
/* This might happen if the thread was blocked on a black hole
* belonging to a thread that we've just woken up (raiseAsync
diff --git a/rts/Schedule.h b/rts/Schedule.h
index 2d8d813464..89ab6e0b4c 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -176,7 +176,7 @@ pushOnRunQueue (Capability *cap, StgTSO *tso)
INLINE_HEADER StgTSO *
popRunQueue (Capability *cap)
{
- ASSERT(cap->n_run_queue != 0);
+ ASSERT(cap->n_run_queue > 0);
StgTSO *t = cap->run_queue_hd;
ASSERT(t != END_TSO_QUEUE);
cap->run_queue_hd = t->_link;
diff --git a/rts/Threads.c b/rts/Threads.c
index 22d58bb48b..54c703963e 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -288,6 +288,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
switch (tso->why_blocked)
{
+ case BlockedOnIOCompletion:
case BlockedOnMVar:
case BlockedOnMVarRead:
{
@@ -868,12 +869,16 @@ printThreadBlockage(StgTSO *tso)
debugBelch("is blocked until %ld", (long)(tso->block_info.target));
break;
#endif
+ break;
case BlockedOnMVar:
debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
case BlockedOnMVarRead:
debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure);
break;
+ case BlockedOnIOCompletion:
+ debugBelch("is blocked on I/O Completion port @ %p", tso->block_info.closure);
+ break;
case BlockedOnBlackHole:
debugBelch("is blocked on a black hole %p",
((StgBlockingQueue*)tso->block_info.bh->bh));
diff --git a/rts/Trace.c b/rts/Trace.c
index b35be3c1e7..7a1f0df768 100644
--- a/rts/Trace.c
+++ b/rts/Trace.c
@@ -164,6 +164,7 @@ static char *thread_stop_reasons[] = {
[6 + BlockedOnSTM] = "blocked on STM",
[6 + BlockedOnDoProc] = "blocked on asyncDoProc",
[6 + BlockedOnCCall] = "blocked on a foreign call",
+ [6 + BlockedOnIOCompletion] = "blocked on I/O Completion port",
[6 + BlockedOnCCall_Interruptible] = "blocked on a foreign call (interruptible)",
[6 + BlockedOnMsgThrowTo] = "blocked on throwTo",
[6 + ThreadMigrating] = "migrating"
diff --git a/rts/TraverseHeap.c b/rts/TraverseHeap.c
index 8bf58c11ee..636737aa0f 100644
--- a/rts/TraverseHeap.c
+++ b/rts/TraverseHeap.c
@@ -1250,6 +1250,7 @@ inner_loop:
traversePushClosure(ts, (StgClosure *) tso->trec, c, child_data);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnMVarRead
+ || tso->why_blocked == BlockedOnIOCompletion
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
) {
diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c
index 5031c535a1..b1250b77e0 100644
--- a/rts/sm/Compact.c
+++ b/rts/sm/Compact.c
@@ -461,6 +461,7 @@ thread_TSO (StgTSO *tso)
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
+ || tso->why_blocked == BlockedOnIOCompletion
|| tso->why_blocked == NotBlocked
) {
thread_(&tso->block_info.closure);
diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c
index ea64483418..2329b02016 100644
--- a/rts/sm/Sanity.c
+++ b/rts/sm/Sanity.c
@@ -618,6 +618,7 @@ checkTSO(StgTSO *tso)
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
+ || tso->why_blocked == BlockedOnIOCompletion
|| tso->why_blocked == NotBlocked
) {
ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->block_info.closure));
diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
index 501d958aae..dd9a96adf8 100644
--- a/rts/sm/Scav.c
+++ b/rts/sm/Scav.c
@@ -129,6 +129,7 @@ scavengeTSO (StgTSO *tso)
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
+ || tso->why_blocked == BlockedOnIOCompletion
|| tso->why_blocked == NotBlocked
) {
evacuate(&tso->block_info.closure);