summaryrefslogtreecommitdiff
path: root/rts/PrimOps.cmm
diff options
context:
space:
mode:
authorTamar Christina <tamar@zhox.com>2019-06-16 21:54:23 +0100
committerBen Gamari <ben@smart-cactus.org>2020-07-15 16:41:01 -0400
commit90e69f779b6da755fac472337535a1321cbb7917 (patch)
tree935ccfc0e38bfae2133b926347edb51bafecdfa7 /rts/PrimOps.cmm
parent356dc3feae967b1c361130f1f356ef9ad6a693e4 (diff)
downloadhaskell-90e69f779b6da755fac472337535a1321cbb7917.tar.gz
winio: Add IOPort synchronization primitive
Diffstat (limited to 'rts/PrimOps.cmm')
-rw-r--r--rts/PrimOps.cmm173
1 files changed, 173 insertions, 0 deletions
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index e13e89b98c..0b1b1419a1 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -1998,6 +1998,179 @@ stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ )
}
/* -----------------------------------------------------------------------------
+ * IOPort primitives
+ *
+ * readIOPort & writeIOPort work as follows. Firstly, an important invariant:
+ *
+ * If the IOPort is full, then the request is silently dropped and the
+ * message is lost. If the IOPort is empty then the
+ * blocking queue contains only the thread blocked on IOPort. An IOPort only
+ * supports a single read and a single write to it.
+ *
+ * readIOPort:
+ * IOPort empty : then add ourselves to the blocking queue
+ * IOPort full : remove the value from the IOPort, and
+ * blocking queue empty : return
+ * blocking queue non-empty : perform the only blocked
+ * writeIOPort from the queue, and
+ * wake up the thread
+ * (IOPort is now empty)
+ *
+ * writeIOPort is just the dual of the above algorithm.
+ *
+ * How do we "perform a writeIOPort"? Well, By storing the value and prt on the
+ * stack, same way we do with MVars. Semantically the operations mutate the
+ * stack the same way so we will re-use the logic and datastructures for MVars
+ * for IOPort. See stg_block_putmvar and stg_block_takemvar in HeapStackCheck.c
+ * for the stack layout, and the PerformPut and PerformTake macros below. We
+ * also re-use the closure types MVAR_CLEAN/_DIRTY for IOPort.
+ *
+ * The remaining caveats of MVar thus also apply for an IOPort. The main
+ * crucial difference between an MVar and IOPort is that the scheduler will not
+ * be allowed to interrupt a blocked IOPort just because it thinks there's a
+ * deadlock. This is especially crucial for the non-threaded runtime.
+ *
+ * -------------------------------------------------------------------------- */
+
+stg_readIOPortzh ( P_ ioport /* :: IOPort a */ )
+{
+ W_ val, info, tso, q;
+
+ LOCK_CLOSURE(ioport, info);
+
+ /* If the MVar is empty, put ourselves on the blocked readers
+ * list and wait until we're woken up.
+ */
+ if (StgMVar_value(ioport) == stg_END_TSO_QUEUE_closure) {
+
+ if (info == stg_MVAR_CLEAN_info) {
+ ccall dirty_MVAR(BaseReg "ptr", ioport "ptr");
+ }
+
+ ALLOC_PRIM_WITH_CUSTOM_FAILURE
+ (SIZEOF_StgMVarTSOQueue,
+ unlockClosure(ioport, stg_MVAR_DIRTY_info);
+ GC_PRIM_P(stg_readIOPortzh, ioport));
+
+ q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
+
+ // readIOPorts are pushed to the front of the queue, so
+ // they get handled immediately
+ SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+ StgMVarTSOQueue_link(q) = StgMVar_head(ioport);
+ StgMVarTSOQueue_tso(q) = CurrentTSO;
+
+ StgTSO__link(CurrentTSO) = q;
+ StgTSO_block_info(CurrentTSO) = ioport;
+ StgTSO_why_blocked(CurrentTSO) = BlockedOnIOCompletion::I16;
+ StgMVar_head(ioport) = q;
+
+ if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(ioport) = q;
+ }
+
+ jump stg_block_readmvar(ioport);
+ }
+
+ val = StgMVar_value(ioport);
+
+ unlockClosure(ioport, info);
+ return (val);
+}
+
+stg_writeIOPortzh ( P_ ioport, /* :: IOPort a */
+ P_ val, /* :: a */ )
+{
+ W_ info, tso, q;
+
+ LOCK_CLOSURE(ioport, info);
+
+ /* If there is already a value in the queue, then silently ignore the
+ second put. TODO: Correct usages of IOPort should never have a second
+ put, so perhaps raise an error instead, but I have no idea how to do this
+ safely and correctly at this point. */
+ if (StgMVar_value(ioport) != stg_END_TSO_QUEUE_closure) {
+ unlockClosure(ioport, info);
+ return (0);
+ }
+
+ q = StgMVar_head(ioport);
+loop:
+ if (q == stg_END_TSO_QUEUE_closure) {
+ /* No further takes, the IOPort is now full. */
+ if (info == stg_MVAR_CLEAN_info) {
+ ccall dirty_MVAR(BaseReg "ptr", ioport "ptr");
+ }
+ StgMVar_value(ioport) = val;
+ unlockClosure(ioport, stg_MVAR_DIRTY_info);
+ return (1);
+ }
+ if (StgHeader_info(q) == stg_IND_info ||
+ StgHeader_info(q) == stg_MSG_NULL_info) {
+ q = StgInd_indirectee(q);
+ goto loop;
+ }
+
+ // There are readIOPort(s) waiting: wake up the first one
+
+ tso = StgMVarTSOQueue_tso(q);
+ StgMVar_head(ioport) = StgMVarTSOQueue_link(q);
+ if (StgMVar_head(ioport) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure;
+ }
+
+ ASSERT(StgTSO_block_info(tso) == ioport);
+ // save why_blocked here, because waking up the thread destroys
+ // this information
+ W_ why_blocked;
+ why_blocked = TO_W_(StgTSO_why_blocked(tso));
+
+ // actually perform the takeMVar
+ W_ stack;
+ stack = StgTSO_stackobj(tso);
+ PerformTake(stack, val);
+
+ // indicate that the MVar operation has now completed.
+ StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
+
+ if (TO_W_(StgStack_dirty(stack)) == 0) {
+ ccall dirty_STACK(MyCapability() "ptr", stack "ptr");
+ }
+
+ ccall tryWakeupThread(MyCapability() "ptr", tso);
+
+ // If it was a readIOPort, then we can still do work,
+ // so loop back. (XXX: This could take a while)
+ if (why_blocked == BlockedOnIOCompletion) {
+ q = StgMVarTSOQueue_link(q);
+ goto loop;
+ }
+
+ ASSERT(why_blocked == BlockedOnIOCompletion);
+
+ unlockClosure(ioport, info);
+ return (1);
+}
+/* -----------------------------------------------------------------------------
+ IOPort primitives
+ -------------------------------------------------------------------------- */
+
+stg_newIOPortzh ( gcptr init )
+{
+ W_ ioport;
+
+ ALLOC_PRIM_ (SIZEOF_StgMVar, stg_newIOPortzh);
+
+ ioport = Hp - SIZEOF_StgMVar + WDS(1);
+ SET_HDR(ioport, stg_MVAR_DIRTY_info,CCCS);
+ // MVARs start dirty: generation 0 has no mutable list
+ StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure;
+ StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure;
+ StgMVar_value(ioport) = stg_END_TSO_QUEUE_closure;
+ return (ioport);
+}
+
+/* -----------------------------------------------------------------------------
Stable pointer primitives
------------------------------------------------------------------------- */