summaryrefslogtreecommitdiff
path: root/spec/unit/chef_fs/parallelizer.rb
diff options
context:
space:
mode:
Diffstat (limited to 'spec/unit/chef_fs/parallelizer.rb')
-rw-r--r--spec/unit/chef_fs/parallelizer.rb112
1 files changed, 56 insertions, 56 deletions
diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb
index d43e48b7bc..541ce2593e 100644
--- a/spec/unit/chef_fs/parallelizer.rb
+++ b/spec/unit/chef_fs/parallelizer.rb
@@ -35,29 +35,29 @@ describe Chef::ChefFS::Parallelizer do
context "With :ordered => false (unordered output)" do
it "An empty input produces an empty output" do
- expect(parallelize([], ordered: false) do
+ expect(parallelize([], ordered: false) {
sleep 10
- end.to_a).to eql([])
+ }.to_a).to eql([])
expect(elapsed_time).to be < 0.1
end
it "10 sleep(0.2)s complete within 0.5 seconds" do
- expect(parallelize(1.upto(10), ordered: false) do |i|
+ expect(parallelize(1.upto(10), ordered: false) { |i|
sleep 0.2
"x"
- end.to_a).to eq(%w{x x x x x x x x x x})
+ }.to_a).to eq(%w{x x x x x x x x x x})
expect(elapsed_time).to be < 0.5
end
it "The output comes as soon as it is available" do
- enum = parallelize([0.5, 0.3, 0.1], ordered: false) do |val|
+ enum = parallelize([0.5, 0.3, 0.1], ordered: false) { |val|
sleep val
val
- end
- expect(enum.map do |value|
+ }
+ expect(enum.map { |value|
expect(elapsed_time).to be < value + 0.1
value
- end).to eq([ 0.1, 0.3, 0.5 ])
+ }).to eq([ 0.1, 0.3, 0.5 ])
end
it "An exception in input is passed through but does NOT stop processing" do
@@ -73,7 +73,7 @@ describe Chef::ChefFS::Parallelizer do
it "Exceptions in output are raised after all processing is done" do
processed = 0
- enum = parallelize([1, 2, "x", 3], ordered: false) do |x|
+ enum = parallelize([1, 2, "x", 3], ordered: false) { |x|
if x == "x"
sleep 0.1
raise "hi"
@@ -81,7 +81,7 @@ describe Chef::ChefFS::Parallelizer do
sleep 0.2
processed += 1
x
- end
+ }
results = []
expect { enum.each { |value| results << value } }.to raise_error "hi"
expect(results.sort).to eq([ 1, 2, 3 ])
@@ -91,7 +91,7 @@ describe Chef::ChefFS::Parallelizer do
it "Exceptions with :stop_on_exception are raised after all processing is done" do
processed = 0
- parallelized = parallelize([0.3, 0.3, "x", 0.3, 0.3, 0.3, 0.3, 0.3], ordered: false, stop_on_exception: true) do |x|
+ parallelized = parallelize([0.3, 0.3, "x", 0.3, 0.3, 0.3, 0.3, 0.3], ordered: false, stop_on_exception: true) { |x|
if x == "x"
sleep(0.1)
raise "hi"
@@ -99,7 +99,7 @@ describe Chef::ChefFS::Parallelizer do
sleep(x)
processed += 1
x
- end
+ }
expect { parallelized.to_a }.to raise_error "hi"
expect(processed).to eq(4)
end
@@ -107,25 +107,25 @@ describe Chef::ChefFS::Parallelizer do
context "With :ordered => true (ordered output)" do
it "An empty input produces an empty output" do
- expect(parallelize([]) do
+ expect(parallelize([]) {
sleep 10
- end.to_a).to eql([])
+ }.to_a).to eql([])
expect(elapsed_time).to be < 0.1
end
it "10 sleep(0.2)s complete within 0.5 seconds" do
- expect(parallelize(1.upto(10), ordered: true) do |i|
+ expect(parallelize(1.upto(10), ordered: true) { |i|
sleep 0.2
"x"
- end.to_a).to eq(%w{x x x x x x x x x x})
+ }.to_a).to eq(%w{x x x x x x x x x x})
expect(elapsed_time).to be < 0.5
end
it "Output comes in the order of the input" do
- enum = parallelize([0.5, 0.3, 0.1]) do |val|
+ enum = parallelize([0.5, 0.3, 0.1]) { |val|
sleep val
val
- end.enum_for(:each_with_index)
+ }.enum_for(:each_with_index)
expect(enum.next).to eq([ 0.5, 0 ])
expect(enum.next).to eq([ 0.3, 1 ])
expect(enum.next).to eq([ 0.1, 2 ])
@@ -145,7 +145,7 @@ describe Chef::ChefFS::Parallelizer do
it "Exceptions in output are raised in the correct sequence and running processes do NOT stop processing" do
processed = 0
- enum = parallelize([1, 2, "x", 3]) do |x|
+ enum = parallelize([1, 2, "x", 3]) { |x|
if x == "x"
sleep(0.1)
raise "hi"
@@ -153,7 +153,7 @@ describe Chef::ChefFS::Parallelizer do
sleep(0.2)
processed += 1
x
- end
+ }
results = []
expect { enum.each { |value| results << value } }.to raise_error "hi"
expect(results).to eq([ 1, 2 ])
@@ -163,7 +163,7 @@ describe Chef::ChefFS::Parallelizer do
it "Exceptions with :stop_on_exception are raised after all processing is done" do
processed = 0
- parallelized = parallelize([0.3, 0.3, "x", 0.3, 0.3, 0.3, 0.3, 0.3], ordered: false, stop_on_exception: true) do |x|
+ parallelized = parallelize([0.3, 0.3, "x", 0.3, 0.3, 0.3, 0.3, 0.3], ordered: false, stop_on_exception: true) { |x|
if x == "x"
sleep(0.1)
raise "hi"
@@ -171,7 +171,7 @@ describe Chef::ChefFS::Parallelizer do
sleep(x)
processed += 1
x
- end
+ }
expect { parallelized.to_a }.to raise_error "hi"
expect(processed).to eq(4)
end
@@ -187,10 +187,10 @@ describe Chef::ChefFS::Parallelizer do
sleep 0.1
end
enum = parallelize(input) { |x| x }
- expect(enum.map do |value|
+ expect(enum.map { |value|
expect(elapsed_time).to be < (value + 1) * 0.1
value
- end).to eq([ 1, 2, 3 ])
+ }).to eq([ 1, 2, 3 ])
end
end
@@ -205,11 +205,11 @@ describe Chef::ChefFS::Parallelizer do
started = false
@occupying_job_finished = occupying_job_finished = [ false ]
@thread = Thread.new do
- parallelizer.parallelize([0], main_thread_processing: false) do |x|
+ parallelizer.parallelize([0], main_thread_processing: false) { |x|
started = true
sleep(0.3)
occupying_job_finished[0] = true
- end.wait
+ }.wait
end
sleep(0.01) until started
end
@@ -219,18 +219,18 @@ describe Chef::ChefFS::Parallelizer do
end
it "parallelize with :main_thread_processing = true does not block" do
- expect(parallelizer.parallelize([1]) do |x|
+ expect(parallelizer.parallelize([1]) { |x|
sleep(0.1)
x
- end.to_a).to eq([ 1 ])
+ }.to_a).to eq([ 1 ])
expect(elapsed_time).to be < 0.2
end
it "parallelize with :main_thread_processing = false waits for the job to finish" do
- expect(parallelizer.parallelize([1], main_thread_processing: false) do |x|
+ expect(parallelizer.parallelize([1], main_thread_processing: false) { |x|
sleep(0.1)
x + 1
- end.to_a).to eq([ 2 ])
+ }.to_a).to eq([ 2 ])
expect(elapsed_time).to be > 0.3
end
@@ -264,11 +264,11 @@ describe Chef::ChefFS::Parallelizer do
it ".count does not process anything" do
outputs_processed = 0
input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
+ enum = parallelizer.parallelize(input_mapper) { |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
x
- end
+ }
expect(enum.count).to eq(6)
expect(outputs_processed).to eq(0)
expect(input_mapper.num_processed).to eq(6)
@@ -277,10 +277,10 @@ describe Chef::ChefFS::Parallelizer do
it ".count with arguments works normally" do
outputs_processed = 0
input_mapper = TestEnumerable.new(1, 1, 1, 1, 2, 2, 2, 3, 3, 4)
- enum = parallelizer.parallelize(input_mapper) do |x|
+ enum = parallelizer.parallelize(input_mapper) { |x|
outputs_processed += 1
x
- end
+ }
expect(enum.count { |x| x > 1 }).to eq(6)
expect(enum.count(2)).to eq(3)
expect(outputs_processed).to eq(20)
@@ -290,11 +290,11 @@ describe Chef::ChefFS::Parallelizer do
it ".first does not enumerate anything other than the first result(s)" do
outputs_processed = 0
input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
+ enum = parallelizer.parallelize(input_mapper) { |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
x
- end
+ }
expect(enum.first).to eq(1)
expect(enum.first(2)).to eq([1, 2])
expect(outputs_processed).to eq(3)
@@ -304,11 +304,11 @@ describe Chef::ChefFS::Parallelizer do
it ".take does not enumerate anything other than the first result(s)" do
outputs_processed = 0
input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
+ enum = parallelizer.parallelize(input_mapper) { |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
x
- end
+ }
expect(enum.take(2)).to eq([1, 2])
expect(outputs_processed).to eq(2)
expect(input_mapper.num_processed).to eq(2)
@@ -317,11 +317,11 @@ describe Chef::ChefFS::Parallelizer do
it ".drop does not process anything other than the last result(s)" do
outputs_processed = 0
input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
+ enum = parallelizer.parallelize(input_mapper) { |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
x
- end
+ }
expect(enum.drop(2)).to eq([3, 4, 5, 6])
expect(outputs_processed).to eq(4)
expect(input_mapper.num_processed).to eq(6)
@@ -331,11 +331,11 @@ describe Chef::ChefFS::Parallelizer do
it ".lazy.take does not enumerate anything other than the first result(s)" do
outputs_processed = 0
input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
+ enum = parallelizer.parallelize(input_mapper) { |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
x
- end
+ }
expect(enum.lazy.take(2).to_a).to eq([1, 2])
expect(outputs_processed).to eq(2)
expect(input_mapper.num_processed).to eq(2)
@@ -344,11 +344,11 @@ describe Chef::ChefFS::Parallelizer do
it ".drop does not process anything other than the last result(s)" do
outputs_processed = 0
input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
+ enum = parallelizer.parallelize(input_mapper) { |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
x
- end
+ }
expect(enum.lazy.drop(2).to_a).to eq([3, 4, 5, 6])
expect(outputs_processed).to eq(4)
expect(input_mapper.num_processed).to eq(6)
@@ -357,11 +357,11 @@ describe Chef::ChefFS::Parallelizer do
it "lazy enumerable is actually lazy" do
outputs_processed = 0
input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
+ enum = parallelizer.parallelize(input_mapper) { |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
x
- end
+ }
enum.lazy.take(2)
enum.lazy.drop(2)
sleep(0.1)
@@ -375,10 +375,10 @@ describe Chef::ChefFS::Parallelizer do
it ".map twice on the same parallel enumerable returns the correct results and re-processes the input" do
outputs_processed = 0
input_mapper = TestEnumerable.new(1, 2, 3)
- enum = parallelizer.parallelize(input_mapper) do |x|
+ enum = parallelizer.parallelize(input_mapper) { |x|
outputs_processed += 1
x
- end
+ }
expect(enum.map { |x| x }).to eq([1, 2, 3])
expect(enum.map { |x| x }).to eq([1, 2, 3])
expect(outputs_processed).to eq(6)
@@ -388,10 +388,10 @@ describe Chef::ChefFS::Parallelizer do
it ".first and then .map on the same parallel enumerable returns the correct results and re-processes the input" do
outputs_processed = 0
input_mapper = TestEnumerable.new(1, 2, 3)
- enum = parallelizer.parallelize(input_mapper) do |x|
+ enum = parallelizer.parallelize(input_mapper) { |x|
outputs_processed += 1
x
- end
+ }
expect(enum.first).to eq(1)
expect(enum.map { |x| x }).to eq([1, 2, 3])
expect(outputs_processed).to be >= 4
@@ -402,10 +402,10 @@ describe Chef::ChefFS::Parallelizer do
enum = parallelizer.parallelize([1, 2, 3]) { |x| x }
a = enum.enum_for(:each)
a.next
- expect do
+ expect {
b = enum.enum_for(:each)
b.next
- end.to raise_error
+ }.to raise_error
end
end
end
@@ -436,13 +436,13 @@ describe Chef::ChefFS::Parallelizer do
end
it "does not have contention issues with large numbers of jobs and inputs with ordering off" do
- parallelizers = 0.upto(99).map do
+ parallelizers = 0.upto(99).map {
parallelizer.parallelize(1.upto(500)) { |x| x + 1 }
- end
+ }
outputs = []
- threads = 0.upto(99).map do |i|
+ threads = 0.upto(99).map { |i|
Thread.new { outputs[i] = parallelizers[i].to_a }
- end
+ }
threads.each(&:join)
outputs.each { |output| expect(output.sort).to eq(2.upto(501).to_a) }
end