diff options
-rw-r--r-- | libraries/base/GHC/Event/Thread.hs | 6 | ||||
-rw-r--r-- | rts/Prelude.h | 2 | ||||
-rw-r--r-- | rts/RtsStartup.c | 1 | ||||
-rw-r--r-- | rts/package.conf.in | 2 | ||||
-rw-r--r-- | rts/posix/Select.c | 179 | ||||
-rw-r--r-- | rts/win32/libHSbase.def | 3 |
6 files changed, 144 insertions, 49 deletions
diff --git a/libraries/base/GHC/Event/Thread.hs b/libraries/base/GHC/Event/Thread.hs index c599047db6..6e991bfb6c 100644 --- a/libraries/base/GHC/Event/Thread.hs +++ b/libraries/base/GHC/Event/Thread.hs @@ -12,9 +12,10 @@ module GHC.Event.Thread , closeFdWith , threadDelay , registerDelay + , blockedOnBadFD -- used by RTS ) where -import Control.Exception (finally) +import Control.Exception (finally, SomeException, toException) import Control.Monad (forM, forM_, sequence_, zipWithM, when) import Data.IORef (IORef, newIORef, readIORef, writeIORef) import Data.List (zipWith3) @@ -115,6 +116,9 @@ threadWait evt fd = mask_ $ do then ioError $ errnoToIOError "threadWait" eBADF Nothing Nothing else return () +-- used at least by RTS in 'select()' IO manager backend +blockedOnBadFD :: SomeException +blockedOnBadFD = toException $ errnoToIOError "awaitEvent" eBADF Nothing Nothing threadWaitSTM :: Event -> Fd -> IO (STM (), IO ()) threadWaitSTM evt fd = mask_ $ do diff --git a/rts/Prelude.h b/rts/Prelude.h index 89e80a0a3d..0c54148ba2 100644 --- a/rts/Prelude.h +++ b/rts/Prelude.h @@ -42,6 +42,7 @@ PRELUDE_CLOSURE(base_GHCziIOziException_blockedIndefinitelyOnMVar_closure); PRELUDE_CLOSURE(base_GHCziIOziException_blockedIndefinitelyOnSTM_closure); PRELUDE_CLOSURE(base_ControlziExceptionziBase_nonTermination_closure); PRELUDE_CLOSURE(base_ControlziExceptionziBase_nestedAtomically_closure); +PRELUDE_CLOSURE(base_GHCziEventziThread_blockedOnBadFD_closure); PRELUDE_CLOSURE(base_GHCziConcziSync_runSparks_closure); PRELUDE_CLOSURE(base_GHCziConcziIO_ensureIOManagerIsRunning_closure); @@ -104,6 +105,7 @@ PRELUDE_INFO(base_GHCziStable_StablePtr_con_info); #define blockedIndefinitelyOnSTM_closure DLL_IMPORT_DATA_REF(base_GHCziIOziException_blockedIndefinitelyOnSTM_closure) #define nonTermination_closure DLL_IMPORT_DATA_REF(base_ControlziExceptionziBase_nonTermination_closure) #define nestedAtomically_closure DLL_IMPORT_DATA_REF(base_ControlziExceptionziBase_nestedAtomically_closure) +#define blockedOnBadFD_closure DLL_IMPORT_DATA_REF(base_GHCziEventziThread_blockedOnBadFD_closure) #define Czh_static_info DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_Czh_static_info) #define Fzh_static_info DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_Fzh_static_info) diff --git a/rts/RtsStartup.c b/rts/RtsStartup.c index 06e888c1b2..c9f5880774 100644 --- a/rts/RtsStartup.c +++ b/rts/RtsStartup.c @@ -209,6 +209,7 @@ hs_init_ghc(int *argc, char **argv[], RtsConfig rts_config) getStablePtr((StgPtr)nonTermination_closure); getStablePtr((StgPtr)blockedIndefinitelyOnSTM_closure); getStablePtr((StgPtr)nestedAtomically_closure); + getStablePtr((StgPtr)blockedOnBadFD_closure); getStablePtr((StgPtr)runSparks_closure); getStablePtr((StgPtr)ensureIOManagerIsRunning_closure); diff --git a/rts/package.conf.in b/rts/package.conf.in index 4c8686f262..8250bc2bb6 100644 --- a/rts/package.conf.in +++ b/rts/package.conf.in @@ -99,6 +99,7 @@ ld-options: , "-Wl,-u,_base_GHCziIOziException_blockedIndefinitelyOnMVar_closure" , "-Wl,-u,_base_GHCziIOziException_blockedIndefinitelyOnSTM_closure" , "-Wl,-u,_base_ControlziExceptionziBase_nestedAtomically_closure" + , "-Wl,-u,_base_GHCziEventziThread_blockedOnBadFD_closure" , "-Wl,-u,_base_GHCziWeak_runFinalizzerBatch_closure" , "-Wl,-u,_base_GHCziTopHandler_flushStdHandles_closure" , "-Wl,-u,_base_GHCziTopHandler_runIO_closure" @@ -139,6 +140,7 @@ ld-options: , "-Wl,-u,base_GHCziIOziException_blockedIndefinitelyOnMVar_closure" , "-Wl,-u,base_GHCziIOziException_blockedIndefinitelyOnSTM_closure" , "-Wl,-u,base_ControlziExceptionziBase_nestedAtomically_closure" + , "-Wl,-u,base_GHCziEventziThread_blockedOnBadFD_closure" , "-Wl,-u,base_GHCziWeak_runFinalizzerBatch_closure" , "-Wl,-u,base_GHCziTopHandler_flushStdHandles_closure" , "-Wl,-u,base_GHCziTopHandler_runIO_closure" diff --git a/rts/posix/Select.c b/rts/posix/Select.c index 3d92a4666a..a101f03dd5 100644 --- a/rts/posix/Select.c +++ b/rts/posix/Select.c @@ -14,6 +14,8 @@ #include "Signals.h" #include "Schedule.h" +#include "Prelude.h" +#include "RaiseAsync.h" #include "RtsUtils.h" #include "Itimer.h" #include "Capability.h" @@ -120,6 +122,85 @@ fdOutOfRange (int fd) stg_exit(EXIT_FAILURE); } +/* + * State of individual file descriptor after a 'select()' poll. + */ +enum FdState { + RTS_FD_IS_READY = 0, + RTS_FD_IS_BLOCKING, + RTS_FD_IS_INVALID, +}; + +static enum FdState fdPollReadState (int fd) +{ + int r; + fd_set rfd; + struct timeval now; + + FD_ZERO(&rfd); + FD_SET(fd, &rfd); + + /* only poll */ + now.tv_sec = 0; + now.tv_usec = 0; + for (;;) + { + r = select(fd+1, &rfd, NULL, NULL, &now); + /* the descriptor is sane */ + if (r != -1) + break; + + switch (errno) + { + case EBADF: return RTS_FD_IS_INVALID; + case EINTR: continue; + default: + sysErrorBelch("select"); + stg_exit(EXIT_FAILURE); + } + } + + if (r == 0) + return RTS_FD_IS_BLOCKING; + else + return RTS_FD_IS_READY; +} + +static enum FdState fdPollWriteState (int fd) +{ + int r; + fd_set wfd; + struct timeval now; + + FD_ZERO(&wfd); + FD_SET(fd, &wfd); + + /* only poll */ + now.tv_sec = 0; + now.tv_usec = 0; + for (;;) + { + r = select(fd+1, NULL, &wfd, NULL, &now); + /* the descriptor is sane */ + if (r != -1) + break; + + switch (errno) + { + case EBADF: return RTS_FD_IS_INVALID; + case EINTR: continue; + default: + sysErrorBelch("select"); + stg_exit(EXIT_FAILURE); + } + } + + if (r == 0) + return RTS_FD_IS_BLOCKING; + else + return RTS_FD_IS_READY; +} + /* Argument 'wait' says whether to wait for I/O to become available, * or whether to just check and return immediately. If there are * other threads ready to run, we normally do the non-waiting variety, @@ -137,12 +218,10 @@ void awaitEvent(rtsBool wait) { StgTSO *tso, *prev, *next; - rtsBool ready; fd_set rfd,wfd; int numFound; int maxfd = -1; - rtsBool select_succeeded = rtsTrue; - rtsBool unblock_all = rtsFalse; + rtsBool seen_bad_fd = rtsFalse; struct timeval tv, *ptv; LowResTime now; @@ -225,25 +304,8 @@ awaitEvent(rtsBool wait) while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, ptv)) < 0) { if (errno != EINTR) { - /* Handle bad file descriptors by unblocking all the - waiting threads. Why? Because a thread might have been - a bit naughty and closed a file descriptor while another - was blocked waiting. This is less-than-good programming - practice, but having the RTS as a result fall over isn't - acceptable, so we simply unblock all the waiting threads - should we see a bad file descriptor & give the threads - a chance to clean up their act. - - Note: assume here that threads becoming unblocked - will try to read/write the file descriptor before trying - to issue a threadWaitRead/threadWaitWrite again (==> an - IOError will result for the thread that's got the bad - file descriptor.) Hence, there's no danger of a bad - file descriptor being repeatedly select()'ed on, so - the RTS won't loop. - */ if ( errno == EBADF ) { - unblock_all = rtsTrue; + seen_bad_fd = rtsTrue; break; } else { sysErrorBelch("select"); @@ -286,33 +348,58 @@ awaitEvent(rtsBool wait) */ prev = NULL; - if (select_succeeded || unblock_all) { - for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) { - next = tso->_link; + { + for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) { + next = tso->_link; + int fd; + enum FdState fd_state = RTS_FD_IS_BLOCKING; switch (tso->why_blocked) { - case BlockedOnRead: - ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd); - break; - case BlockedOnWrite: - ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd); - break; - default: - barf("awaitEvent"); - } - - if (ready) { - IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %lu\n", (unsigned long)tso->id)); - tso->why_blocked = NotBlocked; - tso->_link = END_TSO_QUEUE; - pushOnRunQueue(&MainCapability,tso); - } else { - if (prev == NULL) - blocked_queue_hd = tso; - else - setTSOLink(&MainCapability, prev, tso); - prev = tso; - } + case BlockedOnRead: + fd = tso->block_info.fd; + + if (seen_bad_fd) { + fd_state = fdPollReadState (fd); + } else if (FD_ISSET(fd, &rfd)) { + fd_state = RTS_FD_IS_READY; + } + break; + case BlockedOnWrite: + fd = tso->block_info.fd; + + if (seen_bad_fd) { + fd_state = fdPollWriteState (fd); + } else if (FD_ISSET(fd, &wfd)) { + fd_state = RTS_FD_IS_READY; + } + break; + default: + barf("awaitEvent"); + } + + switch (fd_state) { + case RTS_FD_IS_INVALID: + /* + * Don't let RTS loop on such descriptors, + * pass an IOError to blocked threads (Trac #4934) + */ + IF_DEBUG(scheduler,debugBelch("Killing blocked thread %lu on bad fd=%i\n", (unsigned long)tso->id, fd)); + throwToSingleThreaded(&MainCapability, tso, (StgClosure *)blockedOnBadFD_closure); + break; + case RTS_FD_IS_READY: + IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %lu\n", (unsigned long)tso->id)); + tso->why_blocked = NotBlocked; + tso->_link = END_TSO_QUEUE; + pushOnRunQueue(&MainCapability,tso); + break; + case RTS_FD_IS_BLOCKING: + if (prev == NULL) + blocked_queue_hd = tso; + else + setTSOLink(&MainCapability, prev, tso); + prev = tso; + break; + } } if (prev == NULL) diff --git a/rts/win32/libHSbase.def b/rts/win32/libHSbase.def index 119237b652..8140528c70 100644 --- a/rts/win32/libHSbase.def +++ b/rts/win32/libHSbase.def @@ -40,5 +40,4 @@ EXPORTS base_ControlziExceptionziBase_nonTermination_closure base_ControlziExceptionziBase_nestedAtomically_closure - - + base_GHCziEventziThread_blockedOnBadFD_closure |