diff options
author | Daniel DeLeo <dan@opscode.com> | 2010-09-02 22:41:48 -0700 |
---|---|---|
committer | Daniel DeLeo <dan@opscode.com> | 2010-09-09 15:48:56 -0700 |
commit | e0bb0f7fdb0d0441bf50850d299deb3414d5c430 (patch) | |
tree | 5338423d63c79ea56f6a13a5ca2d95ed7b72d277 | |
parent | c738308991207b02b581e668dfea10de24e73d88 (diff) | |
download | chef-e0bb0f7fdb0d0441bf50850d299deb3414d5c430.tar.gz |
[parallel-indexing] update the client to publish to parallel queues
The backend consumer is now based on a "vnode" design where there are
1024 queues distributed among N nodes. The publishers need to be aware
of this and publish items to the various queues based on the database
ids.
-rw-r--r-- | chef/lib/chef/index_queue/amqp_client.rb | 20 | ||||
-rw-r--r-- | chef/lib/chef/index_queue/indexable.rb | 15 | ||||
-rw-r--r-- | chef/spec/unit/index_queue_spec.rb | 87 |
3 files changed, 85 insertions, 37 deletions
diff --git a/chef/lib/chef/index_queue/amqp_client.rb b/chef/lib/chef/index_queue/amqp_client.rb index 8f88e1e592..be60c4574e 100644 --- a/chef/lib/chef/index_queue/amqp_client.rb +++ b/chef/lib/chef/index_queue/amqp_client.rb @@ -19,6 +19,8 @@ class Chef module IndexQueue class AmqpClient + VNODES = 1024 + include Singleton def initialize @@ -72,13 +74,21 @@ class Chef reset! end - def send_action(action, data) + def queue_for_object(obj_id) + vnode_tag = UUIDTools::UUID.parse(obj_id).to_i % VNODES + queue = amqp_client.queue("vnode-#{vnode_tag}") + retries = 0 begin - exchange.publish({"action" => action.to_s, "payload" => data}.to_json) - rescue Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET => e - Chef::Log.error("Disconnected from the AMQP Broker, cannot queue data to the indexer") + yield queue + rescue Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET disconnected! - raise e + if (retries += 1) < 2 + Chef::Log.info("Attempting to reconnect to the AMQP broker") + retry + else + Chef::Log.fatal("Could not re-connect to the AMQP broker, giving up") + raise + end end end diff --git a/chef/lib/chef/index_queue/indexable.rb b/chef/lib/chef/index_queue/indexable.rb index f027497f13..c01bb0f748 100644 --- a/chef/lib/chef/index_queue/indexable.rb +++ b/chef/lib/chef/index_queue/indexable.rb @@ -55,7 +55,8 @@ class Chef with_metadata["type"] ||= self.index_object_type with_metadata["id"] ||= self.index_id with_metadata["database"] ||= Chef::Config[:couchdb_database] - with_metadata["item"] ||= self + with_metadata["item"] ||= self.to_hash + with_metadata["enqueued_at"] ||= Time.now.utc.to_i raise ArgumentError, "Type, Id, or Database missing in index operation: #{with_metadata.inspect}" if (with_metadata["id"].nil? or with_metadata["type"].nil?) with_metadata @@ -63,12 +64,20 @@ class Chef def add_to_index(metadata={}) Chef::Log.debug("pushing item to index queue for addition: #{self.with_indexer_metadata(metadata)}") - AmqpClient.instance.send_action(:add, self.with_indexer_metadata(metadata)) + object_with_metadata = with_indexer_metadata(metadata) + obj_id = object_with_metadata["id"] + AmqpClient.instance.queue_for_object(obj_id) do |queue| + queue.publish({:action => :add, :payload => self.with_indexer_metadata(metadata)}.to_json) + end end def delete_from_index(metadata={}) Chef::Log.debug("pushing item to index queue for deletion: #{self.with_indexer_metadata(metadata)}") - AmqpClient.instance.send_action(:delete, self.with_indexer_metadata(metadata)) + object_with_metadata = with_indexer_metadata(metadata) + obj_id = object_with_metadata["id"] + AmqpClient.instance.queue_for_object(obj_id) do |queue| + queue.publish({:action => :delete, :payload => self.with_indexer_metadata(metadata)}.to_json) + end end end diff --git a/chef/spec/unit/index_queue_spec.rb b/chef/spec/unit/index_queue_spec.rb index 18fe4b15f2..a5250da6b2 100644 --- a/chef/spec/unit/index_queue_spec.rb +++ b/chef/spec/unit/index_queue_spec.rb @@ -29,6 +29,24 @@ class Chef def index_id=(value) @index_id = value end + + def to_hash + {"ohai_world" => "I am IndexableTestHarness", "object_id" => object_id} + end + + end +end + +class FauxQueue + + attr_reader :published_message + + def initialize + @published_message = :epic_fail! + end + + def publish(message) + @published_message = message end end @@ -57,6 +75,10 @@ describe Chef::IndexQueue::Indexable do Chef::IndexableTestHarness.reset_index_metadata! @publisher = Chef::IndexQueue::AmqpClient.instance @indexable_obj = Chef::IndexableTestHarness.new + @item_as_hash = {"ohai_world" => "I am IndexableTestHarness", "object_id" => @indexable_obj.object_id} + + @now = Time.now + Time.stub!(:now).and_return(@now) end it "downcases the class name for the index_object_type when it's not explicitly set" do @@ -70,12 +92,13 @@ describe Chef::IndexQueue::Indexable do it "adds 'database', 'type', and 'id' (UUID) keys to the published object" do with_metadata = @indexable_obj.with_indexer_metadata(:database => "foo", :id=>UUIDTools::UUID.random_create.to_s) - with_metadata.should have(4).keys - with_metadata.keys.should include("type", "id", "item", "database") + with_metadata.should have(5).keys + with_metadata.keys.should include("type", "id", "item", "database", "enqueued_at") with_metadata["type"].should == "indexable_test_harness" with_metadata["database"].should == "foo" - with_metadata["item"].should == @indexable_obj + with_metadata["item"].should == @item_as_hash with_metadata["id"].should match(a_uuid) + with_metadata["enqueued_at"].should == @now.utc.to_i end it "uses the couchdb_id if available" do @@ -85,20 +108,29 @@ describe Chef::IndexQueue::Indexable do metadata_id.should == expected_uuid end - it "sends ``add'' actions" do - @publisher.should_receive(:send_action).with(:add, {"item" => @indexable_obj, - "type" => "indexable_test_harness", - "database" => "couchdb@localhost,etc.", - "id" => an_instance_of(String)}) - @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>UUIDTools::UUID.random_create.to_s) + it "adds items to the index" do + @queue = FauxQueue.new + @publisher.should_receive(:queue_for_object).with("0000000-1111-2222-3333-444444444444").and_yield(@queue) + @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>"0000000-1111-2222-3333-444444444444") + published_message = JSON.parse(@queue.published_message) + published_message.should == {"action" => "add", "payload" => {"item" => @item_as_hash, + "type" => "indexable_test_harness", + "database" => "couchdb@localhost,etc.", + "id" => "0000000-1111-2222-3333-444444444444", + "enqueued_at" => @now.utc.to_i}} end - it "sends ``delete'' actions" do - @publisher.should_receive(:send_action).with(:delete, { "item" => @indexable_obj, - "type" => "indexable_test_harness", - "database" => "couchdb2@localhost", - "id" => an_instance_of(String)}) - @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>UUIDTools::UUID.random_create.to_s) + it "removes items from the index" do + @queue = FauxQueue.new + @publisher.should_receive(:queue_for_object).with("0000000-1111-2222-3333-444444444444").and_yield(@queue) + + @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>"0000000-1111-2222-3333-444444444444") + published_message = JSON.parse(@queue.published_message) + published_message.should == {"action" => "delete", "payload" => { "item" => @item_as_hash, + "type" => "indexable_test_harness", + "database" => "couchdb2@localhost", + "id" => "0000000-1111-2222-3333-444444444444", + "enqueued_at" => @now.utc.to_i}} end end @@ -195,31 +227,25 @@ describe Chef::IndexQueue::AmqpClient do describe "publishing" do before do + @queue = FauxQueue.new @amqp_client.stub!(:qos) + @amqp_client.stub!(:queue).and_return(@queue) @data = {"some_data" => "in_a_hash"} end - it "publishes an action to the exchange" do - @exchange.should_receive(:publish).with({"action" => "hot_chef_on_queue", "payload" => @data}.to_json) - @publisher.send_action(:hot_chef_on_queue, @data) - end - it "resets the client upon a Bunny::ServerDownError when publishing" do - @exchange.should_receive(:publish).and_raise(Bunny::ServerDownError) - @publisher.should_receive(:disconnected!).twice - lambda {@publisher.send_action(:hot_chef_on_queue, @data)}.should raise_error(Bunny::ServerDownError) + @publisher.should_receive(:disconnected!).at_least(3).times + lambda {@publisher.queue_for_object("00000000-1111-2222-3333-444444444444") {|q| raise Bunny::ServerDownError}}.should raise_error(Bunny::ServerDownError) end it "resets the client upon a Bunny::ConnectionError when publishing" do - @exchange.should_receive(:publish).and_raise(Bunny::ConnectionError) - @publisher.should_receive(:disconnected!).twice - lambda {@publisher.send_action(:hot_chef_on_queue, @data)}.should raise_error(Bunny::ConnectionError) + @publisher.should_receive(:disconnected!).at_least(3).times + lambda {@publisher.queue_for_object("00000000-1111-2222-3333-444444444444") {|q| raise Bunny::ConnectionError}}.should raise_error(Bunny::ConnectionError) end it "resets the client upon a Errno::ECONNRESET when publishing" do - @exchange.should_receive(:publish).and_raise(Errno::ECONNRESET) - @publisher.should_receive(:disconnected!).twice - lambda {@publisher.send_action(:hot_chef_on_queue, @data)}.should raise_error(Errno::ECONNRESET) + @publisher.should_receive(:disconnected!).at_least(3).times + lambda {@publisher.queue_for_object("00000000-1111-2222-3333-444444444444") {|q| raise Errno::ECONNRESET}}.should raise_error(Errno::ECONNRESET) end end @@ -258,3 +284,6 @@ describe Chef::IndexQueue::AmqpClient do end end + + + |