summaryrefslogtreecommitdiff
path: root/spec/support/helpers/concurrent_helpers.rb
blob: 4eecc2133e7086a6033f5f68cad10950c7528bbd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# frozen_string_literal: true

module ConcurrentHelpers
  Cancelled = Class.new(StandardError)

  # To test for contention, we may need to run some actions in parallel. This
  # helper takes an array of blocks and schedules them all on different threads
  # in a fixed-size thread pool.
  #
  # @param [Array[Proc]] blocks
  # @param [Integer] task_wait_time: time to wait for each task (upper bound on
  #                                  reasonable task execution time)
  # @param [Integer] max_concurrency: maximum number of tasks to run at once
  #
  def run_parallel(blocks, task_wait_time: 20.seconds, max_concurrency: Concurrent.processor_count - 1)
    thread_pool = Concurrent::FixedThreadPool.new(
      [2, max_concurrency].max, { max_queue: blocks.size }
    )
    opts = { executor: thread_pool }

    error = Concurrent::MVar.new

    blocks.map { |block| Concurrent::Future.execute(opts, &block) }.each do |future|
      future.wait(task_wait_time)

      if future.complete?
        error.put(future.reason) if future.reason && error.empty?
      else
        future.cancel
        error.put(Cancelled.new) if error.empty?
      end
    end

    raise error.take if error.full?
  ensure
    thread_pool.shutdown
    thread_pool.wait_for_termination(10)
    thread_pool.kill if thread_pool.running?
  end
end