diff options
author | Nikhil Benesch <nikhil.benesch@gmail.com> | 2014-03-09 22:57:21 -0400 |
---|---|---|
committer | Claire McQuin <claire@getchef.com> | 2014-05-13 15:21:15 -0700 |
commit | 89d5d20dcd494c7d42a1f24f8a3ab7712735267e (patch) | |
tree | fbcaa2b854d0dc54405485ec9fa063d52e4ad060 /lib/chef/util/threaded_job_queue.rb | |
parent | bea3c04f0bf8f89c637cc8073825188acb095bff (diff) | |
download | chef-89d5d20dcd494c7d42a1f24f8a3ab7712735267e.tar.gz |
CHEF-4423: extract cookbook upload threaded queue into utility class
Diffstat (limited to 'lib/chef/util/threaded_job_queue.rb')
-rw-r--r-- | lib/chef/util/threaded_job_queue.rb | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/lib/chef/util/threaded_job_queue.rb b/lib/chef/util/threaded_job_queue.rb new file mode 100644 index 0000000000..824cd0a3c4 --- /dev/null +++ b/lib/chef/util/threaded_job_queue.rb @@ -0,0 +1,61 @@ +# Copyright:: Copyright (c) 2014 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'thread' + +class Chef + class Util + # A simple threaded job queue + # + # Create a queue: + # + # queue = ThreadedJobQueue.new + # + # Add jobs: + # + # queue << lambda { |lock| foo.the_bar } + # + # A job is a callable that optionally takes a Mutex instance as its only + # parameter. + # + # Then start processing jobs with +n+ threads: + # + # queue.process(n) + # + class ThreadedJobQueue + def initialize + @queue = Queue.new + @lock = Mutex.new + end + + def <<(job) + @queue << job + end + + def process(concurrency = 10) + workers = (1..concurrency).map do + Thread.new do + loop do + fn = @queue.pop + fn.arity == 1 ? fn.call(@lock) : fn.call + end + end + end + workers.each { |worker| self << Thread.method(:exit) } + workers.each { |worker| worker.join } + end + end + end +end |