From 79df14c04b452411b9d17e26a398e491bca1a811 Mon Sep 17 00:00:00 2001 From: Koichi Sasada Date: Tue, 10 Mar 2020 02:22:11 +0900 Subject: Introduce Ractor mechanism for parallel execution This commit introduces Ractor mechanism to run Ruby program in parallel. See doc/ractor.md for more details about Ractor. See ticket [Feature #17100] to see the implementation details and discussions. [Feature #17100] This commit does not complete the implementation. You can find many bugs on using Ractor. Also the specification will be changed so that this feature is experimental. You will see a warning when you make the first Ractor with `Ractor.new`. I hope this feature can help programmers from thread-safety issues. --- thread_pthread.c | 226 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 117 insertions(+), 109 deletions(-) (limited to 'thread_pthread.c') diff --git a/thread_pthread.c b/thread_pthread.c index ee2f7bc909..427897cfd8 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -122,26 +122,14 @@ static struct { }; #endif -void rb_native_mutex_lock(rb_nativethread_lock_t *lock); -void rb_native_mutex_unlock(rb_nativethread_lock_t *lock); -static int native_mutex_trylock(rb_nativethread_lock_t *lock); -void rb_native_mutex_initialize(rb_nativethread_lock_t *lock); -void rb_native_mutex_destroy(rb_nativethread_lock_t *lock); -void rb_native_cond_signal(rb_nativethread_cond_t *cond); -void rb_native_cond_broadcast(rb_nativethread_cond_t *cond); -void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex); -void rb_native_cond_initialize(rb_nativethread_cond_t *cond); -void rb_native_cond_destroy(rb_nativethread_cond_t *cond); -static void clear_thread_cache_altstack(void); -static void ubf_wakeup_all_threads(void); -static int ubf_threads_empty(void); -static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *, - const rb_hrtime_t *abs); static const rb_hrtime_t *sigwait_timeout(rb_thread_t *, int sigwait_fd, const rb_hrtime_t *, int *drained_p); static void ubf_timer_disarm(void); static void threadptr_trap_interrupt(rb_thread_t *); +static void clear_thread_cache_altstack(void); +static void ubf_wakeup_all_threads(void); +static int ubf_threads_empty(void); #define TIMER_THREAD_CREATED_P() (signal_self_pipe.owner_process == getpid()) @@ -180,17 +168,18 @@ static const void *const condattr_monotonic = NULL; #define TIME_QUANTUM_NSEC (TIME_QUANTUM_USEC * 1000) static rb_hrtime_t native_cond_timeout(rb_nativethread_cond_t *, rb_hrtime_t); +static int native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, const rb_hrtime_t *abs); /* * Designate the next gvl.timer thread, favor the last thread in * the waitq since it will be in waitq longest */ static int -designate_timer_thread(rb_vm_t *vm) +designate_timer_thread(rb_global_vm_lock_t *gvl) { native_thread_data_t *last; - last = list_tail(&vm->gvl.waitq, native_thread_data_t, node.ubf); + last = list_tail(&gvl->waitq, native_thread_data_t, node.ubf); if (last) { rb_native_cond_signal(&last->cond.gvlq); return TRUE; @@ -203,29 +192,30 @@ designate_timer_thread(rb_vm_t *vm) * periodically. Continue on old timeout if it expired. */ static void -do_gvl_timer(rb_vm_t *vm, rb_thread_t *th) +do_gvl_timer(rb_global_vm_lock_t *gvl, rb_thread_t *th) { static rb_hrtime_t abs; native_thread_data_t *nd = &th->native_thread_data; - vm->gvl.timer = th; + gvl->timer = th; /* take over wakeups from UBF_TIMER */ ubf_timer_disarm(); - if (vm->gvl.timer_err == ETIMEDOUT) { + if (gvl->timer_err == ETIMEDOUT) { abs = native_cond_timeout(&nd->cond.gvlq, TIME_QUANTUM_NSEC); } - vm->gvl.timer_err = native_cond_timedwait(&nd->cond.gvlq, &vm->gvl.lock, &abs); + gvl->timer_err = native_cond_timedwait(&nd->cond.gvlq, &gvl->lock, &abs); ubf_wakeup_all_threads(); - ruby_sigchld_handler(vm); + ruby_sigchld_handler(GET_VM()); + if (UNLIKELY(rb_signal_buff_size())) { - if (th == vm->main_thread) { + if (th == GET_VM()->ractor.main_thread) { RUBY_VM_SET_TRAP_INTERRUPT(th->ec); } else { - threadptr_trap_interrupt(vm->main_thread); + threadptr_trap_interrupt(GET_VM()->ractor.main_thread); } } @@ -233,77 +223,77 @@ do_gvl_timer(rb_vm_t *vm, rb_thread_t *th) * Timeslice. Warning: the process may fork while this * thread is contending for GVL: */ - if (vm->gvl.owner) timer_thread_function(); - vm->gvl.timer = 0; + if (gvl->owner) timer_thread_function(gvl->owner->ec); + gvl->timer = 0; } static void -gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) +gvl_acquire_common(rb_global_vm_lock_t *gvl, rb_thread_t *th) { - if (vm->gvl.owner) { + if (gvl->owner) { native_thread_data_t *nd = &th->native_thread_data; VM_ASSERT(th->unblock.func == 0 && "we must not be in ubf_list and GVL waitq at the same time"); - list_add_tail(&vm->gvl.waitq, &nd->node.gvl); + list_add_tail(&gvl->waitq, &nd->node.gvl); do { - if (!vm->gvl.timer) { - do_gvl_timer(vm, th); + if (!gvl->timer) { + do_gvl_timer(gvl, th); } else { - rb_native_cond_wait(&nd->cond.gvlq, &vm->gvl.lock); + rb_native_cond_wait(&nd->cond.gvlq, &gvl->lock); } - } while (vm->gvl.owner); + } while (gvl->owner); list_del_init(&nd->node.gvl); - if (vm->gvl.need_yield) { - vm->gvl.need_yield = 0; - rb_native_cond_signal(&vm->gvl.switch_cond); + if (gvl->need_yield) { + gvl->need_yield = 0; + rb_native_cond_signal(&gvl->switch_cond); } } else { /* reset timer if uncontended */ - vm->gvl.timer_err = ETIMEDOUT; + gvl->timer_err = ETIMEDOUT; } - vm->gvl.owner = th; - if (!vm->gvl.timer) { - if (!designate_timer_thread(vm) && !ubf_threads_empty()) { + gvl->owner = th; + if (!gvl->timer) { + if (!designate_timer_thread(gvl) && !ubf_threads_empty()) { rb_thread_wakeup_timer_thread(-1); } } } static void -gvl_acquire(rb_vm_t *vm, rb_thread_t *th) +gvl_acquire(rb_global_vm_lock_t *gvl, rb_thread_t *th) { - rb_native_mutex_lock(&vm->gvl.lock); - gvl_acquire_common(vm, th); - rb_native_mutex_unlock(&vm->gvl.lock); + rb_native_mutex_lock(&gvl->lock); + gvl_acquire_common(gvl, th); + rb_native_mutex_unlock(&gvl->lock); } static const native_thread_data_t * -gvl_release_common(rb_vm_t *vm) +gvl_release_common(rb_global_vm_lock_t *gvl) { native_thread_data_t *next; - vm->gvl.owner = 0; - next = list_top(&vm->gvl.waitq, native_thread_data_t, node.ubf); + gvl->owner = 0; + next = list_top(&gvl->waitq, native_thread_data_t, node.ubf); if (next) rb_native_cond_signal(&next->cond.gvlq); return next; } static void -gvl_release(rb_vm_t *vm) +gvl_release(rb_global_vm_lock_t *gvl) { - rb_native_mutex_lock(&vm->gvl.lock); - gvl_release_common(vm); - rb_native_mutex_unlock(&vm->gvl.lock); + rb_native_mutex_lock(&gvl->lock); + gvl_release_common(gvl); + rb_native_mutex_unlock(&gvl->lock); } static void -gvl_yield(rb_vm_t *vm, rb_thread_t *th) +gvl_yield(rb_global_vm_lock_t *gvl, rb_thread_t *th) { const native_thread_data_t *next; @@ -312,49 +302,49 @@ gvl_yield(rb_vm_t *vm, rb_thread_t *th) * (perhaps looping in io_close_fptr) so we kick them: */ ubf_wakeup_all_threads(); - rb_native_mutex_lock(&vm->gvl.lock); - next = gvl_release_common(vm); + rb_native_mutex_lock(&gvl->lock); + next = gvl_release_common(gvl); /* An another thread is processing GVL yield. */ - if (UNLIKELY(vm->gvl.wait_yield)) { - while (vm->gvl.wait_yield) - rb_native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock); + if (UNLIKELY(gvl->wait_yield)) { + while (gvl->wait_yield) + rb_native_cond_wait(&gvl->switch_wait_cond, &gvl->lock); } else if (next) { /* Wait until another thread task takes GVL. */ - vm->gvl.need_yield = 1; - vm->gvl.wait_yield = 1; - while (vm->gvl.need_yield) - rb_native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock); - vm->gvl.wait_yield = 0; - rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); + gvl->need_yield = 1; + gvl->wait_yield = 1; + while (gvl->need_yield) + rb_native_cond_wait(&gvl->switch_cond, &gvl->lock); + gvl->wait_yield = 0; + rb_native_cond_broadcast(&gvl->switch_wait_cond); } else { - rb_native_mutex_unlock(&vm->gvl.lock); + rb_native_mutex_unlock(&gvl->lock); native_thread_yield(); - rb_native_mutex_lock(&vm->gvl.lock); - rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); + rb_native_mutex_lock(&gvl->lock); + rb_native_cond_broadcast(&gvl->switch_wait_cond); } - gvl_acquire_common(vm, th); - rb_native_mutex_unlock(&vm->gvl.lock); + gvl_acquire_common(gvl, th); + rb_native_mutex_unlock(&gvl->lock); } -static void -gvl_init(rb_vm_t *vm) +void +rb_gvl_init(rb_global_vm_lock_t *gvl) { - rb_native_mutex_initialize(&vm->gvl.lock); - rb_native_cond_initialize(&vm->gvl.switch_cond); - rb_native_cond_initialize(&vm->gvl.switch_wait_cond); - list_head_init(&vm->gvl.waitq); - vm->gvl.owner = 0; - vm->gvl.timer = 0; - vm->gvl.timer_err = ETIMEDOUT; - vm->gvl.need_yield = 0; - vm->gvl.wait_yield = 0; + rb_native_mutex_initialize(&gvl->lock); + rb_native_cond_initialize(&gvl->switch_cond); + rb_native_cond_initialize(&gvl->switch_wait_cond); + list_head_init(&gvl->waitq); + gvl->owner = 0; + gvl->timer = 0; + gvl->timer_err = ETIMEDOUT; + gvl->need_yield = 0; + gvl->wait_yield = 0; } static void -gvl_destroy(rb_vm_t *vm) +gvl_destroy(rb_global_vm_lock_t *gvl) { /* * only called once at VM shutdown (not atfork), another thread @@ -362,9 +352,9 @@ gvl_destroy(rb_vm_t *vm) * the end of thread_start_func_2 */ if (0) { - rb_native_cond_destroy(&vm->gvl.switch_wait_cond); - rb_native_cond_destroy(&vm->gvl.switch_cond); - rb_native_mutex_destroy(&vm->gvl.lock); + rb_native_cond_destroy(&gvl->switch_wait_cond); + rb_native_cond_destroy(&gvl->switch_cond); + rb_native_mutex_destroy(&gvl->lock); } clear_thread_cache_altstack(); } @@ -372,11 +362,11 @@ gvl_destroy(rb_vm_t *vm) #if defined(HAVE_WORKING_FORK) static void thread_cache_reset(void); static void -gvl_atfork(rb_vm_t *vm) +gvl_atfork(rb_global_vm_lock_t *gvl) { thread_cache_reset(); - gvl_init(vm); - gvl_acquire(vm, GET_THREAD()); + rb_gvl_init(gvl); + gvl_acquire(gvl, GET_THREAD()); } #endif @@ -415,8 +405,8 @@ rb_native_mutex_unlock(pthread_mutex_t *lock) } } -static inline int -native_mutex_trylock(pthread_mutex_t *lock) +int +rb_native_mutex_trylock(pthread_mutex_t *lock) { int r; mutex_debug("trylock", lock); @@ -513,8 +503,7 @@ rb_native_cond_wait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex) } static int -native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, - const rb_hrtime_t *abs) +native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, const rb_hrtime_t *abs) { int r; struct timespec ts; @@ -526,16 +515,24 @@ native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, * Let's hide it from arch generic code. */ do { - r = pthread_cond_timedwait(cond, mutex, rb_hrtime2timespec(&ts, abs)); + rb_hrtime2timespec(&ts, abs); + r = pthread_cond_timedwait(cond, mutex, &ts); } while (r == EINTR); if (r != 0 && r != ETIMEDOUT) { - rb_bug_errno("pthread_cond_timedwait", r); + rb_bug_errno("pthread_cond_timedwait", r); } return r; } +void +rb_native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, unsigned long msec) +{ + rb_hrtime_t hrmsec = native_cond_timeout(cond, RB_HRTIME_PER_MSEC * msec); + native_cond_timedwait(cond, mutex, &hrmsec); +} + static rb_hrtime_t native_cond_timeout(rb_nativethread_cond_t *cond, const rb_hrtime_t rel) { @@ -570,6 +567,9 @@ ruby_thread_from_native(void) static int ruby_thread_set_native(rb_thread_t *th) { + if (th && th->ec) { + rb_ractor_set_current_ec(th->ractor, th->ec); + } return pthread_setspecific(ruby_native_thread_key, th) == 0; } @@ -587,8 +587,14 @@ Init_native_thread(rb_thread_t *th) if (r) condattr_monotonic = NULL; } #endif - pthread_key_create(&ruby_native_thread_key, 0); + if (pthread_key_create(&ruby_native_thread_key, 0) == EAGAIN) { + rb_bug("pthread_key_create failed (ruby_native_thread_key)"); + } + if (pthread_key_create(&ruby_current_ec_key, 0) == EAGAIN) { + rb_bug("pthread_key_create failed (ruby_current_ec_key)"); + } th->thread_id = pthread_self(); + ruby_thread_set_native(th); fill_thread_id_str(th); native_thread_init(th); posix_signal(SIGVTALRM, null_func); @@ -605,7 +611,6 @@ native_thread_init(rb_thread_t *th) rb_native_cond_initialize(&nd->cond.gvlq); if (&nd->cond.gvlq != &nd->cond.intr) rb_native_cond_initialize(&nd->cond.intr); - ruby_thread_set_native(th); } #ifndef USE_THREAD_CACHE @@ -1116,7 +1121,7 @@ native_thread_create(rb_thread_t *th) # endif CHECK_ERR(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)); - err = pthread_create(&th->thread_id, &attr, thread_start_func_1, th); + err = pthread_create(&th->thread_id, &attr, thread_start_func_1, th); thread_debug("create: %p (%d)\n", (void *)th, err); /* should be done in the created thread */ fill_thread_id_str(th); @@ -1207,7 +1212,7 @@ native_cond_sleep(rb_thread_t *th, rb_hrtime_t *rel) } end = native_cond_timeout(cond, *rel); - native_cond_timedwait(cond, lock, &end); + native_cond_timedwait(cond, lock, &end); } } th->unblock.func = 0; @@ -1277,7 +1282,7 @@ static void ubf_select(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; - rb_vm_t *vm = th->vm; + rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor); const rb_thread_t *cur = ruby_thread_from_native(); /* may be 0 */ register_ubf_list(th); @@ -1292,17 +1297,17 @@ ubf_select(void *ptr) * sigwait_th thread, otherwise we can deadlock with a thread * in unblock_function_clear. */ - if (cur != vm->gvl.timer && cur != sigwait_th) { + if (cur != gvl->timer && cur != sigwait_th) { /* * Double-checked locking above was to prevent nested locking * by the SAME thread. We use trylock here to prevent deadlocks * between DIFFERENT threads */ - if (native_mutex_trylock(&vm->gvl.lock) == 0) { - if (!vm->gvl.timer) { + if (rb_native_mutex_trylock(&gvl->lock) == 0) { + if (!gvl->timer) { rb_thread_wakeup_timer_thread(-1); } - rb_native_mutex_unlock(&vm->gvl.lock); + rb_native_mutex_unlock(&gvl->lock); } } @@ -1471,7 +1476,7 @@ rb_thread_wakeup_timer_thread(int sig) * on heap for maximum safety (and startup/shutdown speed) */ if (!vm) return; - mth = vm->main_thread; + mth = vm->ractor.main_thread; if (!mth || system_working <= 0) return; /* this relies on GC for grace period before cont_free */ @@ -2063,12 +2068,12 @@ ubf_ppoll_sleep(void *ignore) */ #define GVL_UNLOCK_BEGIN_YIELD(th) do { \ const native_thread_data_t *next; \ - rb_vm_t *vm = th->vm; \ + rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor); \ RB_GC_SAVE_MACHINE_CONTEXT(th); \ - rb_native_mutex_lock(&vm->gvl.lock); \ - next = gvl_release_common(vm); \ - rb_native_mutex_unlock(&vm->gvl.lock); \ - if (!next && vm_living_thread_num(vm) > 1) { \ + rb_native_mutex_lock(&gvl->lock); \ + next = gvl_release_common(gvl); \ + rb_native_mutex_unlock(&gvl->lock); \ + if (!next && rb_ractor_living_thread_num(th->ractor) > 1) { \ native_thread_yield(); \ } @@ -2117,6 +2122,7 @@ static void native_sleep(rb_thread_t *th, rb_hrtime_t *rel) { int sigwait_fd = rb_sigwait_fd_get(th); + rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__); if (sigwait_fd >= 0) { rb_native_mutex_lock(&th->interrupt_lock); @@ -2136,12 +2142,14 @@ native_sleep(rb_thread_t *th, rb_hrtime_t *rel) rb_sigwait_fd_put(th, sigwait_fd); rb_sigwait_fd_migrate(th->vm); } - else if (th == th->vm->main_thread) { /* always able to handle signals */ + else if (th == th->vm->ractor.main_thread) { /* always able to handle signals */ native_ppoll_sleep(th, rel); } else { native_cond_sleep(th, rel); } + + rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__); } #if UBF_TIMER == UBF_TIMER_PTHREAD @@ -2149,7 +2157,7 @@ static void * timer_pthread_fn(void *p) { rb_vm_t *vm = p; - pthread_t main_thread_id = vm->main_thread->thread_id; + pthread_t main_thread_id = vm->ractor.main_thread->thread_id; struct pollfd pfd; int timeout = -1; int ccp; -- cgit v1.2.1