/* -*- tab-width: 8 -*- */ /* ----------------------------------------------------------------------------- * * (c) The GHC Team, 1998-2012 * * Out-of-line primitive operations * * This file contains the implementations of all the primitive * operations ("primops") which are not expanded inline. See * ghc/compiler/GHC/Builtin/primops.txt.pp for a list of all the primops; * this file contains code for most of those with the attribute * out_of_line=True. * * Entry convention: the entry convention for a primop is the * NativeNodeCall convention, and the return convention is * NativeReturn. (see compiler/GHC/Cmm/CallConv.hs) * * This file is written in a subset of C--, extended with various * features specific to GHC. It is compiled by GHC directly. For the * syntax of .cmm files, see the parser in ghc/compiler/GHC/Cmm/Parser.y. * * ---------------------------------------------------------------------------*/ #include "Cmm.h" #include "MachDeps.h" #include "SMPClosureOps.h" #if defined(__PIC__) import pthread_mutex_lock; import pthread_mutex_unlock; #endif import CLOSURE base_ControlziExceptionziBase_nestedAtomically_closure; import CLOSURE base_GHCziIOziException_heapOverflow_closure; import CLOSURE base_GHCziIOziException_blockedIndefinitelyOnMVar_closure; import CLOSURE base_GHCziIOPort_doubleReadException_closure; import AcquireSRWLockExclusive; import ReleaseSRWLockExclusive; import CLOSURE ghczmprim_GHCziTypes_False_closure; #if defined(PROFILING) import CLOSURE CCS_MAIN; #endif /*----------------------------------------------------------------------------- Array Primitives Basically just new*Array - the others are all inline macros. The slow entry point is for returning from a heap check, the saved size argument must be re-loaded from the stack. -------------------------------------------------------------------------- */ /* for objects that are *less* than the size of a word, make sure we * round up to the nearest word for the size of the array. */ stg_newByteArrayzh ( W_ n ) { W_ words, payload_words; gcptr p; MAYBE_GC_N(stg_newByteArrayzh, n); payload_words = ROUNDUP_BYTES_TO_WDS(n); words = BYTES_TO_WDS(SIZEOF_StgArrBytes) + payload_words; ("ptr" p) = ccall allocateMightFail(MyCapability() "ptr", words); if (p == NULL) { jump stg_raisezh(base_GHCziIOziException_heapOverflow_closure); } TICK_ALLOC_PRIM(SIZEOF_StgArrBytes,WDS(payload_words),0); SET_HDR(p, stg_ARR_WORDS_info, CCCS); StgArrBytes_bytes(p) = n; return (p); } #define BA_ALIGN 16 #define BA_MASK (BA_ALIGN-1) stg_newPinnedByteArrayzh ( W_ n ) { W_ words, bytes, payload_words; gcptr p; MAYBE_GC_N(stg_newPinnedByteArrayzh, n); bytes = n; /* payload_words is what we will tell the profiler we had to allocate */ payload_words = ROUNDUP_BYTES_TO_WDS(bytes); /* When we actually allocate memory, we need to allow space for the header: */ bytes = bytes + SIZEOF_StgArrBytes; /* Now we convert to a number of words: */ words = ROUNDUP_BYTES_TO_WDS(bytes); ("ptr" p) = ccall allocatePinned(MyCapability() "ptr", words, BA_ALIGN, SIZEOF_StgArrBytes); if (p == NULL) { jump stg_raisezh(base_GHCziIOziException_heapOverflow_closure); } TICK_ALLOC_PRIM(SIZEOF_StgArrBytes,WDS(payload_words),0); /* No write barrier needed since this is a new allocation. */ SET_HDR(p, stg_ARR_WORDS_info, CCCS); StgArrBytes_bytes(p) = n; return (p); } stg_newAlignedPinnedByteArrayzh ( W_ n, W_ alignment ) { W_ words, bytes, payload_words; gcptr p; again: MAYBE_GC(again); /* we always supply at least word-aligned memory, so there's no need to allow extra space for alignment if the requirement is less than a word. This also prevents mischief with alignment == 0. */ if (alignment <= SIZEOF_W) { alignment = SIZEOF_W; } bytes = n; /* payload_words is what we will tell the profiler we had to allocate */ payload_words = ROUNDUP_BYTES_TO_WDS(bytes); /* When we actually allocate memory, we need to allow space for the header: */ bytes = bytes + SIZEOF_StgArrBytes; /* Now we convert to a number of words: */ words = ROUNDUP_BYTES_TO_WDS(bytes); ("ptr" p) = ccall allocatePinned(MyCapability() "ptr", words, alignment, SIZEOF_StgArrBytes); if (p == NULL) { jump stg_raisezh(base_GHCziIOziException_heapOverflow_closure); } TICK_ALLOC_PRIM(SIZEOF_StgArrBytes,WDS(payload_words),0); /* No write barrier needed since this is a new allocation. */ SET_HDR(p, stg_ARR_WORDS_info, CCCS); StgArrBytes_bytes(p) = n; return (p); } stg_isByteArrayPinnedzh ( gcptr ba ) // ByteArray# s -> Int# { W_ bd, flags; bd = Bdescr(ba); // Pinned byte arrays live in blocks with the BF_PINNED flag set. // We also consider BF_LARGE objects to be immovable. See #13894. // See the comment in Storage.c:allocatePinned. // We also consider BF_COMPACT objects to be immovable. See #14900. flags = TO_W_(bdescr_flags(bd)); return (flags & (BF_PINNED | BF_LARGE | BF_COMPACT) != 0); } stg_isMutableByteArrayPinnedzh ( gcptr mba ) // MutableByteArray# s -> Int# { jump stg_isByteArrayPinnedzh(mba); } /* Note [LDV profiling and resizing arrays] * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * * As far as the LDV profiler is concerned arrays are "inherently used" which * means we don't track their time of use and eventual destruction. We just * assume they get used. * * Thus it is not necessary to call LDV_RECORD_CREATE when resizing them as we * used to as the LDV profiler will essentially ignore arrays anyways. */ // shrink size of MutableByteArray in-place stg_shrinkMutableByteArrayzh ( gcptr mba, W_ new_size ) // MutableByteArray# s -> Int# -> State# s -> State# s { ASSERT(new_size <= StgArrBytes_bytes(mba)); OVERWRITING_CLOSURE_MUTABLE(mba, (BYTES_TO_WDS(SIZEOF_StgArrBytes) + ROUNDUP_BYTES_TO_WDS(new_size))); StgArrBytes_bytes(mba) = new_size; // No need to call LDV_RECORD_CREATE. See Note [LDV profiling and resizing arrays] return (); } // resize MutableByteArray // // The returned MutableByteArray is either the original // MutableByteArray resized in-place or, if not possible, a newly // allocated (unpinned) MutableByteArray (with the original content // copied over) stg_resizzeMutableByteArrayzh ( gcptr mba, W_ new_size ) // MutableByteArray# s -> Int# -> State# s -> (# State# s,MutableByteArray# s #) { W_ new_size_wds; ASSERT(new_size >= 0); new_size_wds = ROUNDUP_BYTES_TO_WDS(new_size); if (new_size_wds <= BYTE_ARR_WDS(mba)) { OVERWRITING_CLOSURE_MUTABLE(mba, (BYTES_TO_WDS(SIZEOF_StgArrBytes) + new_size_wds)); StgArrBytes_bytes(mba) = new_size; // No need to call LDV_RECORD_CREATE. See Note [LDV profiling and resizing arrays] return (mba); } else { (P_ new_mba) = call stg_newByteArrayzh(new_size); // maybe at some point in the future we may be able to grow the // MBA in-place w/o copying if we know the space after the // current MBA is still available, as often we want to grow the // MBA shortly after we allocated the original MBA. So maybe no // further allocations have occurred by then. // copy over old content prim %memcpy(BYTE_ARR_CTS(new_mba), BYTE_ARR_CTS(mba), StgArrBytes_bytes(mba), SIZEOF_W); return (new_mba); } } // shrink size of SmallMutableArray in-place stg_shrinkSmallMutableArrayzh ( gcptr mba, W_ new_size ) // SmallMutableArray# s -> Int# -> State# s -> State# s { ASSERT(new_size <= StgSmallMutArrPtrs_ptrs(mba)); IF_NONMOVING_WRITE_BARRIER_ENABLED { // Ensure that the elements we are about to shrink out of existence // remain visible to the non-moving collector. W_ p, end; p = mba + SIZEOF_StgSmallMutArrPtrs + WDS(new_size); end = mba + SIZEOF_StgSmallMutArrPtrs + WDS(StgSmallMutArrPtrs_ptrs(mba)); again: ccall updateRemembSetPushClosure_(BaseReg "ptr", W_[p] "ptr"); if (p < end) { p = p + SIZEOF_W; goto again; } } OVERWRITING_CLOSURE_MUTABLE(mba, (BYTES_TO_WDS(SIZEOF_StgSmallMutArrPtrs) + new_size)); StgSmallMutArrPtrs_ptrs(mba) = new_size; // No need to call LDV_RECORD_CREATE. See Note [LDV profiling and resizing arrays] return (); } // RRN: This one does not use the "ticketing" approach because it // deals in unboxed scalars, not heap pointers. stg_casIntArrayzh( gcptr arr, W_ ind, W_ old, W_ new ) /* MutableByteArray# s -> Int# -> Int# -> Int# -> State# s -> (# State# s, Int# #) */ { W_ p, h; p = arr + SIZEOF_StgArrBytes + WDS(ind); (h) = prim %cmpxchgW(p, old, new); return(h); } stg_newArrayzh ( W_ n /* words */, gcptr init ) { W_ words, size, p; gcptr arr; again: MAYBE_GC(again); // the mark area contains one byte for each 2^MUT_ARR_PTRS_CARD_BITS words // in the array, making sure we round up, and then rounding up to a whole // number of words. size = n + mutArrPtrsCardWords(n); words = BYTES_TO_WDS(SIZEOF_StgMutArrPtrs) + size; ("ptr" arr) = ccall allocateMightFail(MyCapability() "ptr",words); if (arr == NULL) { jump stg_raisezh(base_GHCziIOziException_heapOverflow_closure); } TICK_ALLOC_PRIM(SIZEOF_StgMutArrPtrs, WDS(size), 0); /* No write barrier needed since this is a new allocation. */ SET_HDR(arr, stg_MUT_ARR_PTRS_DIRTY_info, CCCS); StgMutArrPtrs_ptrs(arr) = n; StgMutArrPtrs_size(arr) = size; // Initialise all elements of the array with the value in R2 p = arr + SIZEOF_StgMutArrPtrs; for: if (p < arr + SIZEOF_StgMutArrPtrs + WDS(n)) (likely: True) { W_[p] = init; p = p + WDS(1); goto for; } return (arr); } stg_unsafeThawArrayzh ( gcptr arr ) { // A MUT_ARR_PTRS always lives on a mut_list, but a MUT_ARR_PTRS_FROZEN // doesn't. To decide whether to add the thawed array to a mut_list we check // the info table. MUT_ARR_PTRS_FROZEN_DIRTY means it's already on a // mut_list so no need to add it again. MUT_ARR_PTRS_FROZEN_CLEAN means it's // not and we should add it to a mut_list. if (StgHeader_info(arr) != stg_MUT_ARR_PTRS_FROZEN_DIRTY_info) { SET_INFO(arr,stg_MUT_ARR_PTRS_DIRTY_info); // must be done after SET_INFO, because it ASSERTs closure_MUTABLE(): recordMutable(arr); return (arr); } else { SET_INFO(arr,stg_MUT_ARR_PTRS_DIRTY_info); return (arr); } } stg_copyArrayzh ( gcptr src, W_ src_off, gcptr dst, W_ dst_off, W_ n ) { copyArray(src, src_off, dst, dst_off, n) } stg_copyMutableArrayzh ( gcptr src, W_ src_off, gcptr dst, W_ dst_off, W_ n ) { copyMutableArray(src, src_off, dst, dst_off, n) } stg_copyArrayArrayzh ( gcptr src, W_ src_off, gcptr dst, W_ dst_off, W_ n ) { copyArray(src, src_off, dst, dst_off, n) } stg_copyMutableArrayArrayzh ( gcptr src, W_ src_off, gcptr dst, W_ dst_off, W_ n ) { copyMutableArray(src, src_off, dst, dst_off, n) } stg_cloneArrayzh ( gcptr src, W_ offset, W_ n ) { cloneArray(stg_MUT_ARR_PTRS_FROZEN_CLEAN_info, src, offset, n) } stg_cloneMutableArrayzh ( gcptr src, W_ offset, W_ n ) { cloneArray(stg_MUT_ARR_PTRS_DIRTY_info, src, offset, n) } // We have to escape the "z" in the name. stg_freezzeArrayzh ( gcptr src, W_ offset, W_ n ) { cloneArray(stg_MUT_ARR_PTRS_FROZEN_CLEAN_info, src, offset, n) } stg_thawArrayzh ( gcptr src, W_ offset, W_ n ) { cloneArray(stg_MUT_ARR_PTRS_DIRTY_info, src, offset, n) } // RRN: Uses the ticketed approach; see casMutVar stg_casArrayzh ( gcptr arr, W_ ind, gcptr old, gcptr new ) /* MutableArray# s a -> Int# -> a -> a -> State# s -> (# State# s, Int#, Any a #) */ { gcptr h; W_ p, len; p = arr + SIZEOF_StgMutArrPtrs + WDS(ind); (h) = prim %cmpxchgW(p, old, new); if (h != old) { // Failure, return what was there instead of 'old': return (1,h); } else { // Compare and Swap Succeeded: SET_HDR(arr, stg_MUT_ARR_PTRS_DIRTY_info, CCCS); len = StgMutArrPtrs_ptrs(arr); // The write barrier. We must write a byte into the mark table: I8[arr + SIZEOF_StgMutArrPtrs + WDS(len) + (ind >> MUT_ARR_PTRS_CARD_BITS )] = 1; // Concurrent GC write barrier updateRemembSetPushPtr(old); return (0,new); } } stg_newArrayArrayzh ( W_ n /* words */ ) { W_ words, size, p; gcptr arr; MAYBE_GC_N(stg_newArrayArrayzh, n); // the mark area contains one byte for each 2^MUT_ARR_PTRS_CARD_BITS words // in the array, making sure we round up, and then rounding up to a whole // number of words. size = n + mutArrPtrsCardWords(n); words = BYTES_TO_WDS(SIZEOF_StgMutArrPtrs) + size; ("ptr" arr) = ccall allocateMightFail(MyCapability() "ptr",words); if (arr == NULL) { jump stg_raisezh(base_GHCziIOziException_heapOverflow_closure); } TICK_ALLOC_PRIM(SIZEOF_StgMutArrPtrs, WDS(size), 0); SET_HDR(arr, stg_MUT_ARR_PTRS_DIRTY_info, W_[CCCS]); StgMutArrPtrs_ptrs(arr) = n; StgMutArrPtrs_size(arr) = size; // Initialize card table to all-clean. setCardsValue(arr, 0, n, 0); // Initialise all elements of the array with a pointer to the new array p = arr + SIZEOF_StgMutArrPtrs; for: if (p < arr + SIZEOF_StgMutArrPtrs + WDS(n)) (likely: True) { W_[p] = arr; p = p + WDS(1); goto for; } return (arr); } /* ----------------------------------------------------------------------------- SmallArray primitives -------------------------------------------------------------------------- */ stg_newSmallArrayzh ( W_ n /* words */, gcptr init ) { W_ words, size, p; gcptr arr; again: MAYBE_GC(again); words = BYTES_TO_WDS(SIZEOF_StgSmallMutArrPtrs) + n; ("ptr" arr) = ccall allocateMightFail(MyCapability() "ptr",words); if (arr == NULL) (likely: False) { jump stg_raisezh(base_GHCziIOziException_heapOverflow_closure); } TICK_ALLOC_PRIM(SIZEOF_StgSmallMutArrPtrs, WDS(n), 0); /* No write barrier needed since this is a new allocation. */ SET_HDR(arr, stg_SMALL_MUT_ARR_PTRS_DIRTY_info, CCCS); StgSmallMutArrPtrs_ptrs(arr) = n; // Initialise all elements of the array with the value in R2 p = arr + SIZEOF_StgSmallMutArrPtrs; // Avoid the shift for `WDS(n)` in the inner loop W_ limit; limit = arr + SIZEOF_StgSmallMutArrPtrs + WDS(n); for: if (p < limit) (likely: True) { W_[p] = init; p = p + WDS(1); goto for; } return (arr); } stg_unsafeThawSmallArrayzh ( gcptr arr ) { // See stg_unsafeThawArrayzh if (StgHeader_info(arr) != stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info) { SET_INFO(arr, stg_SMALL_MUT_ARR_PTRS_DIRTY_info); recordMutable(arr); // must be done after SET_INFO, because it ASSERTs closure_MUTABLE() return (arr); } else { SET_INFO(arr, stg_SMALL_MUT_ARR_PTRS_DIRTY_info); return (arr); } } stg_cloneSmallArrayzh ( gcptr src, W_ offset, W_ n ) { cloneSmallArray(stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info, src, offset, n) } stg_cloneSmallMutableArrayzh ( gcptr src, W_ offset, W_ n ) { cloneSmallArray(stg_SMALL_MUT_ARR_PTRS_DIRTY_info, src, offset, n) } // We have to escape the "z" in the name. stg_freezzeSmallArrayzh ( gcptr src, W_ offset, W_ n ) { cloneSmallArray(stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info, src, offset, n) } stg_thawSmallArrayzh ( gcptr src, W_ offset, W_ n ) { cloneSmallArray(stg_SMALL_MUT_ARR_PTRS_DIRTY_info, src, offset, n) } // Concurrent GC write barrier for pointer array copies // // hdr_size in bytes. dst_off in words, n in words. stg_copyArray_barrier ( W_ hdr_size, gcptr dst, W_ dst_off, W_ n) { W_ end, p; ASSERT(n > 0); // Assumes n==0 is handled by caller p = dst + hdr_size + WDS(dst_off); end = p + WDS(n); again: IF_NONMOVING_WRITE_BARRIER_ENABLED { ccall updateRemembSetPushClosure_(BaseReg "ptr", W_[p] "ptr"); } p = p + WDS(1); if (p < end) { goto again; } return (); } stg_copySmallArrayzh ( gcptr src, W_ src_off, gcptr dst, W_ dst_off, W_ n) { W_ dst_p, src_p, bytes; if (n > 0) { IF_NONMOVING_WRITE_BARRIER_ENABLED { call stg_copyArray_barrier(SIZEOF_StgSmallMutArrPtrs, dst, dst_off, n); } SET_INFO(dst, stg_SMALL_MUT_ARR_PTRS_DIRTY_info); dst_p = dst + SIZEOF_StgSmallMutArrPtrs + WDS(dst_off); src_p = src + SIZEOF_StgSmallMutArrPtrs + WDS(src_off); bytes = WDS(n); prim %memcpy(dst_p, src_p, bytes, SIZEOF_W); } return (); } stg_copySmallMutableArrayzh ( gcptr src, W_ src_off, gcptr dst, W_ dst_off, W_ n) { W_ dst_p, src_p, bytes; if (n > 0) { IF_NONMOVING_WRITE_BARRIER_ENABLED { call stg_copyArray_barrier(SIZEOF_StgSmallMutArrPtrs, dst, dst_off, n); } SET_INFO(dst, stg_SMALL_MUT_ARR_PTRS_DIRTY_info); dst_p = dst + SIZEOF_StgSmallMutArrPtrs + WDS(dst_off); src_p = src + SIZEOF_StgSmallMutArrPtrs + WDS(src_off); bytes = WDS(n); if (src == dst) { prim %memmove(dst_p, src_p, bytes, SIZEOF_W); } else { prim %memcpy(dst_p, src_p, bytes, SIZEOF_W); } } return (); } // RRN: Uses the ticketed approach; see casMutVar stg_casSmallArrayzh ( gcptr arr, W_ ind, gcptr old, gcptr new ) /* SmallMutableArray# s a -> Int# -> a -> a -> State# s -> (# State# s, Int#, Any a #) */ { gcptr h; W_ p, len; p = arr + SIZEOF_StgSmallMutArrPtrs + WDS(ind); (h) = prim %cmpxchgW(p, old, new); if (h != old) { // Failure, return what was there instead of 'old': return (1,h); } else { // Compare and Swap Succeeded: SET_HDR(arr, stg_SMALL_MUT_ARR_PTRS_DIRTY_info, CCCS); // Concurrent GC write barrier updateRemembSetPushPtr(old); return (0,new); } } /* ----------------------------------------------------------------------------- MutVar primitives -------------------------------------------------------------------------- */ stg_newMutVarzh ( gcptr init ) { W_ mv; ALLOC_PRIM_P (SIZEOF_StgMutVar, stg_newMutVarzh, init); mv = Hp - SIZEOF_StgMutVar + WDS(1); /* No write barrier needed since this is a new allocation. */ SET_HDR(mv,stg_MUT_VAR_DIRTY_info,CCCS); StgMutVar_var(mv) = init; return (mv); } // RRN: To support the "ticketed" approach, we return the NEW rather // than old value if the CAS is successful. This is received in an // opaque form in the Haskell code, preventing the compiler from // changing its pointer identity. The ticket can then be safely used // in future CAS operations. stg_casMutVarzh ( gcptr mv, gcptr old, gcptr new ) /* MutVar# s a -> a -> a -> State# s -> (# State#, Int#, Any a #) */ { #if defined(THREADED_RTS) gcptr h; (h) = prim %cmpxchgW(mv + SIZEOF_StgHeader + OFFSET_StgMutVar_var, old, new); if (h != old) { return (1,h); } else { if (GET_INFO(mv) == stg_MUT_VAR_CLEAN_info) { ccall dirty_MUT_VAR(BaseReg "ptr", mv "ptr", old); } return (0,new); } #else gcptr prev_val; prev_val = StgMutVar_var(mv); if (prev_val != old) { return (1,prev_val); } else { StgMutVar_var(mv) = new; if (GET_INFO(mv) == stg_MUT_VAR_CLEAN_info) { ccall dirty_MUT_VAR(BaseReg "ptr", mv "ptr", old); } return (0,new); } #endif } stg_atomicModifyMutVar2zh ( gcptr mv, gcptr f ) { W_ z, x, y, h; /* If x is the current contents of the MutVar#, then We want to make the new contents point to (sel_0 (f x)) and the return value is (# x, (f x) #) obviously we can share (f x). z = [stg_ap_2 f x] (max (HS + 2) MIN_UPD_SIZE) y = [stg_sel_0 z] (max (HS + 1) MIN_UPD_SIZE) */ #if defined(MIN_UPD_SIZE) && MIN_UPD_SIZE > 1 #define THUNK_1_SIZE (SIZEOF_StgThunkHeader + WDS(MIN_UPD_SIZE)) #define TICK_ALLOC_THUNK_1() TICK_ALLOC_UP_THK(WDS(1),WDS(MIN_UPD_SIZE-1)) #else #define THUNK_1_SIZE (SIZEOF_StgThunkHeader + WDS(1)) #define TICK_ALLOC_THUNK_1() TICK_ALLOC_UP_THK(WDS(1),0) #endif #if defined(MIN_UPD_SIZE) && MIN_UPD_SIZE > 2 #define THUNK_2_SIZE (SIZEOF_StgThunkHeader + WDS(MIN_UPD_SIZE)) #define TICK_ALLOC_THUNK_2() TICK_ALLOC_UP_THK(WDS(2),WDS(MIN_UPD_SIZE-2)) #else #define THUNK_2_SIZE (SIZEOF_StgThunkHeader + WDS(2)) #define TICK_ALLOC_THUNK_2() TICK_ALLOC_UP_THK(WDS(2),0) #endif #define SIZE (THUNK_2_SIZE + THUNK_1_SIZE) HP_CHK_GEN_TICKY(SIZE); TICK_ALLOC_THUNK_2(); CCCS_ALLOC(THUNK_2_SIZE); z = Hp - THUNK_2_SIZE + WDS(1); SET_HDR(z, stg_ap_2_upd_info, CCCS); LDV_RECORD_CREATE(z); StgThunk_payload(z,0) = f; TICK_ALLOC_THUNK_1(); CCCS_ALLOC(THUNK_1_SIZE); y = z - THUNK_1_SIZE; SET_HDR(y, stg_sel_0_upd_info, CCCS); LDV_RECORD_CREATE(y); StgThunk_payload(y,0) = z; retry: x = StgMutVar_var(mv); StgThunk_payload(z,1) = x; #if defined(THREADED_RTS) (h) = prim %cmpxchgW(mv + SIZEOF_StgHeader + OFFSET_StgMutVar_var, x, y); if (h != x) { goto retry; } #else h = StgMutVar_var(mv); StgMutVar_var(mv) = y; #endif if (GET_INFO(mv) == stg_MUT_VAR_CLEAN_info) { ccall dirty_MUT_VAR(BaseReg "ptr", mv "ptr", h); } return (x,z); } stg_atomicModifyMutVarzuzh ( gcptr mv, gcptr f ) { W_ z, x, h; /* If x is the current contents of the MutVar#, then We want to make the new contents point to (f x) and the return value is (# x, (f x) #) obviously we can share (f x). z = [stg_ap_2 f x] (max (HS + 2) MIN_UPD_SIZE) */ #if defined(MIN_UPD_SIZE) && MIN_UPD_SIZE > 2 #define THUNK_SIZE (SIZEOF_StgThunkHeader + WDS(MIN_UPD_SIZE)) #define TICK_ALLOC_THUNK() TICK_ALLOC_UP_THK(WDS(2),WDS(MIN_UPD_SIZE-2)) #else #define THUNK_SIZE (SIZEOF_StgThunkHeader + WDS(2)) #define TICK_ALLOC_THUNK() TICK_ALLOC_UP_THK(WDS(2),0) #endif HP_CHK_GEN_TICKY(THUNK_SIZE); TICK_ALLOC_THUNK(); CCCS_ALLOC(THUNK_SIZE); z = Hp - THUNK_SIZE + WDS(1); SET_HDR(z, stg_ap_2_upd_info, CCCS); LDV_RECORD_CREATE(z); StgThunk_payload(z,0) = f; retry: x = StgMutVar_var(mv); StgThunk_payload(z,1) = x; #if defined(THREADED_RTS) (h) = prim %cmpxchgW(mv + SIZEOF_StgHeader + OFFSET_StgMutVar_var, x, z); if (h != x) { goto retry; } #else StgMutVar_var(mv) = z; #endif if (GET_INFO(mv) == stg_MUT_VAR_CLEAN_info) { ccall dirty_MUT_VAR(BaseReg "ptr", mv "ptr"); } return (x,z); } /* ----------------------------------------------------------------------------- Weak Pointer Primitives -------------------------------------------------------------------------- */ stg_mkWeakzh ( gcptr key, gcptr value, gcptr finalizer /* or stg_NO_FINALIZER_closure */ ) { gcptr w; ALLOC_PRIM (SIZEOF_StgWeak) w = Hp - SIZEOF_StgWeak + WDS(1); // No memory barrier needed as this is a new allocation. SET_HDR(w, stg_WEAK_info, CCCS); StgWeak_key(w) = key; StgWeak_value(w) = value; StgWeak_finalizer(w) = finalizer; StgWeak_cfinalizers(w) = stg_NO_FINALIZER_closure; StgWeak_link(w) = Capability_weak_ptr_list_hd(MyCapability()); Capability_weak_ptr_list_hd(MyCapability()) = w; if (Capability_weak_ptr_list_tl(MyCapability()) == NULL) { Capability_weak_ptr_list_tl(MyCapability()) = w; } IF_DEBUG(weak, ccall debugBelch("New weak pointer at %p\n",w)); return (w); } stg_mkWeakNoFinalizzerzh ( gcptr key, gcptr value ) { jump stg_mkWeakzh (key, value, stg_NO_FINALIZER_closure); } stg_addCFinalizzerToWeakzh ( W_ fptr, // finalizer W_ ptr, W_ flag, // has environment (0 or 1) W_ eptr, gcptr w ) { W_ c, info; ALLOC_PRIM (SIZEOF_StgCFinalizerList) c = Hp - SIZEOF_StgCFinalizerList + WDS(1); SET_HDR(c, stg_C_FINALIZER_LIST_info, CCCS); StgCFinalizerList_fptr(c) = fptr; StgCFinalizerList_ptr(c) = ptr; StgCFinalizerList_eptr(c) = eptr; StgCFinalizerList_flag(c) = flag; LOCK_CLOSURE(w, info); if (info == stg_DEAD_WEAK_info) { // Already dead. unlockClosure(w, info); return (0); } // Write barrier for concurrent non-moving collector updateRemembSetPushPtr(StgWeak_cfinalizers(w)) StgCFinalizerList_link(c) = StgWeak_cfinalizers(w); StgWeak_cfinalizers(w) = c; unlockClosure(w, info); recordMutable(w); IF_DEBUG(weak, ccall debugBelch("Adding a finalizer to %p\n",w)); return (1); } stg_finalizzeWeakzh ( gcptr w ) { gcptr f, list; W_ info; LOCK_CLOSURE(w, info); // already dead? if (info == stg_DEAD_WEAK_info) { unlockClosure(w, info); return (0,stg_NO_FINALIZER_closure); } f = StgWeak_finalizer(w); list = StgWeak_cfinalizers(w); // kill it #if defined(PROFILING) // @LDV profiling // A weak pointer is inherently used, so we do not need to call // LDV_recordDead_FILL_SLOP_DYNAMIC(): // LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)w); // or, LDV_recordDead(): // LDV_recordDead((StgClosure *)w, sizeofW(StgWeak) - sizeofW(StgProfHeader)); // Furthermore, when PROFILING is turned on, dead weak pointers are exactly as // large as weak pointers, so there is no need to fill the slop, either. // See stg_DEAD_WEAK_info in StgMiscClosures.cmm. #endif // // Todo: maybe use SET_HDR() and remove LDV_recordCreate()? // unlockClosure(w, stg_DEAD_WEAK_info); LDV_RECORD_CREATE(w); if (list != stg_NO_FINALIZER_closure) { ccall runCFinalizers(list); } /* return the finalizer */ if (f == stg_NO_FINALIZER_closure) { return (0,stg_NO_FINALIZER_closure); } else { return (1,f); } } stg_deRefWeakzh ( gcptr w ) { W_ code, info; gcptr val; info = GET_INFO(w); prim_read_barrier; if (info == stg_WHITEHOLE_info) { // w is locked by another thread. Now it's not immediately clear if w is // alive or not. We use lockClosure to wait for the info pointer to become // something other than stg_WHITEHOLE_info. LOCK_CLOSURE(w, info); unlockClosure(w, info); } if (info == stg_WEAK_info) { code = 1; val = StgWeak_value(w); // See Note [Concurrent read barrier on deRefWeak#] in NonMovingMark.c updateRemembSetPushPtr(val); } else { code = 0; val = w; } return (code,val); } /* ----------------------------------------------------------------------------- Floating point operations. -------------------------------------------------------------------------- */ stg_decodeFloatzuIntzh ( F_ arg ) { W_ p; W_ tmp, mp_tmp1, mp_tmp_w, r1, r2; STK_CHK_GEN_N (WDS(2)); reserve 2 = tmp { mp_tmp1 = tmp + WDS(1); mp_tmp_w = tmp; /* Perform the operation */ ccall __decodeFloat_Int(mp_tmp1 "ptr", mp_tmp_w "ptr", arg); r1 = W_[mp_tmp1]; r2 = W_[mp_tmp_w]; } /* returns: (Int# (mantissa), Int# (exponent)) */ return (r1, r2); } stg_decodeDoublezu2Intzh ( D_ arg ) { W_ p, tmp; W_ mp_tmp1, mp_tmp2, mp_result1, mp_result2; W_ r1, r2, r3, r4; STK_CHK_GEN_N (WDS(4)); reserve 4 = tmp { mp_tmp1 = tmp + WDS(3); mp_tmp2 = tmp + WDS(2); mp_result1 = tmp + WDS(1); mp_result2 = tmp; /* Perform the operation */ ccall __decodeDouble_2Int(mp_tmp1 "ptr", mp_tmp2 "ptr", mp_result1 "ptr", mp_result2 "ptr", arg); r1 = W_[mp_tmp1]; r2 = W_[mp_tmp2]; r3 = W_[mp_result1]; r4 = W_[mp_result2]; } /* returns: (Int# (mant sign), Word# (mant high), Word# (mant low), Int# (expn)) */ return (r1, r2, r3, r4); } /* Double# -> (# Int64#, Int# #) */ stg_decodeDoublezuInt64zh ( D_ arg ) { CInt exp; I64 mant; W_ mant_ptr; STK_CHK_GEN_N (SIZEOF_INT64); reserve BYTES_TO_WDS(SIZEOF_INT64) = mant_ptr { (exp) = ccall __decodeDouble_Int64(mant_ptr "ptr", arg); mant = I64[mant_ptr]; } return (mant, TO_W_(exp)); } /* ----------------------------------------------------------------------------- * Concurrency primitives * -------------------------------------------------------------------------- */ stg_forkzh ( gcptr closure ) { MAYBE_GC_P(stg_forkzh, closure); gcptr threadid; ("ptr" threadid) = ccall createIOThread( MyCapability() "ptr", TO_W_(RtsFlags_GcFlags_initialStkSize(RtsFlags)), closure "ptr"); /* start blocked if the current thread is blocked */ StgTSO_flags(threadid) = %lobits16( TO_W_(StgTSO_flags(threadid)) | TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE)); ccall scheduleThread(MyCapability() "ptr", threadid "ptr"); // context switch soon, but not immediately: we don't want every // forkIO to force a context-switch. Capability_context_switch(MyCapability()) = 1 :: CInt; return (threadid); } stg_forkOnzh ( W_ cpu, gcptr closure ) { again: MAYBE_GC(again); gcptr threadid; ("ptr" threadid) = ccall createIOThread( MyCapability() "ptr", TO_W_(RtsFlags_GcFlags_initialStkSize(RtsFlags)), closure "ptr"); /* start blocked if the current thread is blocked */ StgTSO_flags(threadid) = %lobits16( TO_W_(StgTSO_flags(threadid)) | TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE)); ccall scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr"); // context switch soon, but not immediately: we don't want every // forkIO to force a context-switch. Capability_context_switch(MyCapability()) = 1 :: CInt; return (threadid); } stg_yieldzh () { // when we yield to the scheduler, we have to tell it to put the // current thread to the back of the queue by setting the // context_switch flag. If we don't do this, it will run the same // thread again. Capability_context_switch(MyCapability()) = 1 :: CInt; jump stg_yield_noregs(); } stg_labelThreadzh ( gcptr threadid, W_ addr ) { ccall labelThread(MyCapability() "ptr", threadid "ptr", addr "ptr"); return (); } stg_isCurrentThreadBoundzh (/* no args */) { W_ r; (r) = ccall isThreadBound(CurrentTSO); return (r); } stg_threadStatuszh ( gcptr tso ) { W_ why_blocked; W_ what_next; W_ ret, cap, locked; what_next = TO_W_(StgTSO_what_next(tso)); why_blocked = TO_W_(StgTSO_why_blocked(tso)); // Note: these two reads are not atomic, so they might end up // being inconsistent. It doesn't matter, since we // only return one or the other. If we wanted to return the // contents of block_info too, then we'd have to do some synchronisation. if (what_next == ThreadComplete) { ret = 16; // NB. magic, matches up with GHC.Conc.threadStatus } else { if (what_next == ThreadKilled) { ret = 17; } else { ret = why_blocked; } } cap = TO_W_(Capability_no(StgTSO_cap(tso))); if ((TO_W_(StgTSO_flags(tso)) & TSO_LOCKED) != 0) { locked = 1; } else { locked = 0; } return (ret,cap,locked); } /* ----------------------------------------------------------------------------- * TVar primitives * -------------------------------------------------------------------------- */ stg_abort /* no arg list: explicit stack layout */ { W_ frame_type; W_ frame; W_ trec; W_ outer; W_ r; // STM operations may allocate MAYBE_GC_ (stg_abort); // NB. not MAYBE_GC(), we cannot make a // function call in an explicit-stack proc // Find the enclosing ATOMICALLY_FRAME SAVE_THREAD_STATE(); (frame_type) = ccall findAtomicallyFrameHelper(MyCapability(), CurrentTSO "ptr"); LOAD_THREAD_STATE(); frame = Sp; trec = StgTSO_trec(CurrentTSO); outer = StgTRecHeader_enclosing_trec(trec); // We've reached the ATOMICALLY_FRAME ASSERT(frame_type == ATOMICALLY_FRAME); ASSERT(outer == NO_TREC); // Restart the transaction. ("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr"); StgTSO_trec(CurrentTSO) = trec; Sp = frame; R1 = StgAtomicallyFrame_code(frame); jump stg_ap_v_fast [R1]; } // Catch retry frame ----------------------------------------------------------- #define CATCH_RETRY_FRAME_FIELDS(w_,p_,info_ptr, \ p1, p2, \ running_alt_code, \ first_code, \ alt_code) \ w_ info_ptr, \ PROF_HDR_FIELDS(w_,p1,p2) \ w_ running_alt_code, \ p_ first_code, \ p_ alt_code INFO_TABLE_RET(stg_catch_retry_frame, CATCH_RETRY_FRAME, CATCH_RETRY_FRAME_FIELDS(W_,P_, info_ptr, p1, p2, running_alt_code, first_code, alt_code)) return (P_ ret) { unwind Sp = Sp + SIZEOF_StgCatchRetryFrame; W_ r; gcptr trec, outer, arg; trec = StgTSO_trec(CurrentTSO); outer = StgTRecHeader_enclosing_trec(trec); (r) = ccall stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr"); if (r != 0) { // Succeeded (either first branch or second branch) StgTSO_trec(CurrentTSO) = outer; return (ret); } else { // Did not commit: abort and restart. StgTSO_trec(CurrentTSO) = outer; jump stg_abort(); } } // Atomically frame ------------------------------------------------------------ // This must match StgAtomicallyFrame in Closures.h #define ATOMICALLY_FRAME_FIELDS(w_,p_,info_ptr,p1,p2,code,result) \ w_ info_ptr, \ PROF_HDR_FIELDS(w_,p1,p2) \ p_ code, \ p_ result INFO_TABLE_RET(stg_atomically_frame, ATOMICALLY_FRAME, // layout of the frame, and bind the field names ATOMICALLY_FRAME_FIELDS(W_,P_, info_ptr, p1, p2, code, frame_result)) return (P_ result) // value returned to the frame { W_ valid; gcptr trec; trec = StgTSO_trec(CurrentTSO); /* Back at the atomically frame */ frame_result = result; /* try to commit */ (valid) = ccall stmCommitTransaction(MyCapability() "ptr", trec "ptr"); if (valid != 0) { /* Transaction was valid: commit succeeded */ StgTSO_trec(CurrentTSO) = NO_TREC; return (frame_result); } else { /* Transaction was not valid: try again */ ("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr", NO_TREC "ptr"); StgTSO_trec(CurrentTSO) = trec; jump stg_ap_v_fast // push the StgAtomicallyFrame again: the code generator is // clever enough to only assign the fields that have changed. (ATOMICALLY_FRAME_FIELDS(,,info_ptr,p1,p2, code,frame_result)) (code); } } INFO_TABLE_RET(stg_atomically_waiting_frame, ATOMICALLY_FRAME, // layout of the frame, and bind the field names ATOMICALLY_FRAME_FIELDS(W_,P_, info_ptr, p1, p2, code, frame_result)) return (/* no return values */) { W_ trec, valid; /* The TSO is currently waiting: should we stop waiting? */ (valid) = ccall stmReWait(MyCapability() "ptr", CurrentTSO "ptr"); if (valid != 0) { /* Previous attempt is still valid: no point trying again yet */ jump stg_block_noregs (ATOMICALLY_FRAME_FIELDS(,,info_ptr, p1, p2, code,frame_result)) (); } else { /* Previous attempt is no longer valid: try again */ ("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr", NO_TREC "ptr"); StgTSO_trec(CurrentTSO) = trec; // change the frame header to stg_atomically_frame_info jump stg_ap_v_fast (ATOMICALLY_FRAME_FIELDS(,,stg_atomically_frame_info, p1, p2, code,frame_result)) (code); } } // STM catch frame ------------------------------------------------------------- /* Catch frames are very similar to update frames, but when entering * one we just pop the frame off the stack and perform the correct * kind of return to the activation record underneath us on the stack. */ #define CATCH_STM_FRAME_FIELDS(w_,p_,info_ptr,p1,p2,code,handler) \ w_ info_ptr, \ PROF_HDR_FIELDS(w_,p1,p2) \ p_ code, \ p_ handler INFO_TABLE_RET(stg_catch_stm_frame, CATCH_STM_FRAME, // layout of the frame, and bind the field names CATCH_STM_FRAME_FIELDS(W_,P_,info_ptr,p1,p2,code,handler)) return (P_ ret) { W_ r, trec, outer; trec = StgTSO_trec(CurrentTSO); outer = StgTRecHeader_enclosing_trec(trec); (r) = ccall stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr"); if (r != 0) { /* Commit succeeded */ StgTSO_trec(CurrentTSO) = outer; return (ret); } else { /* Commit failed */ W_ new_trec; ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr"); StgTSO_trec(CurrentTSO) = new_trec; jump stg_ap_v_fast (CATCH_STM_FRAME_FIELDS(,,info_ptr,p1,p2,code,handler)) (code); } } // Primop definition ----------------------------------------------------------- stg_atomicallyzh (P_ stm) { P_ old_trec; P_ new_trec; P_ code, frame_result; // stmStartTransaction may allocate MAYBE_GC_P(stg_atomicallyzh, stm); STK_CHK_GEN(); old_trec = StgTSO_trec(CurrentTSO); /* Nested transactions are not allowed; raise an exception */ if (old_trec != NO_TREC) { jump stg_raisezh(base_ControlziExceptionziBase_nestedAtomically_closure); } code = stm; frame_result = NO_TREC; /* Start the memory transaction */ ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr", old_trec "ptr"); StgTSO_trec(CurrentTSO) = new_trec; jump stg_ap_v_fast (ATOMICALLY_FRAME_FIELDS(,,stg_atomically_frame_info, CCCS, 0, code,frame_result)) (stm); } // A closure representing "atomically x". This is used when a thread // inside a transaction receives an asynchronous exception; see #5866. // It is somewhat similar to the stg_raise closure. // INFO_TABLE(stg_atomically,1,0,THUNK_1_0,"atomically","atomically") (P_ thunk) { jump stg_atomicallyzh(StgThunk_payload(thunk,0)); } stg_catchSTMzh (P_ code /* :: STM a */, P_ handler /* :: Exception -> STM a */) { STK_CHK_GEN(); /* Start a nested transaction to run the body of the try block in */ W_ cur_trec; W_ new_trec; cur_trec = StgTSO_trec(CurrentTSO); ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr", cur_trec "ptr"); StgTSO_trec(CurrentTSO) = new_trec; jump stg_ap_v_fast (CATCH_STM_FRAME_FIELDS(,,stg_catch_stm_frame_info, CCCS, 0, code, handler)) (code); } stg_catchRetryzh (P_ first_code, /* :: STM a */ P_ alt_code /* :: STM a */) { W_ new_trec; // stmStartTransaction may allocate MAYBE_GC_PP (stg_catchRetryzh, first_code, alt_code); STK_CHK_GEN(); /* Start a nested transaction within which to run the first code */ ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr", StgTSO_trec(CurrentTSO) "ptr"); StgTSO_trec(CurrentTSO) = new_trec; // push the CATCH_RETRY stack frame, and apply first_code to realWorld# jump stg_ap_v_fast (CATCH_RETRY_FRAME_FIELDS(,, stg_catch_retry_frame_info, CCCS, 0, 0, /* not running_alt_code */ first_code, alt_code)) (first_code); } stg_retryzh /* no arg list: explicit stack layout */ { W_ frame_type; W_ frame; W_ trec; W_ outer; W_ r; // STM operations may allocate MAYBE_GC_ (stg_retryzh); // NB. not MAYBE_GC(), we cannot make a // function call in an explicit-stack proc // Find the enclosing ATOMICALLY_FRAME or CATCH_RETRY_FRAME retry_pop_stack: SAVE_THREAD_STATE(); (frame_type) = ccall findRetryFrameHelper(MyCapability(), CurrentTSO "ptr"); LOAD_THREAD_STATE(); frame = Sp; trec = StgTSO_trec(CurrentTSO); outer = StgTRecHeader_enclosing_trec(trec); if (frame_type == CATCH_RETRY_FRAME) { // The retry reaches a CATCH_RETRY_FRAME before the atomic frame ASSERT(outer != NO_TREC); // Abort the transaction attempting the current branch ccall stmAbortTransaction(MyCapability() "ptr", trec "ptr"); ccall stmFreeAbortedTRec(MyCapability() "ptr", trec "ptr"); if (!StgCatchRetryFrame_running_alt_code(frame) != 0) { // Retry in the first branch: try the alternative ("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr"); StgTSO_trec(CurrentTSO) = trec; StgCatchRetryFrame_running_alt_code(frame) = 1 :: CInt; // true; R1 = StgCatchRetryFrame_alt_code(frame); jump stg_ap_v_fast [R1]; } else { // Retry in the alternative code: propagate the retry StgTSO_trec(CurrentTSO) = outer; Sp = Sp + SIZEOF_StgCatchRetryFrame; goto retry_pop_stack; } } // We've reached the ATOMICALLY_FRAME: attempt to wait ASSERT(frame_type == ATOMICALLY_FRAME); ASSERT(outer == NO_TREC); (r) = ccall stmWait(MyCapability() "ptr", CurrentTSO "ptr", trec "ptr"); if (r != 0) { // Transaction was valid: stmWait put us on the TVars' queues, we now block StgHeader_info(frame) = stg_atomically_waiting_frame_info; Sp = frame; R3 = trec; // passing to stmWaitUnblock() jump stg_block_stmwait [R3]; } else { // Transaction was not valid: retry immediately ("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr"); StgTSO_trec(CurrentTSO) = trec; Sp = frame; R1 = StgAtomicallyFrame_code(frame); jump stg_ap_v_fast [R1]; } } stg_newTVarzh (P_ init) { W_ tv; ALLOC_PRIM_P (SIZEOF_StgTVar, stg_newTVarzh, init); tv = Hp - SIZEOF_StgTVar + WDS(1); SET_HDR (tv, stg_TVAR_DIRTY_info, CCCS); StgTVar_current_value(tv) = init; StgTVar_first_watch_queue_entry(tv) = stg_END_STM_WATCH_QUEUE_closure; StgTVar_num_updates(tv) = 0; return (tv); } stg_readTVarzh (P_ tvar) { P_ trec; P_ result; // Call to stmReadTVar may allocate MAYBE_GC_P (stg_readTVarzh, tvar); trec = StgTSO_trec(CurrentTSO); ("ptr" result) = ccall stmReadTVar(MyCapability() "ptr", trec "ptr", tvar "ptr"); return (result); } stg_readTVarIOzh ( P_ tvar /* :: TVar a */ ) { W_ result, resultinfo; again: result = StgTVar_current_value(tvar); resultinfo = %INFO_PTR(result); prim_read_barrier; if (resultinfo == stg_TREC_HEADER_info) { goto again; } return (result); } stg_writeTVarzh (P_ tvar, /* :: TVar a */ P_ new_value /* :: a */) { W_ trec; // Call to stmWriteTVar may allocate MAYBE_GC_PP (stg_writeTVarzh, tvar, new_value); trec = StgTSO_trec(CurrentTSO); ccall stmWriteTVar(MyCapability() "ptr", trec "ptr", tvar "ptr", new_value "ptr"); return (); } /* ----------------------------------------------------------------------------- * MVar primitives * * take & putMVar work as follows. Firstly, an important invariant: * * If the MVar is full, then the blocking queue contains only * threads blocked on putMVar, and if the MVar is empty then the * blocking queue contains only threads blocked on takeMVar. * * takeMvar: * MVar empty : then add ourselves to the blocking queue * MVar full : remove the value from the MVar, and * blocking queue empty : return * blocking queue non-empty : perform the first blocked putMVar * from the queue, and wake up the * thread (MVar is now full again) * * putMVar is just the dual of the above algorithm. * * How do we "perform a putMVar"? Well, we have to fiddle around with * the stack of the thread waiting to do the putMVar. See * stg_block_putmvar and stg_block_takemvar in HeapStackCheck.c for * the stack layout, and the PerformPut and PerformTake macros below. * * It is important that a blocked take or put is woken up with the * take/put already performed, because otherwise there would be a * small window of vulnerability where the thread could receive an * exception and never perform its take or put, and we'd end up with a * deadlock. * * -------------------------------------------------------------------------- */ stg_isEmptyMVarzh ( P_ mvar /* :: MVar a */ ) { if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { return (1); } else { return (0); } } stg_newMVarzh () { W_ mvar; ALLOC_PRIM_ (SIZEOF_StgMVar, stg_newMVarzh); mvar = Hp - SIZEOF_StgMVar + WDS(1); // No memory barrier needed as this is a new allocation. SET_HDR(mvar,stg_MVAR_DIRTY_info,CCCS); // MVARs start dirty: generation 0 has no mutable list StgMVar_head(mvar) = stg_END_TSO_QUEUE_closure; StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure; return (mvar); } #define PerformTake(stack, value) \ W_ sp; \ sp = StgStack_sp(stack); \ W_[sp + WDS(1)] = value; \ W_[sp + WDS(0)] = stg_ret_p_info; #define PerformPut(stack,lval) \ W_ sp; \ sp = StgStack_sp(stack) + WDS(3); \ StgStack_sp(stack) = sp; \ lval = W_[sp - WDS(1)]; stg_takeMVarzh ( P_ mvar /* :: MVar a */ ) { W_ val, info, tso, q, qinfo; LOCK_CLOSURE(mvar, info); /* If the MVar is empty, put ourselves on its blocking queue, * and wait until we're woken up. */ if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { if (info == stg_MVAR_CLEAN_info) { ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", StgMVar_value(mvar) "ptr"); } // We want to put the heap check down here in the slow path, // but be careful to unlock the closure before returning to // the RTS if the check fails. ALLOC_PRIM_WITH_CUSTOM_FAILURE (SIZEOF_StgMVarTSOQueue, unlockClosure(mvar, stg_MVAR_DIRTY_info); GC_PRIM_P(stg_takeMVarzh, mvar)); q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1); StgMVarTSOQueue_link(q) = END_TSO_QUEUE; StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); // Write barrier before we make the new MVAR_TSO_QUEUE // visible to other cores. // See Note [Heap memory barriers] prim_write_barrier; if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { StgMVar_head(mvar) = q; } else { StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q; ccall recordClosureMutated(MyCapability() "ptr", StgMVar_tail(mvar)); } StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = mvar; StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; StgMVar_tail(mvar) = q; jump stg_block_takemvar(mvar); } /* we got the value... */ val = StgMVar_value(mvar); q = StgMVar_head(mvar); loop: if (q == stg_END_TSO_QUEUE_closure) { /* No further putMVars, MVar is now empty */ StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure; // If the MVar is not already dirty, then we don't need to make // it dirty, as it is empty with nothing blocking on it. unlockClosure(mvar, info); // However, we do need to ensure that the nonmoving collector // knows about the reference to the value that we just removed... updateRemembSetPushPtr(val); return (val); } qinfo = StgHeader_info(q); prim_read_barrier; if (qinfo == stg_IND_info || qinfo == stg_MSG_NULL_info) { q = StgInd_indirectee(q); goto loop; } // There are putMVar(s) waiting... wake up the first thread on the queue if (info == stg_MVAR_CLEAN_info) { ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", val "ptr"); } tso = StgMVarTSOQueue_tso(q); StgMVar_head(mvar) = StgMVarTSOQueue_link(q); if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); ASSERT(StgTSO_block_info(tso) == mvar); // actually perform the putMVar for the thread that we just woke up W_ stack; stack = StgTSO_stackobj(tso); PerformPut(stack, StgMVar_value(mvar)); // indicate that the MVar operation has now completed. StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; // no need to mark the TSO dirty, we have only written END_TSO_QUEUE. ccall tryWakeupThread(MyCapability() "ptr", tso); unlockClosure(mvar, stg_MVAR_DIRTY_info); return (val); } stg_tryTakeMVarzh ( P_ mvar /* :: MVar a */ ) { W_ val, info, tso, q, qinfo; LOCK_CLOSURE(mvar, info); /* If the MVar is empty, return 0. */ if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { #if defined(THREADED_RTS) unlockClosure(mvar, info); #endif /* HACK: we need a pointer to pass back, * so we abuse NO_FINALIZER_closure */ return (0, stg_NO_FINALIZER_closure); } /* we got the value... */ val = StgMVar_value(mvar); q = StgMVar_head(mvar); loop: if (q == stg_END_TSO_QUEUE_closure) { /* No further putMVars, MVar is now empty */ StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure; unlockClosure(mvar, info); return (1, val); } qinfo = StgHeader_info(q); prim_read_barrier; if (qinfo == stg_IND_info || qinfo == stg_MSG_NULL_info) { q = StgInd_indirectee(q); goto loop; } // There are putMVar(s) waiting... wake up the first thread on the queue if (info == stg_MVAR_CLEAN_info) { ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", val "ptr"); } tso = StgMVarTSOQueue_tso(q); StgMVar_head(mvar) = StgMVarTSOQueue_link(q); if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); ASSERT(StgTSO_block_info(tso) == mvar); // actually perform the putMVar for the thread that we just woke up W_ stack; stack = StgTSO_stackobj(tso); PerformPut(stack, StgMVar_value(mvar)); // indicate that the MVar operation has now completed. StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; // no need to mark the TSO dirty, we have only written END_TSO_QUEUE. ccall tryWakeupThread(MyCapability() "ptr", tso); unlockClosure(mvar, stg_MVAR_DIRTY_info); return (1,val); } stg_putMVarzh ( P_ mvar, /* :: MVar a */ P_ val, /* :: a */ ) { W_ info, tso, q, qinfo; LOCK_CLOSURE(mvar, info); if (StgMVar_value(mvar) != stg_END_TSO_QUEUE_closure) { if (info == stg_MVAR_CLEAN_info) { ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", StgMVar_value(mvar) "ptr"); } // We want to put the heap check down here in the slow path, // but be careful to unlock the closure before returning to // the RTS if the check fails. ALLOC_PRIM_WITH_CUSTOM_FAILURE (SIZEOF_StgMVarTSOQueue, unlockClosure(mvar, stg_MVAR_DIRTY_info); GC_PRIM_PP(stg_putMVarzh, mvar, val)); q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1); StgMVarTSOQueue_link(q) = END_TSO_QUEUE; StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); //See Note [Heap memory barriers] prim_write_barrier; if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { StgMVar_head(mvar) = q; } else { StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q; ccall recordClosureMutated(MyCapability() "ptr", StgMVar_tail(mvar)); } StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = mvar; StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; StgMVar_tail(mvar) = q; jump stg_block_putmvar(mvar,val); } // We are going to mutate the closure, make sure its current pointers // are marked. if (info == stg_MVAR_CLEAN_info) { ccall update_MVAR(BaseReg "ptr", mvar "ptr", StgMVar_value(mvar) "ptr"); } q = StgMVar_head(mvar); loop: if (q == stg_END_TSO_QUEUE_closure) { /* No further takes, the MVar is now full. */ StgMVar_value(mvar) = val; if (info == stg_MVAR_CLEAN_info) { ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", StgMVar_value(mvar) "ptr"); } unlockClosure(mvar, stg_MVAR_DIRTY_info); return (); } qinfo = StgHeader_info(q); prim_read_barrier; if (qinfo == stg_IND_info || qinfo == stg_MSG_NULL_info) { q = StgInd_indirectee(q); goto loop; } // There are readMVar/takeMVar(s) waiting: wake up the first one tso = StgMVarTSOQueue_tso(q); q = StgMVarTSOQueue_link(q); StgMVar_head(mvar) = q; if (q == stg_END_TSO_QUEUE_closure) { StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } else { if (info == stg_MVAR_CLEAN_info) { // Resolve #18919. ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", StgMVar_value(mvar) "ptr"); info = stg_MVAR_DIRTY_info; } } ASSERT(StgTSO_block_info(tso) == mvar); // save why_blocked here, because waking up the thread destroys // this information W_ why_blocked; why_blocked = TO_W_(StgTSO_why_blocked(tso)); // actually perform the takeMVar W_ stack; stack = StgTSO_stackobj(tso); PerformTake(stack, val); // indicate that the MVar operation has now completed. StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; if ((TO_W_(StgStack_dirty(stack)) & STACK_DIRTY) == 0) { ccall dirty_STACK(MyCapability() "ptr", stack "ptr"); } ccall tryWakeupThread(MyCapability() "ptr", tso); // If it was a readMVar, then we can still do work, // so loop back. (XXX: This could take a while) if (why_blocked == BlockedOnMVarRead) goto loop; ASSERT(why_blocked == BlockedOnMVar); unlockClosure(mvar, info); return (); } // NOTE: there is another implementation of this function in // Threads.c:performTryPutMVar(). Keep them in sync! It was // measurably slower to call the C function from here (70% for a // tight loop doing tryPutMVar#). // // TODO: we could kill the duplication by making tryPutMVar# into an // inline primop that expands into a C call to performTryPutMVar(). stg_tryPutMVarzh ( P_ mvar, /* :: MVar a */ P_ val, /* :: a */ ) { W_ info, tso, q, qinfo; LOCK_CLOSURE(mvar, info); if (StgMVar_value(mvar) != stg_END_TSO_QUEUE_closure) { #if defined(THREADED_RTS) unlockClosure(mvar, info); #endif return (0); } q = StgMVar_head(mvar); loop: if (q == stg_END_TSO_QUEUE_closure) { /* No further takes, the MVar is now full. */ if (info == stg_MVAR_CLEAN_info) { ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", StgMVar_value(mvar) "ptr"); } StgMVar_value(mvar) = val; unlockClosure(mvar, stg_MVAR_DIRTY_info); return (1); } qinfo = StgHeader_info(q); prim_read_barrier; if (qinfo == stg_IND_info || qinfo == stg_MSG_NULL_info) { q = StgInd_indirectee(q); goto loop; } // There are takeMVar(s) waiting: wake up the first one tso = StgMVarTSOQueue_tso(q); q = StgMVarTSOQueue_link(q); StgMVar_head(mvar) = q; if (q == stg_END_TSO_QUEUE_closure) { StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } else { if (info == stg_MVAR_CLEAN_info) { // Resolve #18919. ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", StgMVar_value(mvar) "ptr"); info = stg_MVAR_DIRTY_info; } } ASSERT(StgTSO_block_info(tso) == mvar); // save why_blocked here, because waking up the thread destroys // this information W_ why_blocked; why_blocked = TO_W_(StgTSO_why_blocked(tso)); // actually perform the takeMVar W_ stack; stack = StgTSO_stackobj(tso); PerformTake(stack, val); // indicate that the MVar operation has now completed. StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; if ((TO_W_(StgStack_dirty(stack)) & STACK_DIRTY) == 0) { ccall dirty_STACK(MyCapability() "ptr", stack "ptr"); } ccall tryWakeupThread(MyCapability() "ptr", tso); // If it was a readMVar, then we can still do work, // so loop back. (XXX: This could take a while) if (why_blocked == BlockedOnMVarRead) goto loop; ASSERT(why_blocked == BlockedOnMVar); unlockClosure(mvar, info); return (1); } stg_readMVarzh ( P_ mvar, /* :: MVar a */ ) { W_ val, info, tso, q; LOCK_CLOSURE(mvar, info); /* If the MVar is empty, put ourselves on the blocked readers * list and wait until we're woken up. */ if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { // Add MVar to mutable list if (info == stg_MVAR_CLEAN_info) { ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", StgMVar_value(mvar)); } ALLOC_PRIM_WITH_CUSTOM_FAILURE (SIZEOF_StgMVarTSOQueue, unlockClosure(mvar, stg_MVAR_DIRTY_info); GC_PRIM_P(stg_readMVarzh, mvar)); q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1); // readMVars are pushed to the front of the queue, so // they get handled immediately StgMVarTSOQueue_link(q) = StgMVar_head(mvar); StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); //See Note [Heap memory barriers] prim_write_barrier; StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = mvar; StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16; StgMVar_head(mvar) = q; if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) { StgMVar_tail(mvar) = q; } jump stg_block_readmvar(mvar); } val = StgMVar_value(mvar); unlockClosure(mvar, info); return (val); } stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ ) { W_ val, info, tso, q; LOCK_CLOSURE(mvar, info); if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { unlockClosure(mvar, info); return (0, stg_NO_FINALIZER_closure); } val = StgMVar_value(mvar); unlockClosure(mvar, info); return (1, val); } /* ----------------------------------------------------------------------------- * IOPort primitives * * readIOPort & writeIOPort work as follows. Firstly, an important invariant: * * Only one read and one write is allowed for an IOPort. * Reading or writing to the same port twice will throw an exception. * * readIOPort: * IOPort empty : then add ourselves to the blocking queue * IOPort full : remove the value from the IOPort, and * blocking queue empty : return * blocking queue non-empty : perform the only blocked * writeIOPort from the queue, and * wake up the thread * (IOPort is now empty) * * writeIOPort is just the dual of the above algorithm. * * How do we "perform a writeIOPort"? Well, By storing the value and prt on the * stack, same way we do with MVars. Semantically the operations mutate the * stack the same way so we will re-use the logic and datastructures for MVars * for IOPort. See stg_block_putmvar and stg_block_takemvar in HeapStackCheck.c * for the stack layout, and the PerformPut and PerformTake macros below. We * also re-use the closure types MVAR_CLEAN/_DIRTY for IOPort. * * The remaining caveats of MVar thus also apply for an IOPort. The main * crucial difference between an MVar and IOPort is that the scheduler will not * be allowed to interrupt a blocked IOPort just because it thinks there's a * deadlock. This is especially crucial for the non-threaded runtime. * * To avoid double reads/writes we set only the head to a MVarTSOQueue when * a reader queues up on a port. * We set the tail to the port itself upon reading. We can do this * since there can only be one reader/writer for the port. In contrast to MVars * which do need to keep a list of blocked threads. * * This means IOPorts have these valid states and transitions: * ┌─────────┐ │ Empty │ head == tail == value == END_TSO_QUEUE ├─────────┤ │ │ write │ │ read v v value != END_TSO_QUEUE ┌─────────┐ ┌─────────┐ value == END_TSO_QUEUE head == END_TSO_QUEUE │ full │ │ reading │ head == queue with single reader tail == END_TSO_QUEUE └─────────┘ └─────────┘ tail == END_TSO_QUEUE │ │ read │ │ write │ │ v v ┌──────────┐ value != END_TSO_QUEUE │ Used │ head == END_TSO_QUEUE └──────────┘ tail == ioport * * -------------------------------------------------------------------------- */ stg_readIOPortzh ( P_ ioport /* :: IOPort a */ ) { W_ val, info, tso, q; LOCK_CLOSURE(ioport, info); /* If the Port is empty, put ourselves on the blocked readers * list and wait until we're woken up. */ if (StgMVar_value(ioport) == stg_END_TSO_QUEUE_closure) { // There is or was already another reader, throw exception. if (StgMVar_head(ioport) != stg_END_TSO_QUEUE_closure || StgMVar_tail(ioport) != stg_END_TSO_QUEUE_closure) { unlockClosure(ioport, info); jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure); } if (info == stg_MVAR_CLEAN_info) { ccall dirty_MVAR(BaseReg "ptr", ioport "ptr", StgMVar_value(ioport)); } ALLOC_PRIM_WITH_CUSTOM_FAILURE (SIZEOF_StgMVarTSOQueue, unlockClosure(ioport, stg_MVAR_DIRTY_info); GC_PRIM_P(stg_readIOPortzh, ioport)); q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1); // link = stg_END_TSO_QUEUE_closure since we check that // there is no other reader above. StgMVarTSOQueue_link(q) = stg_END_TSO_QUEUE_closure; StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); //See Note [Heap memory barriers] prim_write_barrier; StgMVar_head(ioport) = q; StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = ioport; StgTSO_why_blocked(CurrentTSO) = BlockedOnIOCompletion::I16; //Unlocks the closure as well jump stg_block_readmvar(ioport); } //This way we can check of there has been a read already. //Upon reading we set tail to indicate the port is now closed. if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) { StgMVar_tail(ioport) = ioport; StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure; } else { //Or another thread has read already: Throw an exception. unlockClosure(ioport, info); jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure); } val = StgMVar_value(ioport); unlockClosure(ioport, info); return (val); } stg_writeIOPortzh ( P_ ioport, /* :: IOPort a */ P_ val, /* :: a */ ) { W_ info, tso, q; LOCK_CLOSURE(ioport, info); /* If there is already a value in the port, then raise an exception as it's the second write. Correct usages of IOPort should never have a second write. */ if (StgMVar_value(ioport) != stg_END_TSO_QUEUE_closure) { unlockClosure(ioport, info); jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure); return (0); } // We are going to mutate the closure, make sure its current pointers // are marked. if (info == stg_MVAR_CLEAN_info) { ccall update_MVAR(BaseReg "ptr", ioport "ptr", StgMVar_value(ioport) "ptr"); } q = StgMVar_head(ioport); loop: if (q == stg_END_TSO_QUEUE_closure) { /* No takes, the IOPort is now full. */ if (info == stg_MVAR_CLEAN_info) { ccall dirty_MVAR(BaseReg "ptr", ioport "ptr"); } StgMVar_value(ioport) = val; unlockClosure(ioport, stg_MVAR_DIRTY_info); return (1); } //Possibly IND added by removeFromMVarBlockedQueue if (StgHeader_info(q) == stg_IND_info || StgHeader_info(q) == stg_MSG_NULL_info) { q = StgInd_indirectee(q); goto loop; } // There is a readIOPort waiting: wake it up tso = StgMVarTSOQueue_tso(q); // Assert no read has happened yet. ASSERT(StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure); // And there is only one reader queued up. ASSERT(StgMVarTSOQueue_link(q) == stg_END_TSO_QUEUE_closure); // We perform the read here, so set tail/head accordingly. StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure; StgMVar_tail(ioport) = ioport; // In contrast to MVars we do not need to move on to the // next element in the waiting list here, as there can only ever // be one thread blocked on a port. ASSERT(StgTSO_block_info(tso) == ioport); // save why_blocked here, because waking up the thread destroys // this information W_ why_blocked; why_blocked = TO_W_(StgTSO_why_blocked(tso)); // actually perform the takeMVar W_ stack; stack = StgTSO_stackobj(tso); PerformTake(stack, val); // indicate that the operation has now completed. StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; if (TO_W_(StgStack_dirty(stack)) == 0) { ccall dirty_STACK(MyCapability() "ptr", stack "ptr"); } ccall tryWakeupThread(MyCapability() "ptr", tso); // For MVars we loop here, waking up all readers. // IOPorts however can only have on reader. So we are done // at this point. //Either there was no reader queued, or he must have been //blocked on BlockedOnIOCompletion ASSERT(why_blocked == BlockedOnIOCompletion); unlockClosure(ioport, info); return (1); } /* ----------------------------------------------------------------------------- IOPort primitives -------------------------------------------------------------------------- */ stg_newIOPortzh () { W_ ioport; ALLOC_PRIM_ (SIZEOF_StgMVar, stg_newIOPortzh); ioport = Hp - SIZEOF_StgMVar + WDS(1); SET_HDR(ioport, stg_MVAR_DIRTY_info,CCCS); // MVARs start dirty: generation 0 has no mutable list StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure; StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure; StgMVar_value(ioport) = stg_END_TSO_QUEUE_closure; return (ioport); } /* ----------------------------------------------------------------------------- Stable pointer primitives ------------------------------------------------------------------------- */ stg_makeStableNamezh ( P_ obj ) { W_ index, sn_obj; MAYBE_GC_P(stg_makeStableNamezh, obj); (index) = ccall lookupStableName(obj "ptr"); /* Is there already a StableName for this heap object? * stable_name_table is a pointer to an array of snEntry structs. */ if ( snEntry_sn_obj(W_[stable_name_table] + index*SIZEOF_snEntry) == NULL ) { // At this point we have a snEntry, but it doesn't look as used to the // GC yet because we don't have a StableName object for the sn_obj field // (remember that sn_obj == NULL means the entry is free). So if we call // GC here we end up skipping the snEntry in gcStableNameTable(). This // caused #15906. Solution: use allocate(), which does not call GC. // // (Alternatively we could use a special value for the sn_obj field to // indicate that the entry is being allocated and not free, but that's // too complicated and doesn't buy us much. See D5342?id=18700.) ("ptr" sn_obj) = ccall allocate(MyCapability() "ptr", BYTES_TO_WDS(SIZEOF_StgStableName)); SET_HDR(sn_obj, stg_STABLE_NAME_info, CCCS); StgStableName_sn(sn_obj) = index; // This will make the StableName# object visible to other threads; // be sure that its completely visible to other cores. // See Note [Heap memory barriers] in SMP.h. prim_write_barrier; snEntry_sn_obj(W_[stable_name_table] + index*SIZEOF_snEntry) = sn_obj; } else { sn_obj = snEntry_sn_obj(W_[stable_name_table] + index*SIZEOF_snEntry); } return (sn_obj); } stg_makeStablePtrzh ( P_ obj ) { W_ sp; ("ptr" sp) = ccall getStablePtr(obj "ptr"); return (sp); } stg_deRefStablePtrzh ( P_ sp ) { W_ r; r = spEntry_addr(W_[stable_ptr_table] + sp*SIZEOF_spEntry); return (r); } /* ----------------------------------------------------------------------------- Bytecode object primitives ------------------------------------------------------------------------- */ stg_newBCOzh ( P_ instrs, P_ literals, P_ ptrs, W_ arity, P_ bitmap_arr ) { W_ bco, bytes, words; words = BYTES_TO_WDS(SIZEOF_StgBCO) + BYTE_ARR_WDS(bitmap_arr); bytes = WDS(words); ALLOC_PRIM (bytes); bco = Hp - bytes + WDS(1); // No memory barrier necessary as this is a new allocation. SET_HDR(bco, stg_BCO_info, CCS_MAIN); StgBCO_instrs(bco) = instrs; StgBCO_literals(bco) = literals; StgBCO_ptrs(bco) = ptrs; StgBCO_arity(bco) = HALF_W_(arity); StgBCO_size(bco) = HALF_W_(words); // Copy the arity/bitmap info into the BCO W_ i; i = 0; for: if (i < BYTE_ARR_WDS(bitmap_arr)) { StgBCO_bitmap(bco,i) = StgArrBytes_payload(bitmap_arr,i); i = i + 1; goto for; } return (bco); } stg_mkApUpd0zh ( P_ bco ) { W_ ap; // This function is *only* used to wrap zero-arity BCOs in an // updatable wrapper (see GHC.ByteCode.Linker). An AP thunk is always // saturated and always points directly to a FUN or BCO. ASSERT(%INFO_TYPE(%GET_STD_INFO(bco)) == HALF_W_(BCO) && StgBCO_arity(bco) == HALF_W_(0)); HP_CHK_P(SIZEOF_StgAP, stg_mkApUpd0zh, bco); TICK_ALLOC_UP_THK(0, 0); CCCS_ALLOC(SIZEOF_StgAP); ap = Hp - SIZEOF_StgAP + WDS(1); // No memory barrier necessary as this is a new allocation. SET_HDR(ap, stg_AP_info, CCS_MAIN); StgAP_n_args(ap) = HALF_W_(0); StgAP_fun(ap) = bco; return (ap); } stg_unpackClosurezh ( P_ closure ) { W_ info, ptrs, nptrs, p, ptrs_arr, dat_arr; MAYBE_GC_P(stg_unpackClosurezh, closure); info = %GET_STD_INFO(UNTAG(closure)); prim_read_barrier; ptrs = TO_W_(%INFO_PTRS(info)); nptrs = TO_W_(%INFO_NPTRS(info)); W_ clos; clos = UNTAG(closure); W_ len; // The array returned, dat_arr, is the raw data for the entire closure. // The length is variable based upon the closure type, ptrs, and non-ptrs (len) = foreign "C" heap_view_closureSize(clos "ptr"); W_ dat_arr_sz; dat_arr_sz = SIZEOF_StgArrBytes + WDS(len); ("ptr" dat_arr) = ccall allocateMightFail(MyCapability() "ptr", BYTES_TO_WDS(dat_arr_sz)); if (dat_arr == NULL) (likely: False) { jump stg_raisezh(base_GHCziIOziException_heapOverflow_closure); } TICK_ALLOC_PRIM(SIZEOF_StgArrBytes, WDS(len), 0); SET_HDR(dat_arr, stg_ARR_WORDS_info, CCCS); StgArrBytes_bytes(dat_arr) = WDS(len); p = 0; for: if(p < len) { W_[BYTE_ARR_CTS(dat_arr) + WDS(p)] = W_[clos + WDS(p)]; p = p + 1; goto for; } W_ ptrArray; // Collect pointers. ("ptr" ptrArray) = foreign "C" heap_view_closurePtrs(MyCapability() "ptr", clos "ptr"); return (info, dat_arr, ptrArray); } stg_closureSizzezh (P_ clos) { W_ len; (len) = foreign "C" heap_view_closureSize(UNTAG(clos) "ptr"); return (len); } /* ----------------------------------------------------------------------------- Thread I/O blocking primitives -------------------------------------------------------------------------- */ /* Add a thread to the end of the blocked queue. (C-- version of the C * macro in Schedule.h). */ #define APPEND_TO_BLOCKED_QUEUE(tso) \ ASSERT(StgTSO__link(tso) == END_TSO_QUEUE); \ if (W_[blocked_queue_hd] == END_TSO_QUEUE) { \ W_[blocked_queue_hd] = tso; \ } else { \ ccall setTSOLink(MyCapability() "ptr", W_[blocked_queue_tl] "ptr", tso); \ } \ W_[blocked_queue_tl] = tso; stg_waitReadzh ( W_ fd ) { #if defined(THREADED_RTS) ccall barf("waitRead# on threaded RTS") never returns; #else ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16; StgTSO_block_info(CurrentTSO) = fd; // No locking - we're not going to use this interface in the // threaded RTS anyway. APPEND_TO_BLOCKED_QUEUE(CurrentTSO); jump stg_block_noregs(); #endif } stg_waitWritezh ( W_ fd ) { #if defined(THREADED_RTS) ccall barf("waitWrite# on threaded RTS") never returns; #else ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16; StgTSO_block_info(CurrentTSO) = fd; // No locking - we're not going to use this interface in the // threaded RTS anyway. APPEND_TO_BLOCKED_QUEUE(CurrentTSO); jump stg_block_noregs(); #endif } stg_delayzh ( W_ us_delay ) { #if defined(mingw32_HOST_OS) W_ ares; CInt reqID; #else W_ t, prev, target; #endif #if defined(THREADED_RTS) ccall barf("delay# on threaded RTS") never returns; #else ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16; #if defined(mingw32_HOST_OS) /* could probably allocate this on the heap instead */ ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult, "stg_delayzh"); (reqID) = ccall addDelayRequest(us_delay); StgAsyncIOResult_reqID(ares) = reqID; StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; /* Having all async-blocked threads reside on the blocked_queue * simplifies matters, so change the status to OnDoProc put the * delayed thread on the blocked_queue. */ StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16; APPEND_TO_BLOCKED_QUEUE(CurrentTSO); jump stg_block_async_void(); #else (target) = ccall getDelayTarget(us_delay); StgTSO_block_info(CurrentTSO) = target; /* Insert the new thread in the sleeping queue. */ prev = NULL; t = W_[sleeping_queue]; while: if (t != END_TSO_QUEUE && StgTSO_block_info(t) < target) { prev = t; t = StgTSO__link(t); goto while; } StgTSO__link(CurrentTSO) = t; if (prev == NULL) { W_[sleeping_queue] = CurrentTSO; } else { ccall setTSOLink(MyCapability() "ptr", prev "ptr", CurrentTSO); } jump stg_block_noregs(); #endif #endif /* !THREADED_RTS */ } #if defined(mingw32_HOST_OS) stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf ) { W_ ares; CInt reqID; #if defined(THREADED_RTS) ccall barf("asyncRead# on threaded RTS") never returns; #else ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16; /* could probably allocate this on the heap instead */ ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult, "stg_asyncReadzh"); (reqID) = ccall addIORequest(fd, 0/*FALSE*/,is_sock,len,buf "ptr"); StgAsyncIOResult_reqID(ares) = reqID; StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; APPEND_TO_BLOCKED_QUEUE(CurrentTSO); jump stg_block_async(); #endif } stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf ) { W_ ares; CInt reqID; #if defined(THREADED_RTS) ccall barf("asyncWrite# on threaded RTS") never returns; #else ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16; ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult, "stg_asyncWritezh"); (reqID) = ccall addIORequest(fd, 1/*TRUE*/,is_sock,len,buf "ptr"); StgAsyncIOResult_reqID(ares) = reqID; StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; APPEND_TO_BLOCKED_QUEUE(CurrentTSO); jump stg_block_async(); #endif } stg_asyncDoProczh ( W_ proc, W_ param ) { W_ ares; CInt reqID; #if defined(THREADED_RTS) ccall barf("asyncDoProc# on threaded RTS") never returns; #else ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16; /* could probably allocate this on the heap instead */ ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult, "stg_asyncDoProczh"); (reqID) = ccall addDoProcRequest(proc "ptr",param "ptr"); StgAsyncIOResult_reqID(ares) = reqID; StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; APPEND_TO_BLOCKED_QUEUE(CurrentTSO); jump stg_block_async(); #endif } #endif /* ----------------------------------------------------------------------------- * noDuplicate# * * noDuplicate# tries to ensure that none of the thunks under * evaluation by the current thread are also under evaluation by * another thread. It relies on *both* threads doing noDuplicate#; * the second one will get blocked if they are duplicating some work. * * The idea is that noDuplicate# is used within unsafePerformIO to * ensure that the IO operation is performed at most once. * noDuplicate# calls threadPaused which acquires an exclusive lock on * all the thunks currently under evaluation by the current thread. * * Consider the following scenario. There is a thunk A, whose * evaluation requires evaluating thunk B, where thunk B is an * unsafePerformIO. Two threads, 1 and 2, bother enter A. Thread 2 * is pre-empted before it enters B, and claims A by blackholing it * (in threadPaused). Thread 1 now enters B, and calls noDuplicate#. * * thread 1 thread 2 * +-----------+ +---------------+ * | -------+-----> A <-------+------- | * | update | BLACKHOLE | marked_update | * +-----------+ +---------------+ * | | | | * ... ... * | | +---------------+ * +-----------+ * | ------+-----> B * | update | BLACKHOLE * +-----------+ * * At this point: A is a blackhole, owned by thread 2. noDuplicate# * calls threadPaused, which walks up the stack and * - claims B on behalf of thread 1 * - then it reaches the update frame for A, which it sees is already * a BLACKHOLE and is therefore owned by another thread. Since * thread 1 is duplicating work, the computation up to the update * frame for A is suspended, including thunk B. * - thunk B, which is an unsafePerformIO, has now been reverted to * an AP_STACK which could be duplicated - BAD! * - The solution is as follows: before calling threadPaused, we * leave a frame on the stack (stg_noDuplicate_info) that will call * noDuplicate# again if the current computation is suspended and * restarted. * * See the test program in concurrent/prog003 for a way to demonstrate * this. It needs to be run with +RTS -N3 or greater, and the bug * only manifests occasionally (once very 10 runs or so). * -------------------------------------------------------------------------- */ INFO_TABLE_RET(stg_noDuplicate, RET_SMALL, W_ info_ptr) return (/* no return values */) { jump stg_noDuplicatezh(); } stg_noDuplicatezh /* no arg list: explicit stack layout */ { // With a single capability there's no chance of work duplication. if (CInt[n_capabilities] == 1 :: CInt) { jump %ENTRY_CODE(Sp(0)) []; } STK_CHK_LL (WDS(1), stg_noDuplicatezh); // leave noDuplicate frame in case the current // computation is suspended and restarted (see above). Sp_adj(-1); Sp(0) = stg_noDuplicate_info; SAVE_THREAD_STATE(); ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16); ccall threadPaused (MyCapability() "ptr", CurrentTSO "ptr"); if (StgTSO_what_next(CurrentTSO) == ThreadKilled::I16) { jump stg_threadFinished []; } else { LOAD_THREAD_STATE(); ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16); // remove the stg_noDuplicate frame if it is still there. if (Sp(0) == stg_noDuplicate_info) { Sp_adj(1); } jump %ENTRY_CODE(Sp(0)) []; } } /* ----------------------------------------------------------------------------- Misc. primitives -------------------------------------------------------------------------- */ stg_getApStackValzh ( P_ ap_stack, W_ offset ) { W_ ap_stackinfo; ap_stackinfo = %INFO_PTR(UNTAG(ap_stack)); prim_read_barrier; if (ap_stackinfo == stg_AP_STACK_info) { return (1,StgAP_STACK_payload(ap_stack,offset)); } else { return (0,ap_stack); } } // Write the cost center stack of the first argument on stderr; return // the second. Possibly only makes sense for already evaluated // things? stg_traceCcszh ( P_ obj, P_ ret ) { W_ ccs; #if defined(PROFILING) ccs = StgHeader_ccs(UNTAG(obj)); ccall fprintCCS_stderr(ccs "ptr"); #endif jump stg_ap_0_fast(ret); } stg_getSparkzh () { W_ spark; #if !defined(THREADED_RTS) return (0,ghczmprim_GHCziTypes_False_closure); #else ("ptr" spark) = ccall findSpark(MyCapability() "ptr"); if (spark != 0) { return (1,spark); } else { return (0,ghczmprim_GHCziTypes_False_closure); } #endif } stg_clearCCSzh (P_ arg) { #if defined(PROFILING) CCCS = CCS_MAIN; #endif jump stg_ap_v_fast(arg); } stg_numSparkszh () { W_ n; #if defined(THREADED_RTS) (n) = ccall dequeElements(Capability_sparks(MyCapability())); #else n = 0; #endif return (n); } stg_traceEventzh ( W_ msg ) { #if defined(TRACING) || defined(DEBUG) ccall traceUserMsg(MyCapability() "ptr", msg "ptr"); #elif defined(DTRACE) W_ enabled; // We should go through the macro HASKELLEVENT_USER_MSG_ENABLED from // RtsProbes.h, but that header file includes unistd.h, which doesn't // work in Cmm #if !defined(solaris2_HOST_OS) (enabled) = ccall __dtrace_isenabled$HaskellEvent$user__msg$v1(); #else // Solaris' DTrace can't handle the // __dtrace_isenabled$HaskellEvent$user__msg$v1 // call above. This call is just for testing whether the user__msg // probe is enabled, and is here for just performance optimization. // Since preparation for the probe is not that complex I disable usage of // this test above for Solaris and enable the probe usage manually // here. Please note that this does not mean that the probe will be // used during the runtime! You still need to enable it by consumption // in your dtrace script as you do with any other probe. enabled = 1; #endif if (enabled != 0) { ccall dtraceUserMsgWrapper(MyCapability() "ptr", msg "ptr"); } #endif return (); } stg_traceBinaryEventzh ( W_ msg, W_ len ) { #if defined(TRACING) || defined(DEBUG) ccall traceUserBinaryMsg(MyCapability() "ptr", msg "ptr", len); #endif return (); } // Same code as stg_traceEventzh above but a different kind of event // Before changing this code, read the comments in the impl above stg_traceMarkerzh ( W_ msg ) { #if defined(TRACING) || defined(DEBUG) ccall traceUserMarker(MyCapability() "ptr", msg "ptr"); #elif defined(DTRACE) W_ enabled; #if !defined(solaris2_HOST_OS) (enabled) = ccall __dtrace_isenabled$HaskellEvent$user__marker$v1(); #else enabled = 1; #endif if (enabled != 0) { ccall dtraceUserMarkerWrapper(MyCapability() "ptr", msg "ptr"); } #endif return (); } stg_getThreadAllocationCounterzh () { // Account for the allocation in the current block W_ offset; offset = Hp - bdescr_start(CurrentNursery); return (StgTSO_alloc_limit(CurrentTSO) - TO_I64(offset)); } stg_setThreadAllocationCounterzh ( I64 counter ) { // Allocation in the current block will be subtracted by // getThreadAllocationCounter#, so we have to offset any existing // allocation here. See also openNursery/closeNursery in // GHC.StgToCmm.Foreign. W_ offset; offset = Hp - bdescr_start(CurrentNursery); StgTSO_alloc_limit(CurrentTSO) = counter + TO_I64(offset); return (); }