From bed920f0731a1a89a0e5fc7a7428d21be3ffb8a0 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 23 Dec 2021 12:20:09 +1300 Subject: Add fiber scheduler hooks for `pread`/`pwrite`, and add support to `IO::Buffer`. --- common.mk | 1 + include/ruby/fiber/scheduler.h | 26 +++++++ include/ruby/io/buffer.h | 6 ++ io_buffer.c | 173 +++++++++++++++++++++++++++++++++++++++++ scheduler.c | 27 ++++++- test/ruby/test_io_buffer.rb | 57 ++++++++++++++ 6 files changed, 288 insertions(+), 2 deletions(-) diff --git a/common.mk b/common.mk index 8fc0a590f8..8791069af7 100644 --- a/common.mk +++ b/common.mk @@ -7585,6 +7585,7 @@ io_buffer.$(OBJEXT): {$(VPATH)}backward/2/stdarg.h io_buffer.$(OBJEXT): {$(VPATH)}config.h io_buffer.$(OBJEXT): {$(VPATH)}defines.h io_buffer.$(OBJEXT): {$(VPATH)}encoding.h +io_buffer.$(OBJEXT): {$(VPATH)}fiber/scheduler.h io_buffer.$(OBJEXT): {$(VPATH)}intern.h io_buffer.$(OBJEXT): {$(VPATH)}internal.h io_buffer.$(OBJEXT): {$(VPATH)}internal/anyargs.h diff --git a/include/ruby/fiber/scheduler.h b/include/ruby/fiber/scheduler.h index ff587e28c0..a255a1a712 100644 --- a/include/ruby/fiber/scheduler.h +++ b/include/ruby/fiber/scheduler.h @@ -261,6 +261,32 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t */ VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length); +/** + * Nonblocking read from the passed IO at the specified offset. + * + * @param[in] scheduler Target scheduler. + * @param[out] io An io object to read from. + * @param[out] buffer Return buffer. + * @param[in] length Requested number of bytes to read. + * @param[in] offset The offset in the given IO to read the data from. + * @retval RUBY_Qundef `scheduler` doesn't have `#io_read`. + * @return otherwise What `scheduler.io_read` returns. + */ +VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset); + +/** + * Nonblocking write to the passed IO at the specified offset. + * + * @param[in] scheduler Target scheduler. + * @param[out] io An io object to write to. + * @param[in] buffer What to write. + * @param[in] length Number of bytes to write. + * @param[in] offset The offset in the given IO to write the data to. + * @retval RUBY_Qundef `scheduler` doesn't have `#io_write`. + * @return otherwise What `scheduler.io_write` returns. + */ +VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset); + /** * Nonblocking read from the passed IO using a native buffer. * diff --git a/include/ruby/io/buffer.h b/include/ruby/io/buffer.h index 4826a7a76f..907fec20bb 100644 --- a/include/ruby/io/buffer.h +++ b/include/ruby/io/buffer.h @@ -80,6 +80,12 @@ VALUE rb_io_buffer_transfer(VALUE self); void rb_io_buffer_resize(VALUE self, size_t size); void rb_io_buffer_clear(VALUE self, uint8_t value, size_t offset, size_t length); +// The length is the minimum required length. +VALUE rb_io_buffer_read(VALUE self, VALUE io, size_t length); +VALUE rb_io_buffer_pread(VALUE self, VALUE io, size_t length, off_t offset); +VALUE rb_io_buffer_write(VALUE self, VALUE io, size_t length); +VALUE rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, off_t offset); + RBIMPL_SYMBOL_EXPORT_END() #endif /* RUBY_IO_BUFFER_T */ diff --git a/io_buffer.c b/io_buffer.c index 4487cab773..fa28c59b32 100644 --- a/io_buffer.c +++ b/io_buffer.c @@ -8,6 +8,7 @@ #include "ruby/io.h" #include "ruby/io/buffer.h" +#include "ruby/fiber/scheduler.h" #include "internal.h" #include "internal/string.h" @@ -1864,6 +1865,172 @@ size_t io_buffer_default_size(size_t page_size) { return platform_agnostic_default_size; } +VALUE +rb_io_buffer_read(VALUE self, VALUE io, size_t length) +{ + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + VALUE result = rb_fiber_scheduler_io_read(scheduler, io, self, length); + + if (result != Qundef) { + return result; + } + } + + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + io_buffer_validate_range(data, 0, length); + + int descriptor = rb_io_descriptor(io); + + void * base; + size_t size; + io_buffer_get_bytes_for_writing(data, &base, &size); + + ssize_t result = read(descriptor, base, size); + + return rb_fiber_scheduler_io_result(result, errno); +} + +static VALUE +io_buffer_read(VALUE self, VALUE io, VALUE length) +{ + return rb_io_buffer_read(self, io, RB_NUM2SIZE(length)); +} + +VALUE +rb_io_buffer_pread(VALUE self, VALUE io, size_t length, off_t offset) +{ + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, self, length, offset); + + if (result != Qundef) { + return result; + } + } + + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + io_buffer_validate_range(data, 0, length); + + int descriptor = rb_io_descriptor(io); + + void * base; + size_t size; + io_buffer_get_bytes_for_writing(data, &base, &size); + +#if defined(HAVE_PREAD) + ssize_t result = pread(descriptor, base, size, offset); +#else + // This emulation is not thread safe, but the GVL means it's unlikely to be a problem. + off_t current_offset = lseek(descriptor, 0, SEEK_CUR); + if (current_offset == (off_t)-1) + return rb_fiber_scheduler_io_result(-1, errno); + + if (lseek(descriptor, offset, SEEK_SET) == (off_t)-1) + return rb_fiber_scheduler_io_result(-1, errno); + + ssize_t result = read(descriptor, base, size); + + if (lseek(descriptor, current_offset, SEEK_SET) == (off_t)-1) + return rb_fiber_scheduler_io_result(-1, errno); +#endif + + return rb_fiber_scheduler_io_result(result, errno); +} + +static VALUE +io_buffer_pread(VALUE self, VALUE io, VALUE length, VALUE offset) +{ + return rb_io_buffer_pread(self, io, RB_NUM2SIZE(length), NUM2OFFT(offset)); +} + +VALUE +rb_io_buffer_write(VALUE self, VALUE io, size_t length) +{ + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + VALUE result = rb_fiber_scheduler_io_write(scheduler, io, self, length); + + if (result != Qundef) { + return result; + } + } + + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + io_buffer_validate_range(data, 0, length); + + int descriptor = rb_io_descriptor(io); + + const void * base; + size_t size; + io_buffer_get_bytes_for_reading(data, &base, &size); + + ssize_t result = write(descriptor, base, length); + + return rb_fiber_scheduler_io_result(result, errno); +} + +static VALUE +io_buffer_write(VALUE self, VALUE io, VALUE length) +{ + return rb_io_buffer_write(self, io, RB_NUM2SIZE(length)); +} + +VALUE +rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, off_t offset) +{ + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, self, length, OFFT2NUM(offset)); + + if (result != Qundef) { + return result; + } + } + + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + io_buffer_validate_range(data, 0, length); + + int descriptor = rb_io_descriptor(io); + + const void * base; + size_t size; + io_buffer_get_bytes_for_reading(data, &base, &size); + +#if defined(HAVE_PWRITE) + ssize_t result = pwrite(descriptor, base, length, offset); +#else + // This emulation is not thread safe, but the GVL means it's unlikely to be a problem. + off_t current_offset = lseek(descriptor, 0, SEEK_CUR); + if (current_offset == (off_t)-1) + return rb_fiber_scheduler_io_result(-1, errno); + + if (lseek(descriptor, offset, SEEK_SET) == (off_t)-1) + return rb_fiber_scheduler_io_result(-1, errno); + + ssize_t result = write(descriptor, base, length); + + if (lseek(descriptor, current_offset, SEEK_SET) == (off_t)-1) + return rb_fiber_scheduler_io_result(-1, errno); +#endif + + return rb_fiber_scheduler_io_result(result, errno); +} + +static VALUE +io_buffer_pwrite(VALUE self, VALUE io, VALUE length, VALUE offset) +{ + return rb_io_buffer_pwrite(self, io, RB_NUM2SIZE(length), NUM2OFFT(offset)); +} + /* * Document-class: IO::Buffer * @@ -2038,4 +2205,10 @@ Init_IO_Buffer(void) rb_define_method(rb_cIOBuffer, "get_string", io_buffer_get_string, -1); rb_define_method(rb_cIOBuffer, "set_string", io_buffer_set_string, -1); + + // IO operations: + rb_define_method(rb_cIOBuffer, "read", io_buffer_read, 2); + rb_define_method(rb_cIOBuffer, "pread", io_buffer_pread, 3); + rb_define_method(rb_cIOBuffer, "write", io_buffer_write, 2); + rb_define_method(rb_cIOBuffer, "pwrite", io_buffer_pwrite, 3); } diff --git a/scheduler.c b/scheduler.c index 51696ab18f..06658356b1 100644 --- a/scheduler.c +++ b/scheduler.c @@ -25,8 +25,8 @@ static ID id_timeout_after; static ID id_kernel_sleep; static ID id_process_wait; -static ID id_io_read; -static ID id_io_write; +static ID id_io_read, id_io_pread; +static ID id_io_write, id_io_pwrite; static ID id_io_wait; static ID id_io_close; @@ -46,7 +46,10 @@ Init_Fiber_Scheduler(void) id_process_wait = rb_intern_const("process_wait"); id_io_read = rb_intern_const("io_read"); + id_io_pread = rb_intern_const("io_pread"); id_io_write = rb_intern_const("io_write"); + id_io_pwrite = rb_intern_const("io_pwrite"); + id_io_wait = rb_intern_const("io_wait"); id_io_close = rb_intern_const("io_close"); @@ -238,6 +241,16 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt return rb_check_funcall(scheduler, id_io_read, 3, arguments); } +VALUE +rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset) +{ + VALUE arguments[] = { + io, buffer, SIZET2NUM(length), OFFT2NUM(offset) + }; + + return rb_check_funcall(scheduler, id_io_pread, 4, arguments); +} + VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length) { @@ -248,6 +261,16 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng return rb_check_funcall(scheduler, id_io_write, 3, arguments); } +VALUE +rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset) +{ + VALUE arguments[] = { + io, buffer, SIZET2NUM(length), OFFT2NUM(offset) + }; + + return rb_check_funcall(scheduler, id_io_pwrite, 4, arguments); +} + VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length) { diff --git a/test/ruby/test_io_buffer.rb b/test/ruby/test_io_buffer.rb index 362845ec2a..7e3b467ed5 100644 --- a/test/ruby/test_io_buffer.rb +++ b/test/ruby/test_io_buffer.rb @@ -1,5 +1,7 @@ # frozen_string_literal: false +require 'tempfile' + class TestIOBuffer < Test::Unit::TestCase experimental = Warning[:experimental] begin @@ -270,4 +272,59 @@ class TestIOBuffer < Test::Unit::TestCase input.close end + + def test_read + io = Tempfile.new + io.write("Hello World") + io.seek(0) + + buffer = IO::Buffer.new(128) + buffer.read(io, 5) + + assert_equal "Hello", buffer.get_string(0, 5) + ensure + io.close! + end + + def test_write + io = Tempfile.new + + buffer = IO::Buffer.new(128) + buffer.set_string("Hello") + buffer.write(io, 5) + + io.seek(0) + assert_equal "Hello", io.read(5) + ensure + io.close! + end + + def test_pread + io = Tempfile.new + io.write("Hello World") + io.seek(0) + + buffer = IO::Buffer.new(128) + buffer.pread(io, 5, 6) + + assert_equal "World", buffer.get_string(0, 5) + assert_equal 0, io.tell + ensure + io.close! + end + + def test_pwrite + io = Tempfile.new + + buffer = IO::Buffer.new(128) + buffer.set_string("World") + buffer.pwrite(io, 5, 6) + + assert_equal 0, io.tell + + io.seek(6) + assert_equal "World", io.read(5) + ensure + io.close! + end end -- cgit v1.2.1