diff options
author | Tamar Christina <tamar@zhox.com> | 2019-06-16 21:54:23 +0100 |
---|---|---|
committer | Ben Gamari <ben@smart-cactus.org> | 2020-07-15 16:41:01 -0400 |
commit | 90e69f779b6da755fac472337535a1321cbb7917 (patch) | |
tree | 935ccfc0e38bfae2133b926347edb51bafecdfa7 | |
parent | 356dc3feae967b1c361130f1f356ef9ad6a693e4 (diff) | |
download | haskell-90e69f779b6da755fac472337535a1321cbb7917.tar.gz |
winio: Add IOPort synchronization primitive
-rw-r--r-- | compiler/GHC/Builtin/Names.hs | 13 | ||||
-rw-r--r-- | compiler/GHC/Builtin/Types/Prim.hs | 20 | ||||
-rw-r--r-- | compiler/GHC/Builtin/primops.txt.pp | 39 | ||||
-rw-r--r-- | compiler/GHC/StgToCmm/Prim.hs | 4 | ||||
-rw-r--r-- | includes/rts/Constants.h | 6 | ||||
-rw-r--r-- | includes/rts/storage/TSO.h | 1 | ||||
-rw-r--r-- | includes/stg/MiscClosures.h | 4 | ||||
-rw-r--r-- | libraries/base/GHC/Conc/Sync.hs | 3 | ||||
-rw-r--r-- | libraries/base/GHC/IOPort.hs | 100 | ||||
-rw-r--r-- | libraries/base/base.cabal | 4 | ||||
-rw-r--r-- | libraries/ghc-heap/GHC/Exts/Heap/Closures.hs | 11 | ||||
-rw-r--r-- | rts/PrimOps.cmm | 173 | ||||
-rw-r--r-- | rts/RaiseAsync.c | 16 | ||||
-rw-r--r-- | rts/RtsSymbols.c | 3 | ||||
-rw-r--r-- | rts/Schedule.c | 37 | ||||
-rw-r--r-- | rts/Schedule.h | 2 | ||||
-rw-r--r-- | rts/Threads.c | 5 | ||||
-rw-r--r-- | rts/Trace.c | 1 | ||||
-rw-r--r-- | rts/TraverseHeap.c | 1 | ||||
-rw-r--r-- | rts/sm/Compact.c | 1 | ||||
-rw-r--r-- | rts/sm/Sanity.c | 1 | ||||
-rw-r--r-- | rts/sm/Scav.c | 1 | ||||
-rw-r--r-- | utils/genprimopcode/Main.hs | 2 |
23 files changed, 429 insertions, 19 deletions
diff --git a/compiler/GHC/Builtin/Names.hs b/compiler/GHC/Builtin/Names.hs index 02a10d4b35..b9ef184923 100644 --- a/compiler/GHC/Builtin/Names.hs +++ b/compiler/GHC/Builtin/Names.hs @@ -1747,7 +1747,7 @@ addrPrimTyConKey, arrayPrimTyConKey, arrayArrayPrimTyConKey, boolTyConKey, weakPrimTyConKey, mutableArrayPrimTyConKey, mutableArrayArrayPrimTyConKey, mutableByteArrayPrimTyConKey, orderingTyConKey, mVarPrimTyConKey, ratioTyConKey, rationalTyConKey, realWorldTyConKey, stablePtrPrimTyConKey, - stablePtrTyConKey, eqTyConKey, heqTyConKey, + stablePtrTyConKey, eqTyConKey, heqTyConKey, ioPortPrimTyConKey, smallArrayPrimTyConKey, smallMutableArrayPrimTyConKey, stringTyConKey :: Unique addrPrimTyConKey = mkPreludeTyConUnique 1 @@ -1783,11 +1783,12 @@ mutableArrayPrimTyConKey = mkPreludeTyConUnique 30 mutableByteArrayPrimTyConKey = mkPreludeTyConUnique 31 orderingTyConKey = mkPreludeTyConUnique 32 mVarPrimTyConKey = mkPreludeTyConUnique 33 -ratioTyConKey = mkPreludeTyConUnique 34 -rationalTyConKey = mkPreludeTyConUnique 35 -realWorldTyConKey = mkPreludeTyConUnique 36 -stablePtrPrimTyConKey = mkPreludeTyConUnique 37 -stablePtrTyConKey = mkPreludeTyConUnique 38 +ioPortPrimTyConKey = mkPreludeTyConUnique 34 +ratioTyConKey = mkPreludeTyConUnique 35 +rationalTyConKey = mkPreludeTyConUnique 36 +realWorldTyConKey = mkPreludeTyConUnique 37 +stablePtrPrimTyConKey = mkPreludeTyConUnique 38 +stablePtrTyConKey = mkPreludeTyConUnique 39 eqTyConKey = mkPreludeTyConUnique 40 heqTyConKey = mkPreludeTyConUnique 41 arrayArrayPrimTyConKey = mkPreludeTyConUnique 42 diff --git a/compiler/GHC/Builtin/Types/Prim.hs b/compiler/GHC/Builtin/Types/Prim.hs index 88ef943a64..13f08739d0 100644 --- a/compiler/GHC/Builtin/Types/Prim.hs +++ b/compiler/GHC/Builtin/Types/Prim.hs @@ -62,6 +62,7 @@ module GHC.Builtin.Types.Prim( mutVarPrimTyCon, mkMutVarPrimTy, mVarPrimTyCon, mkMVarPrimTy, + ioPortPrimTyCon, mkIOPortPrimTy, tVarPrimTyCon, mkTVarPrimTy, stablePtrPrimTyCon, mkStablePtrPrimTy, stableNamePrimTyCon, mkStableNamePrimTy, @@ -171,6 +172,7 @@ exposedPrimTyCons , mutableArrayArrayPrimTyCon , smallMutableArrayPrimTyCon , mVarPrimTyCon + , ioPortPrimTyCon , tVarPrimTyCon , mutVarPrimTyCon , realWorldTyCon @@ -207,7 +209,7 @@ mkBuiltInPrimTc fs unique tycon BuiltInSyntax -charPrimTyConName, intPrimTyConName, int8PrimTyConName, int16PrimTyConName, int32PrimTyConName, int64PrimTyConName, wordPrimTyConName, word32PrimTyConName, word8PrimTyConName, word16PrimTyConName, word64PrimTyConName, addrPrimTyConName, floatPrimTyConName, doublePrimTyConName, statePrimTyConName, proxyPrimTyConName, realWorldTyConName, arrayPrimTyConName, arrayArrayPrimTyConName, smallArrayPrimTyConName, byteArrayPrimTyConName, mutableArrayPrimTyConName, mutableByteArrayPrimTyConName, mutableArrayArrayPrimTyConName, smallMutableArrayPrimTyConName, mutVarPrimTyConName, mVarPrimTyConName, tVarPrimTyConName, stablePtrPrimTyConName, stableNamePrimTyConName, compactPrimTyConName, bcoPrimTyConName, weakPrimTyConName, threadIdPrimTyConName, eqPrimTyConName, eqReprPrimTyConName, eqPhantPrimTyConName, voidPrimTyConName :: Name +charPrimTyConName, intPrimTyConName, int8PrimTyConName, int16PrimTyConName, int32PrimTyConName, int64PrimTyConName, wordPrimTyConName, word32PrimTyConName, word8PrimTyConName, word16PrimTyConName, word64PrimTyConName, addrPrimTyConName, floatPrimTyConName, doublePrimTyConName, statePrimTyConName, proxyPrimTyConName, realWorldTyConName, arrayPrimTyConName, arrayArrayPrimTyConName, smallArrayPrimTyConName, byteArrayPrimTyConName, mutableArrayPrimTyConName, mutableByteArrayPrimTyConName, mutableArrayArrayPrimTyConName, smallMutableArrayPrimTyConName, mutVarPrimTyConName, mVarPrimTyConName, ioPortPrimTyConName, tVarPrimTyConName, stablePtrPrimTyConName, stableNamePrimTyConName, compactPrimTyConName, bcoPrimTyConName, weakPrimTyConName, threadIdPrimTyConName, eqPrimTyConName, eqReprPrimTyConName, eqPhantPrimTyConName, voidPrimTyConName :: Name charPrimTyConName = mkPrimTc (fsLit "Char#") charPrimTyConKey charPrimTyCon intPrimTyConName = mkPrimTc (fsLit "Int#") intPrimTyConKey intPrimTyCon int8PrimTyConName = mkPrimTc (fsLit "Int8#") int8PrimTyConKey int8PrimTyCon @@ -238,6 +240,7 @@ mutableByteArrayPrimTyConName = mkPrimTc (fsLit "MutableByteArray#") mutableByte mutableArrayArrayPrimTyConName= mkPrimTc (fsLit "MutableArrayArray#") mutableArrayArrayPrimTyConKey mutableArrayArrayPrimTyCon smallMutableArrayPrimTyConName= mkPrimTc (fsLit "SmallMutableArray#") smallMutableArrayPrimTyConKey smallMutableArrayPrimTyCon mutVarPrimTyConName = mkPrimTc (fsLit "MutVar#") mutVarPrimTyConKey mutVarPrimTyCon +ioPortPrimTyConName = mkPrimTc (fsLit "IOPort#") ioPortPrimTyConKey ioPortPrimTyCon mVarPrimTyConName = mkPrimTc (fsLit "MVar#") mVarPrimTyConKey mVarPrimTyCon tVarPrimTyConName = mkPrimTc (fsLit "TVar#") tVarPrimTyConKey tVarPrimTyCon stablePtrPrimTyConName = mkPrimTc (fsLit "StablePtr#") stablePtrPrimTyConKey stablePtrPrimTyCon @@ -1006,7 +1009,22 @@ mkMutVarPrimTy s elt = TyConApp mutVarPrimTyCon [s, elt] {- ************************************************************************ * * +\subsection[TysPrim-io-port-var]{The synchronizing I/O Port type} +* * +************************************************************************ +-} + +ioPortPrimTyCon :: TyCon +ioPortPrimTyCon = pcPrimTyCon ioPortPrimTyConName [Nominal, Representational] UnliftedRep + +mkIOPortPrimTy :: Type -> Type -> Type +mkIOPortPrimTy s elt = TyConApp ioPortPrimTyCon [s, elt] + +{- +************************************************************************ +* * The synchronizing variable type +\subsection[TysPrim-synch-var]{The synchronizing variable type} * * ************************************************************************ -} diff --git a/compiler/GHC/Builtin/primops.txt.pp b/compiler/GHC/Builtin/primops.txt.pp index a12ac1f29c..261d02aa67 100644 --- a/compiler/GHC/Builtin/primops.txt.pp +++ b/compiler/GHC/Builtin/primops.txt.pp @@ -2827,6 +2827,45 @@ primop IsEmptyMVarOp "isEmptyMVar#" GenPrimOp out_of_line = True has_side_effects = True + +------------------------------------------------------------------------ +section "Synchronized I/O Ports" + {Operations on {\tt IOPort\#}s. } +------------------------------------------------------------------------ + +primtype IOPort# s a + { A shared I/O port is almost the same as a {\tt MVar\#}!). + The main difference is that IOPort has no deadlock detection or + deadlock breaking code that forcibly releases the lock. } + +primop NewIOPortrOp "newIOPort#" GenPrimOp + State# s -> (# State# s, IOPort# s a #) + {Create new {\tt IOPort\#}; initially empty.} + with + out_of_line = True + has_side_effects = True + +primop ReadIOPortOp "readIOPort#" GenPrimOp + IOPort# s a -> State# s -> (# State# s, a #) + {If {\tt IOPort\#} is empty, block until it becomes full. + Then remove and return its contents, and set it empty.} + with + out_of_line = True + has_side_effects = True + +primop WriteIOPortOp "writeIOPort#" GenPrimOp + IOPort# s a -> a -> State# s -> (# State# s, Int# #) + {If {\tt IOPort\#} is full, immediately return with integer 0. + Otherwise, store value arg as {\tt IOPort\#}'s new contents, + and return with integer 1. } + with + out_of_line = True + has_side_effects = True + +primop SameIOPortOp "sameIOPort#" GenPrimOp + IOPort# s a -> IOPort# s a -> Int# + + ------------------------------------------------------------------------ section "Delay/wait operations" ------------------------------------------------------------------------ diff --git a/compiler/GHC/StgToCmm/Prim.hs b/compiler/GHC/StgToCmm/Prim.hs index ef5e376be8..afbcc34836 100644 --- a/compiler/GHC/StgToCmm/Prim.hs +++ b/compiler/GHC/StgToCmm/Prim.hs @@ -1320,6 +1320,7 @@ emitPrimOp dflags primop = case primop of SameMutVarOp -> \args -> opTranslate args (mo_wordEq platform) SameMVarOp -> \args -> opTranslate args (mo_wordEq platform) + SameIOPortOp -> \args -> opTranslate args (mo_wordEq platform) SameMutableArrayOp -> \args -> opTranslate args (mo_wordEq platform) SameMutableByteArrayOp -> \args -> opTranslate args (mo_wordEq platform) SameMutableArrayArrayOp -> \args -> opTranslate args (mo_wordEq platform) @@ -1467,6 +1468,9 @@ emitPrimOp dflags primop = case primop of ReadMVarOp -> alwaysExternal TryReadMVarOp -> alwaysExternal IsEmptyMVarOp -> alwaysExternal + NewIOPortrOp -> alwaysExternal + ReadIOPortOp -> alwaysExternal + WriteIOPortOp -> alwaysExternal DelayOp -> alwaysExternal WaitReadOp -> alwaysExternal WaitWriteOp -> alwaysExternal diff --git a/includes/rts/Constants.h b/includes/rts/Constants.h index c2cad8fc80..f1ca25a6f3 100644 --- a/includes/rts/Constants.h +++ b/includes/rts/Constants.h @@ -256,7 +256,11 @@ by tryWakeupThread() */ #define ThreadMigrating 13 -/* WARNING WARNING top number is BlockedOnMVarRead 14, not 13!! */ +/* Lightweight non-deadlock checked version of MVar. Used for the why_blocked + field of a TSO. */ +#define BlockedOnIOCompletion 15 + +/* Next number is 16. */ /* * These constants are returned to the scheduler by a thread that has diff --git a/includes/rts/storage/TSO.h b/includes/rts/storage/TSO.h index 3a488d97b5..33eebffc7c 100644 --- a/includes/rts/storage/TSO.h +++ b/includes/rts/storage/TSO.h @@ -288,6 +288,7 @@ void dirty_STACK (Capability *cap, StgStack *stack); BlockedOnBlackHole MessageBlackHole * TSO->bq BlockedOnMVar the MVAR the MVAR's queue + BlockedOnIOCompletion the PortEVent the IOCP's queue BlockedOnSTM END_TSO_QUEUE STM wait queue(s) BlockedOnSTM STM_AWOKEN run queue diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h index dc2b0715ca..5ffdd5cd7b 100644 --- a/includes/stg/MiscClosures.h +++ b/includes/stg/MiscClosures.h @@ -337,6 +337,10 @@ RTS_FUN_DECL(stg_block_stmwait); RTS_FUN_DECL(stg_block_throwto); RTS_RET(stg_block_throwto); +RTS_FUN_DECL(stg_readIOPortzh); +RTS_FUN_DECL(stg_writeIOPortzh); +RTS_FUN_DECL(stg_newIOPortzh); + /* Entry/exit points from StgStartup.cmm */ RTS_RET(stg_stop_thread); diff --git a/libraries/base/GHC/Conc/Sync.hs b/libraries/base/GHC/Conc/Sync.hs index d6ffbc2de9..80287c56c4 100644 --- a/libraries/base/GHC/Conc/Sync.hs +++ b/libraries/base/GHC/Conc/Sync.hs @@ -538,6 +538,8 @@ data BlockReason -- ^blocked in 'retry' in an STM transaction | BlockedOnForeignCall -- ^currently in a foreign call + | BlockedOnIOCompletion + -- ^currently blocked on an I/O Completion port | BlockedOnOther -- ^blocked on some other resource. Without @-threaded@, -- I\/O and 'Control.Concurrent.threadDelay' show up as @@ -576,6 +578,7 @@ threadStatus (ThreadId t) = IO $ \s -> mk_stat 11 = ThreadBlocked BlockedOnForeignCall mk_stat 12 = ThreadBlocked BlockedOnException mk_stat 14 = ThreadBlocked BlockedOnMVar -- possibly: BlockedOnMVarRead + mk_stat 15 = ThreadBlocked BlockedOnIOCompletion -- NB. these are hardcoded in rts/PrimOps.cmm mk_stat 16 = ThreadFinished mk_stat 17 = ThreadDied diff --git a/libraries/base/GHC/IOPort.hs b/libraries/base/GHC/IOPort.hs new file mode 100644 index 0000000000..e4890d0989 --- /dev/null +++ b/libraries/base/GHC/IOPort.hs @@ -0,0 +1,100 @@ +{-# LANGUAGE Unsafe #-} +{-# LANGUAGE NoImplicitPrelude, MagicHash, UnboxedTuples #-} +{-# OPTIONS_GHC -funbox-strict-fields #-} +{-# OPTIONS_HADDOCK hide #-} + +----------------------------------------------------------------------------- +-- | +-- Module : GHC.IOPort +-- Copyright : (c) Tamar Christina 2019 +-- License : see libraries/base/LICENSE +-- +-- Maintainer : cvs-ghc@haskell.org +-- Stability : internal +-- Portability : non-portable (GHC Extensions) +-- +-- The IOPort type. This is a synchronization primitive similar to IOVar but +-- without any of the deadlock guarantees that IOVar provides. The ports are +-- single write/multiple wait. Writing to an already full Port will not queue +-- the value but instead will discard it. +-- +-- +----------------------------------------------------------------------------- + +module GHC.IOPort ( + -- * IOPorts + IOPort(..) + , newIOPort + , newEmptyIOPort + , readIOPort + , writeIOPort + ) where + +import GHC.Base + +data IOPort a = IOPort (IOPort# RealWorld a) +{- ^ +An 'IOPort' is a synchronising variable, used +for communication between concurrent threads, where it one of the threads is +controlled by an external state. e.g. by an I/O action that is serviced by the +runtime. It can be thought of as a box, which may be empty or full. + +It is mostly similar to the behavior of MVar except writeIOPort doesn't block if +the variable is full and the GC won't forcibly release the lock if it thinks +there's a deadlock. +-} + +-- | @since 4.1.0.0 +instance Eq (IOPort a) where + (IOPort ioport1#) == (IOPort ioport2#) = + isTrue# (sameIOPort# ioport1# ioport2#) + +{- +M-Vars are rendezvous points for concurrent threads. They begin +empty, and any attempt to read an empty M-Var blocks. When an M-Var +is written, a single blocked thread may be freed. Reading an M-Var +toggles its state from full back to empty. Therefore, any value +written to an M-Var may only be read once. Multiple reads and writes +are allowed, but there must be at least one read between any two +writes. +-} + +-- |Create an 'IOPort' which is initially empty. +newEmptyIOPort :: IO (IOPort a) +newEmptyIOPort = IO $ \ s# -> + case newIOPort# s# of + (# s2#, svar# #) -> (# s2#, IOPort svar# #) + +-- |Create an 'IOPort' which contains the supplied value. +newIOPort :: a -> IO (IOPort a) +newIOPort value = + newEmptyIOPort >>= \ ioport -> + writeIOPort ioport value >> + return ioport + +-- |Atomically read the the contents of the 'IOPort'. If the 'IOPort' is +-- currently empty, 'readIOPort' will wait until it is full. After a +-- 'readIOPort', the 'IOPort' is left empty. +-- TODO: Figure out how to make this an exception for better debugging. +-- +-- There is one important property of 'readIOPort': +-- +-- * Only a single threads can be blocked on an 'IOPort', The second thread +-- attempting to block will be silently ignored. +-- +readIOPort :: IOPort a -> IO a +readIOPort (IOPort ioport#) = IO $ \ s# -> readIOPort# ioport# s# + +-- |Put a value into an 'IOPort'. If the 'IOPort' is currently full, +-- 'writeIOPort' will return False and not block. +-- +-- There is one important property of 'writeIOPort': +-- +-- * Only a single thread can be blocked on an 'IOPort'. +-- +writeIOPort :: IOPort a -> a -> IO Bool +writeIOPort (IOPort ioport#) x = IO $ \ s# -> + case writeIOPort# ioport# x s# of + (# s, 0# #) -> (# s, False #) + (# s, _ #) -> (# s, True #) + diff --git a/libraries/base/base.cabal b/libraries/base/base.cabal index aee0c20d29..1d4178a2bf 100644 --- a/libraries/base/base.cabal +++ b/libraries/base/base.cabal @@ -308,6 +308,8 @@ Library Type.Reflection Type.Reflection.Unsafe Unsafe.Coerce + -- TODO: remove + GHC.IOPort reexported-modules: GHC.Num.Integer @@ -328,6 +330,8 @@ Library GHC.IO.Handle.Lock.NoOp GHC.IO.Handle.Lock.Windows GHC.StaticPtr.Internal + GHC.Event.Internal.Types + -- GHC.IOPort -- TODO: hide again after debug System.Environment.ExecutablePath System.CPUTime.Utils diff --git a/libraries/ghc-heap/GHC/Exts/Heap/Closures.hs b/libraries/ghc-heap/GHC/Exts/Heap/Closures.hs index bb9d440b37..8a959fc2a0 100644 --- a/libraries/ghc-heap/GHC/Exts/Heap/Closures.hs +++ b/libraries/ghc-heap/GHC/Exts/Heap/Closures.hs @@ -229,7 +229,15 @@ data GenClosure b } -- | An @MVar#@, with a queue of thread state objects blocking on them - | MVarClosure + | MVarClosure + { info :: !StgInfoTable + , queueHead :: !b -- ^ Pointer to head of queue + , queueTail :: !b -- ^ Pointer to tail of queue + , value :: !b -- ^ Pointer to closure + } + + -- | An @IOPort#@, with a queue of thread state objects blocking on them + | IOPortClosure { info :: !StgInfoTable , queueHead :: !b -- ^ Pointer to head of queue , queueTail :: !b -- ^ Pointer to tail of queue @@ -340,6 +348,7 @@ allClosures (MutArrClosure {..}) = mccPayload allClosures (SmallMutArrClosure {..}) = mccPayload allClosures (MutVarClosure {..}) = [var] allClosures (MVarClosure {..}) = [queueHead,queueTail,value] +allClosures (IOPortClosure {..}) = [queueHead,queueTail,value] allClosures (FunClosure {..}) = ptrArgs allClosures (BlockingQueueClosure {..}) = [link, blackHole, owner, queue] allClosures (WeakClosure {..}) = [cfinalizers, key, value, finalizer, link] diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index e13e89b98c..0b1b1419a1 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -1998,6 +1998,179 @@ stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ ) } /* ----------------------------------------------------------------------------- + * IOPort primitives + * + * readIOPort & writeIOPort work as follows. Firstly, an important invariant: + * + * If the IOPort is full, then the request is silently dropped and the + * message is lost. If the IOPort is empty then the + * blocking queue contains only the thread blocked on IOPort. An IOPort only + * supports a single read and a single write to it. + * + * readIOPort: + * IOPort empty : then add ourselves to the blocking queue + * IOPort full : remove the value from the IOPort, and + * blocking queue empty : return + * blocking queue non-empty : perform the only blocked + * writeIOPort from the queue, and + * wake up the thread + * (IOPort is now empty) + * + * writeIOPort is just the dual of the above algorithm. + * + * How do we "perform a writeIOPort"? Well, By storing the value and prt on the + * stack, same way we do with MVars. Semantically the operations mutate the + * stack the same way so we will re-use the logic and datastructures for MVars + * for IOPort. See stg_block_putmvar and stg_block_takemvar in HeapStackCheck.c + * for the stack layout, and the PerformPut and PerformTake macros below. We + * also re-use the closure types MVAR_CLEAN/_DIRTY for IOPort. + * + * The remaining caveats of MVar thus also apply for an IOPort. The main + * crucial difference between an MVar and IOPort is that the scheduler will not + * be allowed to interrupt a blocked IOPort just because it thinks there's a + * deadlock. This is especially crucial for the non-threaded runtime. + * + * -------------------------------------------------------------------------- */ + +stg_readIOPortzh ( P_ ioport /* :: IOPort a */ ) +{ + W_ val, info, tso, q; + + LOCK_CLOSURE(ioport, info); + + /* If the MVar is empty, put ourselves on the blocked readers + * list and wait until we're woken up. + */ + if (StgMVar_value(ioport) == stg_END_TSO_QUEUE_closure) { + + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", ioport "ptr"); + } + + ALLOC_PRIM_WITH_CUSTOM_FAILURE + (SIZEOF_StgMVarTSOQueue, + unlockClosure(ioport, stg_MVAR_DIRTY_info); + GC_PRIM_P(stg_readIOPortzh, ioport)); + + q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1); + + // readIOPorts are pushed to the front of the queue, so + // they get handled immediately + SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); + StgMVarTSOQueue_link(q) = StgMVar_head(ioport); + StgMVarTSOQueue_tso(q) = CurrentTSO; + + StgTSO__link(CurrentTSO) = q; + StgTSO_block_info(CurrentTSO) = ioport; + StgTSO_why_blocked(CurrentTSO) = BlockedOnIOCompletion::I16; + StgMVar_head(ioport) = q; + + if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(ioport) = q; + } + + jump stg_block_readmvar(ioport); + } + + val = StgMVar_value(ioport); + + unlockClosure(ioport, info); + return (val); +} + +stg_writeIOPortzh ( P_ ioport, /* :: IOPort a */ + P_ val, /* :: a */ ) +{ + W_ info, tso, q; + + LOCK_CLOSURE(ioport, info); + + /* If there is already a value in the queue, then silently ignore the + second put. TODO: Correct usages of IOPort should never have a second + put, so perhaps raise an error instead, but I have no idea how to do this + safely and correctly at this point. */ + if (StgMVar_value(ioport) != stg_END_TSO_QUEUE_closure) { + unlockClosure(ioport, info); + return (0); + } + + q = StgMVar_head(ioport); +loop: + if (q == stg_END_TSO_QUEUE_closure) { + /* No further takes, the IOPort is now full. */ + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", ioport "ptr"); + } + StgMVar_value(ioport) = val; + unlockClosure(ioport, stg_MVAR_DIRTY_info); + return (1); + } + if (StgHeader_info(q) == stg_IND_info || + StgHeader_info(q) == stg_MSG_NULL_info) { + q = StgInd_indirectee(q); + goto loop; + } + + // There are readIOPort(s) waiting: wake up the first one + + tso = StgMVarTSOQueue_tso(q); + StgMVar_head(ioport) = StgMVarTSOQueue_link(q); + if (StgMVar_head(ioport) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure; + } + + ASSERT(StgTSO_block_info(tso) == ioport); + // save why_blocked here, because waking up the thread destroys + // this information + W_ why_blocked; + why_blocked = TO_W_(StgTSO_why_blocked(tso)); + + // actually perform the takeMVar + W_ stack; + stack = StgTSO_stackobj(tso); + PerformTake(stack, val); + + // indicate that the MVar operation has now completed. + StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; + + if (TO_W_(StgStack_dirty(stack)) == 0) { + ccall dirty_STACK(MyCapability() "ptr", stack "ptr"); + } + + ccall tryWakeupThread(MyCapability() "ptr", tso); + + // If it was a readIOPort, then we can still do work, + // so loop back. (XXX: This could take a while) + if (why_blocked == BlockedOnIOCompletion) { + q = StgMVarTSOQueue_link(q); + goto loop; + } + + ASSERT(why_blocked == BlockedOnIOCompletion); + + unlockClosure(ioport, info); + return (1); +} +/* ----------------------------------------------------------------------------- + IOPort primitives + -------------------------------------------------------------------------- */ + +stg_newIOPortzh ( gcptr init ) +{ + W_ ioport; + + ALLOC_PRIM_ (SIZEOF_StgMVar, stg_newIOPortzh); + + ioport = Hp - SIZEOF_StgMVar + WDS(1); + SET_HDR(ioport, stg_MVAR_DIRTY_info,CCCS); + // MVARs start dirty: generation 0 has no mutable list + StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure; + StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure; + StgMVar_value(ioport) = stg_END_TSO_QUEUE_closure; + return (ioport); +} + +/* ----------------------------------------------------------------------------- Stable pointer primitives ------------------------------------------------------------------------- */ diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index e8a6a81747..719c05435d 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -174,11 +174,11 @@ throwToSelf (Capability *cap, StgTSO *tso, StgClosure *exception) - or it is masking exceptions (TSO_BLOCKEX) - Currently, if the target is BlockedOnMVar, BlockedOnSTM, or - BlockedOnBlackHole then we acquire ownership of the TSO by locking - its parent container (e.g. the MVar) and then raise the exception. - We might change these cases to be more message-passing-like in the - future. + Currently, if the target is BlockedOnMVar, BlockedOnSTM, + BlockedOnIOCompletion or BlockedOnBlackHole then we acquire ownership of the + TSO by locking its parent container (e.g. the MVar) and then raise the + exception. We might change these cases to be more message-passing-like in + the future. Returns: @@ -343,6 +343,7 @@ check_target: case BlockedOnMVar: case BlockedOnMVarRead: + case BlockedOnIOCompletion: { /* To establish ownership of this TSO, we need to acquire a @@ -367,7 +368,9 @@ check_target: // we have the MVar, let's check whether the thread // is still blocked on the same MVar. - if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead) + if ((target->why_blocked != BlockedOnMVar + && target->why_blocked != BlockedOnMVarRead + && target->why_blocked != BlockedOnIOCompletion) || (StgMVar *)target->block_info.closure != mvar) { unlockClosure((StgClosure *)mvar, info); goto retry; @@ -679,6 +682,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) case BlockedOnMVar: case BlockedOnMVarRead: + case BlockedOnIOCompletion: removeFromMVarBlockedQueue(tso); goto done; diff --git a/rts/RtsSymbols.c b/rts/RtsSymbols.c index ff32932ea9..0f0bd56c82 100644 --- a/rts/RtsSymbols.c +++ b/rts/RtsSymbols.c @@ -706,6 +706,9 @@ SymI_HasProto(stg_newMVarzh) \ SymI_HasProto(stg_newMutVarzh) \ SymI_HasProto(stg_newTVarzh) \ + SymI_HasProto(stg_readIOPortzh) \ + SymI_HasProto(stg_writeIOPortzh) \ + SymI_HasProto(stg_newIOPortzh) \ SymI_HasProto(stg_noDuplicatezh) \ SymI_HasProto(stg_atomicModifyMutVar2zh) \ SymI_HasProto(stg_atomicModifyMutVarzuzh) \ diff --git a/rts/Schedule.c b/rts/Schedule.c index ce1a1fc060..fab357aa06 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -198,6 +198,7 @@ schedule (Capability *initialCapability, Task *task) bool ready_to_gc; cap = initialCapability; + t = NULL; // Pre-condition: this task owns initialCapability. // The sched_mutex is *NOT* held @@ -301,8 +302,13 @@ schedule (Capability *initialCapability, Task *task) // Additionally, it is not fatal for the // threaded RTS to reach here with no threads to run. // + // Since IOPorts have no deadlock avoidance guarantees you may also reach + // this point when blocked on an IO Port. If this is the case the only + // thing that could unblock it is an I/O event. + // // win32: might be here due to awaitEvent() being abandoned - // as a result of a console event having been delivered. + // as a result of a console event having been delivered or as a result of + // waiting on an async I/O to complete with WinIO. #if defined(THREADED_RTS) scheduleYield(&cap,task); @@ -310,9 +316,23 @@ schedule (Capability *initialCapability, Task *task) if (emptyRunQueue(cap)) continue; // look for work again #endif -#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS) +#if !defined(THREADED_RTS) if ( emptyRunQueue(cap) ) { + /* On the non-threaded RTS if the queue is empty and the last action was + blocked on an I/O completion port, then just wait till we're woken + up by the RTS with more work. */ + if (t && t->why_blocked == BlockedOnIOCompletion) + { + fprintf (stderr, "waiting: %d.\n", t->why_blocked); + awaitEvent (emptyRunQueue(cap)); + fprintf (stderr, "running: %d.\n", t->why_blocked); + continue; + } + continue; + +#if !defined(mingw32_HOST_OS) ASSERT(sched_state >= SCHED_INTERRUPTING); +#endif } #endif @@ -928,6 +948,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task) */ if (recent_activity != ACTIVITY_INACTIVE) return; #endif + if (task->incall->tso->why_blocked == BlockedOnIOCompletion) return; debugTrace(DEBUG_sched, "deadlocked, forcing major GC..."); @@ -980,6 +1001,10 @@ scheduleDetectDeadlock (Capability **pcap, Task *task) throwToSingleThreaded(cap, task->incall->tso, (StgClosure *)nonTermination_closure); return; + case BlockedOnIOCompletion: + /* We're blocked waiting for an external I/O call, let's just + chill for a bit. */ + return; default: barf("deadlock: main thread blocked in a strange way"); } @@ -2676,9 +2701,10 @@ initScheduler(void) sched_state = SCHED_RUNNING; recent_activity = ACTIVITY_YES; -#if defined(THREADED_RTS) + /* Initialise the mutex and condition variables used by * the scheduler. */ +#if defined(THREADED_RTS) initMutex(&sched_mutex); initMutex(&sync_finished_mutex); initCondition(&sync_finished_cond); @@ -3164,6 +3190,11 @@ resurrectThreads (StgTSO *threads) throwToSingleThreaded(cap, tso, (StgClosure *)blockedIndefinitelyOnSTM_closure); break; + case BlockedOnIOCompletion: + /* I/O Ports may not be reachable by the GC as they may be getting + * notified by the RTS. As such this call should be treated as if + * it is masking the exception. */ + continue; case NotBlocked: /* This might happen if the thread was blocked on a black hole * belonging to a thread that we've just woken up (raiseAsync diff --git a/rts/Schedule.h b/rts/Schedule.h index 2d8d813464..89ab6e0b4c 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -176,7 +176,7 @@ pushOnRunQueue (Capability *cap, StgTSO *tso) INLINE_HEADER StgTSO * popRunQueue (Capability *cap) { - ASSERT(cap->n_run_queue != 0); + ASSERT(cap->n_run_queue > 0); StgTSO *t = cap->run_queue_hd; ASSERT(t != END_TSO_QUEUE); cap->run_queue_hd = t->_link; diff --git a/rts/Threads.c b/rts/Threads.c index 22d58bb48b..54c703963e 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -288,6 +288,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso) switch (tso->why_blocked) { + case BlockedOnIOCompletion: case BlockedOnMVar: case BlockedOnMVarRead: { @@ -868,12 +869,16 @@ printThreadBlockage(StgTSO *tso) debugBelch("is blocked until %ld", (long)(tso->block_info.target)); break; #endif + break; case BlockedOnMVar: debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); break; case BlockedOnMVarRead: debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure); break; + case BlockedOnIOCompletion: + debugBelch("is blocked on I/O Completion port @ %p", tso->block_info.closure); + break; case BlockedOnBlackHole: debugBelch("is blocked on a black hole %p", ((StgBlockingQueue*)tso->block_info.bh->bh)); diff --git a/rts/Trace.c b/rts/Trace.c index b35be3c1e7..7a1f0df768 100644 --- a/rts/Trace.c +++ b/rts/Trace.c @@ -164,6 +164,7 @@ static char *thread_stop_reasons[] = { [6 + BlockedOnSTM] = "blocked on STM", [6 + BlockedOnDoProc] = "blocked on asyncDoProc", [6 + BlockedOnCCall] = "blocked on a foreign call", + [6 + BlockedOnIOCompletion] = "blocked on I/O Completion port", [6 + BlockedOnCCall_Interruptible] = "blocked on a foreign call (interruptible)", [6 + BlockedOnMsgThrowTo] = "blocked on throwTo", [6 + ThreadMigrating] = "migrating" diff --git a/rts/TraverseHeap.c b/rts/TraverseHeap.c index 8bf58c11ee..636737aa0f 100644 --- a/rts/TraverseHeap.c +++ b/rts/TraverseHeap.c @@ -1250,6 +1250,7 @@ inner_loop: traversePushClosure(ts, (StgClosure *) tso->trec, c, child_data); if ( tso->why_blocked == BlockedOnMVar || tso->why_blocked == BlockedOnMVarRead + || tso->why_blocked == BlockedOnIOCompletion || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo ) { diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c index 5031c535a1..b1250b77e0 100644 --- a/rts/sm/Compact.c +++ b/rts/sm/Compact.c @@ -461,6 +461,7 @@ thread_TSO (StgTSO *tso) || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo + || tso->why_blocked == BlockedOnIOCompletion || tso->why_blocked == NotBlocked ) { thread_(&tso->block_info.closure); diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c index ea64483418..2329b02016 100644 --- a/rts/sm/Sanity.c +++ b/rts/sm/Sanity.c @@ -618,6 +618,7 @@ checkTSO(StgTSO *tso) || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo + || tso->why_blocked == BlockedOnIOCompletion || tso->why_blocked == NotBlocked ) { ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->block_info.closure)); diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c index 501d958aae..dd9a96adf8 100644 --- a/rts/sm/Scav.c +++ b/rts/sm/Scav.c @@ -129,6 +129,7 @@ scavengeTSO (StgTSO *tso) || tso->why_blocked == BlockedOnMVarRead || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgThrowTo + || tso->why_blocked == BlockedOnIOCompletion || tso->why_blocked == NotBlocked ) { evacuate(&tso->block_info.closure); diff --git a/utils/genprimopcode/Main.hs b/utils/genprimopcode/Main.hs index 31d363c0fa..3fe744fec3 100644 --- a/utils/genprimopcode/Main.hs +++ b/utils/genprimopcode/Main.hs @@ -919,6 +919,8 @@ ppType (TyApp (TyCon "StableName#") [x]) = "mkStableNamePrimTy " ++ ppType x ppType (TyApp (TyCon "MVar#") [x,y]) = "mkMVarPrimTy " ++ ppType x ++ " " ++ ppType y +ppType (TyApp (TyCon "IOPort#") [x,y]) = "mkIOPortPrimTy " ++ ppType x + ++ " " ++ ppType y ppType (TyApp (TyCon "TVar#") [x,y]) = "mkTVarPrimTy " ++ ppType x ++ " " ++ ppType y ppType (TyApp (TyCon "Void#") []) = "voidPrimTy" |