diff options
author | Tamar Christina <tamar@zhox.com> | 2019-06-16 21:54:23 +0100 |
---|---|---|
committer | Ben Gamari <ben@smart-cactus.org> | 2020-07-15 16:41:01 -0400 |
commit | 90e69f779b6da755fac472337535a1321cbb7917 (patch) | |
tree | 935ccfc0e38bfae2133b926347edb51bafecdfa7 /rts/PrimOps.cmm | |
parent | 356dc3feae967b1c361130f1f356ef9ad6a693e4 (diff) | |
download | haskell-90e69f779b6da755fac472337535a1321cbb7917.tar.gz |
winio: Add IOPort synchronization primitive
Diffstat (limited to 'rts/PrimOps.cmm')
-rw-r--r-- | rts/PrimOps.cmm | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index e13e89b98c..0b1b1419a1 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -1998,6 +1998,179 @@ stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ ) } /* ----------------------------------------------------------------------------- + * IOPort primitives + * + * readIOPort & writeIOPort work as follows. Firstly, an important invariant: + * + * If the IOPort is full, then the request is silently dropped and the + * message is lost. If the IOPort is empty then the + * blocking queue contains only the thread blocked on IOPort. An IOPort only + * supports a single read and a single write to it. + * + * readIOPort: + * IOPort empty : then add ourselves to the blocking queue + * IOPort full : remove the value from the IOPort, and + * blocking queue empty : return + * blocking queue non-empty : perform the only blocked + * writeIOPort from the queue, and + * wake up the thread + * (IOPort is now empty) + * + * writeIOPort is just the dual of the above algorithm. + * + * How do we "perform a writeIOPort"? Well, By storing the value and prt on the + * stack, same way we do with MVars. Semantically the operations mutate the + * stack the same way so we will re-use the logic and datastructures for MVars + * for IOPort. See stg_block_putmvar and stg_block_takemvar in HeapStackCheck.c + * for the stack layout, and the PerformPut and PerformTake macros below. We + * also re-use the closure types MVAR_CLEAN/_DIRTY for IOPort. + * + * The remaining caveats of MVar thus also apply for an IOPort. The main + * crucial difference between an MVar and IOPort is that the scheduler will not + * be allowed to interrupt a blocked IOPort just because it thinks there's a + * deadlock. This is especially crucial for the non-threaded runtime. + * + * -------------------------------------------------------------------------- */ + +stg_readIOPortzh ( P_ ioport /* :: IOPort a */ ) +{ + W_ val, info, tso, q; + + LOCK_CLOSURE(ioport, info); + + /* If the MVar is empty, put ourselves on the blocked readers + * list and wait until we're woken up. + */ + if (StgMVar_value(ioport) == stg_END_TSO_QUEUE_closure) { + + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", ioport "ptr"); + } + + ALLOC_PRIM_WITH_CUSTOM_FAILURE + (SIZEOF_StgMVarTSOQueue, + unlockClosure(ioport, stg_MVAR_DIRTY_info); + GC_PRIM_P(stg_readIOPortzh, ioport)); + + q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1); + + // readIOPorts are pushed to the front of the queue, so + // they get handled immediately + SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); + StgMVarTSOQueue_link(q) = StgMVar_head(ioport); + StgMVarTSOQueue_tso(q) = CurrentTSO; + + StgTSO__link(CurrentTSO) = q; + StgTSO_block_info(CurrentTSO) = ioport; + StgTSO_why_blocked(CurrentTSO) = BlockedOnIOCompletion::I16; + StgMVar_head(ioport) = q; + + if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(ioport) = q; + } + + jump stg_block_readmvar(ioport); + } + + val = StgMVar_value(ioport); + + unlockClosure(ioport, info); + return (val); +} + +stg_writeIOPortzh ( P_ ioport, /* :: IOPort a */ + P_ val, /* :: a */ ) +{ + W_ info, tso, q; + + LOCK_CLOSURE(ioport, info); + + /* If there is already a value in the queue, then silently ignore the + second put. TODO: Correct usages of IOPort should never have a second + put, so perhaps raise an error instead, but I have no idea how to do this + safely and correctly at this point. */ + if (StgMVar_value(ioport) != stg_END_TSO_QUEUE_closure) { + unlockClosure(ioport, info); + return (0); + } + + q = StgMVar_head(ioport); +loop: + if (q == stg_END_TSO_QUEUE_closure) { + /* No further takes, the IOPort is now full. */ + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", ioport "ptr"); + } + StgMVar_value(ioport) = val; + unlockClosure(ioport, stg_MVAR_DIRTY_info); + return (1); + } + if (StgHeader_info(q) == stg_IND_info || + StgHeader_info(q) == stg_MSG_NULL_info) { + q = StgInd_indirectee(q); + goto loop; + } + + // There are readIOPort(s) waiting: wake up the first one + + tso = StgMVarTSOQueue_tso(q); + StgMVar_head(ioport) = StgMVarTSOQueue_link(q); + if (StgMVar_head(ioport) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure; + } + + ASSERT(StgTSO_block_info(tso) == ioport); + // save why_blocked here, because waking up the thread destroys + // this information + W_ why_blocked; + why_blocked = TO_W_(StgTSO_why_blocked(tso)); + + // actually perform the takeMVar + W_ stack; + stack = StgTSO_stackobj(tso); + PerformTake(stack, val); + + // indicate that the MVar operation has now completed. + StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; + + if (TO_W_(StgStack_dirty(stack)) == 0) { + ccall dirty_STACK(MyCapability() "ptr", stack "ptr"); + } + + ccall tryWakeupThread(MyCapability() "ptr", tso); + + // If it was a readIOPort, then we can still do work, + // so loop back. (XXX: This could take a while) + if (why_blocked == BlockedOnIOCompletion) { + q = StgMVarTSOQueue_link(q); + goto loop; + } + + ASSERT(why_blocked == BlockedOnIOCompletion); + + unlockClosure(ioport, info); + return (1); +} +/* ----------------------------------------------------------------------------- + IOPort primitives + -------------------------------------------------------------------------- */ + +stg_newIOPortzh ( gcptr init ) +{ + W_ ioport; + + ALLOC_PRIM_ (SIZEOF_StgMVar, stg_newIOPortzh); + + ioport = Hp - SIZEOF_StgMVar + WDS(1); + SET_HDR(ioport, stg_MVAR_DIRTY_info,CCCS); + // MVARs start dirty: generation 0 has no mutable list + StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure; + StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure; + StgMVar_value(ioport) = stg_END_TSO_QUEUE_closure; + return (ioport); +} + +/* ----------------------------------------------------------------------------- Stable pointer primitives ------------------------------------------------------------------------- */ |