summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel DeLeo <dan@opscode.com>2010-09-02 22:41:48 -0700
committersdelano <stephen@opscode.com>2010-11-04 14:00:51 -0700
commit9c1f7c6969301491979651b62926d8ebdcfe6a2c (patch)
treeaff33c47cbdc5212c7abb5f6d9f76ddfc132a52e
parenta1725f9de1d8495583f361a005af022ac9d1e77d (diff)
downloadchef-9c1f7c6969301491979651b62926d8ebdcfe6a2c.tar.gz
[parallel-indexing] update the client to publish to parallel queues
The backend consumer is now based on a "vnode" design where there are 1024 queues distributed among N nodes. The publishers need to be aware of this and publish items to the various queues based on the database ids.
-rw-r--r--chef/lib/chef/index_queue/amqp_client.rb8
-rw-r--r--chef/lib/chef/index_queue/indexable.rb13
-rw-r--r--chef/spec/unit/index_queue_spec.rb80
3 files changed, 69 insertions, 32 deletions
diff --git a/chef/lib/chef/index_queue/amqp_client.rb b/chef/lib/chef/index_queue/amqp_client.rb
index c1aa805795..0666a10833 100644
--- a/chef/lib/chef/index_queue/amqp_client.rb
+++ b/chef/lib/chef/index_queue/amqp_client.rb
@@ -19,6 +19,8 @@
class Chef
module IndexQueue
class AmqpClient
+ VNODES = 1024
+
include Singleton
def initialize
@@ -73,10 +75,12 @@ class Chef
reset!
end
- def send_action(action, data)
+ def queue_for_object(obj_id)
+ vnode_tag = UUIDTools::UUID.parse(obj_id).to_i % VNODES
+ queue = amqp_client.queue("vnode-#{vnode_tag}")
retries = 0
begin
- exchange.publish({"action" => action.to_s, "payload" => data}.to_json)
+ yield queue
rescue Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET
disconnected!
if (retries += 1) < 2
diff --git a/chef/lib/chef/index_queue/indexable.rb b/chef/lib/chef/index_queue/indexable.rb
index 1413a8c583..c01bb0f748 100644
--- a/chef/lib/chef/index_queue/indexable.rb
+++ b/chef/lib/chef/index_queue/indexable.rb
@@ -56,6 +56,7 @@ class Chef
with_metadata["id"] ||= self.index_id
with_metadata["database"] ||= Chef::Config[:couchdb_database]
with_metadata["item"] ||= self.to_hash
+ with_metadata["enqueued_at"] ||= Time.now.utc.to_i
raise ArgumentError, "Type, Id, or Database missing in index operation: #{with_metadata.inspect}" if (with_metadata["id"].nil? or with_metadata["type"].nil?)
with_metadata
@@ -63,12 +64,20 @@ class Chef
def add_to_index(metadata={})
Chef::Log.debug("pushing item to index queue for addition: #{self.with_indexer_metadata(metadata)}")
- AmqpClient.instance.send_action(:add, self.with_indexer_metadata(metadata))
+ object_with_metadata = with_indexer_metadata(metadata)
+ obj_id = object_with_metadata["id"]
+ AmqpClient.instance.queue_for_object(obj_id) do |queue|
+ queue.publish({:action => :add, :payload => self.with_indexer_metadata(metadata)}.to_json)
+ end
end
def delete_from_index(metadata={})
Chef::Log.debug("pushing item to index queue for deletion: #{self.with_indexer_metadata(metadata)}")
- AmqpClient.instance.send_action(:delete, self.with_indexer_metadata(metadata))
+ object_with_metadata = with_indexer_metadata(metadata)
+ obj_id = object_with_metadata["id"]
+ AmqpClient.instance.queue_for_object(obj_id) do |queue|
+ queue.publish({:action => :delete, :payload => self.with_indexer_metadata(metadata)}.to_json)
+ end
end
end
diff --git a/chef/spec/unit/index_queue_spec.rb b/chef/spec/unit/index_queue_spec.rb
index 03be92b918..254a8a8e1c 100644
--- a/chef/spec/unit/index_queue_spec.rb
+++ b/chef/spec/unit/index_queue_spec.rb
@@ -31,12 +31,26 @@ class Chef
end
def to_hash
- {:ohai_world => "I am IndexableTestHarness", :object_id => object_id}
+ {"ohai_world" => "I am IndexableTestHarness", "object_id" => object_id}
end
end
end
+class FauxQueue
+
+ attr_reader :published_message
+
+ def initialize
+ @published_message = :epic_fail!
+ end
+
+ def publish(message)
+ @published_message = message
+
+ end
+end
+
class IndexConsumerTestHarness
include Chef::IndexQueue::Consumer
@@ -62,7 +76,10 @@ describe Chef::IndexQueue::Indexable do
Chef::IndexableTestHarness.reset_index_metadata!
@publisher = Chef::IndexQueue::AmqpClient.instance
@indexable_obj = Chef::IndexableTestHarness.new
- @item_as_hash = {:ohai_world => "I am IndexableTestHarness", :object_id => @indexable_obj.object_id}
+ @item_as_hash = {"ohai_world" => "I am IndexableTestHarness", "object_id" => @indexable_obj.object_id}
+
+ @now = Time.now
+ Time.stub!(:now).and_return(@now)
end
it "downcases the class name for the index_object_type when it's not explicitly set" do
@@ -76,12 +93,13 @@ describe Chef::IndexQueue::Indexable do
it "adds 'database', 'type', and 'id' (UUID) keys to the published object" do
with_metadata = @indexable_obj.with_indexer_metadata(:database => "foo", :id=>UUIDTools::UUID.random_create.to_s)
- with_metadata.should have(4).keys
- with_metadata.keys.should include("type", "id", "item", "database")
+ with_metadata.should have(5).keys
+ with_metadata.keys.should include("type", "id", "item", "database", "enqueued_at")
with_metadata["type"].should == "indexable_test_harness"
with_metadata["database"].should == "foo"
with_metadata["item"].should == @item_as_hash
with_metadata["id"].should match(a_uuid)
+ with_metadata["enqueued_at"].should == @now.utc.to_i
end
it "uses the couchdb_id if available" do
@@ -90,21 +108,30 @@ describe Chef::IndexQueue::Indexable do
metadata_id = @indexable_obj.with_indexer_metadata["id"]
metadata_id.should == expected_uuid
end
-
- it "sends ``add'' actions" do
- @publisher.should_receive(:send_action).with(:add, {"item" => @item_as_hash,
- "type" => "indexable_test_harness",
- "database" => "couchdb@localhost,etc.",
- "id" => an_instance_of(String)})
- @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>UUIDTools::UUID.random_create.to_s)
+
+ it "adds items to the index" do
+ @queue = FauxQueue.new
+ @publisher.should_receive(:queue_for_object).with("0000000-1111-2222-3333-444444444444").and_yield(@queue)
+ @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>"0000000-1111-2222-3333-444444444444")
+ published_message = JSON.parse(@queue.published_message)
+ published_message.should == {"action" => "add", "payload" => {"item" => @item_as_hash,
+ "type" => "indexable_test_harness",
+ "database" => "couchdb@localhost,etc.",
+ "id" => "0000000-1111-2222-3333-444444444444",
+ "enqueued_at" => @now.utc.to_i}}
end
- it "sends ``delete'' actions" do
- @publisher.should_receive(:send_action).with(:delete, { "item" => @item_as_hash,
- "type" => "indexable_test_harness",
- "database" => "couchdb2@localhost",
- "id" => an_instance_of(String)})
- @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>UUIDTools::UUID.random_create.to_s)
+ it "removes items from the index" do
+ @queue = FauxQueue.new
+ @publisher.should_receive(:queue_for_object).with("0000000-1111-2222-3333-444444444444").and_yield(@queue)
+
+ @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>"0000000-1111-2222-3333-444444444444")
+ published_message = JSON.parse(@queue.published_message)
+ published_message.should == {"action" => "delete", "payload" => { "item" => @item_as_hash,
+ "type" => "indexable_test_harness",
+ "database" => "couchdb2@localhost",
+ "id" => "0000000-1111-2222-3333-444444444444",
+ "enqueued_at" => @now.utc.to_i}}
end
end
@@ -201,31 +228,25 @@ describe Chef::IndexQueue::AmqpClient do
describe "publishing" do
before do
+ @queue = FauxQueue.new
@amqp_client.stub!(:qos)
+ @amqp_client.stub!(:queue).and_return(@queue)
@data = {"some_data" => "in_a_hash"}
end
- it "publishes an action to the exchange" do
- @exchange.should_receive(:publish).with({"action" => "hot_chef_on_queue", "payload" => @data}.to_json)
- @publisher.send_action(:hot_chef_on_queue, @data)
- end
-
it "resets the client upon a Bunny::ServerDownError when publishing" do
- @exchange.should_receive(:publish).twice.and_raise(Bunny::ServerDownError)
@publisher.should_receive(:disconnected!).at_least(3).times
- lambda {@publisher.send_action(:hot_chef_on_queue, @data)}.should raise_error(Bunny::ServerDownError)
+ lambda {@publisher.queue_for_object("00000000-1111-2222-3333-444444444444") {|q| raise Bunny::ServerDownError}}.should raise_error(Bunny::ServerDownError)
end
it "resets the client upon a Bunny::ConnectionError when publishing" do
- @exchange.should_receive(:publish).twice.and_raise(Bunny::ConnectionError)
@publisher.should_receive(:disconnected!).at_least(3).times
- lambda {@publisher.send_action(:hot_chef_on_queue, @data)}.should raise_error(Bunny::ConnectionError)
+ lambda {@publisher.queue_for_object("00000000-1111-2222-3333-444444444444") {|q| raise Bunny::ConnectionError}}.should raise_error(Bunny::ConnectionError)
end
it "resets the client upon a Errno::ECONNRESET when publishing" do
- @exchange.should_receive(:publish).twice.and_raise(Errno::ECONNRESET)
@publisher.should_receive(:disconnected!).at_least(3).times
- lambda {@publisher.send_action(:hot_chef_on_queue, @data)}.should raise_error(Errno::ECONNRESET)
+ lambda {@publisher.queue_for_object("00000000-1111-2222-3333-444444444444") {|q| raise Errno::ECONNRESET}}.should raise_error(Errno::ECONNRESET)
end
end
@@ -264,3 +285,6 @@ describe Chef::IndexQueue::AmqpClient do
end
end
+
+
+