1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
class Chef
module ChefFS
class Parallelizer
@@parallelizer = nil
@@threads = 0
def self.threads=(value)
if @@threads != value
@@threads = value
@@parallelizer = nil
end
end
def self.parallelize(enumerator, options = {}, &block)
@@parallelizer ||= Parallelizer.new(@@threads)
@@parallelizer.parallelize(enumerator, options, &block)
end
def initialize(threads)
@tasks_mutex = Mutex.new
@tasks = []
@threads = []
1.upto(threads) do
@threads << Thread.new { worker_loop }
end
end
def parallelize(enumerator, options = {}, &block)
task = ParallelizedResults.new(enumerator, options, &block)
@tasks_mutex.synchronize do
@tasks << task
end
task
end
class ParallelizedResults
include Enumerable
def initialize(enumerator, options, &block)
@inputs = enumerator.to_a
@options = options
@block = block
@mutex = Mutex.new
@outputs = []
@status = []
end
def each
next_index = 0
while true
# Report any results that already exist
while @status.length > next_index && ([:finished, :exception].include?(@status[next_index]))
if @status[next_index] == :finished
if @options[:flatten]
@outputs[next_index].each do |entry|
yield entry
end
else
yield @outputs[next_index]
end
else
raise @outputs[next_index]
end
next_index = next_index + 1
end
# Pick up a result and process it, if there is one. This ensures we
# move forward even if there are *zero* worker threads available.
if !process_input
# Exit if we're done.
if next_index >= @status.length
break
else
# Ruby 1.8 threading sucks. Wait till we process more things.
sleep(0.05)
end
end
end
end
def process_input
# Grab the next one to process
index, input = @mutex.synchronize do
index = @status.length
if index >= @inputs.length
return nil
end
input = @inputs[index]
@status[index] = :started
[ index, input ]
end
begin
@outputs[index] = @block.call(input)
@status[index] = :finished
rescue Exception
@outputs[index] = $!
@status[index] = :exception
end
index
end
end
private
def worker_loop
while true
begin
task = @tasks[0]
if task
if !task.process_input
@tasks_mutex.synchronize do
@tasks.delete(task)
end
end
else
# Ruby 1.8 threading sucks. Wait a bit to see if another task comes in.
sleep(0.05)
end
rescue
puts "ERROR #{$!}"
puts $!.backtrace
end
end
end
end
end
end
|