diff options
Diffstat (limited to 'rts/PrimOps.cmm')
-rw-r--r-- | rts/PrimOps.cmm | 244 |
1 files changed, 242 insertions, 2 deletions
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index 1fd746edf6..1aa001c953 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -31,8 +31,10 @@ import pthread_mutex_unlock; #endif import CLOSURE base_ControlziExceptionziBase_nestedAtomically_closure; import CLOSURE base_GHCziIOziException_heapOverflow_closure; -import EnterCriticalSection; -import LeaveCriticalSection; +import CLOSURE base_GHCziIOziException_blockedIndefinitelyOnMVar_closure; +import CLOSURE base_GHCziIOPort_doubleReadException_closure; +import AcquireSRWLockExclusive; +import ReleaseSRWLockExclusive; import CLOSURE ghczmprim_GHCziTypes_False_closure; #if defined(PROFILING) import CLOSURE CCS_MAIN; @@ -1593,6 +1595,7 @@ stg_takeMVarzh ( P_ mvar /* :: MVar a */ ) SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); // Write barrier before we make the new MVAR_TSO_QUEUE // visible to other cores. + // See Note [Heap memory barriers] prim_write_barrier; if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { @@ -1761,6 +1764,7 @@ stg_putMVarzh ( P_ mvar, /* :: MVar a */ StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); + //See Note [Heap memory barriers] prim_write_barrier; if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { @@ -1943,6 +1947,7 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ ) */ if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { + // Add MVar to mutable list if (info == stg_MVAR_CLEAN_info) { ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", StgMVar_value(mvar)); } @@ -1960,6 +1965,7 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ ) StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); + //See Note [Heap memory barriers] prim_write_barrier; StgTSO__link(CurrentTSO) = q; @@ -1998,6 +2004,240 @@ stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ ) } /* ----------------------------------------------------------------------------- + * IOPort primitives + * + * readIOPort & writeIOPort work as follows. Firstly, an important invariant: + * + * Only one read and one write is allowed for an IOPort. + * Reading or writing to the same port twice will throw an exception. + * + * readIOPort: + * IOPort empty : then add ourselves to the blocking queue + * IOPort full : remove the value from the IOPort, and + * blocking queue empty : return + * blocking queue non-empty : perform the only blocked + * writeIOPort from the queue, and + * wake up the thread + * (IOPort is now empty) + * + * writeIOPort is just the dual of the above algorithm. + * + * How do we "perform a writeIOPort"? Well, By storing the value and prt on the + * stack, same way we do with MVars. Semantically the operations mutate the + * stack the same way so we will re-use the logic and datastructures for MVars + * for IOPort. See stg_block_putmvar and stg_block_takemvar in HeapStackCheck.c + * for the stack layout, and the PerformPut and PerformTake macros below. We + * also re-use the closure types MVAR_CLEAN/_DIRTY for IOPort. + * + * The remaining caveats of MVar thus also apply for an IOPort. The main + * crucial difference between an MVar and IOPort is that the scheduler will not + * be allowed to interrupt a blocked IOPort just because it thinks there's a + * deadlock. This is especially crucial for the non-threaded runtime. + * + * To avoid double reads/writes we set only the head to a MVarTSOQueue when + * a reader queues up on a port. + * We set the tail to the port itself upon reading. We can do this + * since there can only be one reader/writer for the port. In contrast to MVars + * which do need to keep a list of blocked threads. + * + * This means IOPorts have these valid states and transitions: + * + ┌─────────┐ + │ Empty │ head == tail == value == END_TSO_QUEUE + ├─────────┤ + │ │ + write │ │ read + v v + value != END_TSO_QUEUE ┌─────────┐ ┌─────────┐ value == END_TSO_QUEUE + head == END_TSO_QUEUE │ full │ │ reading │ head == queue with single reader + tail == END_TSO_QUEUE └─────────┘ └─────────┘ tail == END_TSO_QUEUE + │ │ + read │ │ write + │ │ + v v + ┌──────────┐ value != END_TSO_QUEUE + │ Used │ head == END_TSO_QUEUE + └──────────┘ tail == ioport + + * + * -------------------------------------------------------------------------- */ + + +stg_readIOPortzh ( P_ ioport /* :: IOPort a */ ) +{ + W_ val, info, tso, q; + + LOCK_CLOSURE(ioport, info); + + /* If the Port is empty, put ourselves on the blocked readers + * list and wait until we're woken up. + */ + if (StgMVar_value(ioport) == stg_END_TSO_QUEUE_closure) { + + // There is or was already another reader, throw exception. + if (StgMVar_head(ioport) != stg_END_TSO_QUEUE_closure || + StgMVar_tail(ioport) != stg_END_TSO_QUEUE_closure) { + unlockClosure(ioport, info); + jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure); + } + + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", ioport "ptr", StgMVar_value(ioport)); + } + + ALLOC_PRIM_WITH_CUSTOM_FAILURE + (SIZEOF_StgMVarTSOQueue, + unlockClosure(ioport, stg_MVAR_DIRTY_info); + GC_PRIM_P(stg_readIOPortzh, ioport)); + + q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1); + + // link = stg_END_TSO_QUEUE_closure since we check that + // there is no other reader above. + StgMVarTSOQueue_link(q) = stg_END_TSO_QUEUE_closure; + StgMVarTSOQueue_tso(q) = CurrentTSO; + + SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); + //See Note [Heap memory barriers] + prim_write_barrier; + + StgMVar_head(ioport) = q; + StgTSO__link(CurrentTSO) = q; + StgTSO_block_info(CurrentTSO) = ioport; + StgTSO_why_blocked(CurrentTSO) = BlockedOnIOCompletion::I16; + + //Unlocks the closure as well + jump stg_block_readmvar(ioport); + + } + + //This way we can check of there has been a read already. + //Upon reading we set tail to indicate the port is now closed. + if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(ioport) = ioport; + StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure; + } else { + //Or another thread has read already: Throw an exception. + unlockClosure(ioport, info); + jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure); + } + + val = StgMVar_value(ioport); + + unlockClosure(ioport, info); + return (val); +} + +stg_writeIOPortzh ( P_ ioport, /* :: IOPort a */ + P_ val, /* :: a */ ) +{ + W_ info, tso, q; + + LOCK_CLOSURE(ioport, info); + + /* If there is already a value in the port, then raise an exception + as it's the second write. + Correct usages of IOPort should never have a second + write. */ + if (StgMVar_value(ioport) != stg_END_TSO_QUEUE_closure) { + unlockClosure(ioport, info); + jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure); + return (0); + } + + // We are going to mutate the closure, make sure its current pointers + // are marked. + if (info == stg_MVAR_CLEAN_info) { + ccall update_MVAR(BaseReg "ptr", ioport "ptr", StgMVar_value(ioport) "ptr"); + } + + q = StgMVar_head(ioport); +loop: + if (q == stg_END_TSO_QUEUE_closure) { + /* No takes, the IOPort is now full. */ + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", ioport "ptr"); + } + StgMVar_value(ioport) = val; + + unlockClosure(ioport, stg_MVAR_DIRTY_info); + return (1); + } + //Possibly IND added by removeFromMVarBlockedQueue + if (StgHeader_info(q) == stg_IND_info || + StgHeader_info(q) == stg_MSG_NULL_info) { + q = StgInd_indirectee(q); + goto loop; + } + + // There is a readIOPort waiting: wake it up + tso = StgMVarTSOQueue_tso(q); + + // Assert no read has happened yet. + ASSERT(StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure); + // And there is only one reader queued up. + ASSERT(StgMVarTSOQueue_link(q) == stg_END_TSO_QUEUE_closure); + + // We perform the read here, so set tail/head accordingly. + StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure; + StgMVar_tail(ioport) = ioport; + + // In contrast to MVars we do not need to move on to the + // next element in the waiting list here, as there can only ever + // be one thread blocked on a port. + + ASSERT(StgTSO_block_info(tso) == ioport); + // save why_blocked here, because waking up the thread destroys + // this information + W_ why_blocked; + why_blocked = TO_W_(StgTSO_why_blocked(tso)); + + // actually perform the takeMVar + W_ stack; + stack = StgTSO_stackobj(tso); + PerformTake(stack, val); + + // indicate that the operation has now completed. + StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; + + if (TO_W_(StgStack_dirty(stack)) == 0) { + ccall dirty_STACK(MyCapability() "ptr", stack "ptr"); + } + + ccall tryWakeupThread(MyCapability() "ptr", tso); + + // For MVars we loop here, waking up all readers. + // IOPorts however can only have on reader. So we are done + // at this point. + + //Either there was no reader queued, or he must have been + //blocked on BlockedOnIOCompletion + ASSERT(why_blocked == BlockedOnIOCompletion); + + unlockClosure(ioport, info); + return (1); +} +/* ----------------------------------------------------------------------------- + IOPort primitives + -------------------------------------------------------------------------- */ + +stg_newIOPortzh ( gcptr init ) +{ + W_ ioport; + + ALLOC_PRIM_ (SIZEOF_StgMVar, stg_newIOPortzh); + + ioport = Hp - SIZEOF_StgMVar + WDS(1); + SET_HDR(ioport, stg_MVAR_DIRTY_info,CCCS); + // MVARs start dirty: generation 0 has no mutable list + StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure; + StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure; + StgMVar_value(ioport) = stg_END_TSO_QUEUE_closure; + + return (ioport); +} + +/* ----------------------------------------------------------------------------- Stable pointer primitives ------------------------------------------------------------------------- */ |