diff options
Diffstat (limited to 'ext/ffi_c/Function.c')
-rw-r--r-- | ext/ffi_c/Function.c | 234 |
1 files changed, 162 insertions, 72 deletions
diff --git a/ext/ffi_c/Function.c b/ext/ffi_c/Function.c index 0e68722..7810056 100644 --- a/ext/ffi_c/Function.c +++ b/ext/ffi_c/Function.c @@ -42,6 +42,10 @@ #include <ruby.h> #include <ruby/thread.h> +#if HAVE_RB_EXT_RACTOR_SAFE +#include <ruby/ractor.h> +#endif + #include <ffi.h> #if defined(HAVE_NATIVETHREAD) && !defined(_WIN32) #include <pthread.h> @@ -65,6 +69,9 @@ #include "MethodHandle.h" #include "Function.h" +#define DEFER_ASYNC_CALLBACK 1 + +struct async_cb_dispatcher; typedef struct Function_ { Pointer base; FunctionType* info; @@ -73,6 +80,9 @@ typedef struct Function_ { Closure* closure; VALUE rbProc; VALUE rbFunctionInfo; +#if defined(DEFER_ASYNC_CALLBACK) + struct async_cb_dispatcher *dispatcher; +#endif } Function; static void function_mark(void *data); @@ -86,9 +96,6 @@ static void* callback_with_gvl(void* data); static VALUE invoke_callback(VALUE data); static VALUE save_callback_exception(VALUE data, VALUE exc); -#define DEFER_ASYNC_CALLBACK 1 - - #if defined(DEFER_ASYNC_CALLBACK) static VALUE async_cb_event(void *); static VALUE async_cb_call(void *); @@ -108,15 +115,11 @@ static const rb_data_type_t function_data_type = { .parent = &rbffi_pointer_data_type, // IMPORTANT: WB_PROTECTED objects must only use the RB_OBJ_WRITE() // macro to update VALUE references, as to trigger write barriers. - .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED + .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | FFI_RUBY_TYPED_FROZEN_SHAREABLE }; VALUE rbffi_FunctionClass = Qnil; -#if defined(DEFER_ASYNC_CALLBACK) -static VALUE async_cb_thread = Qnil; -#endif - static ID id_call = 0, id_to_native = 0, id_from_native = 0, id_cbtable = 0, id_cb_ref = 0; struct gvl_callback { @@ -126,7 +129,10 @@ struct gvl_callback { bool done; rbffi_frame_t *frame; #if defined(DEFER_ASYNC_CALLBACK) + struct async_cb_dispatcher *dispatcher; struct gvl_callback* next; + + /* Signal when the callback has finished and retval is set */ # ifndef _WIN32 pthread_cond_t async_cond; pthread_mutex_t async_mutex; @@ -138,16 +144,74 @@ struct gvl_callback { #if defined(DEFER_ASYNC_CALLBACK) -static struct gvl_callback* async_cb_list = NULL; +struct async_cb_dispatcher { + /* the Ractor-local dispatcher thread */ + VALUE thread; + + /* single linked list of pending callbacks */ + struct gvl_callback* async_cb_list; + + /* Signal new entries in async_cb_list */ # ifndef _WIN32 - static pthread_mutex_t async_cb_mutex = PTHREAD_MUTEX_INITIALIZER; - static pthread_cond_t async_cb_cond = PTHREAD_COND_INITIALIZER; + pthread_mutex_t async_cb_mutex; + pthread_cond_t async_cb_cond; # else - static HANDLE async_cb_cond; - static CRITICAL_SECTION async_cb_lock; + HANDLE async_cb_cond; + CRITICAL_SECTION async_cb_lock; # endif -#endif +}; + +#if HAVE_RB_EXT_RACTOR_SAFE +static void +async_cb_dispatcher_mark(void *ptr) +{ + struct async_cb_dispatcher *ctx = (struct async_cb_dispatcher *)ptr; + rb_gc_mark(ctx->thread); +} + +static void +async_cb_dispatcher_free(void *ptr) +{ + struct async_cb_dispatcher *ctx = (struct async_cb_dispatcher *)ptr; + xfree(ctx); +} + +struct rb_ractor_local_storage_type async_cb_dispatcher_key_type = { + async_cb_dispatcher_mark, + async_cb_dispatcher_free, +}; + +static rb_ractor_local_key_t async_cb_dispatcher_key; + +static struct async_cb_dispatcher * +async_cb_dispatcher_get(void) +{ + struct async_cb_dispatcher *ctx = (struct async_cb_dispatcher *)rb_ractor_local_storage_ptr(async_cb_dispatcher_key); + return ctx; +} +static void +async_cb_dispatcher_set(struct async_cb_dispatcher *ctx) +{ + rb_ractor_local_storage_ptr_set(async_cb_dispatcher_key, ctx); +} +#else +// for ruby 2.x +static struct async_cb_dispatcher *async_cb_dispatcher = NULL; + +static struct async_cb_dispatcher * +async_cb_dispatcher_get(void) +{ + return async_cb_dispatcher; +} + +static void +async_cb_dispatcher_set(struct async_cb_dispatcher *ctx) +{ + async_cb_dispatcher = ctx; +} +#endif +#endif static VALUE function_allocate(VALUE klass) @@ -328,9 +392,7 @@ static void after_fork_callback(void) { /* Ensure that a new dispatcher thread is started in a forked process */ - async_cb_thread = Qnil; - pthread_mutex_init(&async_cb_mutex, NULL); - pthread_cond_init(&async_cb_cond, NULL); + async_cb_dispatcher_set(NULL); } #endif @@ -360,17 +422,30 @@ function_init(VALUE self, VALUE rbFunctionInfo, VALUE rbProc) } #if defined(DEFER_ASYNC_CALLBACK) - if (async_cb_thread == Qnil) { + { + struct async_cb_dispatcher *ctx = async_cb_dispatcher_get(); + if (ctx == NULL) { + ctx = (struct async_cb_dispatcher*)ALLOC(struct async_cb_dispatcher); + ctx->async_cb_list = NULL; #if !defined(_WIN32) - if( pthread_atfork(NULL, NULL, after_fork_callback) ){ - rb_warn("FFI: unable to register fork callback"); - } + pthread_mutex_init(&ctx->async_cb_mutex, NULL); + pthread_cond_init(&ctx->async_cb_cond, NULL); + if( pthread_atfork(NULL, NULL, after_fork_callback) ){ + rb_warn("FFI: unable to register fork callback"); + } +#else + InitializeCriticalSection(&ctx->async_cb_lock); + ctx->async_cb_cond = CreateEvent(NULL, FALSE, FALSE, NULL); #endif + ctx->thread = rb_thread_create(async_cb_event, ctx); - async_cb_thread = rb_thread_create(async_cb_event, NULL); - /* Name thread, for better debugging */ - rb_funcall(async_cb_thread, rb_intern("name="), 1, rb_str_new2("FFI Callback Dispatcher")); + /* Name thread, for better debugging */ + rb_funcall(ctx->thread, rb_intern("name="), 1, rb_str_new2("FFI Callback Dispatcher")); + + async_cb_dispatcher_set(ctx); + } + fn->dispatcher = ctx; } #endif @@ -417,8 +492,8 @@ static VALUE function_attach(VALUE self, VALUE module, VALUE name) { Function* fn; - char var[1024]; + StringValue(name); TypedData_Get_Struct(self, Function, &function_data_type, fn); if (fn->info->parameterCount == -1) { @@ -435,12 +510,6 @@ function_attach(VALUE self, VALUE module, VALUE name) fn->methodHandle = rbffi_MethodHandle_Alloc(fn->info, fn->base.memory.address); } - /* - * Stash the Function in a module variable so it does not get garbage collected - */ - snprintf(var, sizeof(var), "@@%s", StringValueCStr(name)); - rb_cv_set(module, var, self); - rb_define_singleton_method(module, StringValueCStr(name), rbffi_MethodHandle_CodeAddress(fn->methodHandle), -1); @@ -462,6 +531,7 @@ function_set_autorelease(VALUE self, VALUE autorelease) { Function* fn; + rb_check_frozen(self); TypedData_Get_Struct(self, Function, &function_data_type, fn); fn->autorelease = RTEST(autorelease); @@ -479,6 +549,16 @@ function_autorelease_p(VALUE self) return fn->autorelease ? Qtrue : Qfalse; } +static VALUE +function_type(VALUE self) +{ + Function* fn; + + TypedData_Get_Struct(self, Function, &function_data_type, fn); + + return fn->rbFunctionInfo; +} + /* * call-seq: free * @return [self] @@ -504,6 +584,7 @@ function_release(VALUE self) static void callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data) { + Function* fn; struct gvl_callback cb = { 0 }; cb.closure = (Closure *) user_data; @@ -511,6 +592,7 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data) cb.parameters = parameters; cb.done = false; cb.frame = rbffi_frame_current(); + fn = (Function *) cb.closure->info; if (cb.frame != NULL) cb.frame->exc = Qnil; @@ -523,18 +605,19 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data) #if defined(DEFER_ASYNC_CALLBACK) && !defined(_WIN32) } else { bool empty = false; + struct async_cb_dispatcher *ctx = fn->dispatcher; pthread_mutex_init(&cb.async_mutex, NULL); pthread_cond_init(&cb.async_cond, NULL); - /* Now signal the async callback thread */ - pthread_mutex_lock(&async_cb_mutex); - empty = async_cb_list == NULL; - cb.next = async_cb_list; - async_cb_list = &cb; + /* Now signal the async callback dispatcher thread */ + pthread_mutex_lock(&ctx->async_cb_mutex); + empty = ctx->async_cb_list == NULL; + cb.next = ctx->async_cb_list; + ctx->async_cb_list = &cb; - pthread_cond_signal(&async_cb_cond); - pthread_mutex_unlock(&async_cb_mutex); + pthread_cond_signal(&ctx->async_cb_cond); + pthread_mutex_unlock(&ctx->async_cb_mutex); /* Wait for the thread executing the ruby callback to signal it is done */ pthread_mutex_lock(&cb.async_mutex); @@ -548,17 +631,18 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data) #elif defined(DEFER_ASYNC_CALLBACK) && defined(_WIN32) } else { bool empty = false; + struct async_cb_dispatcher *ctx = fn->dispatcher; cb.async_event = CreateEvent(NULL, FALSE, FALSE, NULL); - /* Now signal the async callback thread */ - EnterCriticalSection(&async_cb_lock); - empty = async_cb_list == NULL; - cb.next = async_cb_list; - async_cb_list = &cb; - LeaveCriticalSection(&async_cb_lock); + /* Now signal the async callback dispatcher thread */ + EnterCriticalSection(&ctx->async_cb_lock); + empty = ctx->async_cb_list == NULL; + cb.next = ctx->async_cb_list; + ctx->async_cb_list = &cb; + LeaveCriticalSection(&ctx->async_cb_lock); - SetEvent(async_cb_cond); + SetEvent(ctx->async_cb_cond); /* Wait for the thread executing the ruby callback to signal it is done */ WaitForSingleObject(cb.async_event, INFINITE); @@ -569,6 +653,7 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data) #if defined(DEFER_ASYNC_CALLBACK) struct async_wait { + struct async_cb_dispatcher *dispatcher; void* cb; bool stop; }; @@ -577,9 +662,10 @@ static void * async_cb_wait(void *); static void async_cb_stop(void *); static VALUE -async_cb_event(void* unused) +async_cb_event(void* ptr) { - struct async_wait w = { 0 }; + struct async_cb_dispatcher *ctx = (struct async_cb_dispatcher *)ptr; + struct async_wait w = { ctx }; w.stop = false; while (!w.stop) { @@ -600,23 +686,24 @@ static void * async_cb_wait(void *data) { struct async_wait* w = (struct async_wait *) data; + struct async_cb_dispatcher *ctx = w->dispatcher; w->cb = NULL; - EnterCriticalSection(&async_cb_lock); + EnterCriticalSection(&ctx->async_cb_lock); - while (!w->stop && async_cb_list == NULL) { - LeaveCriticalSection(&async_cb_lock); - WaitForSingleObject(async_cb_cond, INFINITE); - EnterCriticalSection(&async_cb_lock); + while (!w->stop && ctx->async_cb_list == NULL) { + LeaveCriticalSection(&ctx->async_cb_lock); + WaitForSingleObject(ctx->async_cb_cond, INFINITE); + EnterCriticalSection(&ctx->async_cb_lock); } - if (async_cb_list != NULL) { - w->cb = async_cb_list; - async_cb_list = async_cb_list->next; + if (ctx->async_cb_list != NULL) { + w->cb = ctx->async_cb_list; + ctx->async_cb_list = ctx->async_cb_list->next; } - LeaveCriticalSection(&async_cb_lock); + LeaveCriticalSection(&ctx->async_cb_lock); return NULL; } @@ -625,11 +712,12 @@ static void async_cb_stop(void *data) { struct async_wait* w = (struct async_wait *) data; + struct async_cb_dispatcher *ctx = w->dispatcher; - EnterCriticalSection(&async_cb_lock); + EnterCriticalSection(&ctx->async_cb_lock); w->stop = true; - LeaveCriticalSection(&async_cb_lock); - SetEvent(async_cb_cond); + LeaveCriticalSection(&ctx->async_cb_lock); + SetEvent(ctx->async_cb_cond); } #else @@ -637,21 +725,22 @@ static void * async_cb_wait(void *data) { struct async_wait* w = (struct async_wait *) data; + struct async_cb_dispatcher *ctx = w->dispatcher; w->cb = NULL; - pthread_mutex_lock(&async_cb_mutex); + pthread_mutex_lock(&ctx->async_cb_mutex); - while (!w->stop && async_cb_list == NULL) { - pthread_cond_wait(&async_cb_cond, &async_cb_mutex); + while (!w->stop && ctx->async_cb_list == NULL) { + pthread_cond_wait(&ctx->async_cb_cond, &ctx->async_cb_mutex); } - if (async_cb_list != NULL) { - w->cb = async_cb_list; - async_cb_list = async_cb_list->next; + if (ctx->async_cb_list != NULL) { + w->cb = ctx->async_cb_list; + ctx->async_cb_list = ctx->async_cb_list->next; } - pthread_mutex_unlock(&async_cb_mutex); + pthread_mutex_unlock(&ctx->async_cb_mutex); return NULL; } @@ -660,11 +749,12 @@ static void async_cb_stop(void *data) { struct async_wait* w = (struct async_wait *) data; + struct async_cb_dispatcher *ctx = w->dispatcher; - pthread_mutex_lock(&async_cb_mutex); + pthread_mutex_lock(&ctx->async_cb_mutex); w->stop = true; - pthread_cond_signal(&async_cb_cond); - pthread_mutex_unlock(&async_cb_mutex); + pthread_cond_signal(&ctx->async_cb_cond); + pthread_mutex_unlock(&ctx->async_cb_mutex); } #endif @@ -941,6 +1031,7 @@ rbffi_Function_Init(VALUE moduleFFI) rb_define_method(rbffi_FunctionClass, "attach", function_attach, 2); rb_define_method(rbffi_FunctionClass, "free", function_release, 0); rb_define_method(rbffi_FunctionClass, "autorelease=", function_set_autorelease, 1); + rb_define_private_method(rbffi_FunctionClass, "type", function_type, 0); /* * call-seq: autorelease * @return [Boolean] @@ -960,8 +1051,7 @@ rbffi_Function_Init(VALUE moduleFFI) id_cb_ref = rb_intern("@__ffi_callback__"); id_to_native = rb_intern("to_native"); id_from_native = rb_intern("from_native"); -#if defined(_WIN32) - InitializeCriticalSection(&async_cb_lock); - async_cb_cond = CreateEvent(NULL, FALSE, FALSE, NULL); +#if defined(DEFER_ASYNC_CALLBACK) && defined(HAVE_RB_EXT_RACTOR_SAFE) + async_cb_dispatcher_key = rb_ractor_local_storage_ptr_newkey(&async_cb_dispatcher_key_type); #endif } |