summaryrefslogtreecommitdiff
path: root/rts/PrimOps.cmm
diff options
context:
space:
mode:
Diffstat (limited to 'rts/PrimOps.cmm')
-rw-r--r--rts/PrimOps.cmm244
1 files changed, 242 insertions, 2 deletions
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index 1fd746edf6..1aa001c953 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -31,8 +31,10 @@ import pthread_mutex_unlock;
#endif
import CLOSURE base_ControlziExceptionziBase_nestedAtomically_closure;
import CLOSURE base_GHCziIOziException_heapOverflow_closure;
-import EnterCriticalSection;
-import LeaveCriticalSection;
+import CLOSURE base_GHCziIOziException_blockedIndefinitelyOnMVar_closure;
+import CLOSURE base_GHCziIOPort_doubleReadException_closure;
+import AcquireSRWLockExclusive;
+import ReleaseSRWLockExclusive;
import CLOSURE ghczmprim_GHCziTypes_False_closure;
#if defined(PROFILING)
import CLOSURE CCS_MAIN;
@@ -1593,6 +1595,7 @@ stg_takeMVarzh ( P_ mvar /* :: MVar a */ )
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
// Write barrier before we make the new MVAR_TSO_QUEUE
// visible to other cores.
+ // See Note [Heap memory barriers]
prim_write_barrier;
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
@@ -1761,6 +1764,7 @@ stg_putMVarzh ( P_ mvar, /* :: MVar a */
StgMVarTSOQueue_tso(q) = CurrentTSO;
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+ //See Note [Heap memory barriers]
prim_write_barrier;
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
@@ -1943,6 +1947,7 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ )
*/
if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {
+ // Add MVar to mutable list
if (info == stg_MVAR_CLEAN_info) {
ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", StgMVar_value(mvar));
}
@@ -1960,6 +1965,7 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ )
StgMVarTSOQueue_tso(q) = CurrentTSO;
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+ //See Note [Heap memory barriers]
prim_write_barrier;
StgTSO__link(CurrentTSO) = q;
@@ -1998,6 +2004,240 @@ stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ )
}
/* -----------------------------------------------------------------------------
+ * IOPort primitives
+ *
+ * readIOPort & writeIOPort work as follows. Firstly, an important invariant:
+ *
+ * Only one read and one write is allowed for an IOPort.
+ * Reading or writing to the same port twice will throw an exception.
+ *
+ * readIOPort:
+ * IOPort empty : then add ourselves to the blocking queue
+ * IOPort full : remove the value from the IOPort, and
+ * blocking queue empty : return
+ * blocking queue non-empty : perform the only blocked
+ * writeIOPort from the queue, and
+ * wake up the thread
+ * (IOPort is now empty)
+ *
+ * writeIOPort is just the dual of the above algorithm.
+ *
+ * How do we "perform a writeIOPort"? Well, By storing the value and prt on the
+ * stack, same way we do with MVars. Semantically the operations mutate the
+ * stack the same way so we will re-use the logic and datastructures for MVars
+ * for IOPort. See stg_block_putmvar and stg_block_takemvar in HeapStackCheck.c
+ * for the stack layout, and the PerformPut and PerformTake macros below. We
+ * also re-use the closure types MVAR_CLEAN/_DIRTY for IOPort.
+ *
+ * The remaining caveats of MVar thus also apply for an IOPort. The main
+ * crucial difference between an MVar and IOPort is that the scheduler will not
+ * be allowed to interrupt a blocked IOPort just because it thinks there's a
+ * deadlock. This is especially crucial for the non-threaded runtime.
+ *
+ * To avoid double reads/writes we set only the head to a MVarTSOQueue when
+ * a reader queues up on a port.
+ * We set the tail to the port itself upon reading. We can do this
+ * since there can only be one reader/writer for the port. In contrast to MVars
+ * which do need to keep a list of blocked threads.
+ *
+ * This means IOPorts have these valid states and transitions:
+ *
+ ┌─────────┐
+ │ Empty │ head == tail == value == END_TSO_QUEUE
+ ├─────────┤
+ │ │
+ write │ │ read
+ v v
+ value != END_TSO_QUEUE ┌─────────┐ ┌─────────┐ value == END_TSO_QUEUE
+ head == END_TSO_QUEUE │ full │ │ reading │ head == queue with single reader
+ tail == END_TSO_QUEUE └─────────┘ └─────────┘ tail == END_TSO_QUEUE
+ │ │
+ read │ │ write
+ │ │
+ v v
+ ┌──────────┐ value != END_TSO_QUEUE
+ │ Used │ head == END_TSO_QUEUE
+ └──────────┘ tail == ioport
+
+ *
+ * -------------------------------------------------------------------------- */
+
+
+stg_readIOPortzh ( P_ ioport /* :: IOPort a */ )
+{
+ W_ val, info, tso, q;
+
+ LOCK_CLOSURE(ioport, info);
+
+ /* If the Port is empty, put ourselves on the blocked readers
+ * list and wait until we're woken up.
+ */
+ if (StgMVar_value(ioport) == stg_END_TSO_QUEUE_closure) {
+
+ // There is or was already another reader, throw exception.
+ if (StgMVar_head(ioport) != stg_END_TSO_QUEUE_closure ||
+ StgMVar_tail(ioport) != stg_END_TSO_QUEUE_closure) {
+ unlockClosure(ioport, info);
+ jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure);
+ }
+
+ if (info == stg_MVAR_CLEAN_info) {
+ ccall dirty_MVAR(BaseReg "ptr", ioport "ptr", StgMVar_value(ioport));
+ }
+
+ ALLOC_PRIM_WITH_CUSTOM_FAILURE
+ (SIZEOF_StgMVarTSOQueue,
+ unlockClosure(ioport, stg_MVAR_DIRTY_info);
+ GC_PRIM_P(stg_readIOPortzh, ioport));
+
+ q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
+
+ // link = stg_END_TSO_QUEUE_closure since we check that
+ // there is no other reader above.
+ StgMVarTSOQueue_link(q) = stg_END_TSO_QUEUE_closure;
+ StgMVarTSOQueue_tso(q) = CurrentTSO;
+
+ SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+ //See Note [Heap memory barriers]
+ prim_write_barrier;
+
+ StgMVar_head(ioport) = q;
+ StgTSO__link(CurrentTSO) = q;
+ StgTSO_block_info(CurrentTSO) = ioport;
+ StgTSO_why_blocked(CurrentTSO) = BlockedOnIOCompletion::I16;
+
+ //Unlocks the closure as well
+ jump stg_block_readmvar(ioport);
+
+ }
+
+ //This way we can check of there has been a read already.
+ //Upon reading we set tail to indicate the port is now closed.
+ if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(ioport) = ioport;
+ StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure;
+ } else {
+ //Or another thread has read already: Throw an exception.
+ unlockClosure(ioport, info);
+ jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure);
+ }
+
+ val = StgMVar_value(ioport);
+
+ unlockClosure(ioport, info);
+ return (val);
+}
+
+stg_writeIOPortzh ( P_ ioport, /* :: IOPort a */
+ P_ val, /* :: a */ )
+{
+ W_ info, tso, q;
+
+ LOCK_CLOSURE(ioport, info);
+
+ /* If there is already a value in the port, then raise an exception
+ as it's the second write.
+ Correct usages of IOPort should never have a second
+ write. */
+ if (StgMVar_value(ioport) != stg_END_TSO_QUEUE_closure) {
+ unlockClosure(ioport, info);
+ jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure);
+ return (0);
+ }
+
+ // We are going to mutate the closure, make sure its current pointers
+ // are marked.
+ if (info == stg_MVAR_CLEAN_info) {
+ ccall update_MVAR(BaseReg "ptr", ioport "ptr", StgMVar_value(ioport) "ptr");
+ }
+
+ q = StgMVar_head(ioport);
+loop:
+ if (q == stg_END_TSO_QUEUE_closure) {
+ /* No takes, the IOPort is now full. */
+ if (info == stg_MVAR_CLEAN_info) {
+ ccall dirty_MVAR(BaseReg "ptr", ioport "ptr");
+ }
+ StgMVar_value(ioport) = val;
+
+ unlockClosure(ioport, stg_MVAR_DIRTY_info);
+ return (1);
+ }
+ //Possibly IND added by removeFromMVarBlockedQueue
+ if (StgHeader_info(q) == stg_IND_info ||
+ StgHeader_info(q) == stg_MSG_NULL_info) {
+ q = StgInd_indirectee(q);
+ goto loop;
+ }
+
+ // There is a readIOPort waiting: wake it up
+ tso = StgMVarTSOQueue_tso(q);
+
+ // Assert no read has happened yet.
+ ASSERT(StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure);
+ // And there is only one reader queued up.
+ ASSERT(StgMVarTSOQueue_link(q) == stg_END_TSO_QUEUE_closure);
+
+ // We perform the read here, so set tail/head accordingly.
+ StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure;
+ StgMVar_tail(ioport) = ioport;
+
+ // In contrast to MVars we do not need to move on to the
+ // next element in the waiting list here, as there can only ever
+ // be one thread blocked on a port.
+
+ ASSERT(StgTSO_block_info(tso) == ioport);
+ // save why_blocked here, because waking up the thread destroys
+ // this information
+ W_ why_blocked;
+ why_blocked = TO_W_(StgTSO_why_blocked(tso));
+
+ // actually perform the takeMVar
+ W_ stack;
+ stack = StgTSO_stackobj(tso);
+ PerformTake(stack, val);
+
+ // indicate that the operation has now completed.
+ StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
+
+ if (TO_W_(StgStack_dirty(stack)) == 0) {
+ ccall dirty_STACK(MyCapability() "ptr", stack "ptr");
+ }
+
+ ccall tryWakeupThread(MyCapability() "ptr", tso);
+
+ // For MVars we loop here, waking up all readers.
+ // IOPorts however can only have on reader. So we are done
+ // at this point.
+
+ //Either there was no reader queued, or he must have been
+ //blocked on BlockedOnIOCompletion
+ ASSERT(why_blocked == BlockedOnIOCompletion);
+
+ unlockClosure(ioport, info);
+ return (1);
+}
+/* -----------------------------------------------------------------------------
+ IOPort primitives
+ -------------------------------------------------------------------------- */
+
+stg_newIOPortzh ( gcptr init )
+{
+ W_ ioport;
+
+ ALLOC_PRIM_ (SIZEOF_StgMVar, stg_newIOPortzh);
+
+ ioport = Hp - SIZEOF_StgMVar + WDS(1);
+ SET_HDR(ioport, stg_MVAR_DIRTY_info,CCCS);
+ // MVARs start dirty: generation 0 has no mutable list
+ StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure;
+ StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure;
+ StgMVar_value(ioport) = stg_END_TSO_QUEUE_closure;
+
+ return (ioport);
+}
+
+/* -----------------------------------------------------------------------------
Stable pointer primitives
------------------------------------------------------------------------- */