summaryrefslogtreecommitdiff
path: root/lib/chef/chef_fs/parallelizer.rb
blob: 32de61e0482a7e9be843437073bad6cba60f5664 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
require "thread"
require "chef/chef_fs/parallelizer/parallel_enumerable"

class Chef
  module ChefFS
    # Tries to balance several guarantees, in order of priority:
    # - don't get deadlocked
    # - provide results in desired order
    # - provide results as soon as they are available
    # - process input as soon as possible
    class Parallelizer
      @@parallelizer = nil
      @@threads = 0

      def self.threads=(value)
        @@threads = value
        @@parallelizer.resize(value) if @@parallelizer
      end

      def self.parallelizer
        @@parallelizer ||= Parallelizer.new(@@threads)
      end

      def self.parallelize(enumerable, options = {}, &block)
        parallelizer.parallelize(enumerable, options, &block)
      end

      def self.parallel_do(enumerable, options = {}, &block)
        parallelizer.parallel_do(enumerable, options, &block)
      end

      def initialize(num_threads)
        @tasks = Queue.new
        @threads = []
        @stop_thread = {}
        resize(num_threads)
      end

      def num_threads
        @threads.size
      end

      def parallelize(enumerable, options = {}, &block)
        ParallelEnumerable.new(@tasks, enumerable, options, &block)
      end

      def parallel_do(enumerable, options = {}, &block)
        ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait
      end

      def stop(wait = true, timeout = nil)
        resize(0, wait, timeout)
      end

      def resize(to_threads, wait = true, timeout = nil)
        if to_threads < num_threads
          threads_to_stop = @threads[to_threads..num_threads - 1]
          @threads = @threads.slice(0, to_threads)
          threads_to_stop.each do |thread|
            @stop_thread[thread] = true
          end

          if wait
            start_time = Time.now
            threads_to_stop.each do |thread|
              thread_timeout = timeout ? timeout - (Time.now - start_time) : nil
              thread.join(thread_timeout)
            end
          end

        else
          num_threads.upto(to_threads - 1) do |i|
            @threads[i] = Thread.new(&method(:worker_loop))
          end
        end
      end

      def kill
        @threads.each do |thread|
          Thread.kill(thread)
          @stop_thread.delete(thread)
        end
        @threads = []
      end

      private

      def worker_loop
        until @stop_thread[Thread.current]
          begin
            task = @tasks.pop
            task.call
          rescue
            puts "ERROR #{$!}"
            puts $!.backtrace
          end
        end
      ensure
        @stop_thread.delete(Thread.current)
      end
    end
  end
end