summaryrefslogtreecommitdiff
path: root/chef-expander/lib
diff options
context:
space:
mode:
authorSeth Falcon <seth@opscode.com>2012-10-08 15:41:22 -0700
committerSeth Falcon <seth@opscode.com>2012-10-08 15:41:22 -0700
commitded6dda0bfec60c482d0cbf08f15c5086af78d6c (patch)
treec1c5c5b6d89d3dc76ea806522ab52ff73267dd10 /chef-expander/lib
parent7253b47582c2560ab15390d98a4ec3ae32bdfbf5 (diff)
downloadchef-ded6dda0bfec60c482d0cbf08f15c5086af78d6c.tar.gz
Remove chef-expander
The chef-expander code has moved to its own git repository located at: https://github.com/opscode/chef-expander
Diffstat (limited to 'chef-expander/lib')
-rw-r--r--chef-expander/lib/chef/expander.rb36
-rw-r--r--chef-expander/lib/chef/expander/cluster_supervisor.rb130
-rw-r--r--chef-expander/lib/chef/expander/configuration.rb320
-rw-r--r--chef-expander/lib/chef/expander/control.rb206
-rw-r--r--chef-expander/lib/chef/expander/daemonizable.rb150
-rw-r--r--chef-expander/lib/chef/expander/flattener.rb79
-rw-r--r--chef-expander/lib/chef/expander/loggable.rb40
-rw-r--r--chef-expander/lib/chef/expander/logger.rb135
-rw-r--r--chef-expander/lib/chef/expander/node.rb177
-rw-r--r--chef-expander/lib/chef/expander/solrizer.rb275
-rw-r--r--chef-expander/lib/chef/expander/version.rb41
-rw-r--r--chef-expander/lib/chef/expander/vnode.rb106
-rw-r--r--chef-expander/lib/chef/expander/vnode_supervisor.rb265
-rw-r--r--chef-expander/lib/chef/expander/vnode_table.rb83
14 files changed, 0 insertions, 2043 deletions
diff --git a/chef-expander/lib/chef/expander.rb b/chef-expander/lib/chef/expander.rb
deleted file mode 100644
index 9a58868a96..0000000000
--- a/chef-expander/lib/chef/expander.rb
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Author:: Seth Falcon (<seth@opscode.com>)
-# Author:: Chris Walters (<cw@opscode.com>)
-# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-module Chef
- module Expander
-
- # VNODES is the number of queues in rabbit that are available for subscribing.
- # The name comes from riak, where the data ring (160bits) is chunked into
- # many vnodes; vnodes outnumber physical nodes, so one node hosts several
- # vnodes. That is the same design we use here.
- #
- # See the notes on topic queue benchmarking before adjusting this value.
- VNODES = 1024
-
- SHARED_CONTROL_QUEUE_NAME = "chef-search-control--shared"
- BROADCAST_CONTROL_EXCHANGE_NAME = 'chef-search-control--broadcast'
-
- end
-end
diff --git a/chef-expander/lib/chef/expander/cluster_supervisor.rb b/chef-expander/lib/chef/expander/cluster_supervisor.rb
deleted file mode 100644
index 56d6e70d5f..0000000000
--- a/chef-expander/lib/chef/expander/cluster_supervisor.rb
+++ /dev/null
@@ -1,130 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Author:: Seth Falcon (<seth@opscode.com>)
-# Author:: Chris Walters (<cw@opscode.com>)
-# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'chef/expander/loggable'
-require 'chef/expander/daemonizable'
-require 'chef/expander/version'
-require 'chef/expander/configuration'
-require 'chef/expander/vnode_supervisor'
-
-module Chef
- module Expander
- #==ClusterSupervisor
- # Manages a cluster of chef-expander processes. Usually this class will
- # be instantiated from the chef-expander-cluster executable.
- #
- # ClusterSupervisor works by forking the desired number of processes, then
- # running VNodeSupervisor.start_cluster_worker within the forked process.
- # ClusterSupervisor keeps track of the process ids of its children, and will
- # periodically attempt to reap them in a non-blocking call. If they are
- # reaped, ClusterSupervisor knows they died and need to be respawned.
- #
- # The child processes are responsible for checking on the master process and
- # dying if the master has died (VNodeSupervisor does this when started in
- # with start_cluster_worker).
- #
- #===TODO:
- # * This implementation currently assumes there is only one cluster, so it
- # will claim all of the vnodes. It may be advantageous to allow multiple
- # clusters.
- # * There is no heartbeat implementation at this time, so a zombified child
- # process will not be automatically killed--This behavior is left to the
- # meatcloud for now.
- class ClusterSupervisor
- include Loggable
- include Daemonizable
-
- def initialize
- @workers = {}
- @running = true
- @kill = :TERM
- end
-
- def start
- trap(:INT) { stop(:INT) }
- trap(:TERM) { stop(:TERM)}
- Expander.init_config(ARGV)
-
- log.info("Chef Expander #{Expander.version} starting cluster with #{Expander.config.node_count} nodes")
- configure_process
- start_workers
- maintain_workers
- release_locks
- rescue Configuration::InvalidConfiguration => e
- log.fatal {"Configuration Error: " + e.message}
- exit(2)
- rescue Exception => e
- raise if SystemExit === e
-
- log.fatal {e}
- exit(1)
- end
-
- def start_workers
- Expander.config.node_count.times do |i|
- start_worker(i + 1)
- end
- end
-
- def start_worker(index)
- log.info { "Starting cluster worker #{index}" }
- worker_params = {:index => index}
- child_pid = fork do
- Expander.config.index = index
- VNodeSupervisor.start_cluster_worker
- end
- @workers[child_pid] = worker_params
- end
-
- def stop(signal)
- log.info { "Stopping cluster on signal (#{signal})" }
- @running = false
- @kill = signal
- end
-
- def maintain_workers
- while @running
- sleep 1
- workers_to_replace = {}
- @workers.each do |process_id, worker_params|
- if result = Process.waitpid2(process_id, Process::WNOHANG)
- log.error { "worker #{worker_params[:index]} (PID: #{process_id}) died with status #{result[1].exitstatus || '(no status)'}"}
- workers_to_replace[process_id] = worker_params
- end
- end
- workers_to_replace.each do |dead_pid, worker_params|
- @workers.delete(dead_pid)
- start_worker(worker_params[:index])
- end
- end
-
- @workers.each do |pid, worker_params|
- log.info { "Stopping worker #{worker_params[:index]} (PID: #{pid})"}
- Process.kill(@kill, pid)
- end
- @workers.each do |pid, worker_params|
- Process.waitpid2(pid)
- end
-
- end
-
- end
- end
-end
diff --git a/chef-expander/lib/chef/expander/configuration.rb b/chef-expander/lib/chef/expander/configuration.rb
deleted file mode 100644
index 8fbaba762a..0000000000
--- a/chef-expander/lib/chef/expander/configuration.rb
+++ /dev/null
@@ -1,320 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Author:: Seth Falcon (<seth@opscode.com>)
-# Author:: Chris Walters (<cw@opscode.com>)
-# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'pp'
-require 'optparse'
-require 'singleton'
-
-require 'chef/expander/flattener'
-require 'chef/expander/loggable'
-require 'chef/expander/version'
-
-module Chef
- module Expander
-
- def self.config
- @config ||= Configuration::Base.new
- end
-
- def self.init_config(argv)
- config.apply_defaults
- remaining_opts_after_parse = Configuration::CLI.parse_options(argv)
- # Need to be able to override the default config file location on the command line
- config_file_to_use = Configuration::CLI.config.config_file || config.config_file
- config.merge_config(Configuration::Base.from_chef_compat_config(config_file_to_use))
- # But for all other config options, the CLI config should win over config file
- config.merge_config(Configuration::CLI.config)
- config.validate!
- remaining_opts_after_parse
- end
-
- class ChefCompatibleConfig
-
- attr_reader :config_hash
-
- def initialize
- @config_hash = {}
- end
-
- def load(file)
- file = File.expand_path(file)
- instance_eval(IO.read(file), file, 1) if File.readable?(file)
- end
-
- def method_missing(method_name, *args, &block)
- if args.size == 1
- @config_hash[method_name] = args.first
- elsif args.empty?
- @config_hash[method_name] or super
- else
- super
- end
- end
-
- end
-
- module Configuration
-
- class InvalidConfiguration < StandardError
- end
-
- class Base
-
- DEFAULT_PIDFILE = Object.new
-
- include Loggable
-
- def self.from_chef_compat_config(file)
- config = ChefCompatibleConfig.new
- config.load(file)
- from_hash(config.config_hash)
- end
-
- def self.from_hash(config_hash)
- config = new
- config_hash.each do |setting, value|
- setter = "#{setting}=".to_sym
- if config.respond_to?(setter)
- config.send(setter, value)
- end
- end
- config
- end
-
- def self.configurables
- @configurables ||= []
- end
-
- def self.validations
- @validations ||= []
- end
-
- def self.defaults
- @defaults ||= {}
- end
-
- def self.configurable(setting, default=nil, &validation)
- attr_accessor(setting)
- configurables << setting
- defaults[setting] = default
- validations << validation if block_given?
-
- setting
- end
-
- configurable :config_file, "/etc/chef/solr.rb" do
- unless (config_file && File.exist?(config_file) && File.readable?(config_file))
- log.warn {"* " * 40}
- log.warn {"Config file #{config_file} does not exist or cannot be read by user (#{Process.euid})"}
- log.warn {"Default configuration settings will be used"}
- log.warn {"* " * 40}
- end
- end
-
- configurable :index do
- unless index.nil? # in single-cluster mode, this setting is not required.
- invalid("You must specify this node's position in the ring as an integer") unless index.kind_of?(Integer)
- invalid("The index cannot be larger than the cluster size (node-count)") unless (index.to_i <= node_count.to_i)
- end
- end
-
- configurable :node_count, 1 do
- invalid("You must specify the node_count as an integer") unless node_count.kind_of?(Integer)
- invalid("The node_count must be 1 or greater") unless node_count >= 1
- invalid("The node_count cannot be smaller than the index") unless node_count >= index.to_i
- end
-
- configurable :ps_tag, ""
-
- configurable :solr_url, "http://localhost:8983/solr"
-
- # override the setter for solr_url for backward compatibilty
- def solr_url=(url)
- if url && url == "http://localhost:8983"
- log.warn {"You seem to have a legacy setting for solr_url: did you mean #{url}/solr ?"}
- url = "#{url}/solr"
- end
- @solr_url = url
- end
-
- configurable :amqp_host, '0.0.0.0'
-
- configurable :amqp_port, 5672
-
- configurable :amqp_user, 'chef'
-
- configurable :amqp_pass, 'testing'
-
- configurable :amqp_vhost, '/chef'
-
- configurable :user, nil
-
- configurable :group, nil
-
- configurable :daemonize, false
-
- alias :daemonize? :daemonize
-
- configurable :pidfile, DEFAULT_PIDFILE
-
- def pidfile
- if @pidfile.equal?(DEFAULT_PIDFILE)
- Process.euid == 0 ? '/var/run/chef-expander.pid' : '/tmp/chef-expander.pid'
- else
- @pidfile
- end
- end
-
- configurable :log_level, :info
-
- # override the setter for log_level to also actually set the level
- def log_level=(level)
- if level #don't accept nil for an answer
- level = level.to_sym
- Loggable::LOGGER.level = level
- @log_level = log_level
- end
- level
- end
-
- configurable :log_location, STDOUT
-
- # override the setter for log_location to re-init the logger
- def log_location=(location)
- Loggable::LOGGER.init(location) unless location.nil?
- end
-
- def initialize
- reset!
- end
-
- def reset!(stdout=nil)
- self.class.configurables.each do |setting|
- send("#{setting}=".to_sym, nil)
- end
- @stdout = stdout || STDOUT
- end
-
- def apply_defaults
- self.class.defaults.each do |setting, value|
- self.send("#{setting}=".to_sym, value)
- end
- end
-
- def merge_config(other)
- self.class.configurables.each do |setting|
- value = other.send(setting)
- self.send("#{setting}=".to_sym, value) if value
- end
- end
-
- def fail_if_invalid
- validate!
- rescue InvalidConfiguration => e
- @stdout.puts("Invalid configuration: #{e.message}")
- exit(1)
- end
-
- def invalid(message)
- raise InvalidConfiguration, message
- end
-
- def validate!
- self.class.validations.each do |validation_proc|
- instance_eval(&validation_proc)
- end
- end
-
- def vnode_numbers
- vnodes_per_node = VNODES / node_count
- lower_bound = (index - 1) * vnodes_per_node
- upper_bound = lower_bound + vnodes_per_node
- upper_bound += VNODES % vnodes_per_node if index == node_count
- (lower_bound...upper_bound).to_a
- end
-
- def amqp_config
- {:host => amqp_host, :port => amqp_port, :user => amqp_user, :pass => amqp_pass, :vhost => amqp_vhost}
- end
-
- end
-
- module CLI
- @config = Configuration::Base.new
-
- @option_parser = OptionParser.new do |o|
- o.banner = "Usage: chef-expander [options]"
-
- o.on('-c', '--config CONFIG_FILE', 'a configuration file to use') do |conf|
- @config.config_file = File.expand_path(conf)
- end
-
- o.on('-i', '--index INDEX', 'the slot this node will occupy in the ring') do |i|
- @config.index = i.to_i
- end
-
- o.on('-n', '--node-count NUMBER', 'the number of nodes in the ring') do |n|
- @config.node_count = n.to_i
- end
-
- o.on('-l', '--log-level LOG_LEVEL', 'set the log level') do |l|
- @config.log_level = l
- end
-
- o.on('-L', '--logfile LOG_LOCATION', 'Logfile to use') do |l|
- @config.log_location = l
- end
-
- o.on('-d', '--daemonize', 'fork into the background') do
- @config.daemonize = true
- end
-
- o.on('-P', '--pid PIDFILE') do |p|
- @config.pidfile = p
- end
-
- o.on_tail('-h', '--help', 'show this message') do
- puts "chef-expander #{Expander.version}"
- puts ''
- puts o
- exit 1
- end
-
- o.on_tail('-v', '--version', 'show the version and exit') do
- puts "chef-expander #{Expander.version}"
- exit 0
- end
-
- end
-
- def self.parse_options(argv)
- @option_parser.parse!(argv.dup)
- end
-
- def self.config
- @config
- end
-
- end
-
- end
-
- end
-end
diff --git a/chef-expander/lib/chef/expander/control.rb b/chef-expander/lib/chef/expander/control.rb
deleted file mode 100644
index f8e3e99503..0000000000
--- a/chef-expander/lib/chef/expander/control.rb
+++ /dev/null
@@ -1,206 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Author:: Seth Falcon (<seth@opscode.com>)
-# Author:: Chris Walters (<cw@opscode.com>)
-# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'bunny'
-require 'yajl'
-require 'eventmachine'
-require 'amqp'
-require 'mq'
-require 'highline'
-
-require 'chef/expander/node'
-require 'chef/expander/configuration'
-
-require 'pp'
-
-module Chef
- module Expander
- class Control
-
- def self.run(argv)
- remaining_args_after_opts = Expander.init_config(ARGV)
- new(remaining_args_after_opts).run
- end
-
- def self.desc(description)
- @desc = description
- end
-
- def self.option(*args)
- #TODO
- end
-
- def self.arg(*args)
- #TODO
- end
-
- def self.descriptions
- @descriptions ||= []
- end
-
- def self.method_added(method_name)
- if @desc
- descriptions << [method_name, method_name.to_s.gsub('_', '-'), @desc]
- @desc = nil
- end
- end
-
- #--
- # TODO: this is confusing and unneeded. Just whitelist the methods
- # that map to commands and use +send+
- def self.compile
- run_method = "def run; case @argv.first;"
- descriptions.each do |method_name, command_name, desc|
- run_method << "when '#{command_name}';#{method_name};"
- end
- run_method << "else; help; end; end;"
- class_eval(run_method, __FILE__, __LINE__)
- end
-
- def initialize(argv)
- @argv = argv.dup
- end
-
- desc "Show this message"
- def help
- puts "Chef Expander #{Expander.version}"
- puts "Usage: chef-expanderctl COMMAND"
- puts
- puts "Commands:"
- self.class.descriptions.each do |method_name, command_name, desc|
- puts " #{command_name}".ljust(15) + desc
- end
- end
-
- desc "display the aggregate queue backlog"
- def queue_depth
- h = HighLine.new
- message_counts = []
-
- amqp_client = Bunny.new(Expander.config.amqp_config)
- amqp_client.start
-
- 0.upto(VNODES - 1) do |vnode|
- q = amqp_client.queue("vnode-#{vnode}", :durable => true)
- message_counts << q.status[:message_count]
- end
- total_messages = message_counts.inject(0) { |sum, count| sum + count }
- max = message_counts.max
- min = message_counts.min
-
- avg = total_messages.to_f / message_counts.size.to_f
-
- puts " total messages: #{total_messages}"
- puts " average queue depth: #{avg}"
- puts " max queue depth: #{max}"
- puts " min queue depth: #{min}"
- ensure
- amqp_client.stop if defined?(amqp_client) && amqp_client
- end
-
- desc "show the backlog and consumer count for each vnode queue"
- def queue_status
- h = HighLine.new
- queue_status = [h.color("VNode", :bold), h.color("Messages", :bold), h.color("Consumers", :bold)]
-
- total_messages = 0
-
- amqp_client = Bunny.new(Expander.config.amqp_config)
- amqp_client.start
-
- 0.upto(VNODES - 1) do |vnode|
- q = amqp_client.queue("vnode-#{vnode}", :durable => true)
- status = q.status
- # returns {:message_count => method.message_count, :consumer_count => method.consumer_count}
- queue_status << vnode.to_s << status[:message_count].to_s << status[:consumer_count].to_s
- total_messages += status[:message_count]
- end
- puts " total messages: #{total_messages}"
- puts
- puts h.list(queue_status, :columns_across, 3)
- ensure
- amqp_client.stop if defined?(amqp_client) && amqp_client
- end
-
- desc "show the status of the nodes in the cluster"
- def node_status
- status_mutex = Mutex.new
- h = ::HighLine.new
- node_status = [h.color("Host", :bold), h.color("PID", :bold), h.color("GUID", :bold), h.color("Vnodes", :bold)]
-
- print("Collecting status info from the cluster...")
-
- AMQP.start(Expander.config.amqp_config) do
- node = Expander::Node.local_node
- node.exclusive_control_queue.subscribe do |header, message|
- status = Yajl::Parser.parse(message)
- status_mutex.synchronize do
- node_status << status["hostname_f"]
- node_status << status["pid"].to_s
- node_status << status["guid"]
- # BIG ASSUMPTION HERE that nodes only have contiguous vnode ranges
- # will not be true once vnode recovery is implemented
- node_status << "#{status["vnodes"].min}-#{status["vnodes"].max}"
- end
- end
- node.broadcast_message(Yajl::Encoder.encode(:action => :status, :rsvp => node.exclusive_control_queue_name))
- EM.add_timer(2) { AMQP.stop;EM.stop }
- end
-
- puts "done"
- puts
- puts h.list(node_status, :columns_across, 4)
- puts
- end
-
- desc "sets the log level of all nodes in the cluster"
- def log_level
- @argv.shift
- level = @argv.first
- acceptable_levels = %w{debug info warn error fatal}
- unless acceptable_levels.include?(level)
- puts "Log level must be one of #{acceptable_levels.join(', ')}"
- exit 1
- end
-
- h = HighLine.new
- response_mutex = Mutex.new
-
- responses = [h.color("Host", :bold), h.color("PID", :bold), h.color("GUID", :bold), h.color("Log Level", :bold)]
- AMQP.start(Expander.config.amqp_config) do
- node = Expander::Node.local_node
- node.exclusive_control_queue.subscribe do |header, message|
- reply = Yajl::Parser.parse(message)
- n = reply['node']
- response_mutex.synchronize do
- responses << n["hostname_f"] << n["pid"].to_s << n["guid"] << reply["level"]
- end
- end
- node.broadcast_message(Yajl::Encoder.encode({:action => :set_log_level, :level => level, :rsvp => node.exclusive_control_queue_name}))
- EM.add_timer(2) { AMQP.stop; EM.stop }
- end
- puts h.list(responses, :columns_across, 4)
- end
-
-
- compile
- end
- end
-end
diff --git a/chef-expander/lib/chef/expander/daemonizable.rb b/chef-expander/lib/chef/expander/daemonizable.rb
deleted file mode 100644
index 0d76bd6b6e..0000000000
--- a/chef-expander/lib/chef/expander/daemonizable.rb
+++ /dev/null
@@ -1,150 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Copyright:: Copyright (c) 2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'etc'
-require 'chef/expander/loggable'
-
-module Chef
- module Expander
-
- class AlreadyRunning < RuntimeError
- end
-
- class NoSuchUser < ArgumentError
- end
-
- class NoSuchGroup < ArgumentError
- end
-
- module Daemonizable
- include Loggable
-
- # Daemonizes the process if configured to do so, and ensures that only one
- # copy of the process is running with a given config by obtaining an
- # exclusive lock on the pidfile. Also sets process user and group if so
- # configured.
- # ===Raises
- # * AlreadyRunning::: when another process has the exclusive lock on the pidfile
- # * NoSuchUser::: when a user is configured that doesn't exist
- # * NoSuchGroup::: when a group is configured that doesn't exist
- # * SystemCallError::: if there is an error creating the pidfile
- def configure_process
- Expander.config.daemonize? ? daemonize : ensure_exclusive
- set_user_and_group
- end
-
- def daemonize
- acquire_locks
- exit if fork
- Process.setsid
- exit if fork
- write_pid
- Dir.chdir('/')
- STDIN.reopen("/dev/null")
- STDOUT.reopen("/dev/null", "a")
- STDERR.reopen("/dev/null", "a")
- end
-
- # When not forking into the background, this ensures only one chef-expander
- # is running with a given config and writes the process id to the pidfile.
- def ensure_exclusive
- acquire_locks
- write_pid
- end
-
- def set_user_and_group
- return nil if Expander.config.user.nil?
-
- if Expander.config.group.nil?
- log.info {"Changing user to #{Expander.config.user}"}
- else
- log.info {"Changing user to #{Expander.config.user} and group to #{Expander.config.group}"}
- end
-
- unless (set_group && set_user)
- log.error {"Unable to change user to #{Expander.config.user} - Are you root?"}
- end
- end
-
- # Deletes the pidfile, releasing the exclusive lock on it in the process.
- def release_locks
- File.unlink(@pidfile.path) if File.exist?(@pidfile.path)
- @pidfile.close unless @pidfile.closed?
- end
-
- private
-
- def set_user
- Process::Sys.setuid(target_uid)
- true
- rescue Errno::EPERM => e
- log.debug {e}
- false
- end
-
- def set_group
- if gid = target_uid
- Process::Sys.setgid(gid)
- end
- true
- rescue Errno::EPERM
- log.debug {e}
- false
- end
-
- def target_uid
- user = Expander.config.user
- user.kind_of?(Fixnum) ? user : Etc.getpwnam(user).uid
- rescue ArgumentError => e
- log.debug {e}
- raise NoSuchUser, "Cannot change user to #{user} - failed to find the uid"
- end
-
- def target_gid
- if group = Expander.config.group
- group.kind_of?(Fixnum) ? group : Etc.getgrnam(group).gid
- else
- nil
- end
- rescue ArgumentError => e
- log.debug {e}
- raise NoSuchGroup, "Cannot change group to #{group} - failed to find the gid"
- end
-
- def acquire_locks
- @pidfile = File.open(Expander.config.pidfile, File::RDWR|File::CREAT, 0644)
- unless @pidfile.flock(File::LOCK_EX | File::LOCK_NB)
- pid = @pidfile.read.strip
- msg = "Another instance of chef-expander (pid: #{pid}) has a lock on the pidfile (#{Expander.config.pidfile}). \n"\
- "Configure a different pidfile to run multiple instances of chef-expander at once."
- raise AlreadyRunning, msg
- end
- rescue Exception
- @pidfile.close if @pidfile && !@pidfile.closed?
- raise
- end
-
- def write_pid
- @pidfile.truncate(0)
- @pidfile.print("#{Process.pid}\n")
- @pidfile.flush
- end
-
- end
- end
-end \ No newline at end of file
diff --git a/chef-expander/lib/chef/expander/flattener.rb b/chef-expander/lib/chef/expander/flattener.rb
deleted file mode 100644
index 90f7cd663e..0000000000
--- a/chef-expander/lib/chef/expander/flattener.rb
+++ /dev/null
@@ -1,79 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Author:: Seth Falcon (<seth@opscode.com>)
-# Author:: Chris Walters (<cw@opscode.com>)
-# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'chef/expander/configuration'
-
-module Chef
- module Expander
- # Flattens and expands nested Hashes representing Chef objects
- # (e.g, Nodes, Roles, DataBagItems, etc.) into flat Hashes so the
- # objects are suitable to be saved into Solr. This code is more or
- # less copy-pasted from chef/solr/index which may or may not be a
- # great idea, though that does minimize the dependencies and
- # hopefully minimize the memory use of chef-expander.
- class Flattener
- UNDERSCORE = '_'
- X = 'X'
-
- X_CHEF_id_CHEF_X = 'X_CHEF_id_CHEF_X'
- X_CHEF_database_CHEF_X = 'X_CHEF_database_CHEF_X'
- X_CHEF_type_CHEF_X = 'X_CHEF_type_CHEF_X'
-
- def initialize(item)
- @item = item
- end
-
- def flattened_item
- @flattened_item || flatten_and_expand
- end
-
- def flatten_and_expand
- @flattened_item = Hash.new {|hash, key| hash[key] = []}
-
- @item.each do |key, value|
- flatten_each([key.to_s], value)
- end
-
- @flattened_item.each_value { |values| values.uniq! }
- @flattened_item
- end
-
- def flatten_each(keys, values)
- case values
- when Hash
- values.each do |child_key, child_value|
- add_field_value(keys, child_key)
- flatten_each(keys + [child_key.to_s], child_value)
- end
- when Array
- values.each { |child_value| flatten_each(keys, child_value) }
- else
- add_field_value(keys, values)
- end
- end
-
- def add_field_value(keys, value)
- value = value.to_s
- @flattened_item[keys.join(UNDERSCORE)] << value
- @flattened_item[keys.last] << value
- end
- end
- end
-end
diff --git a/chef-expander/lib/chef/expander/loggable.rb b/chef-expander/lib/chef/expander/loggable.rb
deleted file mode 100644
index 5dd534dab5..0000000000
--- a/chef-expander/lib/chef/expander/loggable.rb
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Author:: Seth Falcon (<seth@opscode.com>)
-# Author:: Chris Walters (<cw@opscode.com>)
-# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'chef/expander/logger'
-require 'mixlib/log'
-
-module Chef
- module Expander
- module Loggable
-
- # TODO: it's admittedly janky to set up the default logging this way.
- STDOUT.sync = true
- LOGGER = Logger.new(STDOUT)
- LOGGER.level = :debug
-
- def log
- LOGGER
- end
-
- end
- end
-end
-
diff --git a/chef-expander/lib/chef/expander/logger.rb b/chef-expander/lib/chef/expander/logger.rb
deleted file mode 100644
index 234344e0cb..0000000000
--- a/chef-expander/lib/chef/expander/logger.rb
+++ /dev/null
@@ -1,135 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Copyright:: Copyright (c) 2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'logger'
-
-module Chef
- module Expander
-
- class InvalidLogDevice < ArgumentError
- end
-
- class InvalidLogLevel < ArgumentError
- end
-
- # Customized Logger class that dispenses with the unnecessary mutexing.
- # As long as you write one line at a time, the OS will take care of keeping
- # your output in order. Expander commonly runs as a cluster of worker
- # processes so the mutexing wasn't actually helping us anyway.
- #
- # We don't use the program name field in the logger, so support for that
- # has been removed. The log format is also hardcoded since we don't ever
- # change the format.
- class Logger < ::Logger
-
- LEVELS = { :debug=>DEBUG, :info=>INFO, :warn=>WARN, :error=>ERROR, :fatal=>FATAL}
- LEVEL_INTEGERS = LEVELS.invert
- LEVEL_TO_STR = Hash[LEVEL_INTEGERS.map {|i,sym| [i,sym.to_s.upcase]}]
-
- LOG_DEVICES = []
-
- at_exit do
- LOG_DEVICES.each {|io| io.close if io.respond_to?(:closed?) && !io.closed?}
- end
-
- attr_reader :log_device
-
- # (re-)initialize the Logger with a new IO object or file to log to.
- def init(log_device)
- @log_device = initialize_log_device(log_device)
- end
-
- def initialize(log_device)
- @level = DEBUG
- init(log_device)
- end
-
- def level=(new_level)
- @level = if new_level.kind_of?(Fixnum) && LEVEL_INTEGERS.key?(new_level)
- new
- elsif LEVELS.key?(new_level)
- LEVELS[new_level]
- else
- raise InvalidLogLevel, "#{new_level} is not a valid log level. Valid log levels are [#{LEVEL_INTEGERS.keys.join(',')}] and [#{LEVELS.join(',')}]"
- end
- end
-
- def <<(msg)
- @log_device.print(msg)
- end
-
- def add(severity=UNKNOWN, message = nil, progname = nil, &block)
- return true unless severity >= @level
-
- message ||= progname # level methods (e.g, #debug) pass explicit message as progname
-
- if message.nil? && block_given?
- message = yield
- end
-
- self << sprintf("[%s] %s: %s\n", Time.new.rfc2822(), LEVEL_TO_STR[severity], msg2str(message))
- true
- end
-
- alias :log :add
-
- private
-
- def msg2str(msg)
- case msg
- when ::String
- msg
- when ::Exception
- "#{ msg.message } (#{ msg.class })\n" <<
- (msg.backtrace || []).join("\n")
- else
- msg.inspect
- end
- end
-
- def logging_at_severity?(severity=nil)
- end
-
- def initialize_log_device(dev)
- unless dev.respond_to? :sync=
- assert_valid_path!(dev)
- dev = File.open(dev.to_str, "a")
- LOG_DEVICES << dev
- end
-
- dev.sync = true
- dev
- end
-
- def assert_valid_path!(path)
- enclosing_directory = File.dirname(path)
- unless File.directory?(enclosing_directory)
- raise InvalidLogDevice, "You must create the enclosing directory #{enclosing_directory} before the log file #{path} can be created."
- end
- if File.exist?(path)
- unless File.writable?(path)
- raise InvalidLogDevice, "The log file you specified (#{path}) is not writable by user #{Process.euid}"
- end
- elsif !File.writable?(enclosing_directory)
- raise InvalidLogDevice, "You specified a log file #{path} but user #{Process.euid} is not permitted to create files there."
- end
- end
-
- end
- end
-end \ No newline at end of file
diff --git a/chef-expander/lib/chef/expander/node.rb b/chef-expander/lib/chef/expander/node.rb
deleted file mode 100644
index 23249e8685..0000000000
--- a/chef-expander/lib/chef/expander/node.rb
+++ /dev/null
@@ -1,177 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Author:: Seth Falcon (<seth@opscode.com>)
-# Author:: Chris Walters (<cw@opscode.com>)
-# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'uuidtools'
-require 'amqp'
-require 'mq'
-require 'open3'
-
-require 'chef/expander/loggable'
-
-module Chef
- module Expander
- class Node
-
- include Loggable
-
- def self.from_hash(node_info)
- new(node_info[:guid], node_info[:hostname_f], node_info[:pid])
- end
-
- def self.local_node
- new(guid, hostname_f, Process.pid)
- end
-
- def self.guid
- return @guid if @guid
- @guid = UUIDTools::UUID.random_create.to_s
- end
-
- def self.hostname_f
- @hostname ||= Open3.popen3("hostname -f") {|stdin, stdout, stderr| stdout.read }.strip
- end
-
- attr_reader :guid
-
- attr_reader :hostname_f
-
- attr_reader :pid
-
- def initialize(guid, hostname_f, pid)
- @guid, @hostname_f, @pid = guid, hostname_f, pid
- end
-
- def start(&message_handler)
- attach_to_queue(exclusive_control_queue, "exclusive control", &message_handler)
- attach_to_queue(shared_control_queue, "shared_control", &message_handler)
- attach_to_queue(broadcast_control_queue, "broadcast control", &message_handler)
- end
-
- def attach_to_queue(queue, colloquial_name, &message_handler)
- queue.subscribe(:ack => true) do |headers, payload|
- log.debug { "received message on #{colloquial_name} queue: #{payload}" }
- message_handler.call(payload)
- headers.ack
- end
- end
-
- def stop
- log.debug { "unsubscribing from broadcast control queue"}
- broadcast_control_queue.unsubscribe(:nowait => false)
-
- log.debug { "unsubscribing from shared control queue" }
- shared_control_queue.unsubscribe(:nowait => false)
-
- log.debug { "unsubscribing from exclusive control queue" }
- exclusive_control_queue.unsubscribe(:nowait => false)
- end
-
- def direct_message(message)
- log.debug { "publishing direct message to node #{identifier}: #{message}" }
- exclusive_control_queue.publish(message)
- end
-
- def shared_message(message)
- log.debug { "publishing shared message #{message}"}
- shared_control_queue.publish(message)
- end
-
- def broadcast_message(message)
- log.debug { "publishing broadcast message #{message}" }
- broadcast_control_exchange.publish(message)
- end
-
- # The exclusive control queue is for point-to-point messaging, i.e.,
- # messages directly addressed to this node
- def exclusive_control_queue
- @exclusive_control_queue ||= begin
- log.debug { "declaring exclusive control queue #{exclusive_control_queue_name}" }
- MQ.queue(exclusive_control_queue_name)
- end
- end
-
- # The shared control queue is for 1 to (1 of N) messaging, i.e.,
- # messages that can go to any one node.
- def shared_control_queue
- @shared_control_queue ||= begin
- log.debug { "declaring shared control queue #{shared_control_queue_name}" }
- MQ.queue(shared_control_queue_name)
- end
- end
-
- # The broadcast control queue is for 1 to N messaging, i.e.,
- # messages that go to every node
- def broadcast_control_queue
- @broadcast_control_queue ||= begin
- log.debug { "declaring broadcast control queue #{broadcast_control_queue_name}"}
- q = MQ.queue(broadcast_control_queue_name)
- log.debug { "binding broadcast control queue to broadcast control exchange"}
- q.bind(broadcast_control_exchange)
- q
- end
- end
-
- def broadcast_control_exchange
- @broadcast_control_exchange ||= begin
- log.debug { "declaring broadcast control exchange opscode-platfrom-control--broadcast" }
- MQ.fanout(broadcast_control_exchange_name, :nowait => false)
- end
- end
-
- def shared_control_queue_name
- SHARED_CONTROL_QUEUE_NAME
- end
-
- def broadcast_control_queue_name
- @broadcast_control_queue_name ||= "#{identifier}--broadcast"
- end
-
- def broadcast_control_exchange_name
- BROADCAST_CONTROL_EXCHANGE_NAME
- end
-
- def exclusive_control_queue_name
- @exclusive_control_queue_name ||= "#{identifier}--exclusive-control"
- end
-
- def identifier
- "#{hostname_f}--#{pid}--#{guid}"
- end
-
- def ==(other)
- other.respond_to?(:guid) && other.respond_to?(:hostname_f) && other.respond_to?(:pid) &&
- (other.guid == guid) && (other.hostname_f == hostname_f) && (other.pid == pid)
- end
-
- def eql?(other)
- (other.class == self.class) && (other.hash == hash)
- end
-
- def hash
- identifier.hash
- end
-
- def to_hash
- {:guid => @guid, :hostname_f => @hostname_f, :pid => @pid}
- end
-
- end
- end
-end
diff --git a/chef-expander/lib/chef/expander/solrizer.rb b/chef-expander/lib/chef/expander/solrizer.rb
deleted file mode 100644
index 1faf288674..0000000000
--- a/chef-expander/lib/chef/expander/solrizer.rb
+++ /dev/null
@@ -1,275 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Author:: Seth Falcon (<seth@opscode.com>)
-# Author:: Chris Walters (<cw@opscode.com>)
-# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'set'
-require 'yajl'
-require 'fast_xs'
-require 'em-http-request'
-require 'chef/expander/loggable'
-require 'chef/expander/flattener'
-
-module Chef
- module Expander
- class Solrizer
-
- @active_http_requests = Set.new
-
- def self.http_request_started(instance)
- @active_http_requests << instance
- end
-
- def self.http_request_completed(instance)
- @active_http_requests.delete(instance)
- end
-
- def self.http_requests_active?
- !@active_http_requests.empty?
- end
-
- def self.clear_http_requests
- @active_http_requests.clear
- end
-
- include Loggable
-
- ADD = "add"
- DELETE = "delete"
- SKIP = "skip"
-
- ITEM = "item"
- ID = "id"
- TYPE = "type"
- DATABASE = "database"
- ENQUEUED_AT = "enqueued_at"
-
- DATA_BAG_ITEM = "data_bag_item"
- DATA_BAG = "data_bag"
-
- X_CHEF_id_CHEF_X = 'X_CHEF_id_CHEF_X'
- X_CHEF_database_CHEF_X = 'X_CHEF_database_CHEF_X'
- X_CHEF_type_CHEF_X = 'X_CHEF_type_CHEF_X'
-
- CONTENT_TYPE_XML = {"Content-Type" => "text/xml"}
-
- attr_reader :action
-
- attr_reader :indexer_payload
-
- attr_reader :chef_object
-
- attr_reader :obj_id
-
- attr_reader :obj_type
-
- attr_reader :database
-
- attr_reader :enqueued_at
-
- def initialize(object_command_json, &on_completion_block)
- @start_time = Time.now.to_f
- @on_completion_block = on_completion_block
- if parsed_message = parse(object_command_json)
- @action = parsed_message["action"]
- @indexer_payload = parsed_message["payload"]
-
- extract_object_fields if @indexer_payload
- else
- @action = SKIP
- end
- end
-
- def extract_object_fields
- @chef_object = @indexer_payload[ITEM]
- @database = @indexer_payload[DATABASE]
- @obj_id = @indexer_payload[ID]
- @obj_type = @indexer_payload[TYPE]
- @enqueued_at = @indexer_payload[ENQUEUED_AT]
- @data_bag = @obj_type == DATA_BAG_ITEM ? @chef_object[DATA_BAG] : nil
- end
-
- def parse(serialized_object)
- Yajl::Parser.parse(serialized_object)
- rescue Yajl::ParseError
- log.error { "cannot index object because it is invalid JSON: #{serialized_object}" }
- end
-
- def run
- case @action
- when ADD
- add
- when DELETE
- delete
- when SKIP
- completed
- log.info { "not indexing this item because of malformed JSON"}
- else
- completed
- log.error { "cannot index object becuase it has an invalid action #{@action}" }
- end
- end
-
- def add
- post_to_solr(pointyize_add) do
- ["indexed #{indexed_object}",
- "transit,xml,solr-post |",
- [transit_time, @xml_time, @solr_post_time].join(","),
- "|"
- ].join(" ")
- end
- rescue Exception => e
- log.error { "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}"}
- end
-
- def delete
- post_to_solr(pointyize_delete) { "deleted #{indexed_object} transit-time[#{transit_time}s]"}
- rescue Exception => e
- log.error { "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}"}
- end
-
- def flattened_object
- flattened_object = Flattener.new(@chef_object).flattened_item
-
- flattened_object[X_CHEF_id_CHEF_X] = [@obj_id]
- flattened_object[X_CHEF_database_CHEF_X] = [@database]
- flattened_object[X_CHEF_type_CHEF_X] = [@obj_type]
-
- log.debug {"adding flattened object to Solr: #{flattened_object.inspect}"}
-
- flattened_object
- end
-
- START_XML = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
- ADD_DOC = "<add><doc>"
- DELETE_DOC = "<delete>"
- ID_OPEN = "<id>"
- ID_CLOSE = "</id>"
- END_ADD_DOC = "</doc></add>\n"
- END_DELETE = "</delete>\n"
- START_CONTENT = '<field name="content">'
- CLOSE_FIELD = "</field>"
-
- FLD_CHEF_ID_FMT = '<field name="X_CHEF_id_CHEF_X">%s</field>'
- FLD_CHEF_DB_FMT = '<field name="X_CHEF_database_CHEF_X">%s</field>'
- FLD_CHEF_TY_FMT = '<field name="X_CHEF_type_CHEF_X">%s</field>'
- FLD_DATA_BAG = '<field name="data_bag">%s</field>'
-
- KEYVAL_FMT = "%s__=__%s "
-
- # Takes a flattened hash where the values are arrays and converts it into
- # a dignified XML document suitable for POST to Solr.
- # The general structure of the output document is like this:
- # <?xml version="1.0" encoding="UTF-8"?>
- # <add>
- # <doc>
- # <field name="content">
- # key__=__value
- # key__=__another_value
- # other_key__=__yet another value
- # </field>
- # </doc>
- # </add>
- # The document as generated has minimal newlines and formatting, however.
- def pointyize_add
- xml = ""
- xml << START_XML << ADD_DOC
- xml << (FLD_CHEF_ID_FMT % @obj_id)
- xml << (FLD_CHEF_DB_FMT % @database)
- xml << (FLD_CHEF_TY_FMT % @obj_type)
- xml << START_CONTENT
- content = ""
- flattened_object.each do |field, values|
- values.each do |v|
- content << (KEYVAL_FMT % [field, v])
- end
- end
- xml << content.fast_xs
- xml << CLOSE_FIELD # ends content
- xml << (FLD_DATA_BAG % @data_bag.fast_xs) if @data_bag
- xml << END_ADD_DOC
- @xml_time = Time.now.to_f - @start_time
- xml
- end
-
- # Takes a succinct document id, like 2342, and turns it into something
- # even more compact, like
- # "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<delete><id>2342</id></delete>\n"
- def pointyize_delete
- xml = ""
- xml << START_XML
- xml << DELETE_DOC
- xml << ID_OPEN
- xml << @obj_id.to_s
- xml << ID_CLOSE
- xml << END_DELETE
- xml
- end
-
- def post_to_solr(document, &logger_block)
- log.debug("POSTing document to SOLR:\n#{document}")
- http_req = EventMachine::HttpRequest.new(solr_url).post(:body => document, :timeout => 1200, :head => CONTENT_TYPE_XML)
- http_request_started
-
- http_req.callback do
- completed
- if http_req.response_header.status == 200
- log.info(&logger_block)
- else
- log.error { "Failed to post to solr: #{indexed_object}" }
- end
- end
- http_req.errback do
- completed
- log.error { "Failed to post to solr (connection error): #{indexed_object}" }
- end
- end
-
- def completed
- @solr_post_time = Time.now.to_f - @start_time
- self.class.http_request_completed(self)
- @on_completion_block.call
- end
-
- def transit_time
- Time.now.utc.to_i - @enqueued_at
- end
-
- def solr_url
- "#{Expander.config.solr_url}/update"
- end
-
- def indexed_object
- "#{@obj_type}[#{@obj_id}] database[#{@database}]"
- end
-
- def http_request_started
- self.class.http_request_started(self)
- end
-
- def eql?(other)
- other.hash == hash
- end
-
- def hash
- "#{action}#{indexed_object}#@enqueued_at#{self.class.name}".hash
- end
-
- end
- end
-end
diff --git a/chef-expander/lib/chef/expander/version.rb b/chef-expander/lib/chef/expander/version.rb
deleted file mode 100644
index 5a0bac588d..0000000000
--- a/chef-expander/lib/chef/expander/version.rb
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Author:: Seth Falcon (<seth@opscode.com>)
-# Author:: Chris Walters (<cw@opscode.com>)
-# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'open3'
-
-module Chef
- module Expander
-
- VERSION = "11.0.0.alpha"
-
- def self.version
- @rev ||= begin
- begin
- rev = Open3.popen3("git rev-parse HEAD") {|stdin, stdout, stderr| stdout.read }.strip
- rescue Errno::ENOENT
- rev = ""
- end
- rev.empty? ? nil : " (#{rev})"
- end
- "#{VERSION}#@rev"
- end
-
- end
-end
diff --git a/chef-expander/lib/chef/expander/vnode.rb b/chef-expander/lib/chef/expander/vnode.rb
deleted file mode 100644
index 32bb17da32..0000000000
--- a/chef-expander/lib/chef/expander/vnode.rb
+++ /dev/null
@@ -1,106 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Author:: Seth Falcon (<seth@opscode.com>)
-# Author:: Chris Walters (<cw@opscode.com>)
-# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'eventmachine'
-require 'amqp'
-require 'mq'
-
-require 'chef/expander/loggable'
-require 'chef/expander/solrizer'
-
-module Chef
- module Expander
- class VNode
- include Loggable
-
- attr_reader :vnode_number
-
- attr_reader :supervise_interval
-
- def initialize(vnode_number, supervisor, opts={})
- @vnode_number = vnode_number.to_i
- @supervisor = supervisor
- @queue = nil
- @stopped = false
- @supervise_interval = opts[:supervise_interval] || 30
- end
-
- def start
- @supervisor.vnode_added(self)
-
- subscription_confirmed = Proc.new do
- abort_on_multiple_subscribe
- supervise_consumer_count
- end
-
- queue.subscribe(:ack => true, :confirm => subscription_confirmed) do |headers, payload|
- log.debug {"got #{payload} size(#{payload.size} bytes) on queue #{queue_name}"}
- solrizer = Solrizer.new(payload) { headers.ack }
- solrizer.run
- end
-
- rescue MQ::Error => e
- log.error {"Failed to start subscriber on #{queue_name} #{e.class.name}: #{e.message}"}
- end
-
- def supervise_consumer_count
- EM.add_periodic_timer(supervise_interval) do
- abort_on_multiple_subscribe
- end
- end
-
- def abort_on_multiple_subscribe
- queue.status do |message_count, subscriber_count|
- if subscriber_count.to_i > 1
- log.error { "Detected extra consumers (#{subscriber_count} total) on queue #{queue_name}, cancelling subscription" }
- stop
- end
- end
- end
-
- def stop
- log.debug {"Cancelling subscription on queue #{queue_name.inspect}"}
- queue.unsubscribe if queue.subscribed?
- @supervisor.vnode_removed(self)
- @stopped = true
- end
-
- def stopped?
- @stopped
- end
-
- def queue
- @queue ||= begin
- log.debug { "declaring queue #{queue_name}" }
- MQ.queue(queue_name, :passive => false, :durable => true)
- end
- end
-
- def queue_name
- "vnode-#{@vnode_number}"
- end
-
- def control_queue_name
- "#{queue_name}-control"
- end
-
- end
- end
-end
diff --git a/chef-expander/lib/chef/expander/vnode_supervisor.rb b/chef-expander/lib/chef/expander/vnode_supervisor.rb
deleted file mode 100644
index e2e1dcbf8d..0000000000
--- a/chef-expander/lib/chef/expander/vnode_supervisor.rb
+++ /dev/null
@@ -1,265 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Author:: Seth Falcon (<seth@opscode.com>)
-# Author:: Chris Walters (<cw@opscode.com>)
-# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'yajl'
-require 'eventmachine'
-require 'amqp'
-require 'mq'
-require 'chef/expander/version'
-require 'chef/expander/loggable'
-require 'chef/expander/node'
-require 'chef/expander/vnode'
-require 'chef/expander/vnode_table'
-require 'chef/expander/configuration'
-
-module ::AMQP
- def self.hard_reset!
- MQ.reset rescue nil
- stop
- EM.stop rescue nil
- Thread.current[:mq], @conn = nil, nil
- end
-end
-
-module Chef
- module Expander
- class VNodeSupervisor
- include Loggable
- extend Loggable
-
- COULD_NOT_CONNECT = /Could not connect to server/.freeze
-
- def self.start_cluster_worker
- @vnode_supervisor = new
- @original_ppid = Process.ppid
- trap_signals
-
- vnodes = Expander.config.vnode_numbers
-
- $0 = "chef-expander#{Expander.config.ps_tag} worker ##{Expander.config.index} (vnodes #{vnodes.min}-#{vnodes.max})"
-
- AMQP.start(Expander.config.amqp_config) do
- start_consumers
- await_parent_death
- end
- end
-
- def self.await_parent_death
- @awaiting_parent_death = EM.add_periodic_timer(1) do
- unless Process.ppid == @original_ppid
- @awaiting_parent_death.cancel
- stop_immediately("master process death")
- end
- end
- end
-
- def self.start
- @vnode_supervisor = new
- trap_signals
-
- Expander.init_config(ARGV)
-
- log.info("Chef Search Expander #{Expander.version} starting up.")
-
- begin
- AMQP.start(Expander.config.amqp_config) do
- start_consumers
- end
- rescue AMQP::Error => e
- if e.message =~ COULD_NOT_CONNECT
- log.error { "Could not connect to rabbitmq. Make sure it is running and correctly configured." }
- log.error { e.message }
-
- AMQP.hard_reset!
-
- sleep 5
- retry
- else
- raise
- end
- end
- end
-
- def self.start_consumers
- log.debug { "Setting prefetch count to 1"}
- MQ.prefetch(1)
-
- vnodes = Expander.config.vnode_numbers
- log.info("Starting Consumers for vnodes #{vnodes.min}-#{vnodes.max}")
- @vnode_supervisor.start(vnodes)
- end
-
- def self.trap_signals
- Kernel.trap(:INT) { stop_immediately(:INT) }
- Kernel.trap(:TERM) { stop_gracefully(:TERM) }
- end
-
- def self.stop_immediately(signal)
- log.info { "Initiating immediate shutdown on signal (#{signal})" }
- @vnode_supervisor.stop
- EM.add_timer(1) do
- AMQP.stop
- EM.stop
- end
- end
-
- def self.stop_gracefully(signal)
- log.info { "Initiating graceful shutdown on signal (#{signal})" }
- @vnode_supervisor.stop
- wait_for_http_requests_to_complete
- end
-
- def self.wait_for_http_requests_to_complete
- if Expander::Solrizer.http_requests_active?
- log.info { "waiting for in progress HTTP Requests to complete"}
- EM.add_timer(1) do
- wait_for_http_requests_to_complete
- end
- else
- log.info { "HTTP requests completed, shutting down"}
- AMQP.stop
- EM.stop
- end
- end
-
- attr_reader :vnode_table
-
- attr_reader :local_node
-
- def initialize
- @vnodes = {}
- @vnode_table = VNodeTable.new(self)
- @local_node = Node.local_node
- @queue_name, @guid = nil, nil
- end
-
- def start(vnode_ids)
- @local_node.start do |message|
- process_control_message(message)
- end
-
- #start_vnode_table_publisher
-
- Array(vnode_ids).each { |vnode_id| spawn_vnode(vnode_id) }
- end
-
- def stop
- @local_node.stop
-
- #log.debug { "stopping vnode table updater" }
- #@vnode_table_publisher.cancel
-
- log.info { "Stopping VNode queue subscribers"}
- @vnodes.each do |vnode_number, vnode|
- log.debug { "Stopping consumer on VNode #{vnode_number}"}
- vnode.stop
- end
-
- end
-
- def vnode_added(vnode)
- log.debug { "vnode #{vnode.vnode_number} registered with supervisor" }
- @vnodes[vnode.vnode_number.to_i] = vnode
- end
-
- def vnode_removed(vnode)
- log.debug { "vnode #{vnode.vnode_number} unregistered from supervisor" }
- @vnodes.delete(vnode.vnode_number.to_i)
- end
-
- def vnodes
- @vnodes.keys.sort
- end
-
- def spawn_vnode(vnode_number)
- VNode.new(vnode_number, self).start
- end
-
- def release_vnode
- # TODO
- end
-
- def process_control_message(message)
- control_message = parse_symbolic(message)
- case control_message[:action]
- when "claim_vnode"
- spawn_vnode(control_message[:vnode_id])
- when "recover_vnode"
- recover_vnode(control_message[:vnode_id])
- when "release_vnodes"
- raise "todo"
- release_vnode()
- when "update_vnode_table"
- @vnode_table.update_table(control_message[:data])
- when "vnode_table_publish"
- publish_vnode_table
- when "status"
- publish_status_to(control_message[:rsvp])
- when "set_log_level"
- set_log_level(control_message[:level], control_message[:rsvp])
- else
- log.error { "invalid control message #{control_message.inspect}" }
- end
- rescue Exception => e
- log.error { "Error processing a control message."}
- log.error { "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}" }
- end
-
-
- def start_vnode_table_publisher
- @vnode_table_publisher = EM.add_periodic_timer(10) { publish_vnode_table }
- end
-
- def publish_vnode_table
- status_update = @local_node.to_hash
- status_update[:vnodes] = vnodes
- status_update[:update] = :add
- @local_node.broadcast_message(Yajl::Encoder.encode({:action => :update_vnode_table, :data => status_update}))
- end
-
- def publish_status_to(return_queue)
- status_update = @local_node.to_hash
- status_update[:vnodes] = vnodes
- MQ.queue(return_queue).publish(Yajl::Encoder.encode(status_update))
- end
-
- def set_log_level(level, rsvp_to)
- log.info { "setting log level to #{level} due to command from #{rsvp_to}" }
- new_log_level = (Expander.config.log_level = level.to_sym)
- reply = {:level => new_log_level, :node => @local_node.to_hash}
- MQ.queue(rsvp_to).publish(Yajl::Encoder.encode(reply))
- end
-
- def recover_vnode(vnode_id)
- if @vnode_table.local_node_is_leader?
- log.debug { "Recovering vnode: #{vnode_id}" }
- @local_node.shared_message(Yajl::Encoder.encode({:action => :claim_vnode, :vnode_id => vnode_id}))
- else
- log.debug { "Ignoring :recover_vnode message because this node is not the leader" }
- end
- end
-
- def parse_symbolic(message)
- Yajl::Parser.new(:symbolize_keys => true).parse(message)
- end
-
- end
- end
-end
diff --git a/chef-expander/lib/chef/expander/vnode_table.rb b/chef-expander/lib/chef/expander/vnode_table.rb
deleted file mode 100644
index 025812b49c..0000000000
--- a/chef-expander/lib/chef/expander/vnode_table.rb
+++ /dev/null
@@ -1,83 +0,0 @@
-#
-# Author:: Daniel DeLeo (<dan@opscode.com>)
-# Author:: Seth Falcon (<seth@opscode.com>)
-# Author:: Chris Walters (<cw@opscode.com>)
-# Copyright:: Copyright (c) 2010-2011 Opscode, Inc.
-# License:: Apache License, Version 2.0
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'yajl'
-require 'chef/expander/node'
-require 'chef/expander/loggable'
-
-module Chef
- module Expander
- class VNodeTable
-
- include Loggable
-
- class InvalidVNodeTableUpdate < ArgumentError; end
-
- attr_reader :vnodes_by_node
-
- def initialize(vnode_supervisor)
- @node_update_mutex = Mutex.new
- @vnode_supervisor = vnode_supervisor
- @vnodes_by_node = {}
- end
-
- def nodes
- @vnodes_by_node.keys
- end
-
- def update_table(table_update)
- case table_update[:update]
- when "add", "update"
- update_node(table_update)
- when "remove"
- remove_node(table_update)
- else
- raise InvalidVNodeTableUpdate, "no action or action not acceptable: #{table_update.inspect}"
- end
- log.debug { "current vnode table: #{@vnodes_by_node.inspect}" }
- end
-
- def update_node(node_info)
- @node_update_mutex.synchronize do
- @vnodes_by_node[Node.from_hash(node_info)] = node_info[:vnodes]
- end
- end
-
- def remove_node(node_info)
- @node_update_mutex.synchronize do
- @vnodes_by_node.delete(Node.from_hash(node_info))
- end
- end
-
- def leader_node
- if @vnodes_by_node.empty?
- nil
- else
- Array(@vnodes_by_node).reject { |node| node[1].empty? }.sort { |a,b| a[1].min <=> b[1].min }.first[0]
- end
- end
-
- def local_node_is_leader?
- (Node.local_node == leader_node) || (@vnodes_by_node[Node.local_node].include?(0))
- end
-
- end
- end
-end