diff options
Diffstat (limited to 'chef-expander/lib')
-rw-r--r-- | chef-expander/lib/chef/expander.rb | 36 | ||||
-rw-r--r-- | chef-expander/lib/chef/expander/cluster_supervisor.rb | 119 | ||||
-rw-r--r-- | chef-expander/lib/chef/expander/configuration.rb | 261 | ||||
-rw-r--r-- | chef-expander/lib/chef/expander/control.rb | 206 | ||||
-rw-r--r-- | chef-expander/lib/chef/expander/flattener.rb | 79 | ||||
-rw-r--r-- | chef-expander/lib/chef/expander/loggable.rb | 56 | ||||
-rw-r--r-- | chef-expander/lib/chef/expander/node.rb | 177 | ||||
-rw-r--r-- | chef-expander/lib/chef/expander/solrizer.rb | 275 | ||||
-rw-r--r-- | chef-expander/lib/chef/expander/version.rb | 37 | ||||
-rw-r--r-- | chef-expander/lib/chef/expander/vnode.rb | 106 | ||||
-rw-r--r-- | chef-expander/lib/chef/expander/vnode_supervisor.rb | 265 | ||||
-rw-r--r-- | chef-expander/lib/chef/expander/vnode_table.rb | 83 |
12 files changed, 1700 insertions, 0 deletions
diff --git a/chef-expander/lib/chef/expander.rb b/chef-expander/lib/chef/expander.rb new file mode 100644 index 0000000000..9a58868a96 --- /dev/null +++ b/chef-expander/lib/chef/expander.rb @@ -0,0 +1,36 @@ +# +# Author:: Daniel DeLeo (<dan@opscode.com>) +# Author:: Seth Falcon (<seth@opscode.com>) +# Author:: Chris Walters (<cw@opscode.com>) +# Copyright:: Copyright (c) 2010-2011 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Chef + module Expander + + # VNODES is the number of queues in rabbit that are available for subscribing. + # The name comes from riak, where the data ring (160bits) is chunked into + # many vnodes; vnodes outnumber physical nodes, so one node hosts several + # vnodes. That is the same design we use here. + # + # See the notes on topic queue benchmarking before adjusting this value. + VNODES = 1024 + + SHARED_CONTROL_QUEUE_NAME = "chef-search-control--shared" + BROADCAST_CONTROL_EXCHANGE_NAME = 'chef-search-control--broadcast' + + end +end diff --git a/chef-expander/lib/chef/expander/cluster_supervisor.rb b/chef-expander/lib/chef/expander/cluster_supervisor.rb new file mode 100644 index 0000000000..ccc5a9e730 --- /dev/null +++ b/chef-expander/lib/chef/expander/cluster_supervisor.rb @@ -0,0 +1,119 @@ +# +# Author:: Daniel DeLeo (<dan@opscode.com>) +# Author:: Seth Falcon (<seth@opscode.com>) +# Author:: Chris Walters (<cw@opscode.com>) +# Copyright:: Copyright (c) 2010-2011 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'chef/expander/loggable' +require 'chef/expander/version' +require 'chef/expander/configuration' +require 'chef/expander/vnode_supervisor' + +module Chef + module Expander + #==ClusterSupervisor + # Manages a cluster of chef-expander processes. Usually this class will + # be instantiated from the chef-expander-cluster executable. + # + # ClusterSupervisor works by forking the desired number of processes, then + # running VNodeSupervisor.start_cluster_worker within the forked process. + # ClusterSupervisor keeps track of the process ids of its children, and will + # periodically attempt to reap them in a non-blocking call. If they are + # reaped, ClusterSupervisor knows they died and need to be respawned. + # + # The child processes are responsible for checking on the master process and + # dying if the master has died (VNodeSupervisor does this when started in + # with start_cluster_worker). + # + #===TODO: + # * This implementation currently assumes there is only one cluster, so it + # will claim all of the vnodes. It may be advantageous to allow multiple + # clusters. + # * There is no heartbeat implementation at this time, so a zombified child + # process will not be automatically killed--This behavior is left to the + # meatcloud for now. + class ClusterSupervisor + include Loggable + + def initialize + @workers = {} + @running = true + @kill = :TERM + end + + def start + trap(:INT) { stop(:INT) } + trap(:TERM) { stop(:TERM)} + Expander.init_config(ARGV) + + log.info("Chef Expander #{Expander.version} starting cluster with #{Expander.config.node_count} nodes") + + start_workers + maintain_workers + end + + def start_workers + Expander.config.node_count.times do |i| + start_worker(i + 1) + end + end + + def start_worker(index) + log.info { "Starting cluster worker #{index}" } + worker_params = {:index => index} + child_pid = fork do + Expander.config.index = index + VNodeSupervisor.start_cluster_worker + end + @workers[child_pid] = worker_params + end + + def stop(signal) + log.info { "Stopping cluster on signal (#{signal})" } + @running = false + @kill = signal + end + + def maintain_workers + while @running + sleep 1 + workers_to_replace = {} + @workers.each do |process_id, worker_params| + if result = Process.waitpid2(process_id, Process::WNOHANG) + log.error { "worker #{worker_params[:index]} (PID: #{process_id}) died with status #{result[1].exitstatus || '(no status)'}"} + workers_to_replace[process_id] = worker_params + end + end + workers_to_replace.each do |dead_pid, worker_params| + @workers.delete(dead_pid) + start_worker(worker_params[:index]) + end + end + + @workers.each do |pid, worker_params| + log.info { "Stopping worker #{worker_params[:index]} (PID: #{pid})"} + Process.kill(@kill, pid) + end + @workers.each do |pid, worker_params| + Process.waitpid2(pid) + end + + end + + end + end +end diff --git a/chef-expander/lib/chef/expander/configuration.rb b/chef-expander/lib/chef/expander/configuration.rb new file mode 100644 index 0000000000..1888c56811 --- /dev/null +++ b/chef-expander/lib/chef/expander/configuration.rb @@ -0,0 +1,261 @@ +# +# Author:: Daniel DeLeo (<dan@opscode.com>) +# Author:: Seth Falcon (<seth@opscode.com>) +# Author:: Chris Walters (<cw@opscode.com>) +# Copyright:: Copyright (c) 2010-2011 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'pp' +require 'optparse' +require 'singleton' + +require 'chef/expander/flattener' +require 'chef/expander/loggable' +require 'chef/expander/version' + +module Chef + module Expander + + def self.config + @config ||= Configuration::Base.new + end + + def self.init_config(argv) + config.apply_defaults + remaining_opts_after_parse = Configuration::CLI.parse_options(argv) + # Need to be able to override the default config file location on the command line + config_file_to_use = Configuration::CLI.config.config_file || config.config_file + config.merge_config(Configuration::Base.from_chef_compat_config(config_file_to_use)) + # But for all other config options, the CLI config should win over config file + config.merge_config(Configuration::CLI.config) + remaining_opts_after_parse + end + + class ChefCompatibleConfig + + attr_reader :config_hash + + def initialize + @config_hash = {} + end + + def load(file) + file = File.expand_path(file) + instance_eval(IO.read(file), file, 1) + end + + def method_missing(method_name, *args, &block) + if args.size == 1 + @config_hash[method_name] = args.first + elsif args.empty? + @config_hash[method_name] or super + else + super + end + end + + end + + module Configuration + + class InvalidConfiguration < StandardError + end + + class Base + + include Loggable + + def self.from_chef_compat_config(file) + config = ChefCompatibleConfig.new + config.load(file) + from_hash(config.config_hash) + end + + def self.from_hash(config_hash) + config = new + config_hash.each do |setting, value| + setter = "#{setting}=".to_sym + if config.respond_to?(setter) + config.send(setter, value) + end + end + config + end + + def self.configurables + @configurables ||= [] + end + + def self.validations + @validations ||= [] + end + + def self.defaults + @defaults ||= {} + end + + def self.configurable(setting, default=nil, &validation) + attr_accessor(setting) + configurables << setting + defaults[setting] = default + validations << validation if block_given? + + setting + end + + configurable :config_file, File.expand_path(File.dirname(__FILE__) + '/../../../conf/chef-expander.rb') + + configurable :index do + invalid("You must specify this node's position in the ring as an integer") unless index.kind_of?(Integer) + invalid("The index cannot be larger than the cluster size (node-count)") unless (index <= node_count.to_i) + end + + configurable :node_count do + invalid("You must specify the cluster size as an integer") unless node_count.kind_of?(Integer) + invalid("The cluster size (node-count) cannot be smaller than the index") unless node_count >= index.to_i + end + + configurable :ps_tag, "" + + configurable :solr_url, "http://localhost:8983" + + configurable :amqp_host, '0.0.0.0' + + configurable :amqp_port, '5672' + + configurable :amqp_user, 'chef' + + configurable :amqp_pass, 'testing' + + configurable :amqp_vhost, '/chef' + + configurable :log_level, :info + + # override the setter for log_level to also actually set the level + def log_level=(level) + if level #don't accept nil for an answer + level = level.to_sym + Loggable::LOGGER.level = level + @log_level = log_level + end + level + end + + def initialize + reset! + end + + def reset!(stdout=nil) + self.class.configurables.each do |setting| + send("#{setting}=".to_sym, nil) + end + @stdout = stdout || STDOUT + end + + def apply_defaults + self.class.defaults.each do |setting, value| + self.send("#{setting}=".to_sym, value) + end + end + + def merge_config(other) + self.class.configurables.each do |setting| + value = other.send(setting) + self.send("#{setting}=".to_sym, value) if value + end + end + + def fail_if_invalid + validate! + rescue InvalidConfiguration => e + @stdout.puts("Invalid configuration: #{e.message}") + exit(1) + end + + def invalid(message) + raise InvalidConfiguration, message + end + + def validate! + self.class.validations.each do |validation_proc| + instance_eval(&validation_proc) + end + end + + def vnode_numbers + vnodes_per_node = VNODES / node_count + lower_bound = (index - 1) * vnodes_per_node + upper_bound = lower_bound + vnodes_per_node + upper_bound += VNODES % vnodes_per_node if index == node_count + (lower_bound...upper_bound).to_a + end + + def amqp_config + {:host => amqp_host, :port => amqp_port, :user => amqp_user, :pass => amqp_pass, :vhost => amqp_vhost} + end + + end + + module CLI + @config = Configuration::Base.new + + @option_parser = OptionParser.new do |o| + o.banner = "Usage: chef-expander [options]" + + o.on('-c', '--config CONFIG_FILE', 'a configuration file to use') do |conf| + @config.config_file = File.expand_path(conf) + end + + o.on('-i', '--index INDEX', 'the slot this node will occupy in the ring') do |i| + @config.index = i.to_i + end + + o.on('-n', '--node-count NUMBER', 'the number of nodes in the ring') do |n| + @config.node_count = n.to_i + end + + o.on('-l', '--log-level LOG_LEVEL', 'set the log level') do |l| + @config.log_level = l + end + + o.on_tail('-h', '--help', 'show this message') do + puts "chef-expander #{Expander.version}" + puts '' + puts o + exit 1 + end + + o.on_tail('-v', '--version', 'show the version and exit') do + puts "chef-expander #{Expander.version}" + exit 0 + end + + end + + def self.parse_options(argv) + @option_parser.parse!(argv.dup) + end + + def self.config + @config + end + + end + + end + + end +end diff --git a/chef-expander/lib/chef/expander/control.rb b/chef-expander/lib/chef/expander/control.rb new file mode 100644 index 0000000000..f8e3e99503 --- /dev/null +++ b/chef-expander/lib/chef/expander/control.rb @@ -0,0 +1,206 @@ +# +# Author:: Daniel DeLeo (<dan@opscode.com>) +# Author:: Seth Falcon (<seth@opscode.com>) +# Author:: Chris Walters (<cw@opscode.com>) +# Copyright:: Copyright (c) 2010-2011 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'bunny' +require 'yajl' +require 'eventmachine' +require 'amqp' +require 'mq' +require 'highline' + +require 'chef/expander/node' +require 'chef/expander/configuration' + +require 'pp' + +module Chef + module Expander + class Control + + def self.run(argv) + remaining_args_after_opts = Expander.init_config(ARGV) + new(remaining_args_after_opts).run + end + + def self.desc(description) + @desc = description + end + + def self.option(*args) + #TODO + end + + def self.arg(*args) + #TODO + end + + def self.descriptions + @descriptions ||= [] + end + + def self.method_added(method_name) + if @desc + descriptions << [method_name, method_name.to_s.gsub('_', '-'), @desc] + @desc = nil + end + end + + #-- + # TODO: this is confusing and unneeded. Just whitelist the methods + # that map to commands and use +send+ + def self.compile + run_method = "def run; case @argv.first;" + descriptions.each do |method_name, command_name, desc| + run_method << "when '#{command_name}';#{method_name};" + end + run_method << "else; help; end; end;" + class_eval(run_method, __FILE__, __LINE__) + end + + def initialize(argv) + @argv = argv.dup + end + + desc "Show this message" + def help + puts "Chef Expander #{Expander.version}" + puts "Usage: chef-expanderctl COMMAND" + puts + puts "Commands:" + self.class.descriptions.each do |method_name, command_name, desc| + puts " #{command_name}".ljust(15) + desc + end + end + + desc "display the aggregate queue backlog" + def queue_depth + h = HighLine.new + message_counts = [] + + amqp_client = Bunny.new(Expander.config.amqp_config) + amqp_client.start + + 0.upto(VNODES - 1) do |vnode| + q = amqp_client.queue("vnode-#{vnode}", :durable => true) + message_counts << q.status[:message_count] + end + total_messages = message_counts.inject(0) { |sum, count| sum + count } + max = message_counts.max + min = message_counts.min + + avg = total_messages.to_f / message_counts.size.to_f + + puts " total messages: #{total_messages}" + puts " average queue depth: #{avg}" + puts " max queue depth: #{max}" + puts " min queue depth: #{min}" + ensure + amqp_client.stop if defined?(amqp_client) && amqp_client + end + + desc "show the backlog and consumer count for each vnode queue" + def queue_status + h = HighLine.new + queue_status = [h.color("VNode", :bold), h.color("Messages", :bold), h.color("Consumers", :bold)] + + total_messages = 0 + + amqp_client = Bunny.new(Expander.config.amqp_config) + amqp_client.start + + 0.upto(VNODES - 1) do |vnode| + q = amqp_client.queue("vnode-#{vnode}", :durable => true) + status = q.status + # returns {:message_count => method.message_count, :consumer_count => method.consumer_count} + queue_status << vnode.to_s << status[:message_count].to_s << status[:consumer_count].to_s + total_messages += status[:message_count] + end + puts " total messages: #{total_messages}" + puts + puts h.list(queue_status, :columns_across, 3) + ensure + amqp_client.stop if defined?(amqp_client) && amqp_client + end + + desc "show the status of the nodes in the cluster" + def node_status + status_mutex = Mutex.new + h = ::HighLine.new + node_status = [h.color("Host", :bold), h.color("PID", :bold), h.color("GUID", :bold), h.color("Vnodes", :bold)] + + print("Collecting status info from the cluster...") + + AMQP.start(Expander.config.amqp_config) do + node = Expander::Node.local_node + node.exclusive_control_queue.subscribe do |header, message| + status = Yajl::Parser.parse(message) + status_mutex.synchronize do + node_status << status["hostname_f"] + node_status << status["pid"].to_s + node_status << status["guid"] + # BIG ASSUMPTION HERE that nodes only have contiguous vnode ranges + # will not be true once vnode recovery is implemented + node_status << "#{status["vnodes"].min}-#{status["vnodes"].max}" + end + end + node.broadcast_message(Yajl::Encoder.encode(:action => :status, :rsvp => node.exclusive_control_queue_name)) + EM.add_timer(2) { AMQP.stop;EM.stop } + end + + puts "done" + puts + puts h.list(node_status, :columns_across, 4) + puts + end + + desc "sets the log level of all nodes in the cluster" + def log_level + @argv.shift + level = @argv.first + acceptable_levels = %w{debug info warn error fatal} + unless acceptable_levels.include?(level) + puts "Log level must be one of #{acceptable_levels.join(', ')}" + exit 1 + end + + h = HighLine.new + response_mutex = Mutex.new + + responses = [h.color("Host", :bold), h.color("PID", :bold), h.color("GUID", :bold), h.color("Log Level", :bold)] + AMQP.start(Expander.config.amqp_config) do + node = Expander::Node.local_node + node.exclusive_control_queue.subscribe do |header, message| + reply = Yajl::Parser.parse(message) + n = reply['node'] + response_mutex.synchronize do + responses << n["hostname_f"] << n["pid"].to_s << n["guid"] << reply["level"] + end + end + node.broadcast_message(Yajl::Encoder.encode({:action => :set_log_level, :level => level, :rsvp => node.exclusive_control_queue_name})) + EM.add_timer(2) { AMQP.stop; EM.stop } + end + puts h.list(responses, :columns_across, 4) + end + + + compile + end + end +end diff --git a/chef-expander/lib/chef/expander/flattener.rb b/chef-expander/lib/chef/expander/flattener.rb new file mode 100644 index 0000000000..90f7cd663e --- /dev/null +++ b/chef-expander/lib/chef/expander/flattener.rb @@ -0,0 +1,79 @@ +# +# Author:: Daniel DeLeo (<dan@opscode.com>) +# Author:: Seth Falcon (<seth@opscode.com>) +# Author:: Chris Walters (<cw@opscode.com>) +# Copyright:: Copyright (c) 2010-2011 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'chef/expander/configuration' + +module Chef + module Expander + # Flattens and expands nested Hashes representing Chef objects + # (e.g, Nodes, Roles, DataBagItems, etc.) into flat Hashes so the + # objects are suitable to be saved into Solr. This code is more or + # less copy-pasted from chef/solr/index which may or may not be a + # great idea, though that does minimize the dependencies and + # hopefully minimize the memory use of chef-expander. + class Flattener + UNDERSCORE = '_' + X = 'X' + + X_CHEF_id_CHEF_X = 'X_CHEF_id_CHEF_X' + X_CHEF_database_CHEF_X = 'X_CHEF_database_CHEF_X' + X_CHEF_type_CHEF_X = 'X_CHEF_type_CHEF_X' + + def initialize(item) + @item = item + end + + def flattened_item + @flattened_item || flatten_and_expand + end + + def flatten_and_expand + @flattened_item = Hash.new {|hash, key| hash[key] = []} + + @item.each do |key, value| + flatten_each([key.to_s], value) + end + + @flattened_item.each_value { |values| values.uniq! } + @flattened_item + end + + def flatten_each(keys, values) + case values + when Hash + values.each do |child_key, child_value| + add_field_value(keys, child_key) + flatten_each(keys + [child_key.to_s], child_value) + end + when Array + values.each { |child_value| flatten_each(keys, child_value) } + else + add_field_value(keys, values) + end + end + + def add_field_value(keys, value) + value = value.to_s + @flattened_item[keys.join(UNDERSCORE)] << value + @flattened_item[keys.last] << value + end + end + end +end diff --git a/chef-expander/lib/chef/expander/loggable.rb b/chef-expander/lib/chef/expander/loggable.rb new file mode 100644 index 0000000000..5bfb941ecb --- /dev/null +++ b/chef-expander/lib/chef/expander/loggable.rb @@ -0,0 +1,56 @@ +# +# Author:: Daniel DeLeo (<dan@opscode.com>) +# Author:: Seth Falcon (<seth@opscode.com>) +# Author:: Chris Walters (<cw@opscode.com>) +# Copyright:: Copyright (c) 2010-2011 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'mixlib/log' + +module Chef + module Expander + module Loggable + class Logger + include Mixlib::Log + + def init(*args) + @logger = nil + super + end + + [:debug,:info,:warn,:error, :fatal].each do |level| + class_eval(<<-LOG_METHOD, __FILE__, __LINE__) + def #{level}(message=nil, &block) + @logger.#{level}(message, &block) + end + LOG_METHOD + end + end + + # TODO: it's admittedly janky to set up the default logging this way. + STDOUT.sync = true + LOGGER = Logger.new + LOGGER.init + LOGGER.level = :debug + + def log + LOGGER + end + + end + end +end + diff --git a/chef-expander/lib/chef/expander/node.rb b/chef-expander/lib/chef/expander/node.rb new file mode 100644 index 0000000000..23249e8685 --- /dev/null +++ b/chef-expander/lib/chef/expander/node.rb @@ -0,0 +1,177 @@ +# +# Author:: Daniel DeLeo (<dan@opscode.com>) +# Author:: Seth Falcon (<seth@opscode.com>) +# Author:: Chris Walters (<cw@opscode.com>) +# Copyright:: Copyright (c) 2010-2011 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'uuidtools' +require 'amqp' +require 'mq' +require 'open3' + +require 'chef/expander/loggable' + +module Chef + module Expander + class Node + + include Loggable + + def self.from_hash(node_info) + new(node_info[:guid], node_info[:hostname_f], node_info[:pid]) + end + + def self.local_node + new(guid, hostname_f, Process.pid) + end + + def self.guid + return @guid if @guid + @guid = UUIDTools::UUID.random_create.to_s + end + + def self.hostname_f + @hostname ||= Open3.popen3("hostname -f") {|stdin, stdout, stderr| stdout.read }.strip + end + + attr_reader :guid + + attr_reader :hostname_f + + attr_reader :pid + + def initialize(guid, hostname_f, pid) + @guid, @hostname_f, @pid = guid, hostname_f, pid + end + + def start(&message_handler) + attach_to_queue(exclusive_control_queue, "exclusive control", &message_handler) + attach_to_queue(shared_control_queue, "shared_control", &message_handler) + attach_to_queue(broadcast_control_queue, "broadcast control", &message_handler) + end + + def attach_to_queue(queue, colloquial_name, &message_handler) + queue.subscribe(:ack => true) do |headers, payload| + log.debug { "received message on #{colloquial_name} queue: #{payload}" } + message_handler.call(payload) + headers.ack + end + end + + def stop + log.debug { "unsubscribing from broadcast control queue"} + broadcast_control_queue.unsubscribe(:nowait => false) + + log.debug { "unsubscribing from shared control queue" } + shared_control_queue.unsubscribe(:nowait => false) + + log.debug { "unsubscribing from exclusive control queue" } + exclusive_control_queue.unsubscribe(:nowait => false) + end + + def direct_message(message) + log.debug { "publishing direct message to node #{identifier}: #{message}" } + exclusive_control_queue.publish(message) + end + + def shared_message(message) + log.debug { "publishing shared message #{message}"} + shared_control_queue.publish(message) + end + + def broadcast_message(message) + log.debug { "publishing broadcast message #{message}" } + broadcast_control_exchange.publish(message) + end + + # The exclusive control queue is for point-to-point messaging, i.e., + # messages directly addressed to this node + def exclusive_control_queue + @exclusive_control_queue ||= begin + log.debug { "declaring exclusive control queue #{exclusive_control_queue_name}" } + MQ.queue(exclusive_control_queue_name) + end + end + + # The shared control queue is for 1 to (1 of N) messaging, i.e., + # messages that can go to any one node. + def shared_control_queue + @shared_control_queue ||= begin + log.debug { "declaring shared control queue #{shared_control_queue_name}" } + MQ.queue(shared_control_queue_name) + end + end + + # The broadcast control queue is for 1 to N messaging, i.e., + # messages that go to every node + def broadcast_control_queue + @broadcast_control_queue ||= begin + log.debug { "declaring broadcast control queue #{broadcast_control_queue_name}"} + q = MQ.queue(broadcast_control_queue_name) + log.debug { "binding broadcast control queue to broadcast control exchange"} + q.bind(broadcast_control_exchange) + q + end + end + + def broadcast_control_exchange + @broadcast_control_exchange ||= begin + log.debug { "declaring broadcast control exchange opscode-platfrom-control--broadcast" } + MQ.fanout(broadcast_control_exchange_name, :nowait => false) + end + end + + def shared_control_queue_name + SHARED_CONTROL_QUEUE_NAME + end + + def broadcast_control_queue_name + @broadcast_control_queue_name ||= "#{identifier}--broadcast" + end + + def broadcast_control_exchange_name + BROADCAST_CONTROL_EXCHANGE_NAME + end + + def exclusive_control_queue_name + @exclusive_control_queue_name ||= "#{identifier}--exclusive-control" + end + + def identifier + "#{hostname_f}--#{pid}--#{guid}" + end + + def ==(other) + other.respond_to?(:guid) && other.respond_to?(:hostname_f) && other.respond_to?(:pid) && + (other.guid == guid) && (other.hostname_f == hostname_f) && (other.pid == pid) + end + + def eql?(other) + (other.class == self.class) && (other.hash == hash) + end + + def hash + identifier.hash + end + + def to_hash + {:guid => @guid, :hostname_f => @hostname_f, :pid => @pid} + end + + end + end +end diff --git a/chef-expander/lib/chef/expander/solrizer.rb b/chef-expander/lib/chef/expander/solrizer.rb new file mode 100644 index 0000000000..1a6fed9521 --- /dev/null +++ b/chef-expander/lib/chef/expander/solrizer.rb @@ -0,0 +1,275 @@ +# +# Author:: Daniel DeLeo (<dan@opscode.com>) +# Author:: Seth Falcon (<seth@opscode.com>) +# Author:: Chris Walters (<cw@opscode.com>) +# Copyright:: Copyright (c) 2010-2011 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'set' +require 'yajl' +require 'fast_xs' +require 'em-http-request' +require 'chef/expander/loggable' +require 'chef/expander/flattener' + +module Chef + module Expander + class Solrizer + + @active_http_requests = Set.new + + def self.http_request_started(instance) + @active_http_requests << instance + end + + def self.http_request_completed(instance) + @active_http_requests.delete(instance) + end + + def self.http_requests_active? + !@active_http_requests.empty? + end + + def self.clear_http_requests + @active_http_requests.clear + end + + include Loggable + + ADD = "add" + DELETE = "delete" + SKIP = "skip" + + ITEM = "item" + ID = "id" + TYPE = "type" + DATABASE = "database" + ENQUEUED_AT = "enqueued_at" + + DATA_BAG_ITEM = "data_bag_item" + DATA_BAG = "data_bag" + + X_CHEF_id_CHEF_X = 'X_CHEF_id_CHEF_X' + X_CHEF_database_CHEF_X = 'X_CHEF_database_CHEF_X' + X_CHEF_type_CHEF_X = 'X_CHEF_type_CHEF_X' + + CONTENT_TYPE_XML = {"Content-Type" => "text/xml"} + + attr_reader :action + + attr_reader :indexer_payload + + attr_reader :chef_object + + attr_reader :obj_id + + attr_reader :obj_type + + attr_reader :database + + attr_reader :enqueued_at + + def initialize(object_command_json, &on_completion_block) + @start_time = Time.now.to_f + @on_completion_block = on_completion_block + if parsed_message = parse(object_command_json) + @action = parsed_message["action"] + @indexer_payload = parsed_message["payload"] + + extract_object_fields if @indexer_payload + else + @action = SKIP + end + end + + def extract_object_fields + @chef_object = @indexer_payload[ITEM] + @database = @indexer_payload[DATABASE] + @obj_id = @indexer_payload[ID] + @obj_type = @indexer_payload[TYPE] + @enqueued_at = @indexer_payload[ENQUEUED_AT] + @data_bag = @obj_type == DATA_BAG_ITEM ? @chef_object[DATA_BAG] : nil + end + + def parse(serialized_object) + Yajl::Parser.parse(serialized_object) + rescue Yajl::ParseError + log.error { "cannot index object because it is invalid JSON: #{serialized_object}" } + end + + def run + case @action + when ADD + add + when DELETE + delete + when SKIP + completed + log.info { "not indexing this item because of malformed JSON"} + else + completed + log.error { "cannot index object becuase it has an invalid action #{@action}" } + end + end + + def add + post_to_solr(pointyize_add) do + ["indexed #{indexed_object}", + "transit,xml,solr-post |", + [transit_time, @xml_time, @solr_post_time].join(","), + "|" + ].join(" ") + end + rescue Exception => e + log.error { "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}"} + end + + def delete + post_to_solr(pointyize_delete) { "deleted #{indexed_object} transit-time[#{transit_time}s]"} + rescue Exception => e + log.error { "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}"} + end + + def flattened_object + flattened_object = Flattener.new(@chef_object).flattened_item + + flattened_object[X_CHEF_id_CHEF_X] = [@obj_id] + flattened_object[X_CHEF_database_CHEF_X] = [@database] + flattened_object[X_CHEF_type_CHEF_X] = [@obj_type] + + log.debug {"adding flattened object to Solr: #{flattened_object.inspect}"} + + flattened_object + end + + START_XML = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + ADD_DOC = "<add><doc>" + DELETE_DOC = "<delete>" + ID_OPEN = "<id>" + ID_CLOSE = "</id>" + END_ADD_DOC = "</doc></add>\n" + END_DELETE = "</delete>\n" + START_CONTENT = '<field name="content">' + CLOSE_FIELD = "</field>" + + FLD_CHEF_ID_FMT = '<field name="X_CHEF_id_CHEF_X">%s</field>' + FLD_CHEF_DB_FMT = '<field name="X_CHEF_database_CHEF_X">%s</field>' + FLD_CHEF_TY_FMT = '<field name="X_CHEF_type_CHEF_X">%s</field>' + FLD_DATA_BAG = '<field name="data_bag">%s</field>' + + KEYVAL_FMT = "%s__=__%s " + + # Takes a flattened hash where the values are arrays and converts it into + # a dignified XML document suitable for POST to Solr. + # The general structure of the output document is like this: + # <?xml version="1.0" encoding="UTF-8"?> + # <add> + # <doc> + # <field name="content"> + # key__=__value + # key__=__another_value + # other_key__=__yet another value + # </field> + # </doc> + # </add> + # The document as generated has minimal newlines and formatting, however. + def pointyize_add + xml = "" + xml << START_XML << ADD_DOC + xml << (FLD_CHEF_ID_FMT % @obj_id) + xml << (FLD_CHEF_DB_FMT % @database) + xml << (FLD_CHEF_TY_FMT % @obj_type) + xml << START_CONTENT + content = "" + flattened_object.each do |field, values| + values.each do |v| + content << (KEYVAL_FMT % [field, v]) + end + end + xml << content.fast_xs + xml << CLOSE_FIELD # ends content + xml << (FLD_DATA_BAG % @data_bag.fast_xs) if @data_bag + xml << END_ADD_DOC + @xml_time = Time.now.to_f - @start_time + xml + end + + # Takes a succinct document id, like 2342, and turns it into something + # even more compact, like + # "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<delete><id>2342</id></delete>\n" + def pointyize_delete + xml = "" + xml << START_XML + xml << DELETE_DOC + xml << ID_OPEN + xml << @obj_id.to_s + xml << ID_CLOSE + xml << END_DELETE + xml + end + + def post_to_solr(document, &logger_block) + log.debug("POSTing document to SOLR:\n#{document}") + http_req = EventMachine::HttpRequest.new(solr_url).post(:body => document, :timeout => 1200, :head => CONTENT_TYPE_XML) + http_request_started + + http_req.callback do + completed + if http_req.response_header.status == 200 + log.info(&logger_block) + else + log.error { "Failed to post to solr: #{indexed_object}" } + end + end + http_req.errback do + completed + log.error { "Failed to post to solr (connection error): #{indexed_object}" } + end + end + + def completed + @solr_post_time = Time.now.to_f - @start_time + self.class.http_request_completed(self) + @on_completion_block.call + end + + def transit_time + Time.now.utc.to_i - @enqueued_at + end + + def solr_url + 'http://127.0.0.1:8983/solr/update' + end + + def indexed_object + "#{@obj_type}[#{@obj_id}] database[#{@database}]" + end + + def http_request_started + self.class.http_request_started(self) + end + + def eql?(other) + other.hash == hash + end + + def hash + "#{action}#{indexed_object}#@enqueued_at#{self.class.name}".hash + end + + end + end +end diff --git a/chef-expander/lib/chef/expander/version.rb b/chef-expander/lib/chef/expander/version.rb new file mode 100644 index 0000000000..9b0f814465 --- /dev/null +++ b/chef-expander/lib/chef/expander/version.rb @@ -0,0 +1,37 @@ +# +# Author:: Daniel DeLeo (<dan@opscode.com>) +# Author:: Seth Falcon (<seth@opscode.com>) +# Author:: Chris Walters (<cw@opscode.com>) +# Copyright:: Copyright (c) 2010-2011 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'open3' + +module Chef + module Expander + + VERSION = "0.1.0" + + def self.version + @rev ||= begin + rev = Open3.popen3("git rev-parse HEAD") {|stdin, stdout, stderr| stdout.read }.strip + rev.empty? ? nil : " (#{rev})" + end + "#{VERSION}#@rev" + end + + end +end diff --git a/chef-expander/lib/chef/expander/vnode.rb b/chef-expander/lib/chef/expander/vnode.rb new file mode 100644 index 0000000000..32bb17da32 --- /dev/null +++ b/chef-expander/lib/chef/expander/vnode.rb @@ -0,0 +1,106 @@ +# +# Author:: Daniel DeLeo (<dan@opscode.com>) +# Author:: Seth Falcon (<seth@opscode.com>) +# Author:: Chris Walters (<cw@opscode.com>) +# Copyright:: Copyright (c) 2010-2011 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'eventmachine' +require 'amqp' +require 'mq' + +require 'chef/expander/loggable' +require 'chef/expander/solrizer' + +module Chef + module Expander + class VNode + include Loggable + + attr_reader :vnode_number + + attr_reader :supervise_interval + + def initialize(vnode_number, supervisor, opts={}) + @vnode_number = vnode_number.to_i + @supervisor = supervisor + @queue = nil + @stopped = false + @supervise_interval = opts[:supervise_interval] || 30 + end + + def start + @supervisor.vnode_added(self) + + subscription_confirmed = Proc.new do + abort_on_multiple_subscribe + supervise_consumer_count + end + + queue.subscribe(:ack => true, :confirm => subscription_confirmed) do |headers, payload| + log.debug {"got #{payload} size(#{payload.size} bytes) on queue #{queue_name}"} + solrizer = Solrizer.new(payload) { headers.ack } + solrizer.run + end + + rescue MQ::Error => e + log.error {"Failed to start subscriber on #{queue_name} #{e.class.name}: #{e.message}"} + end + + def supervise_consumer_count + EM.add_periodic_timer(supervise_interval) do + abort_on_multiple_subscribe + end + end + + def abort_on_multiple_subscribe + queue.status do |message_count, subscriber_count| + if subscriber_count.to_i > 1 + log.error { "Detected extra consumers (#{subscriber_count} total) on queue #{queue_name}, cancelling subscription" } + stop + end + end + end + + def stop + log.debug {"Cancelling subscription on queue #{queue_name.inspect}"} + queue.unsubscribe if queue.subscribed? + @supervisor.vnode_removed(self) + @stopped = true + end + + def stopped? + @stopped + end + + def queue + @queue ||= begin + log.debug { "declaring queue #{queue_name}" } + MQ.queue(queue_name, :passive => false, :durable => true) + end + end + + def queue_name + "vnode-#{@vnode_number}" + end + + def control_queue_name + "#{queue_name}-control" + end + + end + end +end diff --git a/chef-expander/lib/chef/expander/vnode_supervisor.rb b/chef-expander/lib/chef/expander/vnode_supervisor.rb new file mode 100644 index 0000000000..40e9b62817 --- /dev/null +++ b/chef-expander/lib/chef/expander/vnode_supervisor.rb @@ -0,0 +1,265 @@ +# +# Author:: Daniel DeLeo (<dan@opscode.com>) +# Author:: Seth Falcon (<seth@opscode.com>) +# Author:: Chris Walters (<cw@opscode.com>) +# Copyright:: Copyright (c) 2010-2011 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'yajl' +require 'eventmachine' +require 'amqp' +require 'mq' +require 'chef/expander/version' +require 'chef/expander/loggable' +require 'chef/expander/node' +require 'chef/expander/vnode' +require 'chef/expander/vnode_table' +require 'chef/expander/configuration' + +module ::AMQP + def self.hard_reset! + MQ.reset rescue nil + stop + EM.stop rescue nil + Thread.current[:mq], @conn = nil, nil + end +end + +module Chef + module Expander + class VNodeSupervisor + include Loggable + extend Loggable + + COULD_NOT_CONNECT = /Could not connect to server/.freeze + + def self.start_cluster_worker + @vnode_supervisor = new + @original_ppid = Process.ppid + trap_signals + + vnodes = Expander.config.vnode_numbers + + $0 = "chef-expander#{Expander.config.ps_tag} worker ##{Expander.config.index} (vnodes #{vnodes.min}-#{vnodes.max})" + + AMQP.start(Expander.config.amqp_config) do + start_consumers + await_parent_death + end + end + + def self.await_parent_death + @awaiting_parent_death = EM.add_periodic_timer(1) do + unless Process.ppid == @original_ppid + @awaiting_parent_death.cancel + stop("master process death") + end + end + end + + def self.start + @vnode_supervisor = new + trap_signals + + Expander.init_config(ARGV) + + log.info("Chef Search Expander #{Expander.version} starting up.") + + begin + AMQP.start(Expander.config.amqp_config) do + start_consumers + end + rescue AMQP::Error => e + if e.message =~ COULD_NOT_CONNECT + log.error { "Could not connect to rabbitmq. Make sure it is running and correctly configured." } + log.error { e.message } + + AMQP.hard_reset! + + sleep 5 + retry + else + raise + end + end + end + + def self.start_consumers + log.debug { "Setting prefetch count to 1"} + MQ.prefetch(1) + + vnodes = Expander.config.vnode_numbers + log.info("Starting Consumers for vnodes #{vnodes.min}-#{vnodes.max}") + @vnode_supervisor.start(vnodes) + end + + def self.trap_signals + Kernel.trap(:INT) { stop_immediately(:INT) } + Kernel.trap(:TERM) { stop_gracefully(:TERM) } + end + + def self.stop_immediately(signal) + log.info { "Initiating immediate shutdown on signal (#{signal})" } + @vnode_supervisor.stop + EM.add_timer(1) do + AMQP.stop + EM.stop + end + end + + def self.stop_gracefully(signal) + log.info { "Initiating graceful shutdown on signal (#{signal})" } + @vnode_supervisor.stop + wait_for_http_requests_to_complete + end + + def self.wait_for_http_requests_to_complete + if Expander::Solrizer.http_requests_active? + log.info { "waiting for in progress HTTP Requests to complete"} + EM.add_timer(1) do + wait_for_http_requests_to_complete + end + else + log.info { "HTTP requests completed, shutting down"} + AMQP.stop + EM.stop + end + end + + attr_reader :vnode_table + + attr_reader :local_node + + def initialize + @vnodes = {} + @vnode_table = VNodeTable.new(self) + @local_node = Node.local_node + @queue_name, @guid = nil, nil + end + + def start(vnode_ids) + @local_node.start do |message| + process_control_message(message) + end + + #start_vnode_table_publisher + + Array(vnode_ids).each { |vnode_id| spawn_vnode(vnode_id) } + end + + def stop + @local_node.stop + + #log.debug { "stopping vnode table updater" } + #@vnode_table_publisher.cancel + + log.info { "Stopping VNode queue subscribers"} + @vnodes.each do |vnode_number, vnode| + log.debug { "Stopping consumer on VNode #{vnode_number}"} + vnode.stop + end + + end + + def vnode_added(vnode) + log.debug { "vnode #{vnode.vnode_number} registered with supervisor" } + @vnodes[vnode.vnode_number.to_i] = vnode + end + + def vnode_removed(vnode) + log.debug { "vnode #{vnode.vnode_number} unregistered from supervisor" } + @vnodes.delete(vnode.vnode_number.to_i) + end + + def vnodes + @vnodes.keys.sort + end + + def spawn_vnode(vnode_number) + VNode.new(vnode_number, self).start + end + + def release_vnode + # TODO + end + + def process_control_message(message) + control_message = parse_symbolic(message) + case control_message[:action] + when "claim_vnode" + spawn_vnode(control_message[:vnode_id]) + when "recover_vnode" + recover_vnode(control_message[:vnode_id]) + when "release_vnodes" + raise "todo" + release_vnode() + when "update_vnode_table" + @vnode_table.update_table(control_message[:data]) + when "vnode_table_publish" + publish_vnode_table + when "status" + publish_status_to(control_message[:rsvp]) + when "set_log_level" + set_log_level(control_message[:level], control_message[:rsvp]) + else + log.error { "invalid control message #{control_message.inspect}" } + end + rescue Exception => e + log.error { "Error processing a control message."} + log.error { "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}" } + end + + + def start_vnode_table_publisher + @vnode_table_publisher = EM.add_periodic_timer(10) { publish_vnode_table } + end + + def publish_vnode_table + status_update = @local_node.to_hash + status_update[:vnodes] = vnodes + status_update[:update] = :add + @local_node.broadcast_message(Yajl::Encoder.encode({:action => :update_vnode_table, :data => status_update})) + end + + def publish_status_to(return_queue) + status_update = @local_node.to_hash + status_update[:vnodes] = vnodes + MQ.queue(return_queue).publish(Yajl::Encoder.encode(status_update)) + end + + def set_log_level(level, rsvp_to) + log.info { "setting log level to #{level} due to command from #{rsvp_to}" } + new_log_level = (Expander.config.log_level = level.to_sym) + reply = {:level => new_log_level, :node => @local_node.to_hash} + MQ.queue(rsvp_to).publish(Yajl::Encoder.encode(reply)) + end + + def recover_vnode(vnode_id) + if @vnode_table.local_node_is_leader? + log.debug { "Recovering vnode: #{vnode_id}" } + @local_node.shared_message(Yajl::Encoder.encode({:action => :claim_vnode, :vnode_id => vnode_id})) + else + log.debug { "Ignoring :recover_vnode message because this node is not the leader" } + end + end + + def parse_symbolic(message) + Yajl::Parser.new(:symbolize_keys => true).parse(message) + end + + end + end +end diff --git a/chef-expander/lib/chef/expander/vnode_table.rb b/chef-expander/lib/chef/expander/vnode_table.rb new file mode 100644 index 0000000000..025812b49c --- /dev/null +++ b/chef-expander/lib/chef/expander/vnode_table.rb @@ -0,0 +1,83 @@ +# +# Author:: Daniel DeLeo (<dan@opscode.com>) +# Author:: Seth Falcon (<seth@opscode.com>) +# Author:: Chris Walters (<cw@opscode.com>) +# Copyright:: Copyright (c) 2010-2011 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'yajl' +require 'chef/expander/node' +require 'chef/expander/loggable' + +module Chef + module Expander + class VNodeTable + + include Loggable + + class InvalidVNodeTableUpdate < ArgumentError; end + + attr_reader :vnodes_by_node + + def initialize(vnode_supervisor) + @node_update_mutex = Mutex.new + @vnode_supervisor = vnode_supervisor + @vnodes_by_node = {} + end + + def nodes + @vnodes_by_node.keys + end + + def update_table(table_update) + case table_update[:update] + when "add", "update" + update_node(table_update) + when "remove" + remove_node(table_update) + else + raise InvalidVNodeTableUpdate, "no action or action not acceptable: #{table_update.inspect}" + end + log.debug { "current vnode table: #{@vnodes_by_node.inspect}" } + end + + def update_node(node_info) + @node_update_mutex.synchronize do + @vnodes_by_node[Node.from_hash(node_info)] = node_info[:vnodes] + end + end + + def remove_node(node_info) + @node_update_mutex.synchronize do + @vnodes_by_node.delete(Node.from_hash(node_info)) + end + end + + def leader_node + if @vnodes_by_node.empty? + nil + else + Array(@vnodes_by_node).reject { |node| node[1].empty? }.sort { |a,b| a[1].min <=> b[1].min }.first[0] + end + end + + def local_node_is_leader? + (Node.local_node == leader_node) || (@vnodes_by_node[Node.local_node].include?(0)) + end + + end + end +end |