summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/ruby/fiber/scheduler.h78
-rw-r--r--include/ruby/io/buffer.h2
-rw-r--r--io.c42
-rw-r--r--io_buffer.c27
-rw-r--r--scheduler.c76
-rw-r--r--test/fiber/scheduler.rb58
-rw-r--r--test/fiber/test_io_buffer.rb41
7 files changed, 264 insertions, 60 deletions
diff --git a/include/ruby/fiber/scheduler.h b/include/ruby/fiber/scheduler.h
index 250b39b6df..ad3d2d7483 100644
--- a/include/ruby/fiber/scheduler.h
+++ b/include/ruby/fiber/scheduler.h
@@ -267,10 +267,10 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv);
* Non-blocking read from the passed IO.
*
* @param[in] scheduler Target scheduler.
- * @param[out] io An io object to read from.
- * @param[out] buffer Return buffer.
- * @param[in] length Requested number of bytes to read.
- * @param[in] offset The offset in the buffer to read to.
+ * @param[in] io An io object to read from.
+ * @param[in] buffer The buffer to read to.
+ * @param[in] length The minimum number of bytes to read.
+ * @param[in] offset The offset in the buffer to read from.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns `[-errno, size]`.
*/
@@ -280,9 +280,9 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t
* Non-blocking write to the passed IO.
*
* @param[in] scheduler Target scheduler.
- * @param[out] io An io object to write to.
- * @param[in] buffer What to write.
- * @param[in] length Number of bytes to write.
+ * @param[in] io An io object to write to.
+ * @param[in] buffer The buffer to write from.
+ * @param[in] length The minimum number of bytes to write.
* @param[in] offset The offset in the buffer to write from.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns `[-errno, size]`.
@@ -293,10 +293,10 @@ VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_
* Non-blocking read from the passed IO at the specified offset.
*
* @param[in] scheduler Target scheduler.
- * @param[out] io An io object to read from.
- * @param[in] from The offset in the given IO to read the data from.
- * @param[out] buffer The buffer to read the data to.
- * @param[in] length Requested number of bytes to read.
+ * @param[in] io An io object to read from.
+ * @param[in] from The offset to read from.
+ * @param[in] buffer The buffer to read to.
+ * @param[in] length The minimum number of bytes to read.
* @param[in] offset The offset in the buffer to read to.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns.
@@ -307,10 +307,10 @@ VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALU
* Non-blocking write to the passed IO at the specified offset.
*
* @param[in] scheduler Target scheduler.
- * @param[out] io An io object to write to.
- * @param[in] from The offset in the given IO to write the data to.
- * @param[in] buffer The buffer to write the data from.
- * @param[in] length Number of bytes to write.
+ * @param[in] io An io object to write to.
+ * @param[in] from The offset to write to.
+ * @param[in] buffer The buffer to write from.
+ * @param[in] length The minimum number of bytes to write.
* @param[in] offset The offset in the buffer to write from.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns.
@@ -321,27 +321,55 @@ VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VAL
* Non-blocking read from the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
- * @param[out] io An io object to read from.
- * @param[out] buffer Return buffer.
- * @param[in] size Size of the return buffer.
- * @param[in] length Requested number of bytes to read.
+ * @param[in] io An io object to read from.
+ * @param[in] base The memory to read to.
+ * @param[in] size Size of the memory.
+ * @param[in] length The minimum number of bytes to read.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns.
*/
-VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *buffer, size_t size, size_t length);
+VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length);
/**
* Non-blocking write to the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
- * @param[out] io An io object to write to.
- * @param[in] buffer What to write.
- * @param[in] size Size of the buffer.
- * @param[in] length Number of bytes to write.
+ * @param[in] io An io object to write to.
+ * @param[in] base The memory to write from.
+ * @param[in] size Size of the memory.
+ * @param[in] length The minimum number of bytes to write.
+ * @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
+ * @return otherwise What `scheduler.io_write` returns.
+ */
+VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length);
+
+/**
+ * Non-blocking pread from the passed IO using a native buffer.
+ *
+ * @param[in] scheduler Target scheduler.
+ * @param[in] io An io object to read from.
+ * @param[in] from The offset to read from.
+ * @param[in] base The memory to read to.
+ * @param[in] size Size of the memory.
+ * @param[in] length The minimum number of bytes to read.
+ * @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
+ * @return otherwise What `scheduler.io_read` returns.
+ */
+VALUE rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length);
+
+/**
+ * Non-blocking pwrite to the passed IO using a native buffer.
+ *
+ * @param[in] scheduler Target scheduler.
+ * @param[in] io An io object to write to.
+ * @param[in] from The offset to write from.
+ * @param[in] base The memory to write from.
+ * @param[in] size Size of the memory.
+ * @param[in] length The minimum number of bytes to write.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns.
*/
-VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *buffer, size_t size, size_t length);
+VALUE rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length);
/**
* Non-blocking close the given IO.
diff --git a/include/ruby/io/buffer.h b/include/ruby/io/buffer.h
index 88e5598066..737fafe518 100644
--- a/include/ruby/io/buffer.h
+++ b/include/ruby/io/buffer.h
@@ -75,7 +75,9 @@ VALUE rb_io_buffer_map(VALUE io, size_t size, rb_off_t offset, enum rb_io_buffer
VALUE rb_io_buffer_lock(VALUE self);
VALUE rb_io_buffer_unlock(VALUE self);
int rb_io_buffer_try_unlock(VALUE self);
+
VALUE rb_io_buffer_free(VALUE self);
+VALUE rb_io_buffer_free_locked(VALUE self);
int rb_io_buffer_get_bytes(VALUE self, void **base, size_t *size);
void rb_io_buffer_get_bytes_for_reading(VALUE self, const void **base, size_t *size);
diff --git a/io.c b/io.c
index a023039209..be5d479099 100644
--- a/io.c
+++ b/io.c
@@ -6066,6 +6066,7 @@ rb_io_sysread(int argc, VALUE *argv, VALUE io)
#if defined(HAVE_PREAD) || defined(HAVE_PWRITE)
struct prdwr_internal_arg {
+ VALUE io;
int fd;
void *buf;
size_t count;
@@ -6075,17 +6076,28 @@ struct prdwr_internal_arg {
#if defined(HAVE_PREAD)
static VALUE
-internal_pread_func(void *arg)
+internal_pread_func(void *_arg)
{
- struct prdwr_internal_arg *p = arg;
- return (VALUE)pread(p->fd, p->buf, p->count, p->offset);
+ struct prdwr_internal_arg *arg = _arg;
+
+ return (VALUE)pread(arg->fd, arg->buf, arg->count, arg->offset);
}
static VALUE
-pread_internal_call(VALUE arg)
+pread_internal_call(VALUE _arg)
{
- struct prdwr_internal_arg *p = (struct prdwr_internal_arg *)arg;
- return rb_thread_io_blocking_region(internal_pread_func, p, p->fd);
+ struct prdwr_internal_arg *arg = (struct prdwr_internal_arg *)_arg;
+
+ VALUE scheduler = rb_fiber_scheduler_current();
+ if (scheduler != Qnil) {
+ VALUE result = rb_fiber_scheduler_io_pread_memory(scheduler, arg->io, arg->offset, arg->buf, arg->count, 0);
+
+ if (!UNDEF_P(result)) {
+ return rb_fiber_scheduler_io_result_apply(result);
+ }
+ }
+
+ return rb_thread_io_blocking_region(internal_pread_func, arg, arg->fd);
}
/*
@@ -6122,7 +6134,7 @@ rb_io_pread(int argc, VALUE *argv, VALUE io)
VALUE len, offset, str;
rb_io_t *fptr;
ssize_t n;
- struct prdwr_internal_arg arg;
+ struct prdwr_internal_arg arg = {.io = io};
int shrinkable;
rb_scan_args(argc, argv, "21", &len, &offset, &str);
@@ -6158,9 +6170,19 @@ rb_io_pread(int argc, VALUE *argv, VALUE io)
#if defined(HAVE_PWRITE)
static VALUE
-internal_pwrite_func(void *ptr)
+internal_pwrite_func(void *_arg)
{
- struct prdwr_internal_arg *arg = ptr;
+ struct prdwr_internal_arg *arg = _arg;
+
+ VALUE scheduler = rb_fiber_scheduler_current();
+ if (scheduler != Qnil) {
+ VALUE result = rb_fiber_scheduler_io_pwrite_memory(scheduler, arg->io, arg->offset, arg->buf, arg->count, 0);
+
+ if (!UNDEF_P(result)) {
+ return rb_fiber_scheduler_io_result_apply(result);
+ }
+ }
+
return (VALUE)pwrite(arg->fd, arg->buf, arg->count, arg->offset);
}
@@ -6195,7 +6217,7 @@ rb_io_pwrite(VALUE io, VALUE str, VALUE offset)
{
rb_io_t *fptr;
ssize_t n;
- struct prdwr_internal_arg arg;
+ struct prdwr_internal_arg arg = {.io = io};
VALUE tmp;
if (!RB_TYPE_P(str, T_STRING))
diff --git a/io_buffer.c b/io_buffer.c
index 2fc7ac8a80..4a08811185 100644
--- a/io_buffer.c
+++ b/io_buffer.c
@@ -1001,17 +1001,23 @@ rb_io_buffer_lock(VALUE self)
return self;
}
-VALUE
-rb_io_buffer_unlock(VALUE self)
+static void
+io_buffer_unlock(struct rb_io_buffer *data)
{
- struct rb_io_buffer *data = NULL;
- TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
-
if (!(data->flags & RB_IO_BUFFER_LOCKED)) {
rb_raise(rb_eIOBufferLockedError, "Buffer not locked!");
}
data->flags &= ~RB_IO_BUFFER_LOCKED;
+}
+
+VALUE
+rb_io_buffer_unlock(VALUE self)
+{
+ struct rb_io_buffer *data = NULL;
+ TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
+
+ io_buffer_unlock(data);
return self;
}
@@ -1123,6 +1129,17 @@ rb_io_buffer_free(VALUE self)
return self;
}
+VALUE rb_io_buffer_free_locked(VALUE self)
+{
+ struct rb_io_buffer *data = NULL;
+ TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
+
+ io_buffer_unlock(data);
+ io_buffer_free(data);
+
+ return self;
+}
+
// Validate that access to the buffer is within bounds, assuming you want to
// access length bytes from the specified offset.
static inline void
diff --git a/scheduler.c b/scheduler.c
index 477f11c03c..866e53993f 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -458,15 +458,15 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
/*
* Document-method: Fiber::Scheduler#io_read
- * call-seq: io_read(io, buffer, minimum_length) -> read length or -errno
+ * call-seq: io_read(io, buffer, length) -> read length or -errno
*
* Invoked by IO#read or IO#Buffer.read to read +length+ bytes from +io+ into a
- * specified +buffer+ (see IO::Buffer).
+ * specified +buffer+ (see IO::Buffer) at the given +offset+.
*
- * The +minimum_length+ argument is the "minimum length to be read". If the IO
- * buffer size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be
- * read, but at least 1KiB will be. Generally, the only case where less data
- * than +length+ will be read is if there is an error reading the data.
+ * The +length+ argument is the "minimum length to be read". If the IO buffer
+ * size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be read,
+ * but at least 1KiB will be. Generally, the only case where less data than
+ * +length+ will be read is if there is an error reading the data.
*
* Specifying a +length+ of 0 is valid and means try reading at least once and
* return any available data.
@@ -492,13 +492,19 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt
return rb_check_funcall(scheduler, id_io_read, 4, arguments);
}
-
/*
* Document-method: Fiber::Scheduler#io_read
* call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno
*
- * Invoked by IO::Buffer#pread. See that method for description of arguments.
+ * Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+
+ * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
+ * +offset+.
+ *
+ * This method is semantically the same as #io_read, but it allows to specify
+ * the offset to read from and is often better for asynchronous IO on the same
+ * file.
*
+ * The method should be considered _experimental_.
*/
VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
@@ -512,16 +518,16 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff
/*
* Document-method: Scheduler#io_write
- * call-seq: io_write(io, buffer, minimum_length) -> written length or -errno
+ * call-seq: io_write(io, buffer, length) -> written length or -errno
*
* Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from
- * from a specified +buffer+ (see IO::Buffer).
+ * from a specified +buffer+ (see IO::Buffer) at the given +offset+.
*
- * The +minimum_length+ argument is the "minimum length to be written". If the
- * IO buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most
- * 8KiB will be written, but at least 1KiB will be. Generally, the only case
- * where less data than +minimum_length+ will be written is if there is an
- * error writing the data.
+ * The +length+ argument is the "minimum length to be written". If the IO
+ * buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most 8KiB
+ * will be written, but at least 1KiB will be. Generally, the only case where
+ * less data than +length+ will be written is if there is an error writing the
+ * data.
*
* Specifying a +length+ of 0 is valid and means try writing at least once, as
* much data as possible.
@@ -552,7 +558,15 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng
* Document-method: Fiber::Scheduler#io_pwrite
* call-seq: io_pwrite(io, buffer, from, length, offset) -> written length or -errno
*
- * Invoked by IO::Buffer#pwrite. See that method for description of arguments.
+ * Invoked by IO#pwrite or IO::Buffer#pwrite to write +length+ bytes to +io+
+ * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
+ * +offset+.
+ *
+ * This method is semantically the same as #io_write, but it allows to specify
+ * the offset to write to and is often better for asynchronous IO on the same
+ * file.
+ *
+ * The method should be considered _experimental_.
*
*/
VALUE
@@ -572,8 +586,7 @@ rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t
VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0);
- rb_io_buffer_unlock(buffer);
- rb_io_buffer_free(buffer);
+ rb_io_buffer_free_locked(buffer);
return result;
}
@@ -585,8 +598,31 @@ rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base,
VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0);
- rb_io_buffer_unlock(buffer);
- rb_io_buffer_free(buffer);
+ rb_io_buffer_free_locked(buffer);
+
+ return result;
+}
+
+VALUE
+rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
+{
+ VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
+
+ VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, buffer, length, 0);
+
+ rb_io_buffer_free_locked(buffer);
+
+ return result;
+}
+
+VALUE
+rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
+{
+ VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
+
+ VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, buffer, length, 0);
+
+ rb_io_buffer_free_locked(buffer);
return result;
}
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index 71ca4d2789..5090271db1 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -366,6 +366,64 @@ class IOBufferScheduler < Scheduler
return total
end
+ def io_pread(io, buffer, from, length, offset)
+ total = 0
+ io.nonblock = true
+
+ while true
+ maximum_size = buffer.size - offset
+ result = blocking{buffer.pread(io, from, maximum_size, offset)}
+
+ if result > 0
+ total += result
+ offset += result
+ from += result
+ break if total >= length
+ elsif result == 0
+ break
+ elsif result == EAGAIN
+ if length > 0
+ self.io_wait(io, IO::READABLE, nil)
+ else
+ return result
+ end
+ elsif result < 0
+ return result
+ end
+ end
+
+ return total
+ end
+
+ def io_pwrite(io, buffer, from, length, offset)
+ total = 0
+ io.nonblock = true
+
+ while true
+ maximum_size = buffer.size - offset
+ result = blocking{buffer.pwrite(io, from, maximum_size, offset)}
+
+ if result > 0
+ total += result
+ offset += result
+ from += result
+ break if total >= length
+ elsif result == 0
+ break
+ elsif result == EAGAIN
+ if length > 0
+ self.io_wait(io, IO::WRITABLE, nil)
+ else
+ return result
+ end
+ elsif result < 0
+ return result
+ end
+ end
+
+ return total
+ end
+
def blocking(&block)
Fiber.blocking(&block)
end
diff --git a/test/fiber/test_io_buffer.rb b/test/fiber/test_io_buffer.rb
index 3de70200d5..a08b1ce1a9 100644
--- a/test/fiber/test_io_buffer.rb
+++ b/test/fiber/test_io_buffer.rb
@@ -155,4 +155,45 @@ class TestFiberIOBuffer < Test::Unit::TestCase
i&.close
o&.close
end
+
+ def nonblockable?(io)
+ io.nonblock{}
+ true
+ rescue
+ false
+ end
+
+ def test_io_buffer_pread_pwrite
+ file = Tempfile.new("test_io_buffer_pread_pwrite")
+
+ omit "Non-blocking file IO is not supported" unless nonblockable?(file)
+
+ source_buffer = IO::Buffer.for("Hello World!")
+ destination_buffer = IO::Buffer.new(source_buffer.size)
+
+ # Test non-scheduler code path:
+ source_buffer.pwrite(file, 1, source_buffer.size)
+ destination_buffer.pread(file, 1, source_buffer.size)
+ assert_equal source_buffer, destination_buffer
+
+ # Test scheduler code path:
+ destination_buffer.clear
+ file.truncate(0)
+
+ thread = Thread.new do
+ scheduler = IOBufferScheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ source_buffer.pwrite(file, 1, source_buffer.size)
+ destination_buffer.pread(file, 1, source_buffer.size)
+ end
+ end
+
+ thread.join
+
+ assert_equal source_buffer, destination_buffer
+ ensure
+ file&.close!
+ end
end