diff options
author | normal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2018-08-06 05:22:00 +0000 |
---|---|---|
committer | normal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2018-08-06 05:22:00 +0000 |
commit | 194a6a2c68e9c8a3536b24db18ceac87535a6051 (patch) | |
tree | 1d6074813a7c515b81e61e451422e559541ba79d /thread_pthread.c | |
parent | 828158704c2a03f7c780f5dd4f9ffc88b709c4a7 (diff) | |
download | ruby-194a6a2c68e9c8a3536b24db18ceac87535a6051.tar.gz |
thread_pthread.c: restore timer-thread for now :<
[ruby-core:88306]
Revert "process.c: ensure th->interrupt lock is held when migrating"
This reverts commit 5ca416bdf6b6785cb20f139c2c514eda005fe42f (r64201)
Revert "process.c (rb_waitpid): reduce sigwait_fd bouncing"
This reverts commit 217bdd776fbeea3bfd0b9324eefbfcec3b1ccb3e (r64200).
Revert "test/ruby/test_thread.rb (test_thread_timer_and_interrupt): add timeouts"
This reverts commit 9f395f11202fc3c7edbd76f5aa6ce1f8a1e752a9 (r64199).
Revert "thread_pthread.c (native_sleep): reduce ppoll sleeps"
This reverts commit b3aa256c4d43d3d7e9975ec18eb127f45f623c9b (r64193).
Revert "thread.c (consume_communication_pipe): do not retry after short read"
This reverts commit 291a82f748de56e65fac10edefc51ec7a54a82d4 (r64185).
Revert "test/ruby/test_io.rb (test_race_gets_and_close): timeout each thread"
This reverts commit 3dbd8d1f66537f968f0461ed8547460b3b1241b3 (r64184).
Revert "thread_pthread.c (gvl_acquire_common): persist timeout across calls"
This reverts commit 8c2ae6e3ed072b06fc3cbc34fa8a14b2acbb49d5 (r64165).
Revert "test/ruby/test_io.rb (test_race_gets_and_close): use SIGABRT on timeout"
This reverts commit 931cda4db8afd6b544a8d85a6815765a9c417213 (r64135).
Revert "thread_pthread.c (gvl_yield): do ubf wakeups when uncontended"
This reverts commit 508f00314f46c08b6e9b0141c01355d24954260c (r64133).
Revert "thread_pthread.h (native_thread_data): split condvars on some platforms"
This reverts commit a038bf238bd9a24bf1e1622f618a27db261fc91b (r64124).
Revert "process.c (waitpid_nogvl): prevent conflicting use of sleep_cond"
This reverts commit 7018acc946882f21d519af7c42ccf84b22a46b27 (r64117).
Revert "thread_pthread.c (rb_sigwait_sleep): th may be 0 from MJIT"
This reverts commit 56491afc7916fb24f5c4dc2c632fb93fa7063992 (r64116).
Revert "thread*.c: waiting on sigwait_fd performs periodic ubf wakeups"
This reverts commit ab47a57a46e70634d049e4da20a5441c7a14cdec (r64115).
Revert "thread_pthread.c (gvl_destroy): make no-op on GVL bits"
This reverts commit 95cae748171f4754b97f4ba54da2ae62a8d484fd (r64114).
Revert "thread_pthread.c (rb_sigwait_sleep): fix uninitialized poll set in UBF case"
This reverts commit 4514362948fdb914c6138b12d961d92e9c0fee6c (r64113).
Revert "thread_pthread.c (rb_sigwait_sleep): re-fix [Bug #5343] harder"
This reverts commit 26b8a70bb309c7a367b9134045508b5b5a580a77 (r64111).
Revert "thread.c: move ppoll wrapper into thread_pthread.c"
This reverts commit 3dc7727d22fecbc355597edda25d2a245bf55ba1 (r64110).
Revert "thread.c: move ppoll wrapper before thread_pthread.c"
This reverts commit 2fa1e2e3c3c5c4b3ce84730dee4bcbe9d81b8e35 (r64109).
Revert "thread_pthread.c (ubf_select): refix [Bug #5343]"
This reverts commit 4c1ab82f0623eca91a95d2a44053be22bbce48ad (r64108).
Revert "thread_win32.c: suppress warnings by -Wsuggest-attribute"
This reverts commit 6a9b63e39075c53870933fbac5c1065f7d22047c (r64159).
Revert "thread_pthread: remove timer-thread by restructuring GVL"
This reverts commit 708bfd21156828526fe72de2cedecfaca6647dc1 (r64107).
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@64203 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'thread_pthread.c')
-rw-r--r-- | thread_pthread.c | 710 |
1 files changed, 402 insertions, 308 deletions
diff --git a/thread_pthread.c b/thread_pthread.c index 545cc2fa3b..29805ef2df 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -45,21 +45,27 @@ void rb_native_cond_broadcast(rb_nativethread_cond_t *cond); void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex); void rb_native_cond_initialize(rb_nativethread_cond_t *cond); void rb_native_cond_destroy(rb_nativethread_cond_t *cond); +static void rb_thread_wakeup_timer_thread_low(void); static void clear_thread_cache_altstack(void); -static void ubf_wakeup_all_threads(void); -static int ubf_threads_empty(void); -static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *, - const struct timespec *); -static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd, - const struct timespec *, - int *drained_p); -#define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid()) +#define TIMER_THREAD_MASK (1) +#define TIMER_THREAD_SLEEPY (2|TIMER_THREAD_MASK) +#define TIMER_THREAD_BUSY (4|TIMER_THREAD_MASK) -/* for testing, and in case we come across a platform w/o pipes: */ -#define BUSY_WAIT_SIGNALS (0) -#define THREAD_INVALID ((const rb_thread_t *)-1) -static const rb_thread_t *sigwait_th; +#if defined(HAVE_POLL) && defined(HAVE_FCNTL) && defined(F_GETFL) && \ + defined(F_SETFL) && defined(O_NONBLOCK) && \ + defined(F_GETFD) && defined(F_SETFD) && defined(FD_CLOEXEC) +/* The timer thread sleeps while only one Ruby thread is running. */ +# define TIMER_IMPL TIMER_THREAD_SLEEPY +#else +# define TIMER_IMPL TIMER_THREAD_BUSY +#endif + +static struct { + pthread_t id; + int created; +} timer_thread; +#define TIMER_THREAD_CREATED_P() (timer_thread.created != 0) #ifdef HAVE_SCHED_YIELD #define native_thread_yield() (void)sched_yield() @@ -76,96 +82,49 @@ static pthread_condattr_t *condattr_monotonic = &condattr_mono; static const void *const condattr_monotonic = NULL; #endif -/* 100ms. 10ms is too small for user level thread scheduling - * on recent Linux (tested on 2.6.35) - */ -#define TIME_QUANTUM_USEC (100 * 1000) - -static struct timespec native_cond_timeout(rb_nativethread_cond_t *, - struct timespec rel); - static void -gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) +gvl_acquire_common(rb_vm_t *vm) { if (vm->gvl.acquired) { - native_thread_data_t *nd = &th->native_thread_data; - - VM_ASSERT(th->unblock.func == 0 && "we reuse ubf_list for GVL waitq"); - - list_add_tail(&vm->gvl.waitq, &nd->ubf_list); - do { - if (!vm->gvl.timer) { - static struct timespec ts; - static int err = ETIMEDOUT; - - /* - * become designated timer thread to kick vm->gvl.acquired - * periodically. Continue on old timeout if it expired: - */ - if (err == ETIMEDOUT) { - ts.tv_sec = 0; - ts.tv_nsec = TIME_QUANTUM_USEC * 1000; - ts = native_cond_timeout(&nd->cond.gvlq, ts); - } - vm->gvl.timer = th; - err = native_cond_timedwait(&nd->cond.gvlq, &vm->gvl.lock, &ts); - vm->gvl.timer = 0; - ubf_wakeup_all_threads(); - - /* - * Timeslice. We can't touch thread_destruct_lock here, - * as the process may fork while this thread is contending - * for GVL: - */ - if (vm->gvl.acquired) timer_thread_function(); - } - else { - rb_native_cond_wait(&nd->cond.gvlq, &vm->gvl.lock); - } - } while (vm->gvl.acquired); - - list_del_init(&nd->ubf_list); - - if (vm->gvl.need_yield) { - vm->gvl.need_yield = 0; + + if (!vm->gvl.waiting++) { + /* + * Wake up timer thread iff timer thread is slept. + * When timer thread is polling mode, we don't want to + * make confusing timer thread interval time. + */ + rb_thread_wakeup_timer_thread_low(); + } + + while (vm->gvl.acquired) { + rb_native_cond_wait(&vm->gvl.cond, &vm->gvl.lock); + } + + --vm->gvl.waiting; + + if (vm->gvl.need_yield) { + vm->gvl.need_yield = 0; rb_native_cond_signal(&vm->gvl.switch_cond); - } + } } - vm->gvl.acquired = th; - /* - * Designate the next gvl.timer thread, favor the last thread in - * the waitq since it will be in waitq longest - */ - if (!vm->gvl.timer) { - native_thread_data_t *last; - last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); - if (last) { - rb_native_cond_signal(&last->cond.gvlq); - } - else if (!ubf_threads_empty()) { - rb_thread_wakeup_timer_thread(0); - } - } + vm->gvl.acquired = 1; } static void gvl_acquire(rb_vm_t *vm, rb_thread_t *th) { rb_native_mutex_lock(&vm->gvl.lock); - gvl_acquire_common(vm, th); + gvl_acquire_common(vm); rb_native_mutex_unlock(&vm->gvl.lock); } -static native_thread_data_t * +static void gvl_release_common(rb_vm_t *vm) { - native_thread_data_t *next; vm->gvl.acquired = 0; - next = list_top(&vm->gvl.waitq, native_thread_data_t, ubf_list); - if (next) rb_native_cond_signal(&next->cond.gvlq); - - return next; + if (vm->gvl.waiting > 0) + rb_native_cond_signal(&vm->gvl.cond); } static void @@ -179,38 +138,34 @@ gvl_release(rb_vm_t *vm) static void gvl_yield(rb_vm_t *vm, rb_thread_t *th) { - native_thread_data_t *next; - rb_native_mutex_lock(&vm->gvl.lock); - next = gvl_release_common(vm); + + gvl_release_common(vm); /* An another thread is processing GVL yield. */ if (UNLIKELY(vm->gvl.wait_yield)) { - while (vm->gvl.wait_yield) + while (vm->gvl.wait_yield) rb_native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock); + goto acquire; } - else if (next) { - /* Wait until another thread task takes GVL. */ - vm->gvl.need_yield = 1; - vm->gvl.wait_yield = 1; - while (vm->gvl.need_yield) + + if (vm->gvl.waiting > 0) { + /* Wait until another thread task take GVL. */ + vm->gvl.need_yield = 1; + vm->gvl.wait_yield = 1; + while (vm->gvl.need_yield) rb_native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock); - vm->gvl.wait_yield = 0; - rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); + vm->gvl.wait_yield = 0; } else { - rb_native_mutex_unlock(&vm->gvl.lock); - /* - * GVL was not contended when we released, so we have no potential - * contenders for reacquisition. Perhaps they are stuck in blocking - * region w/o GVL, too, so we kick them: - */ - ubf_wakeup_all_threads(); - native_thread_yield(); + rb_native_mutex_unlock(&vm->gvl.lock); + sched_yield(); rb_native_mutex_lock(&vm->gvl.lock); - rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); } - gvl_acquire_common(vm, th); + + rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); + acquire: + gvl_acquire_common(vm); rb_native_mutex_unlock(&vm->gvl.lock); } @@ -218,11 +173,11 @@ static void gvl_init(rb_vm_t *vm) { rb_native_mutex_initialize(&vm->gvl.lock); + rb_native_cond_initialize(&vm->gvl.cond); rb_native_cond_initialize(&vm->gvl.switch_cond); rb_native_cond_initialize(&vm->gvl.switch_wait_cond); - list_head_init(&vm->gvl.waitq); vm->gvl.acquired = 0; - vm->gvl.timer = 0; + vm->gvl.waiting = 0; vm->gvl.need_yield = 0; vm->gvl.wait_yield = 0; } @@ -230,16 +185,10 @@ gvl_init(rb_vm_t *vm) static void gvl_destroy(rb_vm_t *vm) { - /* - * only called once at VM shutdown (not atfork), another thread - * may still grab vm->gvl.lock when calling gvl_release at - * the end of thread_start_func_2 - */ - if (0) { - rb_native_cond_destroy(&vm->gvl.switch_wait_cond); - rb_native_cond_destroy(&vm->gvl.switch_cond); - rb_native_mutex_destroy(&vm->gvl.lock); - } + rb_native_cond_destroy(&vm->gvl.switch_wait_cond); + rb_native_cond_destroy(&vm->gvl.switch_cond); + rb_native_cond_destroy(&vm->gvl.cond); + rb_native_mutex_destroy(&vm->gvl.lock); clear_thread_cache_altstack(); } @@ -484,9 +433,7 @@ native_thread_init(rb_thread_t *th) #ifdef USE_UBF_LIST list_node_init(&nd->ubf_list); #endif - rb_native_cond_initialize(&nd->cond.gvlq); - if (&nd->cond.gvlq != &nd->cond.intr) - rb_native_cond_initialize(&nd->cond.intr); + rb_native_cond_initialize(&nd->sleep_cond); ruby_thread_set_native(th); } @@ -497,11 +444,7 @@ native_thread_init(rb_thread_t *th) static void native_thread_destroy(rb_thread_t *th) { - native_thread_data_t *nd = &th->native_thread_data; - - rb_native_cond_destroy(&nd->cond.gvlq); - if (&nd->cond.gvlq != &nd->cond.intr) - rb_native_cond_destroy(&nd->cond.intr); + rb_native_cond_destroy(&th->native_thread_data.sleep_cond); /* * prevent false positive from ruby_thread_has_gvl_p if that @@ -1069,6 +1012,17 @@ native_thread_create(rb_thread_t *th) return err; } +#if (TIMER_IMPL & TIMER_THREAD_MASK) +static void +native_thread_join(pthread_t th) +{ + int err = pthread_join(th, 0); + if (err) { + rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err); + } +} +#endif /* TIMER_THREAD_MASK */ + #if USE_NATIVE_THREAD_PRIORITY static void @@ -1110,15 +1064,15 @@ ubf_pthread_cond_signal(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; thread_debug("ubf_pthread_cond_signal (%p)\n", (void *)th); - rb_native_cond_signal(&th->native_thread_data.cond.intr); + rb_native_cond_signal(&th->native_thread_data.sleep_cond); } static void -native_cond_sleep(rb_thread_t *th, struct timespec *timeout_rel) +native_sleep(rb_thread_t *th, struct timespec *timeout_rel) { struct timespec timeout; rb_nativethread_lock_t *lock = &th->interrupt_lock; - rb_nativethread_cond_t *cond = &th->native_thread_data.cond.intr; + rb_nativethread_cond_t *cond = &th->native_thread_data.sleep_cond; if (timeout_rel) { /* Solaris cond_timedwait() return EINVAL if an argument is greater than @@ -1210,30 +1164,17 @@ static void ubf_select(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; - rb_vm_t *vm = th->vm; - register_ubf_list(th); /* * ubf_wakeup_thread() doesn't guarantee to wake up a target thread. * Therefore, we repeatedly call ubf_wakeup_thread() until a target thread - * exit from ubf function. We must designate a timer-thread to perform - * this operation. + * exit from ubf function. + * In the other hands, we shouldn't call rb_thread_wakeup_timer_thread() + * if running on timer thread because it may make endless wakeups. */ - rb_native_mutex_lock(&vm->gvl.lock); - if (!vm->gvl.timer) { - native_thread_data_t *last; - - last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); - if (last) { - rb_native_cond_signal(&last->cond.gvlq); - } - else { - rb_thread_wakeup_timer_thread(0); - } - } - rb_native_mutex_unlock(&vm->gvl.lock); - + if (!pthread_equal(pthread_self(), timer_thread.id)) + rb_thread_wakeup_timer_thread(); ubf_wakeup_thread(th); } @@ -1270,16 +1211,39 @@ static int ubf_threads_empty(void) { return 1; } #define TT_DEBUG 0 #define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0) +/* 100ms. 10ms is too small for user level thread scheduling + * on recent Linux (tested on 2.6.35) + */ +#define TIME_QUANTUM_USEC (100 * 1000) + +#if TIMER_IMPL == TIMER_THREAD_SLEEPY static struct { - /* pipes are closed in forked children when owner_process does not match */ + /* + * Read end of each pipe is closed inside timer thread for shutdown + * Write ends are closed by a normal Ruby thread during shutdown + */ int normal[2]; + int low[2]; /* volatile for signal handler use: */ volatile rb_pid_t owner_process; } timer_thread_pipe = { {-1, -1}, + {-1, -1}, /* low priority */ }; +NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); +static void +async_bug_fd(const char *mesg, int errno_arg, int fd) +{ + char buff[64]; + size_t n = strlcpy(buff, mesg, sizeof(buff)); + if (n < sizeof(buff)-3) { + ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd); + } + rb_async_bug_errno(buff, errno_arg); +} + /* only use signal-safe system calls here */ static void rb_thread_wakeup_timer_thread_fd(int fd) @@ -1311,33 +1275,49 @@ rb_thread_wakeup_timer_thread_fd(int fd) } void -rb_thread_wakeup_timer_thread(int sig) +rb_thread_wakeup_timer_thread(void) { /* must be safe inside sighandler, so no mutex */ if (timer_thread_pipe.owner_process == getpid()) { - rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); - - /* - * system_working check is required because vm and main_thread are - * freed during shutdown - */ - if (sig && system_working) { - volatile rb_execution_context_t *ec; - rb_vm_t *vm = GET_VM(); - rb_thread_t *mth; - - /* - * FIXME: root VM and main_thread should be static and not - * on heap for maximum safety (and startup/shutdown speed) - */ - if (!vm) return; - mth = vm->main_thread; - if (!mth || !system_working) return; - - /* this relies on GC for grace period before cont_free */ - ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec); - - if (ec) RUBY_VM_SET_TRAP_INTERRUPT(ec); + rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); + } +} + +static void +rb_thread_wakeup_timer_thread_low(void) +{ + if (timer_thread_pipe.owner_process == getpid()) { + rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.low[1]); + } +} + +/* VM-dependent API is not available for this function */ +static void +consume_communication_pipe(int fd) +{ +#define CCP_READ_BUFF_SIZE 1024 + /* buffer can be shared because no one refers to them. */ + static char buff[CCP_READ_BUFF_SIZE]; + ssize_t result; + + while (1) { + result = read(fd, buff, sizeof(buff)); + if (result == 0) { + return; + } + else if (result < 0) { + int e = errno; + switch (e) { + case EINTR: + continue; /* retry */ + case EAGAIN: +#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + return; + default: + async_bug_fd("consume_communication_pipe: read", e, fd); + } } } } @@ -1370,7 +1350,6 @@ set_nonblock(int fd) rb_sys_fail(0); } -/* communication pipe with timer thread and signal handler */ static int setup_communication_pipe_internal(int pipes[2]) { @@ -1395,6 +1374,108 @@ setup_communication_pipe_internal(int pipes[2]) return 0; } +/* communication pipe with timer thread and signal handler */ +static int +setup_communication_pipe(void) +{ + rb_pid_t owner = timer_thread_pipe.owner_process; + + if (owner && owner != getpid()) { + CLOSE_INVALIDATE(normal[0]); + CLOSE_INVALIDATE(normal[1]); + CLOSE_INVALIDATE(low[0]); + CLOSE_INVALIDATE(low[1]); + } + + if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) { + return errno; + } + if (setup_communication_pipe_internal(timer_thread_pipe.low) < 0) { + return errno; + } + + return 0; +} + +/** + * Let the timer thread sleep a while. + * + * The timer thread sleeps until woken up by rb_thread_wakeup_timer_thread() if only one Ruby thread is running. + * @pre the calling context is in the timer thread. + */ +static inline void +timer_thread_sleep(rb_vm_t *vm) +{ + int result; + int need_polling; + struct pollfd pollfds[2]; + + pollfds[0].fd = timer_thread_pipe.normal[0]; + pollfds[0].events = POLLIN; + pollfds[1].fd = timer_thread_pipe.low[0]; + pollfds[1].events = POLLIN; + + need_polling = !ubf_threads_empty(); + + if (SIGCHLD_LOSSY && !need_polling) { + rb_native_mutex_lock(&vm->waitpid_lock); + if (!list_empty(&vm->waiting_pids) || !list_empty(&vm->waiting_grps)) { + need_polling = 1; + } + rb_native_mutex_unlock(&vm->waitpid_lock); + } + + if (vm->gvl.waiting > 0 || need_polling) { + /* polling (TIME_QUANTUM_USEC usec) */ + result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000); + } + else { + /* wait (infinite) */ + result = poll(pollfds, numberof(pollfds), -1); + } + + if (result == 0) { + /* maybe timeout */ + } + else if (result > 0) { + consume_communication_pipe(timer_thread_pipe.normal[0]); + consume_communication_pipe(timer_thread_pipe.low[0]); + } + else { /* result < 0 */ + int e = errno; + switch (e) { + case EBADF: + case EINVAL: + case ENOMEM: /* from Linux man */ + case EFAULT: /* from FreeBSD man */ + rb_async_bug_errno("thread_timer: select", e); + default: + /* ignore */; + } + } +} +#endif /* TIMER_THREAD_SLEEPY */ + +#if TIMER_IMPL == TIMER_THREAD_BUSY +# define PER_NANO 1000000000 +void rb_thread_wakeup_timer_thread(void) {} +static void rb_thread_wakeup_timer_thread_low(void) {} + +static rb_nativethread_lock_t timer_thread_lock; +static rb_nativethread_cond_t timer_thread_cond; + +static inline void +timer_thread_sleep(rb_vm_t *unused) +{ + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = TIME_QUANTUM_USEC * 1000; + ts = native_cond_timeout(&timer_thread_cond, ts); + + native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, &ts); +} +#endif /* TIMER_IMPL == TIMER_THREAD_BUSY */ + #if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME) # define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name) #endif @@ -1445,26 +1526,137 @@ native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name) return name; } +static void * +thread_timer(void *p) +{ + rb_vm_t *vm = p; +#ifdef HAVE_PTHREAD_SIGMASK /* mainly to enable SIGCHLD */ + { + sigset_t mask; + sigemptyset(&mask); + pthread_sigmask(SIG_SETMASK, &mask, NULL); + } +#endif + + if (TT_DEBUG) WRITE_CONST(2, "start timer thread\n"); + +#ifdef SET_CURRENT_THREAD_NAME + SET_CURRENT_THREAD_NAME("ruby-timer-thr"); +#endif + +#if TIMER_IMPL == TIMER_THREAD_BUSY + rb_native_mutex_initialize(&timer_thread_lock); + rb_native_cond_initialize(&timer_thread_cond); + rb_native_mutex_lock(&timer_thread_lock); +#endif + while (system_working > 0) { + + /* timer function */ + ubf_wakeup_all_threads(); + timer_thread_function(0); + + if (TT_DEBUG) WRITE_CONST(2, "tick\n"); + + /* wait */ + timer_thread_sleep(vm); + } +#if TIMER_IMPL == TIMER_THREAD_BUSY + rb_native_mutex_unlock(&timer_thread_lock); + rb_native_cond_destroy(&timer_thread_cond); + rb_native_mutex_destroy(&timer_thread_lock); +#endif + + if (TT_DEBUG) WRITE_CONST(2, "finish timer thread\n"); + return NULL; +} + +#if (TIMER_IMPL & TIMER_THREAD_MASK) static void rb_thread_create_timer_thread(void) { - /* we only create the pipe, and lazy-spawn */ - rb_pid_t current = getpid(); - rb_pid_t owner = timer_thread_pipe.owner_process; + if (!timer_thread.created) { + size_t stack_size = 0; + int err; + pthread_attr_t attr; + rb_vm_t *vm = GET_VM(); - if (owner && owner != current) { - CLOSE_INVALIDATE(normal[0]); - CLOSE_INVALIDATE(normal[1]); - } + err = pthread_attr_init(&attr); + if (err != 0) { + rb_warn("pthread_attr_init failed for timer: %s, scheduling broken", + strerror(err)); + return; + } +# ifdef PTHREAD_STACK_MIN + { + size_t stack_min = PTHREAD_STACK_MIN; /* may be dynamic, get only once */ + const size_t min_size = (4096 * 4); + /* Allocate the machine stack for the timer thread + * at least 16KB (4 pages). FreeBSD 8.2 AMD64 causes + * machine stack overflow only with PTHREAD_STACK_MIN. + */ + enum { + needs_more_stack = +#if defined HAVE_VALGRIND_MEMCHECK_H && defined __APPLE__ + 1 +#else + THREAD_DEBUG != 0 +#endif + }; + stack_size = stack_min; + if (stack_size < min_size) stack_size = min_size; + if (needs_more_stack) { + stack_size += +((BUFSIZ - 1) / stack_min + 1) * stack_min; + } + err = pthread_attr_setstacksize(&attr, stack_size); + if (err != 0) { + rb_bug("pthread_attr_setstacksize(.., %"PRIuSIZE") failed: %s", + stack_size, strerror(err)); + } + } +# endif - if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) return; +#if TIMER_IMPL == TIMER_THREAD_SLEEPY + err = setup_communication_pipe(); + if (err) return; +#endif /* TIMER_THREAD_SLEEPY */ - if (owner != current) { - /* validate pipe on this process */ - sigwait_th = THREAD_INVALID; - timer_thread_pipe.owner_process = current; + /* create timer thread */ + if (timer_thread.created) { + rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n"); + } + err = pthread_create(&timer_thread.id, &attr, thread_timer, vm); + pthread_attr_destroy(&attr); + + if (err == EINVAL) { + /* + * Even if we are careful with our own stack use in thread_timer(), + * any third-party libraries (eg libkqueue) which rely on __thread + * storage can cause small stack sizes to fail. So lets hope the + * default stack size is enough for them: + */ + stack_size = 0; + err = pthread_create(&timer_thread.id, NULL, thread_timer, vm); + } + if (err != 0) { + rb_warn("pthread_create failed for timer: %s, scheduling broken", + strerror(err)); + if (stack_size) { + rb_warn("timer thread stack size: %"PRIuSIZE, stack_size); + } + else { + rb_warn("timer thread stack size: system default"); + } + VM_ASSERT(err == 0); + return; + } +#if TIMER_IMPL == TIMER_THREAD_SLEEPY + /* validate pipe on this process */ + timer_thread_pipe.owner_process = getpid(); +#endif /* TIMER_THREAD_SLEEPY */ + timer_thread.created = 1; } } +#endif /* TIMER_IMPL & TIMER_THREAD_MASK */ static int native_stop_timer_thread(void) @@ -1473,6 +1665,24 @@ native_stop_timer_thread(void) stopped = --system_working <= 0; if (TT_DEBUG) fprintf(stderr, "stop timer thread\n"); + if (stopped) { +#if TIMER_IMPL == TIMER_THREAD_SLEEPY + /* kick timer thread out of sleep */ + rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); +#endif + + /* timer thread will stop looping when system_working <= 0: */ + native_thread_join(timer_thread.id); + + /* + * don't care if timer_thread_pipe may fill up at this point. + * If we restart timer thread, signals will be processed, if + * we don't, it's because we're in a different child + */ + + if (TT_DEBUG) fprintf(stderr, "joined timer thread\n"); + timer_thread.created = 0; + } return stopped; } @@ -1529,14 +1739,20 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) int rb_reserved_fd_p(int fd) { +#if TIMER_IMPL == TIMER_THREAD_SLEEPY if ((fd == timer_thread_pipe.normal[0] || - fd == timer_thread_pipe.normal[1]) && + fd == timer_thread_pipe.normal[1] || + fd == timer_thread_pipe.low[0] || + fd == timer_thread_pipe.low[1]) && timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */ return 1; } else { return 0; } +#else + return 0; +#endif } rb_nativethread_id_t @@ -1587,7 +1803,7 @@ rb_sleep_cond_get(const rb_execution_context_t *ec) { rb_thread_t *th = rb_ec_thread_ptr(ec); - return &th->native_thread_data.cond.intr; + return &th->native_thread_data.sleep_cond; } void @@ -1597,126 +1813,4 @@ rb_sleep_cond_put(rb_nativethread_cond_t *cond) } #endif /* USE_NATIVE_SLEEP_COND */ -int -rb_sigwait_fd_get(const rb_thread_t *th) -{ - if (timer_thread_pipe.owner_process == getpid() && - timer_thread_pipe.normal[0] >= 0) { - if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) { - return timer_thread_pipe.normal[0]; - } - } - return -1; /* avoid thundering herd */ -} - -void -rb_sigwait_fd_put(const rb_thread_t *th, int fd) -{ - const rb_thread_t *old; - - VM_ASSERT(timer_thread_pipe.normal[0] == fd); - old = ATOMIC_PTR_EXCHANGE(sigwait_th, THREAD_INVALID); - if (old != th) assert(old == th); -} - -#ifndef HAVE_PPOLL -/* TODO: don't ignore sigmask */ -static int -ruby_ppoll(struct pollfd *fds, nfds_t nfds, - const struct timespec *ts, const sigset_t *sigmask) -{ - int timeout_ms; - - if (ts) { - int tmp, tmp2; - - if (ts->tv_sec > INT_MAX/1000) - timeout_ms = INT_MAX; - else { - tmp = (int)(ts->tv_sec * 1000); - /* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */ - tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L)); - if (INT_MAX - tmp < tmp2) - timeout_ms = INT_MAX; - else - timeout_ms = (int)(tmp + tmp2); - } - } - else - timeout_ms = -1; - - return poll(fds, nfds, timeout_ms); -} -# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask)) -#endif - -void -rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const struct timespec *ts) -{ - struct pollfd pfd; - - pfd.fd = sigwait_fd; - pfd.events = POLLIN; - - if (!BUSY_WAIT_SIGNALS && ubf_threads_empty()) { - (void)ppoll(&pfd, 1, ts, 0); - check_signals_nogvl(th, sigwait_fd); - } - else { - struct timespec end, diff; - const struct timespec *to; - int n = 0; - - if (ts) { - getclockofday(&end); - timespec_add(&end, ts); - diff = *ts; - ts = &diff; - } - /* - * tricky: this needs to return on spurious wakeup (no auto-retry). - * But we also need to distinguish between periodic quantum - * wakeups, so we care about the result of consume_communication_pipe - */ - for (;;) { - to = sigwait_timeout(th, sigwait_fd, ts, &n); - if (n) return; - n = ppoll(&pfd, 1, to, 0); - if (check_signals_nogvl(th, sigwait_fd)) - return; - if (n || (th && RUBY_VM_INTERRUPTED(th->ec))) - return; - if (ts && timespec_update_expire(&diff, &end)) - return; - } - } -} - -static void -native_sleep(rb_thread_t *th, struct timespec *timeout_rel) -{ - int sigwait_fd = rb_sigwait_fd_get(th); - - if (sigwait_fd >= 0) { - rb_native_mutex_lock(&th->interrupt_lock); - th->unblock.func = ubf_sigwait; - rb_native_mutex_unlock(&th->interrupt_lock); - - GVL_UNLOCK_BEGIN(th); - - if (!RUBY_VM_INTERRUPTED(th->ec)) { - rb_sigwait_sleep(th, sigwait_fd, timeout_rel); - } - else { - check_signals_nogvl(th, sigwait_fd); - } - unblock_function_clear(th); - GVL_UNLOCK_END(th); - rb_sigwait_fd_put(th, sigwait_fd); - rb_sigwait_fd_migrate(th->vm); - } - else { - native_cond_sleep(th, timeout_rel); - } -} #endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */ |