From 411d0eec11f3618066a293ee72810bf48168adc8 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Mon, 26 Sep 2022 19:37:28 +1300 Subject: Update `IO::Buffer` read/write to use rb_thread_io_blocking_region. (#6438) --- io_buffer.c | 161 +++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 121 insertions(+), 40 deletions(-) (limited to 'io_buffer.c') diff --git a/io_buffer.c b/io_buffer.c index 8ec5dc984b..4326d21def 100644 --- a/io_buffer.c +++ b/io_buffer.c @@ -6,15 +6,16 @@ **********************************************************************/ -#include "internal/array.h" #include "ruby/io.h" #include "ruby/io/buffer.h" #include "ruby/fiber/scheduler.h" #include "internal.h" -#include "internal/string.h" +#include "internal/array.h" #include "internal/bits.h" #include "internal/error.h" +#include "internal/string.h" +#include "internal/thread.h" VALUE rb_cIOBuffer; VALUE rb_eIOBufferLockedError; @@ -2281,6 +2282,20 @@ io_buffer_default_size(size_t page_size) return platform_agnostic_default_size; } +struct io_buffer_read_internal_argument { + int descriptor; + void *base; + size_t size; +}; + +static VALUE +io_buffer_read_internal(void *_argument) +{ + struct io_buffer_read_internal_argument *argument = _argument; + ssize_t result = read(argument->descriptor, argument->base, argument->size); + return rb_fiber_scheduler_io_result(result, errno); +} + VALUE rb_io_buffer_read(VALUE self, VALUE io, size_t length) { @@ -2304,9 +2319,13 @@ rb_io_buffer_read(VALUE self, VALUE io, size_t length) size_t size; io_buffer_get_bytes_for_writing(data, &base, &size); - ssize_t result = read(descriptor, base, size); + struct io_buffer_read_internal_argument argument = { + .descriptor = descriptor, + .base = base, + .size = length, + }; - return rb_fiber_scheduler_io_result(result, errno); + return rb_thread_io_blocking_region(io_buffer_read_internal, &argument, descriptor); } static VALUE @@ -2315,6 +2334,38 @@ io_buffer_read(VALUE self, VALUE io, VALUE length) return rb_io_buffer_read(self, io, RB_NUM2SIZE(length)); } +struct io_buffer_pread_internal_argument { + int descriptor; + void *base; + size_t size; + off_t offset; +}; + +static VALUE +io_buffer_pread_internal(void *_argument) +{ + struct io_buffer_pread_internal_argument *argument = _argument; + +#if defined(HAVE_PREAD) + ssize_t result = pread(argument->descriptor, argument->base, argument->size, argument->offset); +#else + // This emulation is not thread safe. + rb_off_t offset = lseek(argument->descriptor, 0, SEEK_CUR); + if (offset == (rb_off_t)-1) + return rb_fiber_scheduler_io_result(-1, errno); + + if (lseek(argument->descriptor, argument->offset, SEEK_SET) == (rb_off_t)-1) + return rb_fiber_scheduler_io_result(-1, errno); + + ssize_t result = read(argument->descriptor, argument->base, argument->size); + + if (lseek(argument->descriptor, offset, SEEK_SET) == (rb_off_t)-1) + return rb_fiber_scheduler_io_result(-1, errno); +#endif + + return rb_fiber_scheduler_io_result(result, errno); +} + VALUE rb_io_buffer_pread(VALUE self, VALUE io, size_t length, rb_off_t offset) { @@ -2338,24 +2389,14 @@ rb_io_buffer_pread(VALUE self, VALUE io, size_t length, rb_off_t offset) size_t size; io_buffer_get_bytes_for_writing(data, &base, &size); -#if defined(HAVE_PREAD) - ssize_t result = pread(descriptor, base, size, offset); -#else - // This emulation is not thread safe, but the GVL means it's unlikely to be a problem. - rb_off_t current_offset = lseek(descriptor, 0, SEEK_CUR); - if (current_offset == (rb_off_t)-1) - return rb_fiber_scheduler_io_result(-1, errno); - - if (lseek(descriptor, offset, SEEK_SET) == (rb_off_t)-1) - return rb_fiber_scheduler_io_result(-1, errno); - - ssize_t result = read(descriptor, base, size); - - if (lseek(descriptor, current_offset, SEEK_SET) == (rb_off_t)-1) - return rb_fiber_scheduler_io_result(-1, errno); -#endif + struct io_buffer_pread_internal_argument argument = { + .descriptor = descriptor, + .base = base, + .size = length, + .offset = offset, + }; - return rb_fiber_scheduler_io_result(result, errno); + return rb_thread_io_blocking_region(io_buffer_pread_internal, &argument, descriptor); } static VALUE @@ -2364,6 +2405,20 @@ io_buffer_pread(VALUE self, VALUE io, VALUE length, VALUE offset) return rb_io_buffer_pread(self, io, RB_NUM2SIZE(length), NUM2OFFT(offset)); } +struct io_buffer_write_internal_argument { + int descriptor; + const void *base; + size_t size; +}; + +static VALUE +io_buffer_write_internal(void *_argument) +{ + struct io_buffer_write_internal_argument *argument = _argument; + ssize_t result = write(argument->descriptor, argument->base, argument->size); + return rb_fiber_scheduler_io_result(result, errno); +} + VALUE rb_io_buffer_write(VALUE self, VALUE io, size_t length) { @@ -2387,9 +2442,13 @@ rb_io_buffer_write(VALUE self, VALUE io, size_t length) size_t size; io_buffer_get_bytes_for_reading(data, &base, &size); - ssize_t result = write(descriptor, base, length); + struct io_buffer_write_internal_argument argument = { + .descriptor = descriptor, + .base = base, + .size = length, + }; - return rb_fiber_scheduler_io_result(result, errno); + return rb_thread_io_blocking_region(io_buffer_write_internal, &argument, descriptor); } static VALUE @@ -2398,6 +2457,38 @@ io_buffer_write(VALUE self, VALUE io, VALUE length) return rb_io_buffer_write(self, io, RB_NUM2SIZE(length)); } +struct io_buffer_pwrite_internal_argument { + int descriptor; + const void *base; + size_t size; + off_t offset; +}; + +static VALUE +io_buffer_pwrite_internal(void *_argument) +{ + struct io_buffer_pwrite_internal_argument *argument = _argument; + +#if defined(HAVE_PWRITE) + ssize_t result = pwrite(argument->descriptor, argument->base, argument->size, argument->offset); +#else + // This emulation is not thread safe. + rb_off_t offset = lseek(argument->descriptor, 0, SEEK_CUR); + if (offset == (rb_off_t)-1) + return rb_fiber_scheduler_io_result(-1, errno); + + if (lseek(argument->descriptor, argument->offset, SEEK_SET) == (rb_off_t)-1) + return rb_fiber_scheduler_io_result(-1, errno); + + ssize_t result = write(argument->descriptor, argument->base, argument->size); + + if (lseek(argument->descriptor, offset, SEEK_SET) == (rb_off_t)-1) + return rb_fiber_scheduler_io_result(-1, errno); +#endif + + return rb_fiber_scheduler_io_result(result, errno); +} + VALUE rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, rb_off_t offset) { @@ -2421,24 +2512,14 @@ rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, rb_off_t offset) size_t size; io_buffer_get_bytes_for_reading(data, &base, &size); -#if defined(HAVE_PWRITE) - ssize_t result = pwrite(descriptor, base, length, offset); -#else - // This emulation is not thread safe, but the GVL means it's unlikely to be a problem. - rb_off_t current_offset = lseek(descriptor, 0, SEEK_CUR); - if (current_offset == (rb_off_t)-1) - return rb_fiber_scheduler_io_result(-1, errno); - - if (lseek(descriptor, offset, SEEK_SET) == (rb_off_t)-1) - return rb_fiber_scheduler_io_result(-1, errno); + struct io_buffer_pwrite_internal_argument argument = { + .descriptor = descriptor, + .base = base, + .size = length, + .offset = offset, + }; - ssize_t result = write(descriptor, base, length); - - if (lseek(descriptor, current_offset, SEEK_SET) == (rb_off_t)-1) - return rb_fiber_scheduler_io_result(-1, errno); -#endif - - return rb_fiber_scheduler_io_result(result, errno); + return rb_thread_io_blocking_region(io_buffer_pwrite_internal, &argument, descriptor); } static VALUE -- cgit v1.2.1