summaryrefslogtreecommitdiff
path: root/chef-utils/lib/chef-utils/parallel_map.rb
diff options
context:
space:
mode:
Diffstat (limited to 'chef-utils/lib/chef-utils/parallel_map.rb')
-rw-r--r--chef-utils/lib/chef-utils/parallel_map.rb131
1 files changed, 131 insertions, 0 deletions
diff --git a/chef-utils/lib/chef-utils/parallel_map.rb b/chef-utils/lib/chef-utils/parallel_map.rb
new file mode 100644
index 0000000000..3c1be22006
--- /dev/null
+++ b/chef-utils/lib/chef-utils/parallel_map.rb
@@ -0,0 +1,131 @@
+# frozen_string_literal: true
+#
+# Copyright:: Copyright (c) Chef Software Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require "concurrent/executors"
+require "concurrent/future"
+require "singleton" unless defined?(Singleton)
+
+module ChefUtils
+ #
+ # This module contains ruby refinements that adds several methods to the Enumerable
+ # class which are useful for parallel processing.
+ #
+ module ParallelMap
+ refine Enumerable do
+
+ # Enumerates through the collection in parallel using the thread pool provided
+ # or the default thread pool. By using the default thread pool this supports
+ # recursively calling the method without deadlocking while using a globally
+ # fixed number of workers. This method supports lazy collections. It returns
+ # synchronously, waiting until all the work is done. Failures are only reported
+ # after the collection has executed and only the first exception is raised.
+ #
+ # (0..).lazy.parallel_map { |i| i*i }.first(5)
+ #
+ # @return [Array] output results
+ #
+ def parallel_map(pool: nil)
+ return self unless block_given?
+
+ pool ||= ChefUtils::DefaultThreadPool.instance.pool
+
+ futures = map do |item|
+ Concurrent::Future.execute(executor: pool) do
+ yield item
+ end
+ end
+
+ futures.map(&:value!)
+ end
+
+ # This has the same behavior as parallel_map but returns the enumerator instead of
+ # the return values.
+ #
+ # @return [Enumerable] the enumerable for method chaining
+ #
+ def parallel_each(pool: nil, &block)
+ return self unless block_given?
+
+ parallel_map(pool: pool, &block)
+
+ self
+ end
+
+ # The flat_each method is tightly coupled to the usage of parallel_map within the
+ # ChefFS implementation. It is not itself a parallel method, but it is used to
+ # iterate through the 2nd level of nested structure, which is tied to the nested
+ # structures that ChefFS returns.
+ #
+ # This is different from Enumerable#flat_map because that behaves like map.flatten(1) while
+ # this behaves more like flatten(1).each. We need this on an Enumerable, so we have no
+ # Enumerable#flatten method to call.
+ #
+ # [ [ 1, 2 ], [ 3, 4 ] ].flat_each(&block) calls block four times with 1, 2, 3, 4
+ #
+ # [ [ 1, 2 ], [ 3, 4 ] ].flat_map(&block) calls block twice with [1, 2] and [3,4]
+ #
+ def flat_each(&block)
+ map do |value|
+ if value.is_a?(Enumerable)
+ value.each(&block)
+ else
+ yield value
+ end
+ end
+ end
+ end
+ end
+
+ # The DefaultThreadPool has a fixed thread size and has no
+ # queue of work and the behavior on failure to find a thread is for the
+ # caller to run the work. This contract means that the thread pool can
+ # be called recursively without deadlocking and while keeping the fixed
+ # number of threads (and not exponentially growing the thread pool with
+ # the depth of recursion).
+ #
+ class DefaultThreadPool
+ include Singleton
+
+ DEFAULT_THREAD_SIZE = 10
+
+ # Size of the thread pool, must be set before getting the thread pool or
+ # calling parallel_map/parallel_each. Does not (but could be modified to)
+ # support dynamic resizing. To get fully synchronous behavior set this equal to
+ # zero rather than one since the caller will get work if the threads are
+ # busy.
+ #
+ # @return [Integer] number of threads
+ attr_accessor :threads
+
+ # Memoizing accessor for the thread pool
+ #
+ # @return [Concurrent::ThreadPoolExecutor] the thread pool
+ def pool
+ @pool ||= Concurrent::ThreadPoolExecutor.new(
+ min_threads: threads || DEFAULT_THREAD_SIZE,
+ max_threads: threads || DEFAULT_THREAD_SIZE,
+ max_queue: 0,
+ # "synchronous" redefines the 0 in max_queue to mean 'no queue' instead of 'infinite queue'
+ # it does not mean synchronous execution (no threads) but synchronous offload to the threads.
+ synchronous: true,
+ # this prevents deadlocks on recursive parallel usage
+ fallback_policy: :caller_runs
+ )
+ end
+ end
+end