diff options
Diffstat (limited to 'rts/posix/Select.c')
-rw-r--r-- | rts/posix/Select.c | 279 |
1 files changed, 279 insertions, 0 deletions
diff --git a/rts/posix/Select.c b/rts/posix/Select.c new file mode 100644 index 0000000000..e21ced03ab --- /dev/null +++ b/rts/posix/Select.c @@ -0,0 +1,279 @@ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team 1995-2002 + * + * Support for concurrent non-blocking I/O and thread waiting. + * + * ---------------------------------------------------------------------------*/ + +/* we're outside the realms of POSIX here... */ +/* #include "PosixSource.h" */ + +#include "Rts.h" +#include "Schedule.h" +#include "RtsUtils.h" +#include "RtsFlags.h" +#include "Timer.h" +#include "Itimer.h" +#include "Signals.h" +#include "Capability.h" +#include "posix/Select.h" + +# ifdef HAVE_SYS_TYPES_H +# include <sys/types.h> +# endif + +# ifdef HAVE_SYS_TIME_H +# include <sys/time.h> +# endif + +#include <errno.h> +#include <string.h> + +#ifdef HAVE_UNISTD_H +#include <unistd.h> +#endif + +#if !defined(THREADED_RTS) +/* last timestamp */ +lnat timestamp = 0; + +/* + * The threaded RTS uses an IO-manager thread in Haskell instead (see GHC.Conc) + */ + +/* There's a clever trick here to avoid problems when the time wraps + * around. Since our maximum delay is smaller than 31 bits of ticks + * (it's actually 31 bits of microseconds), we can safely check + * whether a timer has expired even if our timer will wrap around + * before the target is reached, using the following formula: + * + * (int)((uint)current_time - (uint)target_time) < 0 + * + * if this is true, then our time has expired. + * (idea due to Andy Gill). + */ +static rtsBool +wakeUpSleepingThreads(lnat ticks) +{ + StgTSO *tso; + rtsBool flag = rtsFalse; + + while (sleeping_queue != END_TSO_QUEUE && + (int)(ticks - sleeping_queue->block_info.target) > 0) { + tso = sleeping_queue; + sleeping_queue = tso->link; + tso->why_blocked = NotBlocked; + tso->link = END_TSO_QUEUE; + IF_DEBUG(scheduler,debugBelch("Waking up sleeping thread %d\n", tso->id)); + // MainCapability: this code is !THREADED_RTS + pushOnRunQueue(&MainCapability,tso); + flag = rtsTrue; + } + return flag; +} + +/* Argument 'wait' says whether to wait for I/O to become available, + * or whether to just check and return immediately. If there are + * other threads ready to run, we normally do the non-waiting variety, + * otherwise we wait (see Schedule.c). + * + * SMP note: must be called with sched_mutex locked. + * + * Windows: select only works on sockets, so this doesn't really work, + * though it makes things better than before. MsgWaitForMultipleObjects + * should really be used, though it only seems to work for read handles, + * not write handles. + * + */ +void +awaitEvent(rtsBool wait) +{ + StgTSO *tso, *prev, *next; + rtsBool ready; + fd_set rfd,wfd; + int numFound; + int maxfd = -1; + rtsBool select_succeeded = rtsTrue; + rtsBool unblock_all = rtsFalse; + struct timeval tv; + lnat min, ticks; + + tv.tv_sec = 0; + tv.tv_usec = 0; + + IF_DEBUG(scheduler, + debugBelch("scheduler: checking for threads blocked on I/O"); + if (wait) { + debugBelch(" (waiting)"); + } + debugBelch("\n"); + ); + + /* loop until we've woken up some threads. This loop is needed + * because the select timing isn't accurate, we sometimes sleep + * for a while but not long enough to wake up a thread in + * a threadDelay. + */ + do { + + ticks = timestamp = getourtimeofday(); + if (wakeUpSleepingThreads(ticks)) { + return; + } + + if (!wait) { + min = 0; + } else if (sleeping_queue != END_TSO_QUEUE) { + min = (sleeping_queue->block_info.target - ticks) + * TICK_MILLISECS * 1000; + } else { + min = 0x7ffffff; + } + + /* + * Collect all of the fd's that we're interested in + */ + FD_ZERO(&rfd); + FD_ZERO(&wfd); + + for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) { + next = tso->link; + + switch (tso->why_blocked) { + case BlockedOnRead: + { + int fd = tso->block_info.fd; + if (fd >= FD_SETSIZE) { + barf("awaitEvent: descriptor out of range"); + } + maxfd = (fd > maxfd) ? fd : maxfd; + FD_SET(fd, &rfd); + continue; + } + + case BlockedOnWrite: + { + int fd = tso->block_info.fd; + if (fd >= FD_SETSIZE) { + barf("awaitEvent: descriptor out of range"); + } + maxfd = (fd > maxfd) ? fd : maxfd; + FD_SET(fd, &wfd); + continue; + } + + default: + barf("AwaitEvent"); + } + } + + /* Check for any interesting events */ + + tv.tv_sec = min / 1000000; + tv.tv_usec = min % 1000000; + + while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) { + if (errno != EINTR) { + /* Handle bad file descriptors by unblocking all the + waiting threads. Why? Because a thread might have been + a bit naughty and closed a file descriptor while another + was blocked waiting. This is less-than-good programming + practice, but having the RTS as a result fall over isn't + acceptable, so we simply unblock all the waiting threads + should we see a bad file descriptor & give the threads + a chance to clean up their act. + + Note: assume here that threads becoming unblocked + will try to read/write the file descriptor before trying + to issue a threadWaitRead/threadWaitWrite again (==> an + IOError will result for the thread that's got the bad + file descriptor.) Hence, there's no danger of a bad + file descriptor being repeatedly select()'ed on, so + the RTS won't loop. + */ + if ( errno == EBADF ) { + unblock_all = rtsTrue; + break; + } else { + perror("select"); + barf("select failed"); + } + } + + /* We got a signal; could be one of ours. If so, we need + * to start up the signal handler straight away, otherwise + * we could block for a long time before the signal is + * serviced. + */ +#if defined(RTS_USER_SIGNALS) + if (signals_pending()) { + startSignalHandlers(&MainCapability); + return; /* still hold the lock */ + } +#endif + + /* we were interrupted, return to the scheduler immediately. + */ + if (sched_state >= SCHED_INTERRUPTING) { + return; /* still hold the lock */ + } + + /* check for threads that need waking up + */ + wakeUpSleepingThreads(getourtimeofday()); + + /* If new runnable threads have arrived, stop waiting for + * I/O and run them. + */ + if (!emptyRunQueue(&MainCapability)) { + return; /* still hold the lock */ + } + } + + /* Step through the waiting queue, unblocking every thread that now has + * a file descriptor in a ready state. + */ + + prev = NULL; + if (select_succeeded || unblock_all) { + for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) { + next = tso->link; + switch (tso->why_blocked) { + case BlockedOnRead: + ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd); + break; + case BlockedOnWrite: + ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd); + break; + default: + barf("awaitEvent"); + } + + if (ready) { + IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %d\n", tso->id)); + tso->why_blocked = NotBlocked; + tso->link = END_TSO_QUEUE; + pushOnRunQueue(&MainCapability,tso); + } else { + if (prev == NULL) + blocked_queue_hd = tso; + else + prev->link = tso; + prev = tso; + } + } + + if (prev == NULL) + blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE; + else { + prev->link = END_TSO_QUEUE; + blocked_queue_tl = prev; + } + } + + } while (wait && sched_state == SCHED_RUNNING + && emptyRunQueue(&MainCapability)); +} + +#endif /* THREADED_RTS */ |