diff options
author | Tamar Christina <tamar@zhox.com> | 2019-06-16 21:54:23 +0100 |
---|---|---|
committer | Ben Gamari <ben@smart-cactus.org> | 2020-07-15 16:41:01 -0400 |
commit | 90e69f779b6da755fac472337535a1321cbb7917 (patch) | |
tree | 935ccfc0e38bfae2133b926347edb51bafecdfa7 /rts | |
parent | 356dc3feae967b1c361130f1f356ef9ad6a693e4 (diff) | |
download | haskell-90e69f779b6da755fac472337535a1321cbb7917.tar.gz |
winio: Add IOPort synchronization primitive
Diffstat (limited to 'rts')
-rw-r--r-- | rts/PrimOps.cmm | 173 | ||||
-rw-r--r-- | rts/RaiseAsync.c | 16 | ||||
-rw-r--r-- | rts/RtsSymbols.c | 3 | ||||
-rw-r--r-- | rts/Schedule.c | 37 | ||||
-rw-r--r-- | rts/Schedule.h | 2 | ||||
-rw-r--r-- | rts/Threads.c | 5 | ||||
-rw-r--r-- | rts/Trace.c | 1 | ||||
-rw-r--r-- | rts/TraverseHeap.c | 1 | ||||
-rw-r--r-- | rts/sm/Compact.c | 1 | ||||
-rw-r--r-- | rts/sm/Sanity.c | 1 | ||||
-rw-r--r-- | rts/sm/Scav.c | 1 |
11 files changed, 231 insertions, 10 deletions
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index e13e89b98c..0b1b1419a1 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -1998,6 +1998,179 @@ stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ ) } /* ----------------------------------------------------------------------------- + * IOPort primitives + * + * readIOPort & writeIOPort work as follows. Firstly, an important invariant: + * + * If the IOPort is full, then the request is silently dropped and the + * message is lost. If the IOPort is empty then the + * blocking queue contains only the thread blocked on IOPort. An IOPort only + * supports a single read and a single write to it. + * + * readIOPort: + * IOPort empty : then add ourselves to the blocking queue + * IOPort full : remove the value from the IOPort, and + * blocking queue empty : return + * blocking queue non-empty : perform the only blocked + * writeIOPort from the queue, and + * wake up the thread + * (IOPort is now empty) + * + * writeIOPort is just the dual of the above algorithm. + * + * How do we "perform a writeIOPort"? Well, By storing the value and prt on the + * stack, same way we do with MVars. Semantically the operations mutate the + * stack the same way so we will re-use the logic and datastructures for MVars + * for IOPort. See stg_block_putmvar and stg_block_takemvar in HeapStackCheck.c + * for the stack layout, and the PerformPut and PerformTake macros below. We + * also re-use the closure types MVAR_CLEAN/_DIRTY for IOPort. + * + * The remaining caveats of MVar thus also apply for an IOPort. The main + * crucial difference between an MVar and IOPort is that the scheduler will not + * be allowed to interrupt a blocked IOPort just because it thinks there's a + * deadlock. This is especially crucial for the non-threaded runtime. + * + * -------------------------------------------------------------------------- */ + +stg_readIOPortzh ( P_ ioport /* :: IOPort a */ ) +{ + W_ val, info, tso, q; + + LOCK_CLOSURE(ioport, info); + + /* If the MVar is empty, put ourselves on the blocked readers + * list and wait until we're woken up. + */ + if (StgMVar_value(ioport) == stg_END_TSO_QUEUE_closure) { + + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", ioport "ptr"); + } + + ALLOC_PRIM_WITH_CUSTOM_FAILURE + (SIZEOF_StgMVarTSOQueue, + unlockClosure(ioport, stg_MVAR_DIRTY_info); + GC_PRIM_P(stg_readIOPortzh, ioport)); + + q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1); + + // readIOPorts are pushed to the front of the queue, so + // they get handled immediately + SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); + StgMVarTSOQueue_link(q) = StgMVar_head(ioport); + StgMVarTSOQueue_tso(q) = CurrentTSO; + + StgTSO__link(CurrentTSO) = q; + StgTSO_block_info(CurrentTSO) = ioport; + StgTSO_why_blocked(CurrentTSO) = BlockedOnIOCompletion::I16; + StgMVar_head(ioport) = q; + + if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(ioport) = q; + } + + jump stg_block_readmvar(ioport); + } + + val = StgMVar_value(ioport); + + unlockClosure(ioport, info); + return (val); +} + +stg_writeIOPortzh ( P_ ioport, /* :: IOPort a */ + P_ val, /* :: a */ ) +{ + W_ info, tso, q; + + LOCK_CLOSURE(ioport, info); + + /* If there is already a value in the queue, then silently ignore the + second put. TODO: Correct usages of IOPort should never have a second + put, so perhaps raise an error instead, but I have no idea how to do this + safely and correctly at this point. */ + if (StgMVar_value(ioport) != stg_END_TSO_QUEUE_closure) { + unlockClosure(ioport, info); + return (0); + } + + q = StgMVar_head(ioport); +loop: + if (q == stg_END_TSO_QUEUE_closure) { + /* No further takes, the IOPort is now full. */ + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", ioport "ptr"); + } + StgMVar_value(ioport) = val; + unlockClosure(ioport, stg_MVAR_DIRTY_info); + return (1); + } + if (StgHeader_info(q) == stg_IND_info || + StgHeader_info(q) == stg_MSG_NULL_info) { + q = StgInd_indirectee(q); + goto loop; + } + + // There are readIOPort(s) waiting: wake up the first one + + tso = StgMVarTSOQueue_tso(q); + StgMVar_head(ioport) = StgMVarTSOQueue_link(q); + if (StgMVar_head(ioport) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure; + } + + ASSERT(StgTSO_block_info(tso) == ioport); + // save why_blocked here, because waking up the thread destroys + // this information + W_ why_blocked; + why_blocked = TO_W_(StgTSO_why_blocked(tso)); + + // actually perform the takeMVar + W_ stack; + stack = StgTSO_stackobj(tso); + PerformTake(stack, val); + + // indicate that the MVar operation has now completed. + StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; + + if (TO_W_(StgStack_dirty(stack)) == 0) { + ccall dirty_STACK(MyCapability() "ptr", stack "ptr"); + } + + ccall tryWakeupThread(MyCapability() "ptr", tso); + + // If it was a readIOPort, then we can still do work, + // so loop back. (XXX: This could take a while) + if (why_blocked == BlockedOnIOCompletion) { + q = StgMVarTSOQueue_link(q); + goto loop; + } + + ASSERT(why_blocked == BlockedOnIOCompletion); + + unlockClosure(ioport, info); + return (1); +} +/* ----------------------------------------------------------------------------- + IOPort primitives + -------------------------------------------------------------------------- */ + +stg_newIOPortzh ( gcptr init ) +{ + W_ ioport; + + ALLOC_PRIM_ (SIZEOF_StgMVar, stg_newIOPortzh); + + ioport = Hp - SIZEOF_StgMVar + WDS(1); + SET_HDR(ioport, stg_MVAR_DIRTY_info,CCCS); + // MVARs start dirty: generation 0 has no mutable list + StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure; + StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure; + StgMVar_value(ioport) = stg_END_TSO_QUEUE_closure; + return (ioport); +} + +/* ----------------------------------------------------------------------------- Stable pointer primitives ------------------------------------------------------------------------- */ diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index e8a6a81747..719c05435d 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -174,11 +174,11 @@ throwToSelf (Capability *cap, StgTSO *tso, StgClosure *exception) - or it is masking exceptions (TSO_BLOCKEX) - Currently, if the target is BlockedOnMVar, BlockedOnSTM, or - BlockedOnBlackHole then we acquire ownership of the TSO by locking - its parent container (e.g. the MVar) and then raise the exception. - We might change these cases to be more message-passing-like in the - future. + Currently, if the target is BlockedOnMVar, BlockedOnSTM, + BlockedOnIOCompletion or BlockedOnBlackHole then we acquire ownership of the + TSO by locking its parent container (e.g. the MVar) and then raise the + exception. We might change these cases to be more message-passing-like in + the future. Returns: @@ -343,6 +343,7 @@ check_target: case BlockedOnMVar: case BlockedOnMVarRead: + case BlockedOnIOCompletion: { /* To establish ownership of this TSO, we need to acquire a @@ -367,7 +368,9 @@ check_target: // we have the MVar, let's check whether the thread // is still blocked on the same MVar. - if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead) + if ((target->why_blocked != BlockedOnMVar + && target->why_blocked != BlockedOnMVarRead + && target->why_blocked != BlockedOnIOCompletion) || (StgMVar *)target->block_info.closure != mvar) { unlockClosure((StgClosure *)mvar, info); goto retry; @@ -679,6 +682,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) case BlockedOnMVar: case BlockedOnMVarRead: + case BlockedOnIOCompletion: removeFromMVarBlockedQueue(tso); goto done; diff --git a/rts/RtsSymbols.c b/rts/RtsSymbols.c index ff32932ea9..0f0bd56c82 100644 --- a/rts/RtsSymbols.c +++ b/rts/RtsSymbols.c @@ -706,6 +706,9 @@ SymI_HasProto(stg_newMVarzh) \ SymI_HasProto(stg_newMutVarzh) \ SymI_HasProto(stg_newTVarzh) \ + SymI_HasProto(stg_readIOPortzh) \ + SymI_HasProto(stg_writeIOPortzh) \ + SymI_HasProto(stg_newIOPortzh) \ SymI_HasProto(stg_noDuplicatezh) \ SymI_HasProto(stg_atomicModifyMutVar2zh) \ SymI_HasProto(stg_atomicModifyMutVarzuzh) \ diff --git a/rts/Schedule.c b/rts/Schedule.c index ce1a1fc060..fab357aa06 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -198,6 +198,7 @@ schedule (Capability *initialCapability, Task *task) bool ready_to_gc; cap = initialCapability; + t = NULL; // Pre-condition: this task owns initialCapability. // The sched_mutex is *NOT* held @@ -301,8 +302,13 @@ schedule (Capability *initialCapability, Task *task) // Additionally, it is not fatal for the // threaded RTS to reach here with no threads to run. // + // Since IOPorts have no deadlock avoidance guarantees you may also reach + // this point when blocked on an IO Port. If this is the case the only + // thing that could unblock it is an I/O event. + // // win32: might be here due to awaitEvent() being abandoned - // as a result of a console event having been delivered. + // as a result of a console event having been delivered or as a result of + // waiting on an async I/O to complete with WinIO. #if defined(THREADED_RTS) scheduleYield(&cap,task); @@ -310,9 +316,23 @@ schedule (Capability *initialCapability, Task *task) if (emptyRunQueue(cap)) continue; // look for work again #endif -#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS) +#if !defined(THREADED_RTS) if ( emptyRunQueue(cap) ) { + /* On the non-threaded RTS if the queue is empty and the last action was + blocked on an I/O completion port, then just wait till we're woken + up by the RTS with more work. */ + if (t && t->why_blocked == BlockedOnIOCompletion) + { + fprintf (stderr, "waiting: %d.\n", t->why_blocked); + awaitEvent (emptyRunQueue(cap)); + fprintf (stderr, "running: %d.\n", t->why_blocked); + continue; + } + continue; + +#if !defined(mingw32_HOST_OS) ASSERT(sched_state >= SCHED_INTERRUPTING); +#endif } #endif @@ -928,6 +948,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task) */ if (recent_activity != ACTIVITY_INACTIVE) return; #endif + if (task->incall->tso->why_blocked == BlockedOnIOCompletion) return; debugTrace(DEBUG_sched, "deadlocked, forcing major GC..."); @@ -980,6 +1001,10 @@ scheduleDetectDeadlock (Capability **pcap, Task *task) throwToSingleThreaded(cap, task->incall->tso, (StgClosure *)nonTermination_closure); return; + case BlockedOnIOCompletion: + /* We're blocked waiting for an external I/O call, let's just + chill for a bit. */ + return; default: barf("deadlock: main thread blocked in a strange way"); } @@ -2676,9 +2701,10 @@ initScheduler(void) sched_state = SCHED_RUNNING; recent_activity = ACTIVITY_YES; -#if defined(THREADED_RTS) + /* Initialise the mutex and condition variables used by * the scheduler. */ +#if defined(THREADED_RTS) initMutex(&sched_mutex); initMutex(&sync_finished_mutex); initCondition(&sync_finished_cond); @@ -3164,6 +3190,11 @@ resurrectThreads (StgTSO *threads) throwToSingleThreaded(cap, tso, (StgClosure *)blockedIndefinitelyOnSTM_closure); break; + case BlockedOnIOCompletion: + /* I/O Ports may not be reachable by the GC as they may be getting + * notified by the RTS. As such this call should be treated as if + * it is masking the exception. */ + continue; case NotBlocked: /* This might happen if the thread was blocked on a black hole * belonging to a thread that we've just woken up (raiseAsync diff --git a/rts/Schedule.h b/rts/Schedule.h index 2d8d813464..89ab6e0b4c 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -176,7 +176,7 @@ pushOnRunQueue (Capability *cap, StgTSO *tso) INLINE_HEADER StgTSO * popRunQueue (Capability *cap) { - ASSERT(cap->n_run_queue != 0); + ASSERT(cap->n_run_queue > 0); StgTSO *t = cap->run_queue_hd; ASSERT(t != END_TSO_QUEUE); cap->run_queue_hd = t->_link; diff --git a/rts/Threads.c b/rts/Threads.c index 22d58bb48b..54c703963e 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -288,6 +288,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso) switch (tso->why_blocked) { + case BlockedOnIOCompletion: case BlockedOnMVar: case BlockedOnMVarRead: { @@ -868,12 +869,16 @@ printThreadBlockage(StgTSO *tso) debugBelch("is blocked until %ld", (long)(tso->block_info.target)); break; #endif + break; case BlockedOnMVar: debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); break; case BlockedOnMVarRead: debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure); break; + case BlockedOnIOCompletion: + debugBelch("is blocked on I/O Completion port @ %p", tso->block_info.closure); + break; case BlockedOnBlackHole: debugBelch("is blocked on a black hole %p", ((StgBlockingQueue*)tso->block_info.bh->bh)); diff --git a/rts/Trace.c b/rts/Trace.c index b35be3c1e7..7a1f0df768 100644 --- a/rts/Trace.c +++ b/rts/Trace.c @@ -164,6 +164,7 @@ static char *thread_stop_reasons[] = { [6 + BlockedOnSTM] = "blocked on STM", [6 + BlockedOnDoProc] = "blocked on asyncDoProc", [6 + BlockedOnCCall] = "blocked on a foreign call", + [6 + BlockedOnIOCompletion] = "blocked on I/O Completion port", [6 + BlockedOnCCall_Interruptible] = "blocked on a foreign call (interruptible)", [6 + BlockedOnMsgThrowTo] = "blocked on throwTo", [6 + ThreadMigrating] = "migrating" diff --git a/rts/TraverseHeap.c b/rts/TraverseHeap.c index 8bf58c11ee..636737aa0f 100644 --- a/rts/TraverseHeap.c +++ b/rts/TraverseHeap.c @@ -1250,6 +1250,7 @@ inner_loop: traversePushClosure(ts, (StgClosure *) tso->trec, c, child_data); if ( tso->why_blocked == BlockedOnMVar || tso->why_blocked == BlockedOnMVarRead + || tso->why_blocked == BlockedOnIOCompletion || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo ) { diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c index 5031c535a1..b1250b77e0 100644 --- a/rts/sm/Compact.c +++ b/rts/sm/Compact.c @@ -461,6 +461,7 @@ thread_TSO (StgTSO *tso) || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo + || tso->why_blocked == BlockedOnIOCompletion || tso->why_blocked == NotBlocked ) { thread_(&tso->block_info.closure); diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c index ea64483418..2329b02016 100644 --- a/rts/sm/Sanity.c +++ b/rts/sm/Sanity.c @@ -618,6 +618,7 @@ checkTSO(StgTSO *tso) || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo + || tso->why_blocked == BlockedOnIOCompletion || tso->why_blocked == NotBlocked ) { ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->block_info.closure)); diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c index 501d958aae..dd9a96adf8 100644 --- a/rts/sm/Scav.c +++ b/rts/sm/Scav.c @@ -129,6 +129,7 @@ scavengeTSO (StgTSO *tso) || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo + || tso->why_blocked == BlockedOnIOCompletion || tso->why_blocked == NotBlocked ) { evacuate(&tso->block_info.closure); |