summaryrefslogtreecommitdiff
path: root/lib/chef/util/threaded_job_queue.rb
diff options
context:
space:
mode:
authorNikhil Benesch <nikhil.benesch@gmail.com>2014-03-09 22:57:21 -0400
committerClaire McQuin <claire@getchef.com>2014-05-13 15:21:15 -0700
commit89d5d20dcd494c7d42a1f24f8a3ab7712735267e (patch)
treefbcaa2b854d0dc54405485ec9fa063d52e4ad060 /lib/chef/util/threaded_job_queue.rb
parentbea3c04f0bf8f89c637cc8073825188acb095bff (diff)
downloadchef-89d5d20dcd494c7d42a1f24f8a3ab7712735267e.tar.gz
CHEF-4423: extract cookbook upload threaded queue into utility class
Diffstat (limited to 'lib/chef/util/threaded_job_queue.rb')
-rw-r--r--lib/chef/util/threaded_job_queue.rb61
1 files changed, 61 insertions, 0 deletions
diff --git a/lib/chef/util/threaded_job_queue.rb b/lib/chef/util/threaded_job_queue.rb
new file mode 100644
index 0000000000..824cd0a3c4
--- /dev/null
+++ b/lib/chef/util/threaded_job_queue.rb
@@ -0,0 +1,61 @@
+# Copyright:: Copyright (c) 2014 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'thread'
+
+class Chef
+ class Util
+ # A simple threaded job queue
+ #
+ # Create a queue:
+ #
+ # queue = ThreadedJobQueue.new
+ #
+ # Add jobs:
+ #
+ # queue << lambda { |lock| foo.the_bar }
+ #
+ # A job is a callable that optionally takes a Mutex instance as its only
+ # parameter.
+ #
+ # Then start processing jobs with +n+ threads:
+ #
+ # queue.process(n)
+ #
+ class ThreadedJobQueue
+ def initialize
+ @queue = Queue.new
+ @lock = Mutex.new
+ end
+
+ def <<(job)
+ @queue << job
+ end
+
+ def process(concurrency = 10)
+ workers = (1..concurrency).map do
+ Thread.new do
+ loop do
+ fn = @queue.pop
+ fn.arity == 1 ? fn.call(@lock) : fn.call
+ end
+ end
+ end
+ workers.each { |worker| self << Thread.method(:exit) }
+ workers.each { |worker| worker.join }
+ end
+ end
+ end
+end