1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
|
/**********************************************************************
scheduler.c
$Author$
Copyright (C) 2020 Samuel Grant Dawson Williams
**********************************************************************/
#include "vm_core.h"
#include "ruby/fiber/scheduler.h"
#include "ruby/io.h"
#include "ruby/io/buffer.h"
#include "internal/thread.h"
static ID id_close;
static ID id_scheduler_close;
static ID id_block;
static ID id_unblock;
static ID id_timeout_after;
static ID id_kernel_sleep;
static ID id_process_wait;
static ID id_io_read, id_io_pread;
static ID id_io_write, id_io_pwrite;
static ID id_io_wait;
static ID id_io_select;
static ID id_io_close;
static ID id_address_resolve;
static ID id_fiber_schedule;
/*
* Document-class: Fiber::Scheduler
*
* This is not an existing class, but documentation of the interface that Scheduler
* object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking
* fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations
* of some concepts.
*
* Scheduler's behavior and usage are expected to be as follows:
*
* * When the execution in the non-blocking Fiber reaches some blocking operation (like
* sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's
* hook methods, listed below.
* * Scheduler somehow registers what the current fiber is waiting on, and yields control
* to other fibers with Fiber.yield (so the fiber would be suspended while expecting its
* wait to end, and other fibers in the same thread can perform)
* * At the end of the current thread execution, the scheduler's method #scheduler_close is called
* * The scheduler runs into a wait loop, checking all the blocked fibers (which it has
* registered on hook calls) and resuming them when the awaited resource is ready
* (e.g. I/O ready or sleep time elapsed).
*
* This way concurrent execution will be achieved transparently for every
* individual Fiber's code.
*
* Scheduler implementations are provided by gems, like
* Async[https://github.com/socketry/async].
*
* Hook methods are:
*
* * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite, and #io_select, #io_close
* * #process_wait
* * #kernel_sleep
* * #timeout_after
* * #address_resolve
* * #block and #unblock
* * (the list is expanded as Ruby developers make more methods having non-blocking calls)
*
* When not specified otherwise, the hook implementations are mandatory: if they are not
* implemented, the methods trying to call hook will fail. To provide backward compatibility,
* in the future hooks will be optional (if they are not implemented, due to the scheduler
* being created for the older Ruby version, the code which needs this hook will not fail,
* and will just behave in a blocking fashion).
*
* It is also strongly recommended that the scheduler implements the #fiber method, which is
* delegated to by Fiber.schedule.
*
* Sample _toy_ implementation of the scheduler can be found in Ruby's code, in
* <tt>test/fiber/scheduler.rb</tt>
*
*/
void
Init_Fiber_Scheduler(void)
{
id_close = rb_intern_const("close");
id_scheduler_close = rb_intern_const("scheduler_close");
id_block = rb_intern_const("block");
id_unblock = rb_intern_const("unblock");
id_timeout_after = rb_intern_const("timeout_after");
id_kernel_sleep = rb_intern_const("kernel_sleep");
id_process_wait = rb_intern_const("process_wait");
id_io_read = rb_intern_const("io_read");
id_io_pread = rb_intern_const("io_pread");
id_io_write = rb_intern_const("io_write");
id_io_pwrite = rb_intern_const("io_pwrite");
id_io_wait = rb_intern_const("io_wait");
id_io_select = rb_intern_const("io_select");
id_io_close = rb_intern_const("io_close");
id_address_resolve = rb_intern_const("address_resolve");
id_fiber_schedule = rb_intern_const("fiber");
#if 0 /* for RDoc */
rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject);
rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0);
rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_process_wait, 2);
rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_io_wait, 3);
rb_define_method(rb_cFiberScheduler, "io_read", rb_fiber_scheduler_io_read, 4);
rb_define_method(rb_cFiberScheduler, "io_write", rb_fiber_scheduler_io_write, 4);
rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_kernel_sleep, 1);
rb_define_method(rb_cFiberScheduler, "address_resolve", rb_fiber_scheduler_address_resolve, 1);
rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3);
rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2);
rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2);
rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler, -2);
#endif
}
VALUE
rb_fiber_scheduler_get(void)
{
VM_ASSERT(ruby_thread_has_gvl_p());
rb_thread_t *thread = GET_THREAD();
VM_ASSERT(thread);
return thread->scheduler;
}
static void
verify_interface(VALUE scheduler)
{
if (!rb_respond_to(scheduler, id_block)) {
rb_raise(rb_eArgError, "Scheduler must implement #block");
}
if (!rb_respond_to(scheduler, id_unblock)) {
rb_raise(rb_eArgError, "Scheduler must implement #unblock");
}
if (!rb_respond_to(scheduler, id_kernel_sleep)) {
rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep");
}
if (!rb_respond_to(scheduler, id_io_wait)) {
rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
}
}
VALUE
rb_fiber_scheduler_set(VALUE scheduler)
{
VM_ASSERT(ruby_thread_has_gvl_p());
rb_thread_t *thread = GET_THREAD();
VM_ASSERT(thread);
if (scheduler != Qnil) {
verify_interface(scheduler);
}
// We invoke Scheduler#close when setting it to something else, to ensure
// the previous scheduler runs to completion before changing the scheduler.
// That way, we do not need to consider interactions, e.g., of a Fiber from
// the previous scheduler with the new scheduler.
if (thread->scheduler != Qnil) {
rb_fiber_scheduler_close(thread->scheduler);
}
thread->scheduler = scheduler;
return thread->scheduler;
}
static VALUE
rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
{
VM_ASSERT(thread);
if (thread->blocking == 0) {
return thread->scheduler;
}
else {
return Qnil;
}
}
VALUE
rb_fiber_scheduler_current(void)
{
return rb_fiber_scheduler_current_for_threadptr(GET_THREAD());
}
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
{
return rb_fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
}
/*
*
* Document-method: Fiber::Scheduler#close
*
* Called when the current thread exits. The scheduler is expected to implement this
* method in order to allow all waiting fibers to finalize their execution.
*
* The suggested pattern is to implement the main event loop in the #close method.
*
*/
VALUE
rb_fiber_scheduler_close(VALUE scheduler)
{
VM_ASSERT(ruby_thread_has_gvl_p());
VALUE result;
// The reason for calling `scheduler_close` before calling `close` is for
// legacy schedulers which implement `close` and expect the user to call
// it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
// which should call `scheduler_close`. If it were to call `close`, it
// would create an infinite loop.
result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
if (result != Qundef) return result;
result = rb_check_funcall(scheduler, id_close, 0, NULL);
if (result != Qundef) return result;
return Qnil;
}
VALUE
rb_fiber_scheduler_make_timeout(struct timeval *timeout)
{
if (timeout) {
return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec));
}
return Qnil;
}
/*
* Document-method: Fiber::Scheduler#kernel_sleep
* call-seq: kernel_sleep(duration = nil)
*
* Invoked by Kernel#sleep and Mutex#sleep and is expected to provide
* an implementation of sleeping in a non-blocking way. Implementation might
* register the current fiber in some list of "which fiber wait until what
* moment", call Fiber.yield to pass control, and then in #close resume
* the fibers whose wait period has elapsed.
*
*/
VALUE
rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout)
{
return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
}
VALUE
rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
{
return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
}
#if 0
/*
* Document-method: Fiber::Scheduler#timeout_after
* call-seq: timeout_after(duration, exception_class, *exception_arguments, &block) -> result of block
*
* Invoked by Timeout.timeout to execute the given +block+ within the given
* +duration+. It can also be invoked directly by the scheduler or user code.
*
* Attempt to limit the execution time of a given +block+ to the given
* +duration+ if possible. When a non-blocking operation causes the +block+'s
* execution time to exceed the specified +duration+, that non-blocking
* operation should be interrupted by raising the specified +exception_class+
* constructed with the given +exception_arguments+.
*
* General execution timeouts are often considered risky. This implementation
* will only interrupt non-blocking operations. This is by design because it's
* expected that non-blocking operations can fail for a variety of
* unpredictable reasons, so applications should already be robust in handling
* these conditions and by implication timeouts.
*
* However, as a result of this design, if the +block+ does not invoke any
* non-blocking operations, it will be impossible to interrupt it. If you
* desire to provide predictable points for timeouts, consider adding
* +sleep(0)+.
*
* If the block is executed successfully, its result will be returned.
*
* The exception will typically be raised using Fiber#raise.
*/
VALUE
rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
{
VALUE arguments[] = {
timeout, exception, message
};
return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
}
VALUE
rb_fiber_scheduler_timeout_afterv(VALUE scheduler, int argc, VALUE * argv)
{
return rb_check_funcall(scheduler, id_timeout_after, argc, argv);
}
#endif
/*
* Document-method: Fiber::Scheduler#process_wait
* call-seq: process_wait(pid, flags)
*
* Invoked by Process::Status.wait in order to wait for a specified process.
* See that method description for arguments description.
*
* Suggested minimal implementation:
*
* Thread.new do
* Process::Status.wait(pid, flags)
* end.value
*
* This hook is optional: if it is not present in the current scheduler,
* Process::Status.wait will behave as a blocking method.
*
* Expected to return a Process::Status instance.
*/
VALUE
rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
{
VALUE arguments[] = {
PIDT2NUM(pid), RB_INT2NUM(flags)
};
return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
}
/*
* Document-method: Fiber::Scheduler#block
* call-seq: block(blocker, timeout = nil)
*
* Invoked by methods like Thread.join, and by Mutex, to signify that current
* Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has
* elapsed.
*
* +blocker+ is what we are waiting on, informational only (for debugging and
* logging). There are no guarantee about its value.
*
* Expected to return boolean, specifying whether the blocking operation was
* successful or not.
*/
VALUE
rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
{
return rb_funcall(scheduler, id_block, 2, blocker, timeout);
}
/*
* Document-method: Fiber::Scheduler#unblock
* call-seq: unblock(blocker, fiber)
*
* Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock
* calls #block and Mutex#unlock calls #unblock). The scheduler should use
* the +fiber+ parameter to understand which fiber is unblocked.
*
* +blocker+ is what was awaited for, but it is informational only (for debugging
* and logging), and it is not guaranteed to be the same value as the +blocker+ for
* #block.
*
*/
VALUE
rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
{
VM_ASSERT(rb_obj_is_fiber(fiber));
return rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
}
/*
* Document-method: Fiber::Scheduler#io_wait
* call-seq: io_wait(io, events, timeout)
*
* Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the
* specified descriptor is ready for specified events within
* the specified +timeout+.
*
* +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and
* <tt>IO::PRIORITY</tt>.
*
* Suggested implementation should register which Fiber is waiting for which
* resources and immediately calling Fiber.yield to pass control to other
* fibers. Then, in the #close method, the scheduler might dispatch all the
* I/O resources to fibers waiting for it.
*
* Expected to return the subset of events that are ready immediately.
*
*/
VALUE
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
{
return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
}
VALUE
rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
{
return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_READABLE), rb_io_timeout(io));
}
VALUE
rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
{
return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), rb_io_timeout(io));
}
/*
* Document-method: Fiber::Scheduler#io_select
* call-seq: io_select(readables, writables, exceptables, timeout)
*
* Invoked by IO.select to ask whether the specified descriptors are ready for
* specified events within the specified +timeout+.
*
* Expected to return the 3-tuple of Array of IOs that are ready.
*
*/
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
{
VALUE arguments[] = {
readables, writables, exceptables, timeout
};
return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
}
VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
{
// I wondered about extracting argv, and checking if there is only a single
// IO instance, and instead calling `io_wait`. However, it would require a
// decent amount of work and it would be hard to preserve the exact
// semantics of IO.select.
return rb_check_funcall(scheduler, id_io_select, argc, argv);
}
/*
* Document-method: Fiber::Scheduler#io_read
* call-seq: io_read(io, buffer, length) -> read length or -errno
*
* Invoked by IO#read to read +length+ bytes from +io+ into a specified
* +buffer+ (see IO::Buffer).
*
* The +length+ argument is the "minimum length to be read".
* If the IO buffer size is 8KiB, but the +length+ is +1024+ (1KiB), up to
* 8KiB might be read, but at least 1KiB will be.
* Generally, the only case where less data than +length+ will be read is if
* there is an error reading the data.
*
* Specifying a +length+ of 0 is valid and means try reading at least once
* and return any available data.
*
* Suggested implementation should try to read from +io+ in a non-blocking
* manner and call #io_wait if the +io+ is not ready (which will yield control
* to other fibers).
*
* See IO::Buffer for an interface available to return data.
*
* Expected to return number of bytes read, or, in case of an error, <tt>-errno</tt>
* (negated number corresponding to system's error code).
*
* The method should be considered _experimental_.
*/
VALUE
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
VALUE arguments[] = {
io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};
return rb_check_funcall(scheduler, id_io_read, 4, arguments);
}
VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
VALUE arguments[] = {
io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};
return rb_check_funcall(scheduler, id_io_pread, 5, arguments);
}
/*
* Document-method: Scheduler#io_write
* call-seq: io_write(io, buffer, length) -> written length or -errno
*
* Invoked by IO#write to write +length+ bytes to +io+ from
* from a specified +buffer+ (see IO::Buffer).
*
* The +length+ argument is the "(minimum) length to be written".
* If the IO buffer size is 8KiB, but the +length+ specified is 1024 (1KiB),
* at most 8KiB will be written, but at least 1KiB will be.
* Generally, the only case where less data than +length+ will be written is if
* there is an error writing the data.
*
* Specifying a +length+ of 0 is valid and means try writing at least once,
* as much data as possible.
*
* Suggested implementation should try to write to +io+ in a non-blocking
* manner and call #io_wait if the +io+ is not ready (which will yield control
* to other fibers).
*
* See IO::Buffer for an interface available to get data from buffer efficiently.
*
* Expected to return number of bytes written, or, in case of an error, <tt>-errno</tt>
* (negated number corresponding to system's error code).
*
* The method should be considered _experimental_.
*/
VALUE
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
VALUE arguments[] = {
io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};
return rb_check_funcall(scheduler, id_io_write, 4, arguments);
}
VALUE
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
VALUE arguments[] = {
io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};
return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments);
}
VALUE
rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
{
VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0);
rb_io_buffer_unlock(buffer);
rb_io_buffer_free(buffer);
return result;
}
VALUE
rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
{
VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0);
rb_io_buffer_unlock(buffer);
rb_io_buffer_free(buffer);
return result;
}
VALUE
rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
{
VALUE arguments[] = {io};
return rb_check_funcall(scheduler, id_io_close, 1, arguments);
}
/*
* Document-method: Fiber::Scheduler#address_resolve
* call-seq: address_resolve(hostname) -> array_of_strings or nil
*
* Invoked by any method that performs a non-reverse DNS lookup. The most
* notable method is Addrinfo.getaddrinfo, but there are many other.
*
* The method is expected to return an array of strings corresponding to ip
* addresses the +hostname+ is resolved to, or +nil+ if it can not be resolved.
*
* Fairly exhaustive list of all possible call-sites:
*
* - Addrinfo.getaddrinfo
* - Addrinfo.tcp
* - Addrinfo.udp
* - Addrinfo.ip
* - Addrinfo.new
* - Addrinfo.marshal_load
* - SOCKSSocket.new
* - TCPServer.new
* - TCPSocket.new
* - IPSocket.getaddress
* - TCPSocket.gethostbyname
* - UDPSocket#connect
* - UDPSocket#bind
* - UDPSocket#send
* - Socket.getaddrinfo
* - Socket.gethostbyname
* - Socket.pack_sockaddr_in
* - Socket.sockaddr_in
* - Socket.unpack_sockaddr_in
*/
VALUE
rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
{
VALUE arguments[] = {
hostname
};
return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
}
/*
* Document-method: Fiber::Scheduler#fiber
* call-seq: fiber(&block)
*
* Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately
* run the given block of code in a separate non-blocking fiber, and to return that Fiber.
*
* Minimal suggested implementation is:
*
* def fiber(&block)
* fiber = Fiber.new(blocking: false, &block)
* fiber.resume
* fiber
* end
*/
VALUE
rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
{
return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
}
|