summaryrefslogtreecommitdiff
path: root/chef-expander/lib/chef
diff options
context:
space:
mode:
Diffstat (limited to 'chef-expander/lib/chef')
-rw-r--r--chef-expander/lib/chef/expander.rb36
-rw-r--r--chef-expander/lib/chef/expander/cluster_supervisor.rb119
-rw-r--r--chef-expander/lib/chef/expander/configuration.rb261
-rw-r--r--chef-expander/lib/chef/expander/control.rb206
-rw-r--r--chef-expander/lib/chef/expander/flattener.rb79
-rw-r--r--chef-expander/lib/chef/expander/loggable.rb56
-rw-r--r--chef-expander/lib/chef/expander/node.rb177
-rw-r--r--chef-expander/lib/chef/expander/solrizer.rb275
-rw-r--r--chef-expander/lib/chef/expander/version.rb37
-rw-r--r--chef-expander/lib/chef/expander/vnode.rb106
-rw-r--r--chef-expander/lib/chef/expander/vnode_supervisor.rb265
-rw-r--r--chef-expander/lib/chef/expander/vnode_table.rb83
12 files changed, 1700 insertions, 0 deletions
diff --git a/chef-expander/lib/chef/expander.rb b/chef-expander/lib/chef/expander.rb
new file mode 100644
index 0000000000..9a58868a96
--- /dev/null
+++ b/chef-expander/lib/chef/expander.rb
@@ -0,0 +1,36 @@
+#
+# Author:: Daniel DeLeo (<dan@opscode.com>)
+# Author:: Seth Falcon (<seth@opscode.com>)
+# Author:: Chris Walters (<cw@opscode.com>)
+# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Chef
+ module Expander
+
+ # VNODES is the number of queues in rabbit that are available for subscribing.
+ # The name comes from riak, where the data ring (160bits) is chunked into
+ # many vnodes; vnodes outnumber physical nodes, so one node hosts several
+ # vnodes. That is the same design we use here.
+ #
+ # See the notes on topic queue benchmarking before adjusting this value.
+ VNODES = 1024
+
+ SHARED_CONTROL_QUEUE_NAME = "chef-search-control--shared"
+ BROADCAST_CONTROL_EXCHANGE_NAME = 'chef-search-control--broadcast'
+
+ end
+end
diff --git a/chef-expander/lib/chef/expander/cluster_supervisor.rb b/chef-expander/lib/chef/expander/cluster_supervisor.rb
new file mode 100644
index 0000000000..ccc5a9e730
--- /dev/null
+++ b/chef-expander/lib/chef/expander/cluster_supervisor.rb
@@ -0,0 +1,119 @@
+#
+# Author:: Daniel DeLeo (<dan@opscode.com>)
+# Author:: Seth Falcon (<seth@opscode.com>)
+# Author:: Chris Walters (<cw@opscode.com>)
+# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'chef/expander/loggable'
+require 'chef/expander/version'
+require 'chef/expander/configuration'
+require 'chef/expander/vnode_supervisor'
+
+module Chef
+ module Expander
+ #==ClusterSupervisor
+ # Manages a cluster of chef-expander processes. Usually this class will
+ # be instantiated from the chef-expander-cluster executable.
+ #
+ # ClusterSupervisor works by forking the desired number of processes, then
+ # running VNodeSupervisor.start_cluster_worker within the forked process.
+ # ClusterSupervisor keeps track of the process ids of its children, and will
+ # periodically attempt to reap them in a non-blocking call. If they are
+ # reaped, ClusterSupervisor knows they died and need to be respawned.
+ #
+ # The child processes are responsible for checking on the master process and
+ # dying if the master has died (VNodeSupervisor does this when started in
+ # with start_cluster_worker).
+ #
+ #===TODO:
+ # * This implementation currently assumes there is only one cluster, so it
+ # will claim all of the vnodes. It may be advantageous to allow multiple
+ # clusters.
+ # * There is no heartbeat implementation at this time, so a zombified child
+ # process will not be automatically killed--This behavior is left to the
+ # meatcloud for now.
+ class ClusterSupervisor
+ include Loggable
+
+ def initialize
+ @workers = {}
+ @running = true
+ @kill = :TERM
+ end
+
+ def start
+ trap(:INT) { stop(:INT) }
+ trap(:TERM) { stop(:TERM)}
+ Expander.init_config(ARGV)
+
+ log.info("Chef Expander #{Expander.version} starting cluster with #{Expander.config.node_count} nodes")
+
+ start_workers
+ maintain_workers
+ end
+
+ def start_workers
+ Expander.config.node_count.times do |i|
+ start_worker(i + 1)
+ end
+ end
+
+ def start_worker(index)
+ log.info { "Starting cluster worker #{index}" }
+ worker_params = {:index => index}
+ child_pid = fork do
+ Expander.config.index = index
+ VNodeSupervisor.start_cluster_worker
+ end
+ @workers[child_pid] = worker_params
+ end
+
+ def stop(signal)
+ log.info { "Stopping cluster on signal (#{signal})" }
+ @running = false
+ @kill = signal
+ end
+
+ def maintain_workers
+ while @running
+ sleep 1
+ workers_to_replace = {}
+ @workers.each do |process_id, worker_params|
+ if result = Process.waitpid2(process_id, Process::WNOHANG)
+ log.error { "worker #{worker_params[:index]} (PID: #{process_id}) died with status #{result[1].exitstatus || '(no status)'}"}
+ workers_to_replace[process_id] = worker_params
+ end
+ end
+ workers_to_replace.each do |dead_pid, worker_params|
+ @workers.delete(dead_pid)
+ start_worker(worker_params[:index])
+ end
+ end
+
+ @workers.each do |pid, worker_params|
+ log.info { "Stopping worker #{worker_params[:index]} (PID: #{pid})"}
+ Process.kill(@kill, pid)
+ end
+ @workers.each do |pid, worker_params|
+ Process.waitpid2(pid)
+ end
+
+ end
+
+ end
+ end
+end
diff --git a/chef-expander/lib/chef/expander/configuration.rb b/chef-expander/lib/chef/expander/configuration.rb
new file mode 100644
index 0000000000..1888c56811
--- /dev/null
+++ b/chef-expander/lib/chef/expander/configuration.rb
@@ -0,0 +1,261 @@
+#
+# Author:: Daniel DeLeo (<dan@opscode.com>)
+# Author:: Seth Falcon (<seth@opscode.com>)
+# Author:: Chris Walters (<cw@opscode.com>)
+# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'pp'
+require 'optparse'
+require 'singleton'
+
+require 'chef/expander/flattener'
+require 'chef/expander/loggable'
+require 'chef/expander/version'
+
+module Chef
+ module Expander
+
+ def self.config
+ @config ||= Configuration::Base.new
+ end
+
+ def self.init_config(argv)
+ config.apply_defaults
+ remaining_opts_after_parse = Configuration::CLI.parse_options(argv)
+ # Need to be able to override the default config file location on the command line
+ config_file_to_use = Configuration::CLI.config.config_file || config.config_file
+ config.merge_config(Configuration::Base.from_chef_compat_config(config_file_to_use))
+ # But for all other config options, the CLI config should win over config file
+ config.merge_config(Configuration::CLI.config)
+ remaining_opts_after_parse
+ end
+
+ class ChefCompatibleConfig
+
+ attr_reader :config_hash
+
+ def initialize
+ @config_hash = {}
+ end
+
+ def load(file)
+ file = File.expand_path(file)
+ instance_eval(IO.read(file), file, 1)
+ end
+
+ def method_missing(method_name, *args, &block)
+ if args.size == 1
+ @config_hash[method_name] = args.first
+ elsif args.empty?
+ @config_hash[method_name] or super
+ else
+ super
+ end
+ end
+
+ end
+
+ module Configuration
+
+ class InvalidConfiguration < StandardError
+ end
+
+ class Base
+
+ include Loggable
+
+ def self.from_chef_compat_config(file)
+ config = ChefCompatibleConfig.new
+ config.load(file)
+ from_hash(config.config_hash)
+ end
+
+ def self.from_hash(config_hash)
+ config = new
+ config_hash.each do |setting, value|
+ setter = "#{setting}=".to_sym
+ if config.respond_to?(setter)
+ config.send(setter, value)
+ end
+ end
+ config
+ end
+
+ def self.configurables
+ @configurables ||= []
+ end
+
+ def self.validations
+ @validations ||= []
+ end
+
+ def self.defaults
+ @defaults ||= {}
+ end
+
+ def self.configurable(setting, default=nil, &validation)
+ attr_accessor(setting)
+ configurables << setting
+ defaults[setting] = default
+ validations << validation if block_given?
+
+ setting
+ end
+
+ configurable :config_file, File.expand_path(File.dirname(__FILE__) + '/../../../conf/chef-expander.rb')
+
+ configurable :index do
+ invalid("You must specify this node's position in the ring as an integer") unless index.kind_of?(Integer)
+ invalid("The index cannot be larger than the cluster size (node-count)") unless (index <= node_count.to_i)
+ end
+
+ configurable :node_count do
+ invalid("You must specify the cluster size as an integer") unless node_count.kind_of?(Integer)
+ invalid("The cluster size (node-count) cannot be smaller than the index") unless node_count >= index.to_i
+ end
+
+ configurable :ps_tag, ""
+
+ configurable :solr_url, "http://localhost:8983"
+
+ configurable :amqp_host, '0.0.0.0'
+
+ configurable :amqp_port, '5672'
+
+ configurable :amqp_user, 'chef'
+
+ configurable :amqp_pass, 'testing'
+
+ configurable :amqp_vhost, '/chef'
+
+ configurable :log_level, :info
+
+ # override the setter for log_level to also actually set the level
+ def log_level=(level)
+ if level #don't accept nil for an answer
+ level = level.to_sym
+ Loggable::LOGGER.level = level
+ @log_level = log_level
+ end
+ level
+ end
+
+ def initialize
+ reset!
+ end
+
+ def reset!(stdout=nil)
+ self.class.configurables.each do |setting|
+ send("#{setting}=".to_sym, nil)
+ end
+ @stdout = stdout || STDOUT
+ end
+
+ def apply_defaults
+ self.class.defaults.each do |setting, value|
+ self.send("#{setting}=".to_sym, value)
+ end
+ end
+
+ def merge_config(other)
+ self.class.configurables.each do |setting|
+ value = other.send(setting)
+ self.send("#{setting}=".to_sym, value) if value
+ end
+ end
+
+ def fail_if_invalid
+ validate!
+ rescue InvalidConfiguration => e
+ @stdout.puts("Invalid configuration: #{e.message}")
+ exit(1)
+ end
+
+ def invalid(message)
+ raise InvalidConfiguration, message
+ end
+
+ def validate!
+ self.class.validations.each do |validation_proc|
+ instance_eval(&validation_proc)
+ end
+ end
+
+ def vnode_numbers
+ vnodes_per_node = VNODES / node_count
+ lower_bound = (index - 1) * vnodes_per_node
+ upper_bound = lower_bound + vnodes_per_node
+ upper_bound += VNODES % vnodes_per_node if index == node_count
+ (lower_bound...upper_bound).to_a
+ end
+
+ def amqp_config
+ {:host => amqp_host, :port => amqp_port, :user => amqp_user, :pass => amqp_pass, :vhost => amqp_vhost}
+ end
+
+ end
+
+ module CLI
+ @config = Configuration::Base.new
+
+ @option_parser = OptionParser.new do |o|
+ o.banner = "Usage: chef-expander [options]"
+
+ o.on('-c', '--config CONFIG_FILE', 'a configuration file to use') do |conf|
+ @config.config_file = File.expand_path(conf)
+ end
+
+ o.on('-i', '--index INDEX', 'the slot this node will occupy in the ring') do |i|
+ @config.index = i.to_i
+ end
+
+ o.on('-n', '--node-count NUMBER', 'the number of nodes in the ring') do |n|
+ @config.node_count = n.to_i
+ end
+
+ o.on('-l', '--log-level LOG_LEVEL', 'set the log level') do |l|
+ @config.log_level = l
+ end
+
+ o.on_tail('-h', '--help', 'show this message') do
+ puts "chef-expander #{Expander.version}"
+ puts ''
+ puts o
+ exit 1
+ end
+
+ o.on_tail('-v', '--version', 'show the version and exit') do
+ puts "chef-expander #{Expander.version}"
+ exit 0
+ end
+
+ end
+
+ def self.parse_options(argv)
+ @option_parser.parse!(argv.dup)
+ end
+
+ def self.config
+ @config
+ end
+
+ end
+
+ end
+
+ end
+end
diff --git a/chef-expander/lib/chef/expander/control.rb b/chef-expander/lib/chef/expander/control.rb
new file mode 100644
index 0000000000..f8e3e99503
--- /dev/null
+++ b/chef-expander/lib/chef/expander/control.rb
@@ -0,0 +1,206 @@
+#
+# Author:: Daniel DeLeo (<dan@opscode.com>)
+# Author:: Seth Falcon (<seth@opscode.com>)
+# Author:: Chris Walters (<cw@opscode.com>)
+# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'bunny'
+require 'yajl'
+require 'eventmachine'
+require 'amqp'
+require 'mq'
+require 'highline'
+
+require 'chef/expander/node'
+require 'chef/expander/configuration'
+
+require 'pp'
+
+module Chef
+ module Expander
+ class Control
+
+ def self.run(argv)
+ remaining_args_after_opts = Expander.init_config(ARGV)
+ new(remaining_args_after_opts).run
+ end
+
+ def self.desc(description)
+ @desc = description
+ end
+
+ def self.option(*args)
+ #TODO
+ end
+
+ def self.arg(*args)
+ #TODO
+ end
+
+ def self.descriptions
+ @descriptions ||= []
+ end
+
+ def self.method_added(method_name)
+ if @desc
+ descriptions << [method_name, method_name.to_s.gsub('_', '-'), @desc]
+ @desc = nil
+ end
+ end
+
+ #--
+ # TODO: this is confusing and unneeded. Just whitelist the methods
+ # that map to commands and use +send+
+ def self.compile
+ run_method = "def run; case @argv.first;"
+ descriptions.each do |method_name, command_name, desc|
+ run_method << "when '#{command_name}';#{method_name};"
+ end
+ run_method << "else; help; end; end;"
+ class_eval(run_method, __FILE__, __LINE__)
+ end
+
+ def initialize(argv)
+ @argv = argv.dup
+ end
+
+ desc "Show this message"
+ def help
+ puts "Chef Expander #{Expander.version}"
+ puts "Usage: chef-expanderctl COMMAND"
+ puts
+ puts "Commands:"
+ self.class.descriptions.each do |method_name, command_name, desc|
+ puts " #{command_name}".ljust(15) + desc
+ end
+ end
+
+ desc "display the aggregate queue backlog"
+ def queue_depth
+ h = HighLine.new
+ message_counts = []
+
+ amqp_client = Bunny.new(Expander.config.amqp_config)
+ amqp_client.start
+
+ 0.upto(VNODES - 1) do |vnode|
+ q = amqp_client.queue("vnode-#{vnode}", :durable => true)
+ message_counts << q.status[:message_count]
+ end
+ total_messages = message_counts.inject(0) { |sum, count| sum + count }
+ max = message_counts.max
+ min = message_counts.min
+
+ avg = total_messages.to_f / message_counts.size.to_f
+
+ puts " total messages: #{total_messages}"
+ puts " average queue depth: #{avg}"
+ puts " max queue depth: #{max}"
+ puts " min queue depth: #{min}"
+ ensure
+ amqp_client.stop if defined?(amqp_client) && amqp_client
+ end
+
+ desc "show the backlog and consumer count for each vnode queue"
+ def queue_status
+ h = HighLine.new
+ queue_status = [h.color("VNode", :bold), h.color("Messages", :bold), h.color("Consumers", :bold)]
+
+ total_messages = 0
+
+ amqp_client = Bunny.new(Expander.config.amqp_config)
+ amqp_client.start
+
+ 0.upto(VNODES - 1) do |vnode|
+ q = amqp_client.queue("vnode-#{vnode}", :durable => true)
+ status = q.status
+ # returns {:message_count => method.message_count, :consumer_count => method.consumer_count}
+ queue_status << vnode.to_s << status[:message_count].to_s << status[:consumer_count].to_s
+ total_messages += status[:message_count]
+ end
+ puts " total messages: #{total_messages}"
+ puts
+ puts h.list(queue_status, :columns_across, 3)
+ ensure
+ amqp_client.stop if defined?(amqp_client) && amqp_client
+ end
+
+ desc "show the status of the nodes in the cluster"
+ def node_status
+ status_mutex = Mutex.new
+ h = ::HighLine.new
+ node_status = [h.color("Host", :bold), h.color("PID", :bold), h.color("GUID", :bold), h.color("Vnodes", :bold)]
+
+ print("Collecting status info from the cluster...")
+
+ AMQP.start(Expander.config.amqp_config) do
+ node = Expander::Node.local_node
+ node.exclusive_control_queue.subscribe do |header, message|
+ status = Yajl::Parser.parse(message)
+ status_mutex.synchronize do
+ node_status << status["hostname_f"]
+ node_status << status["pid"].to_s
+ node_status << status["guid"]
+ # BIG ASSUMPTION HERE that nodes only have contiguous vnode ranges
+ # will not be true once vnode recovery is implemented
+ node_status << "#{status["vnodes"].min}-#{status["vnodes"].max}"
+ end
+ end
+ node.broadcast_message(Yajl::Encoder.encode(:action => :status, :rsvp => node.exclusive_control_queue_name))
+ EM.add_timer(2) { AMQP.stop;EM.stop }
+ end
+
+ puts "done"
+ puts
+ puts h.list(node_status, :columns_across, 4)
+ puts
+ end
+
+ desc "sets the log level of all nodes in the cluster"
+ def log_level
+ @argv.shift
+ level = @argv.first
+ acceptable_levels = %w{debug info warn error fatal}
+ unless acceptable_levels.include?(level)
+ puts "Log level must be one of #{acceptable_levels.join(', ')}"
+ exit 1
+ end
+
+ h = HighLine.new
+ response_mutex = Mutex.new
+
+ responses = [h.color("Host", :bold), h.color("PID", :bold), h.color("GUID", :bold), h.color("Log Level", :bold)]
+ AMQP.start(Expander.config.amqp_config) do
+ node = Expander::Node.local_node
+ node.exclusive_control_queue.subscribe do |header, message|
+ reply = Yajl::Parser.parse(message)
+ n = reply['node']
+ response_mutex.synchronize do
+ responses << n["hostname_f"] << n["pid"].to_s << n["guid"] << reply["level"]
+ end
+ end
+ node.broadcast_message(Yajl::Encoder.encode({:action => :set_log_level, :level => level, :rsvp => node.exclusive_control_queue_name}))
+ EM.add_timer(2) { AMQP.stop; EM.stop }
+ end
+ puts h.list(responses, :columns_across, 4)
+ end
+
+
+ compile
+ end
+ end
+end
diff --git a/chef-expander/lib/chef/expander/flattener.rb b/chef-expander/lib/chef/expander/flattener.rb
new file mode 100644
index 0000000000..90f7cd663e
--- /dev/null
+++ b/chef-expander/lib/chef/expander/flattener.rb
@@ -0,0 +1,79 @@
+#
+# Author:: Daniel DeLeo (<dan@opscode.com>)
+# Author:: Seth Falcon (<seth@opscode.com>)
+# Author:: Chris Walters (<cw@opscode.com>)
+# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'chef/expander/configuration'
+
+module Chef
+ module Expander
+ # Flattens and expands nested Hashes representing Chef objects
+ # (e.g, Nodes, Roles, DataBagItems, etc.) into flat Hashes so the
+ # objects are suitable to be saved into Solr. This code is more or
+ # less copy-pasted from chef/solr/index which may or may not be a
+ # great idea, though that does minimize the dependencies and
+ # hopefully minimize the memory use of chef-expander.
+ class Flattener
+ UNDERSCORE = '_'
+ X = 'X'
+
+ X_CHEF_id_CHEF_X = 'X_CHEF_id_CHEF_X'
+ X_CHEF_database_CHEF_X = 'X_CHEF_database_CHEF_X'
+ X_CHEF_type_CHEF_X = 'X_CHEF_type_CHEF_X'
+
+ def initialize(item)
+ @item = item
+ end
+
+ def flattened_item
+ @flattened_item || flatten_and_expand
+ end
+
+ def flatten_and_expand
+ @flattened_item = Hash.new {|hash, key| hash[key] = []}
+
+ @item.each do |key, value|
+ flatten_each([key.to_s], value)
+ end
+
+ @flattened_item.each_value { |values| values.uniq! }
+ @flattened_item
+ end
+
+ def flatten_each(keys, values)
+ case values
+ when Hash
+ values.each do |child_key, child_value|
+ add_field_value(keys, child_key)
+ flatten_each(keys + [child_key.to_s], child_value)
+ end
+ when Array
+ values.each { |child_value| flatten_each(keys, child_value) }
+ else
+ add_field_value(keys, values)
+ end
+ end
+
+ def add_field_value(keys, value)
+ value = value.to_s
+ @flattened_item[keys.join(UNDERSCORE)] << value
+ @flattened_item[keys.last] << value
+ end
+ end
+ end
+end
diff --git a/chef-expander/lib/chef/expander/loggable.rb b/chef-expander/lib/chef/expander/loggable.rb
new file mode 100644
index 0000000000..5bfb941ecb
--- /dev/null
+++ b/chef-expander/lib/chef/expander/loggable.rb
@@ -0,0 +1,56 @@
+#
+# Author:: Daniel DeLeo (<dan@opscode.com>)
+# Author:: Seth Falcon (<seth@opscode.com>)
+# Author:: Chris Walters (<cw@opscode.com>)
+# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'mixlib/log'
+
+module Chef
+ module Expander
+ module Loggable
+ class Logger
+ include Mixlib::Log
+
+ def init(*args)
+ @logger = nil
+ super
+ end
+
+ [:debug,:info,:warn,:error, :fatal].each do |level|
+ class_eval(<<-LOG_METHOD, __FILE__, __LINE__)
+ def #{level}(message=nil, &block)
+ @logger.#{level}(message, &block)
+ end
+ LOG_METHOD
+ end
+ end
+
+ # TODO: it's admittedly janky to set up the default logging this way.
+ STDOUT.sync = true
+ LOGGER = Logger.new
+ LOGGER.init
+ LOGGER.level = :debug
+
+ def log
+ LOGGER
+ end
+
+ end
+ end
+end
+
diff --git a/chef-expander/lib/chef/expander/node.rb b/chef-expander/lib/chef/expander/node.rb
new file mode 100644
index 0000000000..23249e8685
--- /dev/null
+++ b/chef-expander/lib/chef/expander/node.rb
@@ -0,0 +1,177 @@
+#
+# Author:: Daniel DeLeo (<dan@opscode.com>)
+# Author:: Seth Falcon (<seth@opscode.com>)
+# Author:: Chris Walters (<cw@opscode.com>)
+# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'uuidtools'
+require 'amqp'
+require 'mq'
+require 'open3'
+
+require 'chef/expander/loggable'
+
+module Chef
+ module Expander
+ class Node
+
+ include Loggable
+
+ def self.from_hash(node_info)
+ new(node_info[:guid], node_info[:hostname_f], node_info[:pid])
+ end
+
+ def self.local_node
+ new(guid, hostname_f, Process.pid)
+ end
+
+ def self.guid
+ return @guid if @guid
+ @guid = UUIDTools::UUID.random_create.to_s
+ end
+
+ def self.hostname_f
+ @hostname ||= Open3.popen3("hostname -f") {|stdin, stdout, stderr| stdout.read }.strip
+ end
+
+ attr_reader :guid
+
+ attr_reader :hostname_f
+
+ attr_reader :pid
+
+ def initialize(guid, hostname_f, pid)
+ @guid, @hostname_f, @pid = guid, hostname_f, pid
+ end
+
+ def start(&message_handler)
+ attach_to_queue(exclusive_control_queue, "exclusive control", &message_handler)
+ attach_to_queue(shared_control_queue, "shared_control", &message_handler)
+ attach_to_queue(broadcast_control_queue, "broadcast control", &message_handler)
+ end
+
+ def attach_to_queue(queue, colloquial_name, &message_handler)
+ queue.subscribe(:ack => true) do |headers, payload|
+ log.debug { "received message on #{colloquial_name} queue: #{payload}" }
+ message_handler.call(payload)
+ headers.ack
+ end
+ end
+
+ def stop
+ log.debug { "unsubscribing from broadcast control queue"}
+ broadcast_control_queue.unsubscribe(:nowait => false)
+
+ log.debug { "unsubscribing from shared control queue" }
+ shared_control_queue.unsubscribe(:nowait => false)
+
+ log.debug { "unsubscribing from exclusive control queue" }
+ exclusive_control_queue.unsubscribe(:nowait => false)
+ end
+
+ def direct_message(message)
+ log.debug { "publishing direct message to node #{identifier}: #{message}" }
+ exclusive_control_queue.publish(message)
+ end
+
+ def shared_message(message)
+ log.debug { "publishing shared message #{message}"}
+ shared_control_queue.publish(message)
+ end
+
+ def broadcast_message(message)
+ log.debug { "publishing broadcast message #{message}" }
+ broadcast_control_exchange.publish(message)
+ end
+
+ # The exclusive control queue is for point-to-point messaging, i.e.,
+ # messages directly addressed to this node
+ def exclusive_control_queue
+ @exclusive_control_queue ||= begin
+ log.debug { "declaring exclusive control queue #{exclusive_control_queue_name}" }
+ MQ.queue(exclusive_control_queue_name)
+ end
+ end
+
+ # The shared control queue is for 1 to (1 of N) messaging, i.e.,
+ # messages that can go to any one node.
+ def shared_control_queue
+ @shared_control_queue ||= begin
+ log.debug { "declaring shared control queue #{shared_control_queue_name}" }
+ MQ.queue(shared_control_queue_name)
+ end
+ end
+
+ # The broadcast control queue is for 1 to N messaging, i.e.,
+ # messages that go to every node
+ def broadcast_control_queue
+ @broadcast_control_queue ||= begin
+ log.debug { "declaring broadcast control queue #{broadcast_control_queue_name}"}
+ q = MQ.queue(broadcast_control_queue_name)
+ log.debug { "binding broadcast control queue to broadcast control exchange"}
+ q.bind(broadcast_control_exchange)
+ q
+ end
+ end
+
+ def broadcast_control_exchange
+ @broadcast_control_exchange ||= begin
+ log.debug { "declaring broadcast control exchange opscode-platfrom-control--broadcast" }
+ MQ.fanout(broadcast_control_exchange_name, :nowait => false)
+ end
+ end
+
+ def shared_control_queue_name
+ SHARED_CONTROL_QUEUE_NAME
+ end
+
+ def broadcast_control_queue_name
+ @broadcast_control_queue_name ||= "#{identifier}--broadcast"
+ end
+
+ def broadcast_control_exchange_name
+ BROADCAST_CONTROL_EXCHANGE_NAME
+ end
+
+ def exclusive_control_queue_name
+ @exclusive_control_queue_name ||= "#{identifier}--exclusive-control"
+ end
+
+ def identifier
+ "#{hostname_f}--#{pid}--#{guid}"
+ end
+
+ def ==(other)
+ other.respond_to?(:guid) && other.respond_to?(:hostname_f) && other.respond_to?(:pid) &&
+ (other.guid == guid) && (other.hostname_f == hostname_f) && (other.pid == pid)
+ end
+
+ def eql?(other)
+ (other.class == self.class) && (other.hash == hash)
+ end
+
+ def hash
+ identifier.hash
+ end
+
+ def to_hash
+ {:guid => @guid, :hostname_f => @hostname_f, :pid => @pid}
+ end
+
+ end
+ end
+end
diff --git a/chef-expander/lib/chef/expander/solrizer.rb b/chef-expander/lib/chef/expander/solrizer.rb
new file mode 100644
index 0000000000..1a6fed9521
--- /dev/null
+++ b/chef-expander/lib/chef/expander/solrizer.rb
@@ -0,0 +1,275 @@
+#
+# Author:: Daniel DeLeo (<dan@opscode.com>)
+# Author:: Seth Falcon (<seth@opscode.com>)
+# Author:: Chris Walters (<cw@opscode.com>)
+# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'set'
+require 'yajl'
+require 'fast_xs'
+require 'em-http-request'
+require 'chef/expander/loggable'
+require 'chef/expander/flattener'
+
+module Chef
+ module Expander
+ class Solrizer
+
+ @active_http_requests = Set.new
+
+ def self.http_request_started(instance)
+ @active_http_requests << instance
+ end
+
+ def self.http_request_completed(instance)
+ @active_http_requests.delete(instance)
+ end
+
+ def self.http_requests_active?
+ !@active_http_requests.empty?
+ end
+
+ def self.clear_http_requests
+ @active_http_requests.clear
+ end
+
+ include Loggable
+
+ ADD = "add"
+ DELETE = "delete"
+ SKIP = "skip"
+
+ ITEM = "item"
+ ID = "id"
+ TYPE = "type"
+ DATABASE = "database"
+ ENQUEUED_AT = "enqueued_at"
+
+ DATA_BAG_ITEM = "data_bag_item"
+ DATA_BAG = "data_bag"
+
+ X_CHEF_id_CHEF_X = 'X_CHEF_id_CHEF_X'
+ X_CHEF_database_CHEF_X = 'X_CHEF_database_CHEF_X'
+ X_CHEF_type_CHEF_X = 'X_CHEF_type_CHEF_X'
+
+ CONTENT_TYPE_XML = {"Content-Type" => "text/xml"}
+
+ attr_reader :action
+
+ attr_reader :indexer_payload
+
+ attr_reader :chef_object
+
+ attr_reader :obj_id
+
+ attr_reader :obj_type
+
+ attr_reader :database
+
+ attr_reader :enqueued_at
+
+ def initialize(object_command_json, &on_completion_block)
+ @start_time = Time.now.to_f
+ @on_completion_block = on_completion_block
+ if parsed_message = parse(object_command_json)
+ @action = parsed_message["action"]
+ @indexer_payload = parsed_message["payload"]
+
+ extract_object_fields if @indexer_payload
+ else
+ @action = SKIP
+ end
+ end
+
+ def extract_object_fields
+ @chef_object = @indexer_payload[ITEM]
+ @database = @indexer_payload[DATABASE]
+ @obj_id = @indexer_payload[ID]
+ @obj_type = @indexer_payload[TYPE]
+ @enqueued_at = @indexer_payload[ENQUEUED_AT]
+ @data_bag = @obj_type == DATA_BAG_ITEM ? @chef_object[DATA_BAG] : nil
+ end
+
+ def parse(serialized_object)
+ Yajl::Parser.parse(serialized_object)
+ rescue Yajl::ParseError
+ log.error { "cannot index object because it is invalid JSON: #{serialized_object}" }
+ end
+
+ def run
+ case @action
+ when ADD
+ add
+ when DELETE
+ delete
+ when SKIP
+ completed
+ log.info { "not indexing this item because of malformed JSON"}
+ else
+ completed
+ log.error { "cannot index object becuase it has an invalid action #{@action}" }
+ end
+ end
+
+ def add
+ post_to_solr(pointyize_add) do
+ ["indexed #{indexed_object}",
+ "transit,xml,solr-post |",
+ [transit_time, @xml_time, @solr_post_time].join(","),
+ "|"
+ ].join(" ")
+ end
+ rescue Exception => e
+ log.error { "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}"}
+ end
+
+ def delete
+ post_to_solr(pointyize_delete) { "deleted #{indexed_object} transit-time[#{transit_time}s]"}
+ rescue Exception => e
+ log.error { "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}"}
+ end
+
+ def flattened_object
+ flattened_object = Flattener.new(@chef_object).flattened_item
+
+ flattened_object[X_CHEF_id_CHEF_X] = [@obj_id]
+ flattened_object[X_CHEF_database_CHEF_X] = [@database]
+ flattened_object[X_CHEF_type_CHEF_X] = [@obj_type]
+
+ log.debug {"adding flattened object to Solr: #{flattened_object.inspect}"}
+
+ flattened_object
+ end
+
+ START_XML = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ ADD_DOC = "<add><doc>"
+ DELETE_DOC = "<delete>"
+ ID_OPEN = "<id>"
+ ID_CLOSE = "</id>"
+ END_ADD_DOC = "</doc></add>\n"
+ END_DELETE = "</delete>\n"
+ START_CONTENT = '<field name="content">'
+ CLOSE_FIELD = "</field>"
+
+ FLD_CHEF_ID_FMT = '<field name="X_CHEF_id_CHEF_X">%s</field>'
+ FLD_CHEF_DB_FMT = '<field name="X_CHEF_database_CHEF_X">%s</field>'
+ FLD_CHEF_TY_FMT = '<field name="X_CHEF_type_CHEF_X">%s</field>'
+ FLD_DATA_BAG = '<field name="data_bag">%s</field>'
+
+ KEYVAL_FMT = "%s__=__%s "
+
+ # Takes a flattened hash where the values are arrays and converts it into
+ # a dignified XML document suitable for POST to Solr.
+ # The general structure of the output document is like this:
+ # <?xml version="1.0" encoding="UTF-8"?>
+ # <add>
+ # <doc>
+ # <field name="content">
+ # key__=__value
+ # key__=__another_value
+ # other_key__=__yet another value
+ # </field>
+ # </doc>
+ # </add>
+ # The document as generated has minimal newlines and formatting, however.
+ def pointyize_add
+ xml = ""
+ xml << START_XML << ADD_DOC
+ xml << (FLD_CHEF_ID_FMT % @obj_id)
+ xml << (FLD_CHEF_DB_FMT % @database)
+ xml << (FLD_CHEF_TY_FMT % @obj_type)
+ xml << START_CONTENT
+ content = ""
+ flattened_object.each do |field, values|
+ values.each do |v|
+ content << (KEYVAL_FMT % [field, v])
+ end
+ end
+ xml << content.fast_xs
+ xml << CLOSE_FIELD # ends content
+ xml << (FLD_DATA_BAG % @data_bag.fast_xs) if @data_bag
+ xml << END_ADD_DOC
+ @xml_time = Time.now.to_f - @start_time
+ xml
+ end
+
+ # Takes a succinct document id, like 2342, and turns it into something
+ # even more compact, like
+ # "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<delete><id>2342</id></delete>\n"
+ def pointyize_delete
+ xml = ""
+ xml << START_XML
+ xml << DELETE_DOC
+ xml << ID_OPEN
+ xml << @obj_id.to_s
+ xml << ID_CLOSE
+ xml << END_DELETE
+ xml
+ end
+
+ def post_to_solr(document, &logger_block)
+ log.debug("POSTing document to SOLR:\n#{document}")
+ http_req = EventMachine::HttpRequest.new(solr_url).post(:body => document, :timeout => 1200, :head => CONTENT_TYPE_XML)
+ http_request_started
+
+ http_req.callback do
+ completed
+ if http_req.response_header.status == 200
+ log.info(&logger_block)
+ else
+ log.error { "Failed to post to solr: #{indexed_object}" }
+ end
+ end
+ http_req.errback do
+ completed
+ log.error { "Failed to post to solr (connection error): #{indexed_object}" }
+ end
+ end
+
+ def completed
+ @solr_post_time = Time.now.to_f - @start_time
+ self.class.http_request_completed(self)
+ @on_completion_block.call
+ end
+
+ def transit_time
+ Time.now.utc.to_i - @enqueued_at
+ end
+
+ def solr_url
+ 'http://127.0.0.1:8983/solr/update'
+ end
+
+ def indexed_object
+ "#{@obj_type}[#{@obj_id}] database[#{@database}]"
+ end
+
+ def http_request_started
+ self.class.http_request_started(self)
+ end
+
+ def eql?(other)
+ other.hash == hash
+ end
+
+ def hash
+ "#{action}#{indexed_object}#@enqueued_at#{self.class.name}".hash
+ end
+
+ end
+ end
+end
diff --git a/chef-expander/lib/chef/expander/version.rb b/chef-expander/lib/chef/expander/version.rb
new file mode 100644
index 0000000000..9b0f814465
--- /dev/null
+++ b/chef-expander/lib/chef/expander/version.rb
@@ -0,0 +1,37 @@
+#
+# Author:: Daniel DeLeo (<dan@opscode.com>)
+# Author:: Seth Falcon (<seth@opscode.com>)
+# Author:: Chris Walters (<cw@opscode.com>)
+# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'open3'
+
+module Chef
+ module Expander
+
+ VERSION = "0.1.0"
+
+ def self.version
+ @rev ||= begin
+ rev = Open3.popen3("git rev-parse HEAD") {|stdin, stdout, stderr| stdout.read }.strip
+ rev.empty? ? nil : " (#{rev})"
+ end
+ "#{VERSION}#@rev"
+ end
+
+ end
+end
diff --git a/chef-expander/lib/chef/expander/vnode.rb b/chef-expander/lib/chef/expander/vnode.rb
new file mode 100644
index 0000000000..32bb17da32
--- /dev/null
+++ b/chef-expander/lib/chef/expander/vnode.rb
@@ -0,0 +1,106 @@
+#
+# Author:: Daniel DeLeo (<dan@opscode.com>)
+# Author:: Seth Falcon (<seth@opscode.com>)
+# Author:: Chris Walters (<cw@opscode.com>)
+# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'eventmachine'
+require 'amqp'
+require 'mq'
+
+require 'chef/expander/loggable'
+require 'chef/expander/solrizer'
+
+module Chef
+ module Expander
+ class VNode
+ include Loggable
+
+ attr_reader :vnode_number
+
+ attr_reader :supervise_interval
+
+ def initialize(vnode_number, supervisor, opts={})
+ @vnode_number = vnode_number.to_i
+ @supervisor = supervisor
+ @queue = nil
+ @stopped = false
+ @supervise_interval = opts[:supervise_interval] || 30
+ end
+
+ def start
+ @supervisor.vnode_added(self)
+
+ subscription_confirmed = Proc.new do
+ abort_on_multiple_subscribe
+ supervise_consumer_count
+ end
+
+ queue.subscribe(:ack => true, :confirm => subscription_confirmed) do |headers, payload|
+ log.debug {"got #{payload} size(#{payload.size} bytes) on queue #{queue_name}"}
+ solrizer = Solrizer.new(payload) { headers.ack }
+ solrizer.run
+ end
+
+ rescue MQ::Error => e
+ log.error {"Failed to start subscriber on #{queue_name} #{e.class.name}: #{e.message}"}
+ end
+
+ def supervise_consumer_count
+ EM.add_periodic_timer(supervise_interval) do
+ abort_on_multiple_subscribe
+ end
+ end
+
+ def abort_on_multiple_subscribe
+ queue.status do |message_count, subscriber_count|
+ if subscriber_count.to_i > 1
+ log.error { "Detected extra consumers (#{subscriber_count} total) on queue #{queue_name}, cancelling subscription" }
+ stop
+ end
+ end
+ end
+
+ def stop
+ log.debug {"Cancelling subscription on queue #{queue_name.inspect}"}
+ queue.unsubscribe if queue.subscribed?
+ @supervisor.vnode_removed(self)
+ @stopped = true
+ end
+
+ def stopped?
+ @stopped
+ end
+
+ def queue
+ @queue ||= begin
+ log.debug { "declaring queue #{queue_name}" }
+ MQ.queue(queue_name, :passive => false, :durable => true)
+ end
+ end
+
+ def queue_name
+ "vnode-#{@vnode_number}"
+ end
+
+ def control_queue_name
+ "#{queue_name}-control"
+ end
+
+ end
+ end
+end
diff --git a/chef-expander/lib/chef/expander/vnode_supervisor.rb b/chef-expander/lib/chef/expander/vnode_supervisor.rb
new file mode 100644
index 0000000000..40e9b62817
--- /dev/null
+++ b/chef-expander/lib/chef/expander/vnode_supervisor.rb
@@ -0,0 +1,265 @@
+#
+# Author:: Daniel DeLeo (<dan@opscode.com>)
+# Author:: Seth Falcon (<seth@opscode.com>)
+# Author:: Chris Walters (<cw@opscode.com>)
+# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'yajl'
+require 'eventmachine'
+require 'amqp'
+require 'mq'
+require 'chef/expander/version'
+require 'chef/expander/loggable'
+require 'chef/expander/node'
+require 'chef/expander/vnode'
+require 'chef/expander/vnode_table'
+require 'chef/expander/configuration'
+
+module ::AMQP
+ def self.hard_reset!
+ MQ.reset rescue nil
+ stop
+ EM.stop rescue nil
+ Thread.current[:mq], @conn = nil, nil
+ end
+end
+
+module Chef
+ module Expander
+ class VNodeSupervisor
+ include Loggable
+ extend Loggable
+
+ COULD_NOT_CONNECT = /Could not connect to server/.freeze
+
+ def self.start_cluster_worker
+ @vnode_supervisor = new
+ @original_ppid = Process.ppid
+ trap_signals
+
+ vnodes = Expander.config.vnode_numbers
+
+ $0 = "chef-expander#{Expander.config.ps_tag} worker ##{Expander.config.index} (vnodes #{vnodes.min}-#{vnodes.max})"
+
+ AMQP.start(Expander.config.amqp_config) do
+ start_consumers
+ await_parent_death
+ end
+ end
+
+ def self.await_parent_death
+ @awaiting_parent_death = EM.add_periodic_timer(1) do
+ unless Process.ppid == @original_ppid
+ @awaiting_parent_death.cancel
+ stop("master process death")
+ end
+ end
+ end
+
+ def self.start
+ @vnode_supervisor = new
+ trap_signals
+
+ Expander.init_config(ARGV)
+
+ log.info("Chef Search Expander #{Expander.version} starting up.")
+
+ begin
+ AMQP.start(Expander.config.amqp_config) do
+ start_consumers
+ end
+ rescue AMQP::Error => e
+ if e.message =~ COULD_NOT_CONNECT
+ log.error { "Could not connect to rabbitmq. Make sure it is running and correctly configured." }
+ log.error { e.message }
+
+ AMQP.hard_reset!
+
+ sleep 5
+ retry
+ else
+ raise
+ end
+ end
+ end
+
+ def self.start_consumers
+ log.debug { "Setting prefetch count to 1"}
+ MQ.prefetch(1)
+
+ vnodes = Expander.config.vnode_numbers
+ log.info("Starting Consumers for vnodes #{vnodes.min}-#{vnodes.max}")
+ @vnode_supervisor.start(vnodes)
+ end
+
+ def self.trap_signals
+ Kernel.trap(:INT) { stop_immediately(:INT) }
+ Kernel.trap(:TERM) { stop_gracefully(:TERM) }
+ end
+
+ def self.stop_immediately(signal)
+ log.info { "Initiating immediate shutdown on signal (#{signal})" }
+ @vnode_supervisor.stop
+ EM.add_timer(1) do
+ AMQP.stop
+ EM.stop
+ end
+ end
+
+ def self.stop_gracefully(signal)
+ log.info { "Initiating graceful shutdown on signal (#{signal})" }
+ @vnode_supervisor.stop
+ wait_for_http_requests_to_complete
+ end
+
+ def self.wait_for_http_requests_to_complete
+ if Expander::Solrizer.http_requests_active?
+ log.info { "waiting for in progress HTTP Requests to complete"}
+ EM.add_timer(1) do
+ wait_for_http_requests_to_complete
+ end
+ else
+ log.info { "HTTP requests completed, shutting down"}
+ AMQP.stop
+ EM.stop
+ end
+ end
+
+ attr_reader :vnode_table
+
+ attr_reader :local_node
+
+ def initialize
+ @vnodes = {}
+ @vnode_table = VNodeTable.new(self)
+ @local_node = Node.local_node
+ @queue_name, @guid = nil, nil
+ end
+
+ def start(vnode_ids)
+ @local_node.start do |message|
+ process_control_message(message)
+ end
+
+ #start_vnode_table_publisher
+
+ Array(vnode_ids).each { |vnode_id| spawn_vnode(vnode_id) }
+ end
+
+ def stop
+ @local_node.stop
+
+ #log.debug { "stopping vnode table updater" }
+ #@vnode_table_publisher.cancel
+
+ log.info { "Stopping VNode queue subscribers"}
+ @vnodes.each do |vnode_number, vnode|
+ log.debug { "Stopping consumer on VNode #{vnode_number}"}
+ vnode.stop
+ end
+
+ end
+
+ def vnode_added(vnode)
+ log.debug { "vnode #{vnode.vnode_number} registered with supervisor" }
+ @vnodes[vnode.vnode_number.to_i] = vnode
+ end
+
+ def vnode_removed(vnode)
+ log.debug { "vnode #{vnode.vnode_number} unregistered from supervisor" }
+ @vnodes.delete(vnode.vnode_number.to_i)
+ end
+
+ def vnodes
+ @vnodes.keys.sort
+ end
+
+ def spawn_vnode(vnode_number)
+ VNode.new(vnode_number, self).start
+ end
+
+ def release_vnode
+ # TODO
+ end
+
+ def process_control_message(message)
+ control_message = parse_symbolic(message)
+ case control_message[:action]
+ when "claim_vnode"
+ spawn_vnode(control_message[:vnode_id])
+ when "recover_vnode"
+ recover_vnode(control_message[:vnode_id])
+ when "release_vnodes"
+ raise "todo"
+ release_vnode()
+ when "update_vnode_table"
+ @vnode_table.update_table(control_message[:data])
+ when "vnode_table_publish"
+ publish_vnode_table
+ when "status"
+ publish_status_to(control_message[:rsvp])
+ when "set_log_level"
+ set_log_level(control_message[:level], control_message[:rsvp])
+ else
+ log.error { "invalid control message #{control_message.inspect}" }
+ end
+ rescue Exception => e
+ log.error { "Error processing a control message."}
+ log.error { "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}" }
+ end
+
+
+ def start_vnode_table_publisher
+ @vnode_table_publisher = EM.add_periodic_timer(10) { publish_vnode_table }
+ end
+
+ def publish_vnode_table
+ status_update = @local_node.to_hash
+ status_update[:vnodes] = vnodes
+ status_update[:update] = :add
+ @local_node.broadcast_message(Yajl::Encoder.encode({:action => :update_vnode_table, :data => status_update}))
+ end
+
+ def publish_status_to(return_queue)
+ status_update = @local_node.to_hash
+ status_update[:vnodes] = vnodes
+ MQ.queue(return_queue).publish(Yajl::Encoder.encode(status_update))
+ end
+
+ def set_log_level(level, rsvp_to)
+ log.info { "setting log level to #{level} due to command from #{rsvp_to}" }
+ new_log_level = (Expander.config.log_level = level.to_sym)
+ reply = {:level => new_log_level, :node => @local_node.to_hash}
+ MQ.queue(rsvp_to).publish(Yajl::Encoder.encode(reply))
+ end
+
+ def recover_vnode(vnode_id)
+ if @vnode_table.local_node_is_leader?
+ log.debug { "Recovering vnode: #{vnode_id}" }
+ @local_node.shared_message(Yajl::Encoder.encode({:action => :claim_vnode, :vnode_id => vnode_id}))
+ else
+ log.debug { "Ignoring :recover_vnode message because this node is not the leader" }
+ end
+ end
+
+ def parse_symbolic(message)
+ Yajl::Parser.new(:symbolize_keys => true).parse(message)
+ end
+
+ end
+ end
+end
diff --git a/chef-expander/lib/chef/expander/vnode_table.rb b/chef-expander/lib/chef/expander/vnode_table.rb
new file mode 100644
index 0000000000..025812b49c
--- /dev/null
+++ b/chef-expander/lib/chef/expander/vnode_table.rb
@@ -0,0 +1,83 @@
+#
+# Author:: Daniel DeLeo (<dan@opscode.com>)
+# Author:: Seth Falcon (<seth@opscode.com>)
+# Author:: Chris Walters (<cw@opscode.com>)
+# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'yajl'
+require 'chef/expander/node'
+require 'chef/expander/loggable'
+
+module Chef
+ module Expander
+ class VNodeTable
+
+ include Loggable
+
+ class InvalidVNodeTableUpdate < ArgumentError; end
+
+ attr_reader :vnodes_by_node
+
+ def initialize(vnode_supervisor)
+ @node_update_mutex = Mutex.new
+ @vnode_supervisor = vnode_supervisor
+ @vnodes_by_node = {}
+ end
+
+ def nodes
+ @vnodes_by_node.keys
+ end
+
+ def update_table(table_update)
+ case table_update[:update]
+ when "add", "update"
+ update_node(table_update)
+ when "remove"
+ remove_node(table_update)
+ else
+ raise InvalidVNodeTableUpdate, "no action or action not acceptable: #{table_update.inspect}"
+ end
+ log.debug { "current vnode table: #{@vnodes_by_node.inspect}" }
+ end
+
+ def update_node(node_info)
+ @node_update_mutex.synchronize do
+ @vnodes_by_node[Node.from_hash(node_info)] = node_info[:vnodes]
+ end
+ end
+
+ def remove_node(node_info)
+ @node_update_mutex.synchronize do
+ @vnodes_by_node.delete(Node.from_hash(node_info))
+ end
+ end
+
+ def leader_node
+ if @vnodes_by_node.empty?
+ nil
+ else
+ Array(@vnodes_by_node).reject { |node| node[1].empty? }.sort { |a,b| a[1].min <=> b[1].min }.first[0]
+ end
+ end
+
+ def local_node_is_leader?
+ (Node.local_node == leader_node) || (@vnodes_by_node[Node.local_node].include?(0))
+ end
+
+ end
+ end
+end