diff options
author | Chris Walters <cw@opscode.com> | 2010-12-08 14:12:44 -0800 |
---|---|---|
committer | Daniel DeLeo <dan@opscode.com> | 2011-02-11 15:57:41 -0800 |
commit | aa1e2da7391f7739cc3bd5c38ecd9fb4d4c28be2 (patch) | |
tree | 0f6c059f8e9f598467d13b281cafc0b4bd607d2a | |
parent | 6e4cbe9eccea89f7bd48e7952fe7b5c04b4393b1 (diff) | |
download | chef-aa1e2da7391f7739cc3bd5c38ecd9fb4d4c28be2.tar.gz |
Add optional rabbitmq persistence
-rw-r--r-- | chef/lib/chef/index_queue/amqp_client.rb | 1 | ||||
-rw-r--r-- | chef/lib/chef/index_queue/indexable.rb | 43 | ||||
-rw-r--r-- | chef/spec/spec_helper.rb | 3 | ||||
-rw-r--r-- | chef/spec/unit/index_queue_spec.rb | 163 |
4 files changed, 168 insertions, 42 deletions
diff --git a/chef/lib/chef/index_queue/amqp_client.rb b/chef/lib/chef/index_queue/amqp_client.rb index dbc5769e81..a7d155f4d1 100644 --- a/chef/lib/chef/index_queue/amqp_client.rb +++ b/chef/lib/chef/index_queue/amqp_client.rb @@ -113,3 +113,4 @@ class Chef end end end + diff --git a/chef/lib/chef/index_queue/indexable.rb b/chef/lib/chef/index_queue/indexable.rb index 03fb0916f1..498539f7d4 100644 --- a/chef/lib/chef/index_queue/indexable.rb +++ b/chef/lib/chef/index_queue/indexable.rb @@ -64,27 +64,46 @@ class Chef raise ArgumentError, "Type, Id, or Database missing in index operation: #{with_metadata.inspect}" if (with_metadata["id"].nil? or with_metadata["type"].nil?) with_metadata end - + def add_to_index(metadata={}) - Chef::Log.debug("pushing item to index queue for addition: #{self.with_indexer_metadata(metadata)}") - object_with_metadata = with_indexer_metadata(metadata) - obj_id = object_with_metadata["id"] - AmqpClient.instance.queue_for_object(obj_id) do |queue| - obj = {:action => :add, :payload => self.with_indexer_metadata(metadata)} - queue.publish(Chef::JSON.to_json(obj)) - end + Chef::Log.debug("pushing item to index queue for addition: #{self.with_indexer_metadata(metadata)}") + object_with_metadata = with_indexer_metadata(metadata) + obj_id = object_with_metadata["id"] + obj = {:action => :add, :payload => self.with_indexer_metadata(metadata)} + + publish_object(obj_id, obj) end def delete_from_index(metadata={}) Chef::Log.debug("pushing item to index queue for deletion: #{self.with_indexer_metadata(metadata)}") object_with_metadata = with_indexer_metadata(metadata) obj_id = object_with_metadata["id"] - AmqpClient.instance.queue_for_object(obj_id) do |queue| - obj = {:action => :delete, :payload => self.with_indexer_metadata(metadata)} - queue.publish(Chef::JSON.to_json(obj)) + obj = {:action => :delete, :payload => self.with_indexer_metadata(metadata)} + + publish_object(obj_id, obj) + end + + private + + # Uses the publisher to update the object's queue. If + # Chef::Config[:persistent_queue] is true, the update is wrapped + # in a transaction. + def publish_object(object_id, object) + publisher = AmqpClient.instance + begin + publisher.amqp_client.tx_select if Chef::Config[:persistent_queue] + publisher.queue_for_object(object_id) do |queue| + queue.publish(Chef::JSON.to_json(object), :persistent => Chef::Config[:persistent_queue]) + end + publisher.amqp_client.tx_commit if Chef::Config[:persistent_queue] + rescue + publisher.amqp_client.tx_rollback if Chef::Config[:persistent_queue] + raise end + + true end - + end end end diff --git a/chef/spec/spec_helper.rb b/chef/spec/spec_helper.rb index c0e05cf7f0..332a8c809d 100644 --- a/chef/spec/spec_helper.rb +++ b/chef/spec/spec_helper.rb @@ -44,7 +44,8 @@ Dir[File.join(File.dirname(__FILE__), 'lib', '**', '*.rb')].sort.each { |lib| re Chef::Config[:log_level] = :fatal Chef::Config[:cache_type] = "Memory" -Chef::Config[:cache_options] = { } +Chef::Config[:cache_options] = { } +Chef::Config[:persistent_queue] = false Chef::Log.level(Chef::Config.log_level) Chef::Config.solo(false) diff --git a/chef/spec/unit/index_queue_spec.rb b/chef/spec/unit/index_queue_spec.rb index 0aac03e5d4..43c7325f4f 100644 --- a/chef/spec/unit/index_queue_spec.rb +++ b/chef/spec/unit/index_queue_spec.rb @@ -37,17 +37,22 @@ class Chef end end +class IndexQueueSpecError < RuntimeError ; end + class FauxQueue - attr_reader :published_message + attr_reader :published_message, :publish_options + # Note: If publish is not called, this published_message will cause + # JSON parsing to die with "can't convert Symbol into String" def initialize @published_message = :epic_fail! + @publish_options = :epic_fail! end - def publish(message) + def publish(message, options=nil) @published_message = message - + @publish_options = options end end @@ -109,31 +114,134 @@ describe Chef::IndexQueue::Indexable do metadata_id.should == expected_uuid end - it "adds items to the index" do - @queue = FauxQueue.new - @publisher.should_receive(:queue_for_object).with("0000000-1111-2222-3333-444444444444").and_yield(@queue) - @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>"0000000-1111-2222-3333-444444444444") - published_message = Chef::JSON.from_json(@queue.published_message) - published_message.should == {"action" => "add", "payload" => {"item" => @item_as_hash, - "type" => "indexable_test_harness", - "database" => "couchdb@localhost,etc.", - "id" => "0000000-1111-2222-3333-444444444444", - "enqueued_at" => @now.utc.to_i}} - end + describe "adds and removes items to and from the index and respects Chef::Config[:persistent_queue]" do + before do + @exchange = mock("Bunny::Exchange") + @amqp_client = mock("Bunny::Client", :start => true, :exchange => @exchange) + @publisher.stub!(:amqp_client).and_return(@amqp_client) + @queue = FauxQueue.new + @publisher.should_receive(:queue_for_object).with("0000000-1111-2222-3333-444444444444").and_yield(@queue) + end + + it "adds items to the index" do + @amqp_client.should_not_receive(:tx_select) + @amqp_client.should_not_receive(:tx_commit) + @amqp_client.should_not_receive(:tx_rollback) + + @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>"0000000-1111-2222-3333-444444444444") + + published_message = Chef::JSON.from_json(@queue.published_message) + published_message.should == {"action" => "add", "payload" => {"item" => @item_as_hash, + "type" => "indexable_test_harness", + "database" => "couchdb@localhost,etc.", + "id" => "0000000-1111-2222-3333-444444444444", + "enqueued_at" => @now.utc.to_i}} + @queue.publish_options[:persistent].should == false + end + + it "adds items to the index transactionactionally when Chef::Config[:persistent_queue] == true" do + @amqp_client.should_receive(:tx_select) + @amqp_client.should_receive(:tx_commit) + @amqp_client.should_not_receive(:tx_rollback) + + # set and restore Chef::Config[:persistent_queue] to true + orig_value = Chef::Config[:persistent_queue] + Chef::Config[:persistent_queue] = true + begin + @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>"0000000-1111-2222-3333-444444444444") + ensure + Chef::Config[:persistent_queue] = orig_value + end + + published_message = Chef::JSON.from_json(@queue.published_message) + published_message.should == {"action" => "add", "payload" => {"item" => @item_as_hash, + "type" => "indexable_test_harness", + "database" => "couchdb@localhost,etc.", + "id" => "0000000-1111-2222-3333-444444444444", + "enqueued_at" => @now.utc.to_i}} + @queue.publish_options[:persistent].should == true + end + + it "adds items to the index transactionally when Chef::Config[:persistent_queue] == true and rolls it back when there is a failure" do + @amqp_client.should_receive(:tx_select) + @amqp_client.should_receive(:tx_rollback) + @amqp_client.should_not_receive(:tx_commit) + + # cause the publish to fail, and make sure the failure is our own + # by using a specific class + @queue.should_receive(:publish).and_raise(IndexQueueSpecError) + + # set and restore Chef::Config[:persistent_queue] to true + orig_value = Chef::Config[:persistent_queue] + Chef::Config[:persistent_queue] = true + begin + lambda{ + @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>"0000000-1111-2222-3333-444444444444") + }.should raise_error(IndexQueueSpecError) + ensure + Chef::Config[:persistent_queue] = orig_value + end + end + + it "removes items from the index" do + @amqp_client.should_not_receive(:tx_select) + @amqp_client.should_not_receive(:tx_commit) + @amqp_client.should_not_receive(:tx_rollback) + + @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>"0000000-1111-2222-3333-444444444444") + published_message = Chef::JSON.from_json(@queue.published_message) + published_message.should == {"action" => "delete", "payload" => { "item" => @item_as_hash, + "type" => "indexable_test_harness", + "database" => "couchdb2@localhost", + "id" => "0000000-1111-2222-3333-444444444444", + "enqueued_at" => @now.utc.to_i}} + @queue.publish_options[:persistent].should == false + end - it "removes items from the index" do - @queue = FauxQueue.new - @publisher.should_receive(:queue_for_object).with("0000000-1111-2222-3333-444444444444").and_yield(@queue) - - @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>"0000000-1111-2222-3333-444444444444") - published_message = Chef::JSON.from_json(@queue.published_message) - published_message.should == {"action" => "delete", "payload" => { "item" => @item_as_hash, - "type" => "indexable_test_harness", - "database" => "couchdb2@localhost", - "id" => "0000000-1111-2222-3333-444444444444", - "enqueued_at" => @now.utc.to_i}} + it "removes items from the index transactionactionally when Chef::Config[:persistent_queue] == true" do + @amqp_client.should_receive(:tx_select) + @amqp_client.should_receive(:tx_commit) + @amqp_client.should_not_receive(:tx_rollback) + + # set and restore Chef::Config[:persistent_queue] to true + orig_value = Chef::Config[:persistent_queue] + Chef::Config[:persistent_queue] = true + begin + @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>"0000000-1111-2222-3333-444444444444") + ensure + Chef::Config[:persistent_queue] = orig_value + end + + published_message = Chef::JSON.from_json(@queue.published_message) + published_message.should == {"action" => "delete", "payload" => { "item" => @item_as_hash, + "type" => "indexable_test_harness", + "database" => "couchdb2@localhost", + "id" => "0000000-1111-2222-3333-444444444444", + "enqueued_at" => @now.utc.to_i}} + @queue.publish_options[:persistent].should == true + end + + it "remove items from the index transactionally when Chef::Config[:persistent_queue] == true and rolls it back when there is a failure" do + @amqp_client.should_receive(:tx_select) + @amqp_client.should_receive(:tx_rollback) + @amqp_client.should_not_receive(:tx_commit) + + # cause the publish to fail, and make sure the failure is our own + # by using a specific class + @queue.should_receive(:publish).and_raise(IndexQueueSpecError) + + # set and restore Chef::Config[:persistent_queue] to true + orig_value = Chef::Config[:persistent_queue] + Chef::Config[:persistent_queue] = true + begin + lambda{ + @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>"0000000-1111-2222-3333-444444444444") }.should raise_error(IndexQueueSpecError) + ensure + Chef::Config[:persistent_queue] = orig_value + end + end end - + end describe Chef::IndexQueue::Consumer do @@ -280,6 +388,3 @@ describe Chef::IndexQueue::AmqpClient do end end - - - |