From 0436f1e15a8e79ffef5ea412ac1312cbf9f063e6 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 1 Dec 2022 23:00:33 +1300 Subject: Introduce `Fiber#storage` for inheritable fiber-scoped variables. (#6612) --- cont.c | 314 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 261 insertions(+), 53 deletions(-) (limited to 'cont.c') diff --git a/cont.c b/cont.c index c967e57325..e050deefdc 100644 --- a/cont.c +++ b/cont.c @@ -271,7 +271,7 @@ struct rb_fiber_struct { static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0}; -static ID fiber_initialize_keywords[2] = {0}; +static ID fiber_initialize_keywords[3] = {0}; /* * FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL @@ -1156,7 +1156,9 @@ fiber_memsize(const void *ptr) */ if (saved_ec->local_storage && fiber != th->root_fiber) { size += rb_id_table_memsize(saved_ec->local_storage); + size += rb_obj_memsize_of(saved_ec->storage); } + size += cont_memsize(&fiber->cont); return size; } @@ -2007,11 +2009,186 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking) return fiber; } +static rb_fiber_t * +root_fiber_alloc(rb_thread_t *th) +{ + VALUE fiber_value = fiber_alloc(rb_cFiber); + rb_fiber_t *fiber = th->ec->fiber_ptr; + + VM_ASSERT(DATA_PTR(fiber_value) == NULL); + VM_ASSERT(fiber->cont.type == FIBER_CONTEXT); + VM_ASSERT(FIBER_RESUMED_P(fiber)); + + th->root_fiber = fiber; + DATA_PTR(fiber_value) = fiber; + fiber->cont.self = fiber_value; + + coroutine_initialize_main(&fiber->context); + + return fiber; +} + +static inline rb_fiber_t* +fiber_current(void) +{ + rb_execution_context_t *ec = GET_EC(); + if (ec->fiber_ptr->cont.self == 0) { + root_fiber_alloc(rb_ec_thread_ptr(ec)); + } + return ec->fiber_ptr; +} + +static inline VALUE +current_fiber_storage(void) +{ + rb_execution_context_t *ec = GET_EC(); + return ec->storage; +} + +static inline VALUE +inherit_fiber_storage(void) +{ + return rb_obj_dup(current_fiber_storage()); +} + +static inline void +fiber_storage_set(struct rb_fiber_struct *fiber, VALUE storage) +{ + fiber->cont.saved_ec.storage = storage; +} + +static inline VALUE +fiber_storage_get(rb_fiber_t *fiber) +{ + VALUE storage = fiber->cont.saved_ec.storage; + if (storage == Qnil) { + storage = rb_hash_new(); + fiber_storage_set(fiber, storage); + } + return storage; +} + +/** + * call-seq: Fiber.current.storage -> hash (dup) + * + * Returns a copy of the storage hash for the current fiber. + */ +static VALUE +rb_fiber_storage_get(VALUE self) +{ + return rb_obj_dup(fiber_storage_get(fiber_ptr(self))); +} + +static int +fiber_storage_validate_each(VALUE key, VALUE value, VALUE _argument) +{ + rb_check_id(&key); + + return ST_CONTINUE; +} + +static void +fiber_storage_validate(VALUE value) +{ + if (!RB_TYPE_P(value, T_HASH)) { + rb_raise(rb_eTypeError, "storage must be a hash"); + } + + rb_hash_foreach(value, fiber_storage_validate_each, Qundef); +} + +/** + * call-seq: Fiber.current.storage = hash + * + * Sets the storage hash for the current fiber. This feature is experimental + * and may change in the future. + * + * You should be careful about using this method as you may inadvertently clear + * important fiber-storage state. You should mostly prefer to assign specific + * keys in the storage using Fiber#[]=. + * + * You can also use Fiber.new(storage: nil) to create a fiber with an empty + * storage. + * + * Example: + * + * while request = request_queue.pop + * # Reset the per-request state: + * Fiber.current.storage = nil + * handle_request(request) + * end + */ static VALUE -fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking) +rb_fiber_storage_set(VALUE self, VALUE value) { + fiber_storage_validate(value); + + fiber_ptr(self)->cont.saved_ec.storage = rb_obj_dup(value); + return value; +} + +/** + * call-seq: Fiber[key] -> value + * + * Returns the value of the fiber-local variable identified by +key+. + * + * The +key+ must be a symbol, and the value is set by Fiber#[]= or + * Fiber#store. + * + * See also Fiber[]=. + */ +static VALUE +rb_fiber_storage_aref(VALUE class, VALUE key) +{ + ID id = rb_check_id(&key); + if (!id) return Qnil; + + VALUE storage = fiber_storage_get(fiber_current()); + + if (storage == Qnil) return Qnil; + + return rb_hash_aref(storage, key); +} + +/** + * call-seq: Fiber[key] = value + * + * Assign +value+ to the fiber-local variable identified by +key+. + * The variable is created if it doesn't exist. + * + * +key+ must be a Symbol, otherwise a TypeError is raised. + * + * See also Fiber[]. + */ +static VALUE +rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value) +{ + ID id = rb_check_id(&key); + if (!id) return Qnil; + + VALUE storage = fiber_storage_get(fiber_current()); + + return rb_hash_aset(storage, key, value); +} + +static VALUE +fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking, VALUE storage) +{ + if (storage == Qundef || storage == Qtrue) { + // The default, inherit storage (dup) from the current fiber: + storage = inherit_fiber_storage(); + } + else if (storage == Qfalse) { + storage = current_fiber_storage(); + } + else /* nil, hash, etc. */ { + fiber_storage_validate(storage); + storage = rb_obj_dup(storage); + } + rb_fiber_t *fiber = fiber_t_alloc(self, blocking); + fiber->cont.saved_ec.storage = storage; fiber->first_proc = proc; fiber->stack.base = NULL; fiber->stack.pool = fiber_pool; @@ -2044,19 +2221,27 @@ rb_fiber_pool_default(VALUE pool) return &shared_fiber_pool; } +VALUE rb_fiber_inherit_storage(struct rb_execution_context_struct *ec, struct rb_fiber_struct *fiber) +{ + VALUE storage = rb_obj_dup(ec->storage); + fiber->cont.saved_ec.storage = storage; + return storage; +} + /* :nodoc: */ static VALUE rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat) { VALUE pool = Qnil; VALUE blocking = Qfalse; + VALUE storage = Qundef; if (kw_splat != RB_NO_KEYWORDS) { VALUE options = Qnil; - VALUE arguments[2] = {Qundef}; + VALUE arguments[3] = {Qundef}; argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options); - rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments); + rb_get_kwargs(options, fiber_initialize_keywords, 0, 3, arguments); if (!UNDEF_P(arguments[0])) { blocking = arguments[0]; @@ -2065,33 +2250,73 @@ rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat) if (!UNDEF_P(arguments[1])) { pool = arguments[1]; } + + storage = arguments[2]; } - return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking)); + return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking), storage); } /* * call-seq: - * Fiber.new(blocking: false) { |*args| ... } -> fiber + * Fiber.new(blocking: false, storage: true) { |*args| ... } -> fiber * - * Creates new Fiber. Initially, the fiber is not running and can be resumed with - * #resume. Arguments to the first #resume call will be passed to the block: + * Creates new Fiber. Initially, the fiber is not running and can be resumed + * with #resume. Arguments to the first #resume call will be passed to the + * block: * - * f = Fiber.new do |initial| - * current = initial - * loop do - * puts "current: #{current.inspect}" - * current = Fiber.yield - * end - * end - * f.resume(100) # prints: current: 100 - * f.resume(1, 2, 3) # prints: current: [1, 2, 3] - * f.resume # prints: current: nil - * # ... and so on ... - * - * If blocking: false is passed to Fiber.new, _and_ current thread - * has a Fiber.scheduler defined, the Fiber becomes non-blocking (see "Non-blocking - * Fibers" section in class docs). + * f = Fiber.new do |initial| + * current = initial + * loop do + * puts "current: #{current.inspect}" + * current = Fiber.yield + * end + * end + * f.resume(100) # prints: current: 100 + * f.resume(1, 2, 3) # prints: current: [1, 2, 3] + * f.resume # prints: current: nil + * # ... and so on ... + * + * If blocking: false is passed to Fiber.new, _and_ current + * thread has a Fiber.scheduler defined, the Fiber becomes non-blocking (see + * "Non-blocking Fibers" section in class docs). + * + * If the storage is unspecified, the default is to inherit a copy of + * the storage from the current fiber. This is the same as specifying + * storage: true. + * + * Fiber[:x] = 1 + * Fiber.new do + * Fiber[:x] # => 1 + * Fiber[:x] = 2 + * end.resume + * Fiber[:x] # => 1 + * + * If the storage is false, this function uses the current + * fiber's storage by reference. This is used for Enumerator to create + * hidden fiber. + * + * Fiber[:count] = 0 + * enumerator = Enumerator.new do |y| + * loop{y << (Fiber[:count] += 1)} + * end + * Fiber[:count] # => 0 + * enumerator.next # => 1 + * Fiber[:count] # => 1 + * + * If the given storage is nil, this function will lazy + * initialize the internal storage, which starts as an empty hash. + * + * Fiber[:x] = "Hello World" + * Fiber.new(storage: nil) do + * Fiber[:x] # nil + * end + * + * Otherwise, the given storage is used as the new fiber's storage, + * and it must be an instance of Hash. + * + * Explicitly using `storage: true/false` is currently experimental and may + * change in the future. */ static VALUE rb_fiber_initialize(int argc, VALUE* argv, VALUE self) @@ -2099,10 +2324,16 @@ rb_fiber_initialize(int argc, VALUE* argv, VALUE self) return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p()); } +VALUE +rb_fiber_new_storage(rb_block_call_func_t func, VALUE obj, VALUE storage) +{ + return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1, storage); +} + VALUE rb_fiber_new(rb_block_call_func_t func, VALUE obj) { - return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1); + return rb_fiber_new_storage(func, obj, Qtrue); } static VALUE @@ -2276,25 +2507,6 @@ rb_fiber_start(rb_fiber_t *fiber) rb_fiber_terminate(fiber, need_interrupt, err); } -static rb_fiber_t * -root_fiber_alloc(rb_thread_t *th) -{ - VALUE fiber_value = fiber_alloc(rb_cFiber); - rb_fiber_t *fiber = th->ec->fiber_ptr; - - VM_ASSERT(DATA_PTR(fiber_value) == NULL); - VM_ASSERT(fiber->cont.type == FIBER_CONTEXT); - VM_ASSERT(FIBER_RESUMED_P(fiber)); - - th->root_fiber = fiber; - DATA_PTR(fiber_value) = fiber; - fiber->cont.self = fiber_value; - - coroutine_initialize_main(&fiber->context); - - return fiber; -} - // Set up a "root fiber", which is the fiber that every Ractor has. void rb_threadptr_root_fiber_setup(rb_thread_t *th) @@ -2348,16 +2560,6 @@ rb_threadptr_root_fiber_terminate(rb_thread_t *th) rb_ec_clear_vm_stack(th->ec); } -static inline rb_fiber_t* -fiber_current(void) -{ - rb_execution_context_t *ec = GET_EC(); - if (ec->fiber_ptr->cont.self == 0) { - root_fiber_alloc(rb_ec_thread_ptr(ec)); - } - return ec->fiber_ptr; -} - static inline rb_fiber_t* return_fiber(bool terminate) { @@ -3146,6 +3348,7 @@ Init_Cont(void) fiber_initialize_keywords[0] = rb_intern_const("blocking"); fiber_initialize_keywords[1] = rb_intern_const("pool"); + fiber_initialize_keywords[2] = rb_intern_const("storage"); const char *fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS"); if (fiber_shared_fiber_pool_free_stacks) { @@ -3158,8 +3361,13 @@ Init_Cont(void) rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1); rb_define_singleton_method(rb_cFiber, "current", rb_fiber_s_current, 0); rb_define_singleton_method(rb_cFiber, "blocking", rb_fiber_blocking, 0); + rb_define_singleton_method(rb_cFiber, "[]", rb_fiber_storage_aref, 1); + rb_define_singleton_method(rb_cFiber, "[]=", rb_fiber_storage_aset, 2); + rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1); rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0); + rb_define_method(rb_cFiber, "storage", rb_fiber_storage_get, 0); + rb_define_method(rb_cFiber, "storage=", rb_fiber_storage_set, 1); rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); rb_define_method(rb_cFiber, "raise", rb_fiber_m_raise, -1); rb_define_method(rb_cFiber, "backtrace", rb_fiber_backtrace, -1); -- cgit v1.2.1