diff options
author | Dylan Yudaken <dylany@fb.com> | 2020-10-06 13:42:22 +0100 |
---|---|---|
committer | Marge Bot <ben+marge-bot@smart-cactus.org> | 2020-10-17 22:02:50 -0400 |
commit | 50e9df49b7cd637c4552ab34bf629a01af4767c0 (patch) | |
tree | 15576b62a40dbb4bbf6ca58bc6afab6e4ff9e732 | |
parent | 451455fd008500259f5d2207bdfdccf6dddb52c5 (diff) | |
download | haskell-50e9df49b7cd637c4552ab34bf629a01af4767c0.tar.gz |
When using rts_setInCallCapability, lock incall threads
This diff makes sure that incall threads, when using `rts_setInCallCapability`, will be created as locked.
If the thread is not locked, the thread might end up being scheduled to a different capability.
While this is mentioned in the docs for `rts_setInCallCapability,`, it makes the method significantly less useful as there is no guarantees on the capability being used.
This commit also adds a test to make sure things stay on the correct capability.
-rw-r--r-- | compiler/GHC/HsToCore/Foreign/Decl.hs | 8 | ||||
-rw-r--r-- | includes/RtsAPI.h | 8 | ||||
-rw-r--r-- | rts/RtsAPI.c | 20 | ||||
-rw-r--r-- | rts/RtsSymbols.c | 1 | ||||
-rw-r--r-- | testsuite/tests/ffi/should_run/IncallAffinity.hs | 36 | ||||
-rw-r--r-- | testsuite/tests/ffi/should_run/IncallAffinity_c.c | 78 | ||||
-rw-r--r-- | testsuite/tests/ffi/should_run/all.T | 2 |
7 files changed, 145 insertions, 8 deletions
diff --git a/compiler/GHC/HsToCore/Foreign/Decl.hs b/compiler/GHC/HsToCore/Foreign/Decl.hs index f97f38d458..e707b75e1b 100644 --- a/compiler/GHC/HsToCore/Foreign/Decl.hs +++ b/compiler/GHC/HsToCore/Foreign/Decl.hs @@ -415,7 +415,7 @@ f_helper(StablePtr s, HsBool b, HsInt i) { Capability *cap; cap = rts_lock(); - rts_evalIO(&cap, + rts_inCall(&cap, rts_apply(rts_apply(deRefStablePtr(s), rts_mkBool(b)), rts_mkInt(i))); rts_unlock(cap); @@ -630,7 +630,7 @@ mkFExportCBits dflags c_nm maybe_target arg_htys res_hty is_IO_res_ty cc | otherwise = cResType <+> pprCconv <+> ftext c_nm <> parens fun_args - -- the target which will form the root of what we ask rts_evalIO to run + -- the target which will form the root of what we ask rts_inCall to run the_cfun = case maybe_target of Nothing -> text "(StgClosure*)deRefStablePtr(the_stableptr)" @@ -638,7 +638,7 @@ mkFExportCBits dflags c_nm maybe_target arg_htys res_hty is_IO_res_ty cc cap = text "cap" <> comma - -- the expression we give to rts_evalIO + -- the expression we give to rts_inCall expr_to_run = foldl' appArg the_cfun arg_info -- NOT aug_arg_info where @@ -674,7 +674,7 @@ mkFExportCBits dflags c_nm maybe_target arg_htys res_hty is_IO_res_ty cc , declareCResult , text "cap = rts_lock();" -- create the application + perform it. - , text "rts_evalIO" <> parens ( + , text "rts_inCall" <> parens ( char '&' <> cap <> text "rts_apply" <> parens ( cap <> diff --git a/includes/RtsAPI.h b/includes/RtsAPI.h index 3cf02c0d8a..055b17004d 100644 --- a/includes/RtsAPI.h +++ b/includes/RtsAPI.h @@ -374,10 +374,6 @@ Capability *rts_unsafeGetMyCapability (void); // into Haskell. The actual capability will be calculated as the supplied // value modulo the number of enabled Capabilities. // -// Note that the thread may still be migrated by the RTS scheduler, but that -// will only happen if there are multiple threads running on one Capability and -// another Capability is free. -// // If affinity is non-zero, the current thread will be bound to // specific CPUs according to the prevailing affinity policy for the // specified capability, set by either +RTS -qa or +RTS --numa. @@ -479,6 +475,10 @@ void rts_evalLazyIO_ (/* inout */ Capability **, /* in */ unsigned int stack_size, /* out */ HaskellObj *ret); +void rts_inCall (/* inout */ Capability **, + /* in */ HaskellObj p, + /* out */ HaskellObj *ret); + void rts_checkSchedStatus (char* site, Capability *); SchedulerStatus rts_getSchedStatus (Capability *cap); diff --git a/rts/RtsAPI.c b/rts/RtsAPI.c index 51a1f2b7cf..1d8e0bc1c8 100644 --- a/rts/RtsAPI.c +++ b/rts/RtsAPI.c @@ -461,6 +461,26 @@ void rts_evalIO (/* inout */ Capability **cap, } /* + * rts_inCall() is similar to rts_evalIO, but expects to be called as an incall, + * and is not expected to be called by user code directly. + */ +void rts_inCall (/* inout */ Capability **cap, + /* in */ HaskellObj p, + /* out */ HaskellObj *ret) +{ + StgTSO* tso; + + tso = createStrictIOThread(*cap, RtsFlags.GcFlags.initialStkSize, p); + if ((*cap)->running_task->preferred_capability != -1) { + // enabled_capabilities should not change between here and waitCapability() + ASSERT((*cap)->no == ((*cap)->running_task->preferred_capability % enabled_capabilities)); + // we requested explicit affinity; don't move this thread from now on. + tso->flags |= TSO_LOCKED; + } + scheduleWaitThread(tso,ret,cap); +} + +/* * rts_evalStableIOMain() is suitable for calling main Haskell thread * stored in (StablePtr (IO a)) it calls rts_evalStableIO but wraps * function in GHC.TopHandler.runMainIO that installs top_handlers. diff --git a/rts/RtsSymbols.c b/rts/RtsSymbols.c index 9b8a4ae16d..e433d9d369 100644 --- a/rts/RtsSymbols.c +++ b/rts/RtsSymbols.c @@ -763,6 +763,7 @@ SymI_HasProto(rts_evalStableIOMain) \ SymI_HasProto(rts_evalStableIO) \ SymI_HasProto(rts_eval_) \ + SymI_HasProto(rts_inCall) \ SymI_HasProto(rts_getBool) \ SymI_HasProto(rts_getChar) \ SymI_HasProto(rts_getDouble) \ diff --git a/testsuite/tests/ffi/should_run/IncallAffinity.hs b/testsuite/tests/ffi/should_run/IncallAffinity.hs new file mode 100644 index 0000000000..386e9950e8 --- /dev/null +++ b/testsuite/tests/ffi/should_run/IncallAffinity.hs @@ -0,0 +1,36 @@ +module Lib (capTest) where + +import Control.Concurrent +import Control.Exception +import Control.Concurrent.MVar +import Control.Monad (when) +import System.Exit + +foreign export ccall "capTest" capTest :: IO Int + +capTest :: IO Int +capTest = catch go handle + where + handle :: SomeException -> IO Int + handle e = do + putStrLn $ "Failed " ++ (show e) + return (-1) + getCap = fmap fst $ threadCapability =<< myThreadId + go = do + when (not rtsSupportsBoundThreads) $ + die "This test requires -threaded" + mvar <- newEmptyMVar + mvar2 <- newEmptyMVar + (cap, locked) <- threadCapability =<< myThreadId + forkOn cap $ do + putMVar mvar =<< getCap + takeMVar mvar2 + -- if cap is locked, then this would get scheduled on a different + -- capacity. + fCap <- takeMVar mvar + putMVar mvar2 () + cap2 <- getCap + when (fCap /= cap) (fail "expected cap to be the same") + when (cap2 /= cap) (fail "expected cap to be the same when returning") + when (not locked) (fail "expected to be locked") + return cap diff --git a/testsuite/tests/ffi/should_run/IncallAffinity_c.c b/testsuite/tests/ffi/should_run/IncallAffinity_c.c new file mode 100644 index 0000000000..bd719dff99 --- /dev/null +++ b/testsuite/tests/ffi/should_run/IncallAffinity_c.c @@ -0,0 +1,78 @@ +#include "HsFFI.h" + +#include <stdio.h> +#include "Rts.h" +#include <pthread.h> + +#define THREADS 6 +#define OK 9999 +static OSThreadId ids[THREADS]; +static int results[THREADS]; +static int waiters = 0; +static int done = 0; +static Condition cond; +static Mutex mutex; + +HsInt capTest(); + +void* OSThreadProcAttr go(void *info) +{ + int cap; + int res; + int threadNum = *(int*)(info); + + // divide everything onto two caps (if there are two) + cap = (threadNum % 2) % enabled_capabilities; + + OS_ACQUIRE_LOCK(&mutex); + waiters++; + if (waiters == THREADS) { + broadcastCondition(&cond); + } else { + while(waiters != THREADS) { + waitCondition(&cond, &mutex); + } + } + OS_RELEASE_LOCK(&mutex); + + rts_setInCallCapability(cap, 0); + res = capTest(); + *(int*)info = res == cap ? OK : res; + OS_ACQUIRE_LOCK(&mutex); + done++; + broadcastCondition(&cond); + OS_RELEASE_LOCK(&mutex); + return 0; +} + +int main(int argc, char *argv[]) +{ + int n; + bool ok; + hs_init(&argc, &argv); + initCondition(&cond); + initMutex(&mutex); + waiters = 0; + done = 0; + ok = true; + for (n=0; n < THREADS; n++) { + results[n] = n; + if (createOSThread(&ids[n], "test", go, (void*)&results[n])) { + printf("unable to create thread %d\n", n); + exit(1); + } + } + OS_ACQUIRE_LOCK(&mutex); + while(done != THREADS) { + waitCondition(&cond, &mutex); + } + OS_RELEASE_LOCK(&mutex); + for (n = 0; n < THREADS; n++) { + if (results[n] != OK) { + printf("%d: unexpected result was %d\n", n, results[n]); + ok = false; + } + } + hs_exit(); + return ok ? 0 : 1; +} diff --git a/testsuite/tests/ffi/should_run/all.T b/testsuite/tests/ffi/should_run/all.T index fb840861e6..bde21c7c26 100644 --- a/testsuite/tests/ffi/should_run/all.T +++ b/testsuite/tests/ffi/should_run/all.T @@ -218,3 +218,5 @@ test('UnliftedNewtypesByteArrayOffset', [omit_ways(['ghci'])], compile_and_run, test('T17471', [omit_ways(['ghci'])], compile_and_run, ['T17471_c.c -optc-D -optcFOO']) + +test('IncallAffinity', [req_smp, only_ways(['threaded1', 'threaded2'])], compile_and_run, ['IncallAffinity_c.c -no-hs-main']) |