1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
|
# frozen_string_literal: true
# This is an example and simplified scheduler for test purposes.
# - It is not efficient for a large number of file descriptors as it uses
# IO.select().
# - It does not correctly handle multiple calls to `wait` with the same file
# descriptor and overlapping events.
# - Production fiber schedulers should use epoll/kqueue/etc. Consider using the
# [`io-event`](https://github.com/socketry/io-event) gem instead of this
# scheduler if you want something simple to build on.
require 'fiber'
require 'socket'
begin
require 'io/nonblock'
rescue LoadError
# Ignore.
end
class Scheduler
experimental = Warning[:experimental]
begin
Warning[:experimental] = false
IO::Buffer.new(0)
ensure
Warning[:experimental] = experimental
end
def initialize
@readable = {}
@writable = {}
@waiting = {}
@closed = false
@lock = Thread::Mutex.new
@blocking = Hash.new.compare_by_identity
@ready = []
@urgent = IO.pipe
end
attr :readable
attr :writable
attr :waiting
def next_timeout
_fiber, timeout = @waiting.min_by{|key, value| value}
if timeout
offset = timeout - current_time
if offset < 0
return 0
else
return offset
end
end
end
def run
# $stderr.puts [__method__, Fiber.current].inspect
while @readable.any? or @writable.any? or @waiting.any? or @blocking.any?
# May only handle file descriptors up to 1024...
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
# puts "readable: #{readable}" if readable&.any?
# puts "writable: #{writable}" if writable&.any?
selected = {}
readable&.each do |io|
if fiber = @readable.delete(io)
@writable.delete(io) if @writable[io] == fiber
selected[fiber] = IO::READABLE
elsif io == @urgent.first
@urgent.first.read_nonblock(1024)
end
end
writable&.each do |io|
if fiber = @writable.delete(io)
@readable.delete(io) if @readable[io] == fiber
selected[fiber] = selected.fetch(fiber, 0) | IO::WRITABLE
end
end
selected.each do |fiber, events|
fiber.resume(events)
end
if @waiting.any?
time = current_time
waiting, @waiting = @waiting, {}
waiting.each do |fiber, timeout|
if fiber.alive?
if timeout <= time
fiber.resume
else
@waiting[fiber] = timeout
end
end
end
end
if @ready.any?
ready = nil
@lock.synchronize do
ready, @ready = @ready, []
end
ready.each do |fiber|
fiber.resume
end
end
end
end
# A fiber scheduler hook, invoked when the scheduler goes out of scope.
def scheduler_close
close(true)
end
# If the `scheduler_close` hook does not exist, this method `close` will be
# invoked instead when the fiber scheduler goes out of scope. This is legacy
# behaviour, you should almost certainly use `scheduler_close`. The reason for
# this, is `scheduler_close` is called when the scheduler goes out of scope,
# while `close` may be called by the user.
def close(internal = false)
# $stderr.puts [__method__, Fiber.current].inspect
unless internal
if Fiber.scheduler == self
return Fiber.set_scheduler(nil)
end
end
if @closed
raise "Scheduler already closed!"
end
self.run
ensure
if @urgent
@urgent.each(&:close)
@urgent = nil
end
@closed ||= true
# We freeze to detect any unintended modifications after the scheduler is closed:
self.freeze
end
def closed?
@closed
end
def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
# This hook is invoked by `Timeout.timeout` and related code.
def timeout_after(duration, klass, message, &block)
fiber = Fiber.current
self.fiber do
sleep(duration)
if fiber&.alive?
fiber.raise(klass, message)
end
end
begin
yield(duration)
ensure
fiber = nil
end
end
# This hook is invoked by `Process.wait`, `system`, and backticks.
def process_wait(pid, flags)
# $stderr.puts [__method__, pid, flags, Fiber.current].inspect
# This is a very simple way to implement a non-blocking wait:
Thread.new do
Process::Status.wait(pid, flags)
end.value
end
# This hook is invoked by `IO#read` and `IO#write` in the case that `io_read`
# and `io_write` hooks are not available. This implementation is not
# completely general, in the sense that calling `io_wait` multiple times with
# the same `io` and `events` will not work, which is okay for tests but not
# for real code. Correct fiber schedulers should not have this limitation.
def io_wait(io, events, duration)
# $stderr.puts [__method__, io, events, duration, Fiber.current].inspect
fiber = Fiber.current
unless (events & IO::READABLE).zero?
@readable[io] = fiber
readable = true
end
unless (events & IO::WRITABLE).zero?
@writable[io] = fiber
writable = true
end
if duration
@waiting[fiber] = current_time + duration
end
Fiber.yield
ensure
@waiting.delete(fiber) if duration
@readable.delete(io) if readable
@writable.delete(io) if writable
end
# This hook is invoked by `IO.select`. Using a thread ensures that the
# operation does not block the fiber scheduler.
def io_select(...)
# Emulate the operation using a non-blocking thread:
Thread.new do
IO.select(...)
end.value
end
# This hook is invoked by `Kernel#sleep` and `Thread::Mutex#sleep`.
def kernel_sleep(duration = nil)
# $stderr.puts [__method__, duration, Fiber.current].inspect
self.block(:sleep, duration)
return true
end
# This hook is invoked by blocking options such as `Thread::Mutex#lock`,
# `Thread::Queue#pop` and `Thread::SizedQueue#push`, which are unblocked by
# other threads/fibers. To unblock a blocked fiber, you should call `unblock`
# with the same `blocker` and `fiber` arguments.
def block(blocker, timeout = nil)
# $stderr.puts [__method__, blocker, timeout].inspect
fiber = Fiber.current
if timeout
@waiting[fiber] = current_time + timeout
begin
Fiber.yield
ensure
# Remove from @waiting in the case #unblock was called before the timeout expired:
@waiting.delete(fiber)
end
else
@blocking[fiber] = true
begin
Fiber.yield
ensure
@blocking.delete(fiber)
end
end
end
# This method is invoked from a thread or fiber to unblock a fiber that is
# blocked by `block`. It is expected to be thread safe.
def unblock(blocker, fiber)
# $stderr.puts [__method__, blocker, fiber].inspect
# $stderr.puts blocker.backtrace.inspect
# $stderr.puts fiber.backtrace.inspect
@lock.synchronize do
@ready << fiber
end
io = @urgent.last
io.write_nonblock('.')
end
# This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use
# it to create scheduled fibers, but it is not required in practice;
# `Fiber.new` is usually sufficient.
def fiber(&block)
fiber = Fiber.new(blocking: false, &block)
fiber.resume
return fiber
end
# This hook is invoked by `Addrinfo.getaddrinfo`. Using a thread ensures that
# the operation does not block the fiber scheduler, since `getaddrinfo` is
# usually provided by `libc` and is blocking.
def address_resolve(hostname)
Thread.new do
Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq
end.value
end
end
# This scheduler class implements `io_read` and `io_write` hooks which require
# `IO::Buffer`.
class IOBufferScheduler < Scheduler
EAGAIN = -Errno::EAGAIN::Errno
def io_read(io, buffer, length, offset)
total = 0
io.nonblock = true
while true
maximum_size = buffer.size - offset
result = blocking{buffer.read(io, maximum_size, offset)}
if result > 0
total += result
offset += result
break if total >= length
elsif result == 0
break
elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::READABLE, nil)
else
return result
end
elsif result < 0
return result
end
end
return total
end
def io_write(io, buffer, length, offset)
total = 0
io.nonblock = true
while true
maximum_size = buffer.size - offset
result = blocking{buffer.write(io, maximum_size, offset)}
if result > 0
total += result
offset += result
break if total >= length
elsif result == 0
break
elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::WRITABLE, nil)
else
return result
end
elsif result < 0
return result
end
end
return total
end
def io_pread(io, buffer, from, length, offset)
total = 0
io.nonblock = true
while true
maximum_size = buffer.size - offset
result = blocking{buffer.pread(io, from, maximum_size, offset)}
if result > 0
total += result
offset += result
from += result
break if total >= length
elsif result == 0
break
elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::READABLE, nil)
else
return result
end
elsif result < 0
return result
end
end
return total
end
def io_pwrite(io, buffer, from, length, offset)
total = 0
io.nonblock = true
while true
maximum_size = buffer.size - offset
result = blocking{buffer.pwrite(io, from, maximum_size, offset)}
if result > 0
total += result
offset += result
from += result
break if total >= length
elsif result == 0
break
elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::WRITABLE, nil)
else
return result
end
elsif result < 0
return result
end
end
return total
end
def blocking(&block)
Fiber.blocking(&block)
end
end
# This scheduler has a broken implementation of `unblock`` in the sense that it
# raises an exception. This is used to test the behavior of the scheduler when
# unblock raises an exception.
class BrokenUnblockScheduler < Scheduler
def unblock(blocker, fiber)
super
raise "Broken unblock!"
end
end
# This scheduler has a broken implementation of `unblock` in the sense that it
# sleeps. This is used to test the behavior of the scheduler when unblock
# messes with the internal thread state in an unexpected way.
class SleepingUnblockScheduler < Scheduler
# This method is invoked when the thread is exiting.
def unblock(blocker, fiber)
super
# This changes the current thread state to `THREAD_RUNNING` which causes `thread_join_sleep` to hang.
sleep(0.1)
end
end
# This scheduler has a broken implementation of `kernel_sleep` in the sense that
# it invokes a blocking sleep which can cause a deadlock in some cases.
class SleepingBlockingScheduler < Scheduler
def kernel_sleep(duration = nil)
# Deliberaly sleep in a blocking state which can trigger a deadlock if the implementation is not correct.
Fiber.blocking{sleep 0.0001}
self.block(:sleep, duration)
return true
end
end
|