diff options
author | Andreas Klebinger <klebinger.andreas@gmx.at> | 2020-03-05 17:55:43 +0100 |
---|---|---|
committer | Ben Gamari <ben@smart-cactus.org> | 2020-07-15 16:41:02 -0400 |
commit | 750ebaeec06d7ee118abfbb29142c12fb31730cc (patch) | |
tree | 880d05d448908fb5cced1d8f2d417982119d643e /rts/PrimOps.cmm | |
parent | 6aefdf62b767b7828698c3ec5bf6a15e6e20eddb (diff) | |
download | haskell-750ebaeec06d7ee118abfbb29142c12fb31730cc.tar.gz |
winio: Clean up code surrounding IOPort primitives.
According to phyx these should only be read and written once per
object. Not neccesarily in that order.
To strengthen that guarantee the primitives will now throw an
exception if we violate this invariant.
As a consequence we can eliminate some code from their primops.
In particular code dealing with multiple queued readers/writers
now simply checks the invariant and throws an exception if it
was violated. That is in contrast to mvars which will do things
like wake up all readers, queue multi writers etc.
Diffstat (limited to 'rts/PrimOps.cmm')
-rw-r--r-- | rts/PrimOps.cmm | 97 |
1 files changed, 67 insertions, 30 deletions
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index 0b1b1419a1..e1f4365963 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -31,6 +31,8 @@ import pthread_mutex_unlock; #endif import CLOSURE base_ControlziExceptionziBase_nestedAtomically_closure; import CLOSURE base_GHCziIOziException_heapOverflow_closure; +import CLOSURE base_GHCziIOziException_blockedIndefinitelyOnMVar_closure; +import CLOSURE base_GHCziIOPort_doubleReadException_closure; import AcquireSRWLockExclusive; import ReleaseSRWLockExclusive; import CLOSURE ghczmprim_GHCziTypes_False_closure; @@ -1593,6 +1595,7 @@ stg_takeMVarzh ( P_ mvar /* :: MVar a */ ) SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); // Write barrier before we make the new MVAR_TSO_QUEUE // visible to other cores. + // See Note [Heap memory barriers] prim_write_barrier; if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { @@ -1761,6 +1764,7 @@ stg_putMVarzh ( P_ mvar, /* :: MVar a */ StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); + //See Note [Heap memory barriers] prim_write_barrier; if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { @@ -1943,6 +1947,7 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ ) */ if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { + // Add MVar to mutable list if (info == stg_MVAR_CLEAN_info) { ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", StgMVar_value(mvar)); } @@ -1960,6 +1965,7 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ ) StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); + //See Note [Heap memory barriers] prim_write_barrier; StgTSO__link(CurrentTSO) = q; @@ -2002,10 +2008,8 @@ stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ ) * * readIOPort & writeIOPort work as follows. Firstly, an important invariant: * - * If the IOPort is full, then the request is silently dropped and the - * message is lost. If the IOPort is empty then the - * blocking queue contains only the thread blocked on IOPort. An IOPort only - * supports a single read and a single write to it. + * Only one read and one write is allowed for an IOPort. + * Reading or writing to the same port twice will throw an exception. * * readIOPort: * IOPort empty : then add ourselves to the blocking queue @@ -2030,8 +2034,17 @@ stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ ) * be allowed to interrupt a blocked IOPort just because it thinks there's a * deadlock. This is especially crucial for the non-threaded runtime. * + * To avoid double reads/writes we set only the head when a reader queues up + * on a port. We set the tail to the port itself upon reading. We can do this + * since there can only be one reader/writer for the port. In contrast to MVars + * which do need to keep a list of blocked threads. + * + * This allows us to detect any double uses of IOPorts and throw an exception + * in that case for easier debugging. + * * -------------------------------------------------------------------------- */ + stg_readIOPortzh ( P_ ioport /* :: IOPort a */ ) { W_ val, info, tso, q; @@ -2043,8 +2056,14 @@ stg_readIOPortzh ( P_ ioport /* :: IOPort a */ ) */ if (StgMVar_value(ioport) == stg_END_TSO_QUEUE_closure) { + // There is already another reader, throw exception. + if (StgMVar_head(ioport) != stg_END_TSO_QUEUE_closure) { + unlockClosure(ioport, info); + jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure); + } + if (info == stg_MVAR_CLEAN_info) { - ccall dirty_MVAR(BaseReg "ptr", ioport "ptr"); + ccall dirty_MVAR(BaseReg "ptr", ioport "ptr", StgMVar_value(ioport)); } ALLOC_PRIM_WITH_CUSTOM_FAILURE @@ -2056,20 +2075,31 @@ stg_readIOPortzh ( P_ ioport /* :: IOPort a */ ) // readIOPorts are pushed to the front of the queue, so // they get handled immediately - SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); StgMVarTSOQueue_link(q) = StgMVar_head(ioport); StgMVarTSOQueue_tso(q) = CurrentTSO; + SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); + //See Note [Heap memory barriers] + prim_write_barrier; + + StgMVar_head(ioport) = q; StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = ioport; StgTSO_why_blocked(CurrentTSO) = BlockedOnIOCompletion::I16; - StgMVar_head(ioport) = q; - - if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) { - StgMVar_tail(ioport) = q; - } + //Unlocks the closure as well jump stg_block_readmvar(ioport); + + } + + //Upon reading we set tail = ioport. + //This way we can check of there has been a read already. + if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(ioport) = ioport; + } else { + //Or another thread has read already: Throw an exception. + unlockClosure(ioport, info); + jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure); } val = StgMVar_value(ioport); @@ -2085,23 +2115,31 @@ stg_writeIOPortzh ( P_ ioport, /* :: IOPort a */ LOCK_CLOSURE(ioport, info); - /* If there is already a value in the queue, then silently ignore the - second put. TODO: Correct usages of IOPort should never have a second - put, so perhaps raise an error instead, but I have no idea how to do this - safely and correctly at this point. */ + /* If there is already a value in the port, then raise an exception + as it's the second write. + Correct usages of IOPort should never have a second + write. */ if (StgMVar_value(ioport) != stg_END_TSO_QUEUE_closure) { unlockClosure(ioport, info); + jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure); return (0); } + // We are going to mutate the closure, make sure its current pointers + // are marked. + if (info == stg_MVAR_CLEAN_info) { + ccall update_MVAR(BaseReg "ptr", ioport "ptr", StgMVar_value(ioport) "ptr"); + } + q = StgMVar_head(ioport); loop: if (q == stg_END_TSO_QUEUE_closure) { - /* No further takes, the IOPort is now full. */ + /* No takes, the IOPort is now full. */ if (info == stg_MVAR_CLEAN_info) { ccall dirty_MVAR(BaseReg "ptr", ioport "ptr"); } StgMVar_value(ioport) = val; + unlockClosure(ioport, stg_MVAR_DIRTY_info); return (1); } @@ -2111,13 +2149,12 @@ loop: goto loop; } - // There are readIOPort(s) waiting: wake up the first one - + // There is a readIOPort waiting: wake it up tso = StgMVarTSOQueue_tso(q); - StgMVar_head(ioport) = StgMVarTSOQueue_link(q); - if (StgMVar_head(ioport) == stg_END_TSO_QUEUE_closure) { - StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure; - } + + // In contrast to MVars we do not need to move on to the + // next element in the waiting list here, as there can only ever + // be one thread blocked on a port. ASSERT(StgTSO_block_info(tso) == ioport); // save why_blocked here, because waking up the thread destroys @@ -2130,7 +2167,7 @@ loop: stack = StgTSO_stackobj(tso); PerformTake(stack, val); - // indicate that the MVar operation has now completed. + // indicate that the operation has now completed. StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; if (TO_W_(StgStack_dirty(stack)) == 0) { @@ -2139,13 +2176,12 @@ loop: ccall tryWakeupThread(MyCapability() "ptr", tso); - // If it was a readIOPort, then we can still do work, - // so loop back. (XXX: This could take a while) - if (why_blocked == BlockedOnIOCompletion) { - q = StgMVarTSOQueue_link(q); - goto loop; - } + // For MVars we loop here, waking up all readers. + // IOPorts however can only have on reader. So we are done + // at this point. + //Either there was no reader queued, or he must have been + //blocked on BlockedOnIOCompletion ASSERT(why_blocked == BlockedOnIOCompletion); unlockClosure(ioport, info); @@ -2163,10 +2199,11 @@ stg_newIOPortzh ( gcptr init ) ioport = Hp - SIZEOF_StgMVar + WDS(1); SET_HDR(ioport, stg_MVAR_DIRTY_info,CCCS); - // MVARs start dirty: generation 0 has no mutable list + // MVARs start dirty: generation 0 has no mutable list StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure; StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure; StgMVar_value(ioport) = stg_END_TSO_QUEUE_closure; + return (ioport); } |