summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Walters <cw@opscode.com>2010-12-08 14:12:44 -0800
committerDaniel DeLeo <dan@opscode.com>2011-02-11 15:57:41 -0800
commitaa1e2da7391f7739cc3bd5c38ecd9fb4d4c28be2 (patch)
tree0f6c059f8e9f598467d13b281cafc0b4bd607d2a
parent6e4cbe9eccea89f7bd48e7952fe7b5c04b4393b1 (diff)
downloadchef-aa1e2da7391f7739cc3bd5c38ecd9fb4d4c28be2.tar.gz
Add optional rabbitmq persistence
-rw-r--r--chef/lib/chef/index_queue/amqp_client.rb1
-rw-r--r--chef/lib/chef/index_queue/indexable.rb43
-rw-r--r--chef/spec/spec_helper.rb3
-rw-r--r--chef/spec/unit/index_queue_spec.rb163
4 files changed, 168 insertions, 42 deletions
diff --git a/chef/lib/chef/index_queue/amqp_client.rb b/chef/lib/chef/index_queue/amqp_client.rb
index dbc5769e81..a7d155f4d1 100644
--- a/chef/lib/chef/index_queue/amqp_client.rb
+++ b/chef/lib/chef/index_queue/amqp_client.rb
@@ -113,3 +113,4 @@ class Chef
end
end
end
+
diff --git a/chef/lib/chef/index_queue/indexable.rb b/chef/lib/chef/index_queue/indexable.rb
index 03fb0916f1..498539f7d4 100644
--- a/chef/lib/chef/index_queue/indexable.rb
+++ b/chef/lib/chef/index_queue/indexable.rb
@@ -64,27 +64,46 @@ class Chef
raise ArgumentError, "Type, Id, or Database missing in index operation: #{with_metadata.inspect}" if (with_metadata["id"].nil? or with_metadata["type"].nil?)
with_metadata
end
-
+
def add_to_index(metadata={})
- Chef::Log.debug("pushing item to index queue for addition: #{self.with_indexer_metadata(metadata)}")
- object_with_metadata = with_indexer_metadata(metadata)
- obj_id = object_with_metadata["id"]
- AmqpClient.instance.queue_for_object(obj_id) do |queue|
- obj = {:action => :add, :payload => self.with_indexer_metadata(metadata)}
- queue.publish(Chef::JSON.to_json(obj))
- end
+ Chef::Log.debug("pushing item to index queue for addition: #{self.with_indexer_metadata(metadata)}")
+ object_with_metadata = with_indexer_metadata(metadata)
+ obj_id = object_with_metadata["id"]
+ obj = {:action => :add, :payload => self.with_indexer_metadata(metadata)}
+
+ publish_object(obj_id, obj)
end
def delete_from_index(metadata={})
Chef::Log.debug("pushing item to index queue for deletion: #{self.with_indexer_metadata(metadata)}")
object_with_metadata = with_indexer_metadata(metadata)
obj_id = object_with_metadata["id"]
- AmqpClient.instance.queue_for_object(obj_id) do |queue|
- obj = {:action => :delete, :payload => self.with_indexer_metadata(metadata)}
- queue.publish(Chef::JSON.to_json(obj))
+ obj = {:action => :delete, :payload => self.with_indexer_metadata(metadata)}
+
+ publish_object(obj_id, obj)
+ end
+
+ private
+
+ # Uses the publisher to update the object's queue. If
+ # Chef::Config[:persistent_queue] is true, the update is wrapped
+ # in a transaction.
+ def publish_object(object_id, object)
+ publisher = AmqpClient.instance
+ begin
+ publisher.amqp_client.tx_select if Chef::Config[:persistent_queue]
+ publisher.queue_for_object(object_id) do |queue|
+ queue.publish(Chef::JSON.to_json(object), :persistent => Chef::Config[:persistent_queue])
+ end
+ publisher.amqp_client.tx_commit if Chef::Config[:persistent_queue]
+ rescue
+ publisher.amqp_client.tx_rollback if Chef::Config[:persistent_queue]
+ raise
end
+
+ true
end
-
+
end
end
end
diff --git a/chef/spec/spec_helper.rb b/chef/spec/spec_helper.rb
index c0e05cf7f0..332a8c809d 100644
--- a/chef/spec/spec_helper.rb
+++ b/chef/spec/spec_helper.rb
@@ -44,7 +44,8 @@ Dir[File.join(File.dirname(__FILE__), 'lib', '**', '*.rb')].sort.each { |lib| re
Chef::Config[:log_level] = :fatal
Chef::Config[:cache_type] = "Memory"
-Chef::Config[:cache_options] = { }
+Chef::Config[:cache_options] = { }
+Chef::Config[:persistent_queue] = false
Chef::Log.level(Chef::Config.log_level)
Chef::Config.solo(false)
diff --git a/chef/spec/unit/index_queue_spec.rb b/chef/spec/unit/index_queue_spec.rb
index 0aac03e5d4..43c7325f4f 100644
--- a/chef/spec/unit/index_queue_spec.rb
+++ b/chef/spec/unit/index_queue_spec.rb
@@ -37,17 +37,22 @@ class Chef
end
end
+class IndexQueueSpecError < RuntimeError ; end
+
class FauxQueue
- attr_reader :published_message
+ attr_reader :published_message, :publish_options
+ # Note: If publish is not called, this published_message will cause
+ # JSON parsing to die with "can't convert Symbol into String"
def initialize
@published_message = :epic_fail!
+ @publish_options = :epic_fail!
end
- def publish(message)
+ def publish(message, options=nil)
@published_message = message
-
+ @publish_options = options
end
end
@@ -109,31 +114,134 @@ describe Chef::IndexQueue::Indexable do
metadata_id.should == expected_uuid
end
- it "adds items to the index" do
- @queue = FauxQueue.new
- @publisher.should_receive(:queue_for_object).with("0000000-1111-2222-3333-444444444444").and_yield(@queue)
- @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>"0000000-1111-2222-3333-444444444444")
- published_message = Chef::JSON.from_json(@queue.published_message)
- published_message.should == {"action" => "add", "payload" => {"item" => @item_as_hash,
- "type" => "indexable_test_harness",
- "database" => "couchdb@localhost,etc.",
- "id" => "0000000-1111-2222-3333-444444444444",
- "enqueued_at" => @now.utc.to_i}}
- end
+ describe "adds and removes items to and from the index and respects Chef::Config[:persistent_queue]" do
+ before do
+ @exchange = mock("Bunny::Exchange")
+ @amqp_client = mock("Bunny::Client", :start => true, :exchange => @exchange)
+ @publisher.stub!(:amqp_client).and_return(@amqp_client)
+ @queue = FauxQueue.new
+ @publisher.should_receive(:queue_for_object).with("0000000-1111-2222-3333-444444444444").and_yield(@queue)
+ end
+
+ it "adds items to the index" do
+ @amqp_client.should_not_receive(:tx_select)
+ @amqp_client.should_not_receive(:tx_commit)
+ @amqp_client.should_not_receive(:tx_rollback)
+
+ @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>"0000000-1111-2222-3333-444444444444")
+
+ published_message = Chef::JSON.from_json(@queue.published_message)
+ published_message.should == {"action" => "add", "payload" => {"item" => @item_as_hash,
+ "type" => "indexable_test_harness",
+ "database" => "couchdb@localhost,etc.",
+ "id" => "0000000-1111-2222-3333-444444444444",
+ "enqueued_at" => @now.utc.to_i}}
+ @queue.publish_options[:persistent].should == false
+ end
+
+ it "adds items to the index transactionactionally when Chef::Config[:persistent_queue] == true" do
+ @amqp_client.should_receive(:tx_select)
+ @amqp_client.should_receive(:tx_commit)
+ @amqp_client.should_not_receive(:tx_rollback)
+
+ # set and restore Chef::Config[:persistent_queue] to true
+ orig_value = Chef::Config[:persistent_queue]
+ Chef::Config[:persistent_queue] = true
+ begin
+ @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>"0000000-1111-2222-3333-444444444444")
+ ensure
+ Chef::Config[:persistent_queue] = orig_value
+ end
+
+ published_message = Chef::JSON.from_json(@queue.published_message)
+ published_message.should == {"action" => "add", "payload" => {"item" => @item_as_hash,
+ "type" => "indexable_test_harness",
+ "database" => "couchdb@localhost,etc.",
+ "id" => "0000000-1111-2222-3333-444444444444",
+ "enqueued_at" => @now.utc.to_i}}
+ @queue.publish_options[:persistent].should == true
+ end
+
+ it "adds items to the index transactionally when Chef::Config[:persistent_queue] == true and rolls it back when there is a failure" do
+ @amqp_client.should_receive(:tx_select)
+ @amqp_client.should_receive(:tx_rollback)
+ @amqp_client.should_not_receive(:tx_commit)
+
+ # cause the publish to fail, and make sure the failure is our own
+ # by using a specific class
+ @queue.should_receive(:publish).and_raise(IndexQueueSpecError)
+
+ # set and restore Chef::Config[:persistent_queue] to true
+ orig_value = Chef::Config[:persistent_queue]
+ Chef::Config[:persistent_queue] = true
+ begin
+ lambda{
+ @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>"0000000-1111-2222-3333-444444444444")
+ }.should raise_error(IndexQueueSpecError)
+ ensure
+ Chef::Config[:persistent_queue] = orig_value
+ end
+ end
+
+ it "removes items from the index" do
+ @amqp_client.should_not_receive(:tx_select)
+ @amqp_client.should_not_receive(:tx_commit)
+ @amqp_client.should_not_receive(:tx_rollback)
+
+ @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>"0000000-1111-2222-3333-444444444444")
+ published_message = Chef::JSON.from_json(@queue.published_message)
+ published_message.should == {"action" => "delete", "payload" => { "item" => @item_as_hash,
+ "type" => "indexable_test_harness",
+ "database" => "couchdb2@localhost",
+ "id" => "0000000-1111-2222-3333-444444444444",
+ "enqueued_at" => @now.utc.to_i}}
+ @queue.publish_options[:persistent].should == false
+ end
- it "removes items from the index" do
- @queue = FauxQueue.new
- @publisher.should_receive(:queue_for_object).with("0000000-1111-2222-3333-444444444444").and_yield(@queue)
-
- @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>"0000000-1111-2222-3333-444444444444")
- published_message = Chef::JSON.from_json(@queue.published_message)
- published_message.should == {"action" => "delete", "payload" => { "item" => @item_as_hash,
- "type" => "indexable_test_harness",
- "database" => "couchdb2@localhost",
- "id" => "0000000-1111-2222-3333-444444444444",
- "enqueued_at" => @now.utc.to_i}}
+ it "removes items from the index transactionactionally when Chef::Config[:persistent_queue] == true" do
+ @amqp_client.should_receive(:tx_select)
+ @amqp_client.should_receive(:tx_commit)
+ @amqp_client.should_not_receive(:tx_rollback)
+
+ # set and restore Chef::Config[:persistent_queue] to true
+ orig_value = Chef::Config[:persistent_queue]
+ Chef::Config[:persistent_queue] = true
+ begin
+ @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>"0000000-1111-2222-3333-444444444444")
+ ensure
+ Chef::Config[:persistent_queue] = orig_value
+ end
+
+ published_message = Chef::JSON.from_json(@queue.published_message)
+ published_message.should == {"action" => "delete", "payload" => { "item" => @item_as_hash,
+ "type" => "indexable_test_harness",
+ "database" => "couchdb2@localhost",
+ "id" => "0000000-1111-2222-3333-444444444444",
+ "enqueued_at" => @now.utc.to_i}}
+ @queue.publish_options[:persistent].should == true
+ end
+
+ it "remove items from the index transactionally when Chef::Config[:persistent_queue] == true and rolls it back when there is a failure" do
+ @amqp_client.should_receive(:tx_select)
+ @amqp_client.should_receive(:tx_rollback)
+ @amqp_client.should_not_receive(:tx_commit)
+
+ # cause the publish to fail, and make sure the failure is our own
+ # by using a specific class
+ @queue.should_receive(:publish).and_raise(IndexQueueSpecError)
+
+ # set and restore Chef::Config[:persistent_queue] to true
+ orig_value = Chef::Config[:persistent_queue]
+ Chef::Config[:persistent_queue] = true
+ begin
+ lambda{
+ @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>"0000000-1111-2222-3333-444444444444") }.should raise_error(IndexQueueSpecError)
+ ensure
+ Chef::Config[:persistent_queue] = orig_value
+ end
+ end
end
-
+
end
describe Chef::IndexQueue::Consumer do
@@ -280,6 +388,3 @@ describe Chef::IndexQueue::AmqpClient do
end
end
-
-
-