diff options
Diffstat (limited to 'cont.c')
-rw-r--r-- | cont.c | 92 |
1 files changed, 86 insertions, 6 deletions
@@ -241,12 +241,17 @@ struct rb_fiber_struct { */ unsigned int transferred : 1; + /* Whether the fiber is allowed to implicitly yield. */ + unsigned int blocking : 1; + struct coroutine_context context; struct fiber_pool_stack stack; }; static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0}; +static ID fiber_initialize_keywords[2] = {0}; + /* * FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL * if MAP_STACK is passed. @@ -1733,7 +1738,7 @@ fiber_alloc(VALUE klass) } static rb_fiber_t* -fiber_t_alloc(VALUE fiber_value) +fiber_t_alloc(VALUE fiber_value, unsigned int blocking) { rb_fiber_t *fiber; rb_thread_t *th = GET_THREAD(); @@ -1746,6 +1751,7 @@ fiber_t_alloc(VALUE fiber_value) fiber = ZALLOC(rb_fiber_t); fiber->cont.self = fiber_value; fiber->cont.type = FIBER_CONTEXT; + fiber->blocking = blocking; cont_init(&fiber->cont, th); fiber->cont.saved_ec.fiber_ptr = fiber; @@ -1763,9 +1769,9 @@ fiber_t_alloc(VALUE fiber_value) } static VALUE -fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool) +fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking) { - rb_fiber_t *fiber = fiber_t_alloc(self); + rb_fiber_t *fiber = fiber_t_alloc(self, blocking); fiber->first_proc = proc; fiber->stack.base = NULL; @@ -1793,17 +1799,66 @@ fiber_prepare_stack(rb_fiber_t *fiber) sec->local_storage_recursive_hash_for_trace = Qnil; } +static struct fiber_pool * +rb_fiber_pool_default(VALUE pool) +{ + return &shared_fiber_pool; +} + +/* :nodoc: */ +static VALUE +rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat) +{ + VALUE pool = Qnil; + VALUE blocking = Qtrue; + + if (kw_splat != RB_NO_KEYWORDS) { + VALUE options = Qnil; + VALUE arguments[2] = {Qundef}; + + argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options); + rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments); + + blocking = arguments[0]; + pool = arguments[1]; + } + + return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking)); +} + /* :nodoc: */ static VALUE rb_fiber_initialize(int argc, VALUE* argv, VALUE self) { - return fiber_initialize(self, rb_block_proc(), &shared_fiber_pool); + return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p()); } VALUE rb_fiber_new(rb_block_call_func_t func, VALUE obj) { - return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), &shared_fiber_pool); + return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1); +} + +static VALUE +rb_f_fiber_kw(int argc, VALUE* argv, int kw_splat) +{ + rb_thread_t * th = GET_THREAD(); + VALUE scheduler = th->scheduler; + VALUE fiber = Qnil; + + if (scheduler != Qnil) { + fiber = rb_funcall_passing_block_kw(scheduler, rb_intern("fiber"), argc, argv, kw_splat); + } else { + rb_raise(rb_eRuntimeError, "No scheduler is available!"); + } + + return fiber; +} + +static VALUE +rb_f_fiber(int argc, VALUE *argv, VALUE obj) +{ + return rb_f_fiber_kw(argc, argv, rb_keyword_given_p()); } static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt); @@ -1820,6 +1875,10 @@ rb_fiber_start(void) VM_ASSERT(th->ec == ruby_current_execution_context_ptr); VM_ASSERT(FIBER_RESUMED_P(fiber)); + if (fiber->blocking) { + th->blocking += 1; + } + EC_PUSH_TAG(th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { rb_context_t *cont = &VAR_FROM_MEMORY(fiber)->cont; @@ -1892,6 +1951,7 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th) fiber->cont.type = FIBER_CONTEXT; fiber->cont.saved_ec.fiber_ptr = fiber; fiber->cont.saved_ec.thread_ptr = th; + fiber->blocking = 1; fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */ th->ec = &fiber->cont.saved_ec; } @@ -2044,11 +2104,15 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int } } + VM_ASSERT(FIBER_RUNNABLE_P(fiber)); + if (is_resume) { fiber->prev = fiber_current(); } - VM_ASSERT(FIBER_RUNNABLE_P(fiber)); + if (fiber_current()->blocking) { + th->blocking -= 1; + } cont->argc = argc; cont->kw_splat = kw_splat; @@ -2060,6 +2124,10 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int fiber_stack_release(fiber); } + if (fiber_current()->blocking) { + th->blocking += 1; + } + RUBY_VM_CHECK_INTS(th->ec); EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil); @@ -2073,6 +2141,12 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv) return fiber_switch(fiber_ptr(fiber_value), argc, argv, 0, RB_NO_KEYWORDS); } +VALUE +rb_fiber_blocking_p(VALUE fiber) +{ + return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue; +} + void rb_fiber_close(rb_fiber_t *fiber) { @@ -2442,6 +2516,9 @@ Init_Cont(void) fiber_pool_initialize(&shared_fiber_pool, stack_size, FIBER_POOL_INITIAL_SIZE, vm_stack_size); + fiber_initialize_keywords[0] = rb_intern_const("blocking"); + fiber_initialize_keywords[1] = rb_intern_const("pool"); + char * fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS"); if (fiber_shared_fiber_pool_free_stacks) { shared_fiber_pool.free_stacks = atoi(fiber_shared_fiber_pool_free_stacks); @@ -2452,11 +2529,14 @@ Init_Cont(void) rb_eFiberError = rb_define_class("FiberError", rb_eStandardError); rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1); rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1); + rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0); rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); rb_define_method(rb_cFiber, "raise", rb_fiber_raise, -1); rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0); rb_define_alias(rb_cFiber, "inspect", "to_s"); + rb_define_global_function("Fiber", rb_f_fiber, -1); + #ifdef RB_EXPERIMENTAL_FIBER_POOL rb_cFiberPool = rb_define_class("Pool", rb_cFiber); rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc); |