1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
require "thread"
require "chef/chef_fs/parallelizer/parallel_enumerable"
class Chef
module ChefFS
# Tries to balance several guarantees, in order of priority:
# - don't get deadlocked
# - provide results in desired order
# - provide results as soon as they are available
# - process input as soon as possible
class Parallelizer
@@parallelizer = nil
@@threads = 0
def self.threads=(value)
@@threads = value
@@parallelizer.resize(value) if @@parallelizer
end
def self.parallelizer
@@parallelizer ||= Parallelizer.new(@@threads)
end
def self.parallelize(enumerable, options = {}, &block)
parallelizer.parallelize(enumerable, options, &block)
end
def self.parallel_do(enumerable, options = {}, &block)
parallelizer.parallel_do(enumerable, options, &block)
end
def initialize(num_threads)
@tasks = Queue.new
@threads = []
@stop_thread = {}
resize(num_threads)
end
def num_threads
@threads.size
end
def parallelize(enumerable, options = {}, &block)
ParallelEnumerable.new(@tasks, enumerable, options, &block)
end
def parallel_do(enumerable, options = {}, &block)
ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait
end
def stop(wait = true, timeout = nil)
resize(0, wait, timeout)
end
def resize(to_threads, wait = true, timeout = nil)
if to_threads < num_threads
threads_to_stop = @threads[to_threads..num_threads - 1]
@threads = @threads.slice(0, to_threads)
threads_to_stop.each do |thread|
@stop_thread[thread] = true
end
if wait
start_time = Time.now
threads_to_stop.each do |thread|
thread_timeout = timeout ? timeout - (Time.now - start_time) : nil
thread.join(thread_timeout)
end
end
else
num_threads.upto(to_threads - 1) do |i|
@threads[i] = Thread.new(&method(:worker_loop))
end
end
end
def kill
@threads.each do |thread|
Thread.kill(thread)
@stop_thread.delete(thread)
end
@threads = []
end
private
def worker_loop
until @stop_thread[Thread.current]
begin
task = @tasks.pop
task.call
rescue
puts "ERROR #{$!}"
puts $!.backtrace
end
end
ensure
@stop_thread.delete(Thread.current)
end
end
end
end
|