summaryrefslogtreecommitdiff
path: root/lib/chef/chef_fs/parallelizer.rb
blob: 84f3d4d8706689088258baace1791ff807614249 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class Chef
  module ChefFS
    class Parallelizer
      @@parallelizer = nil
      @@threads = 0

      def self.threads=(value)
        if @@threads != value
          @@threads = value
          @@parallelizer = nil
        end
      end

      def self.parallelize(enumerator, options = {}, &block)
        @@parallelizer ||= Parallelizer.new(@@threads)
        @@parallelizer.parallelize(enumerator, options, &block)
      end

      def initialize(threads)
        @tasks_mutex = Mutex.new
        @tasks = []
        @threads = []
        1.upto(threads) do
          @threads << Thread.new { worker_loop }
        end
      end

      def parallelize(enumerator, options = {}, &block)
        task = ParallelizedResults.new(enumerator, options, &block)
        @tasks_mutex.synchronize do
          @tasks << task
        end
        task
      end

      class ParallelizedResults
        include Enumerable

        def initialize(enumerator, options, &block)
          @inputs = enumerator.to_a
          @options = options
          @block = block

          @mutex = Mutex.new
          @outputs = []
          @status = []
        end

        def each
          next_index = 0
          while true
            # Report any results that already exist
            while @status.length > next_index && ([:finished, :exception].include?(@status[next_index]))
              if @status[next_index] == :finished
                if @options[:flatten]
                  @outputs[next_index].each do |entry|
                    yield entry
                  end
                else
                  yield @outputs[next_index]
                end
              else
                raise @outputs[next_index]
              end
              next_index = next_index + 1
            end

            # Pick up a result and process it, if there is one.  This ensures we
            # move forward even if there are *zero* worker threads available.
            if !process_input
              # Exit if we're done.
              if next_index >= @status.length
                break
              else
                # Ruby 1.8 threading sucks.  Wait till we process more things.
                sleep(0.05)
              end
            end
          end
        end

        def process_input
          # Grab the next one to process
          index, input = @mutex.synchronize do
            index = @status.length
            if index >= @inputs.length
              return nil
            end
            input = @inputs[index]
            @status[index] = :started
            [ index, input ]
          end

          begin
            @outputs[index] = @block.call(input)
            @status[index] = :finished
          rescue Exception
            @outputs[index] = $!
            @status[index] = :exception
          end
          index
        end
      end

      private

      def worker_loop
        while true
          begin
            task = @tasks[0]
            if task
              if !task.process_input
                @tasks_mutex.synchronize do
                  @tasks.delete(task)
                end
              end
            else
              # Ruby 1.8 threading sucks.  Wait a bit to see if another task comes in.
              sleep(0.05)
            end
          rescue
            puts "ERROR #{$!}"
            puts $!.backtrace
          end
        end
      end
    end
  end
end