diff options
author | Jamis Buck <jamis@37signals.com> | 2008-04-07 22:41:12 -0600 |
---|---|---|
committer | Jamis Buck <jamis@37signals.com> | 2008-04-07 22:41:12 -0600 |
commit | 9782ac60ccc82169a0e942c7d0eaba008bae44e8 (patch) | |
tree | e348c0f1c86e2a1f2e1c9cb149bab711ed3ec5f1 /lib/net/ssh/multi | |
parent | 97e3f9e56a6e8d7ad0027228a497d7f144402ee4 (diff) | |
download | net-ssh-multi-9782ac60ccc82169a0e942c7d0eaba008bae44e8.tar.gz |
make with() and on() yield a new subsession object that encapsulates the set of matching servers, and add a new servers_for method.
Diffstat (limited to 'lib/net/ssh/multi')
-rw-r--r-- | lib/net/ssh/multi/session.rb | 285 | ||||
-rw-r--r-- | lib/net/ssh/multi/session_actions.rb | 153 | ||||
-rw-r--r-- | lib/net/ssh/multi/subsession.rb | 48 |
3 files changed, 273 insertions, 213 deletions
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb index 99819e1..561f9d1 100644 --- a/lib/net/ssh/multi/session.rb +++ b/lib/net/ssh/multi/session.rb @@ -3,6 +3,8 @@ require 'net/ssh/gateway' require 'net/ssh/multi/server' require 'net/ssh/multi/channel' require 'net/ssh/multi/pending_connection' +require 'net/ssh/multi/session_actions' +require 'net/ssh/multi/subsession' module Net; module SSH; module Multi # Represents a collection of connections to various servers. It provides an @@ -27,11 +29,11 @@ module Net; module SSH; module Multi # session.use 'app2', 'user' # end # - # # execute commands + # # execute commands on all servers # session.exec "uptime" # # # execute commands on a subset of servers - # session.with(:app) { session.exec "hostname" } + # session.with(:app).exec "hostname" # # # run the aggregated event loop # session.loop @@ -41,6 +43,8 @@ module Net; module SSH; module Multi # You can force the connections to be opened immediately, though, using the # #connect! method. class Session + include SessionActions + # The list of Net::SSH::Multi::Server definitions managed by this session. attr_reader :servers @@ -63,11 +67,6 @@ module Net; module SSH; module Multi # See #use and #group. attr_reader :open_groups #:nodoc: - # The list of "active" groups, which will be used to restrict subsequent - # commands. This is actually a Hash, mapping group names to their corresponding - # constraints (see #with). - attr_reader :active_groups #:nodoc: - # Creates a new Net::SSH::Multi::Session instance. Initially, it contains # no server definitions, no group definitions, and no default gateway. # @@ -85,7 +84,6 @@ module Net; module SSH; module Multi @servers = [] @groups = {} @gateway = nil - @active_groups = {} @open_groups = [] @connect_threads = [] @@ -182,24 +180,22 @@ module Net; module SSH; module Multi server end - # Restricts the set of servers that will be targeted by commands within - # the associated block. It can be used in either of two ways (or both ways - # used together). + # Returns the set of servers that match the given criteria. It can be used + # in any (or all) of three ways. # - # First, you can simply specify a list of group names. All servers in all - # named groups will be the target of the commands. (Nested calls to #with - # are cumulative.) + # First, you can omit any arguments. In this case, the full list of servers + # will be returned. # - # # execute 'hostname' on all servers in the :app group, and 'uptime' - # # on all servers in either :app or :db. - # session.with(:app) do - # session.exec('hostname') - # session.with(:db) do - # session.exec('uptime') - # end - # end + # all = session.servers_for + # + # Second, you can simply specify a list of group names. All servers in all + # named groups will be returned. If a server belongs to multiple matching + # groups, then it will appear only once in the list (the resulting list + # will contain only unique servers). # - # Secondly, you can specify a hash with group names as keys, and property + # servers = session.servers_for(:app, :db) + # + # Last, you can specify a hash with group names as keys, and property # constraints as the values. These property constraints are either "only" # constraints (which restrict the set of servers to "only" those that match # the given properties) or "except" constraints (which restrict the set of @@ -212,95 +208,76 @@ module Net; module SSH; module Multi # session.use 'dbslve2', 'user2' # end # - # # execute the given rake task ONLY on the servers in the :db group - # # which have the :primary property set to true. - # session.with :db => { :only => { :primary => true } } do - # session.exec "rake db:migrate" - # end + # # return ONLY on the servers in the :db group which have the :primary + # # property set to true. + # primary = session.servers_for(:db => { :only => { :primary => true } }) # # You can, naturally, combine these methods: # # # all servers in :app and :web, and all servers in :db with the # # :primary property set to true - # session.with :app, :web, :db => { :only => { :primary => true } } do - # # ... - # end - def with(*groups) - saved_groups = active_groups.dup - - new_map = groups.inject({}) do |map, group| - if group.is_a?(Hash) - group.each do |gr, value| - raise ArgumentError, "the value for any group must be a Hash" unless value.is_a?(Hash) - bad_keys = value.keys - [:only, :except] - raise ArgumentError, "unknown constraint(s): #{bad_keys.inspect}" unless bad_keys.empty? - map[gr] = (active_groups[gr] || {}).merge(value) + # servers = session.servers_for(:app, :web, :db => { :only => { :primary => true } }) + def servers_for(*criteria) + if criteria.empty? + servers + else + # normalize the criteria list, so that every entry is a key to a + # criteria hash (possibly empty). + criteria = criteria.inject({}) do |hash, entry| + case entry + when Hash then hash.merge(entry) + else hash.merge(entry => {}) end - else - map[group] = active_groups[group] || {} end - map - end - - active_groups.update(new_map) - yield self - ensure - active_groups.replace(saved_groups) - end - # Works as #with, but for specific servers rather than groups. In other - # words, you can use this to restrict actions within the block to only - # a specific list of servers. It works by creating an ad-hoc group, adding - # the servers to that group, and then making that group the only active - # group. (Note that because of this, you cannot nest #on within #with, - # though you could nest #with inside of #on.) - # - # srv = session.use('host', 'user') - # # ... - # session.on(srv) do - # session.exec('hostname') - # end - def on(*servers) - adhoc_group = "adhoc_group_#{servers.hash}_#{rand(0xffffffff)}".to_sym - group(adhoc_group => servers) - saved_groups = active_groups.dup - active_groups.replace(adhoc_group => {}) - yield self - ensure - active_groups.replace(saved_groups) if saved_groups - groups.delete(adhoc_group) - end + list = criteria.inject([]) do |server_list, (group, properties)| + raise ArgumentError, "the value for any group must be a Hash, but got a #{properties.class} for #{group.inspect}" unless properties.is_a?(Hash) + bad_keys = properties.keys - [:only, :except] + raise ArgumentError, "unknown constraint(s) #{bad_keys.inspect} for #{group.inspect}" unless bad_keys.empty? - # Returns the list of Net::SSH sessions for all servers that match the - # current scope (e.g., the groups or servers named in the outer #with or - # #on calls). If any servers have not yet been connected to, this will - # block until the connections have been made. - def active_sessions - list = if active_groups.empty? - servers - else - active_groups.inject([]) do |list, (group, properties)| - servers = groups[group].select do |server| + servers = (groups[group] || []).select do |server| (properties[:only] || {}).all? { |prop, value| server[prop] == value } && !(properties[:except] || {}).any? { |prop, value| server[prop] == value } end - list.concat(servers) + server_list.concat(servers) end - end - list.uniq! - threads = list.map { |server| Thread.new { server.session(true) } if server.session.nil? } - threads.each { |thread| thread.join if thread } + list.uniq + end + end - list.map { |server| server.session } + # Returns a new Net::SSH::Multi::Subsession instance consisting of the + # servers that meet the given criteria. If a block is given, the + # subsession will be yielded to it. See #servers_for for a discussion of + # how these criteria are interpreted. + # + # session.with(:app).exec('hostname') + # + # session.with(:app, :db => { :primary => true }) do |s| + # s.exec 'date' + # s.exec 'uptime' + # end + def with(*groups) + subsession = Subsession.new(self, servers_for(*groups)) + yield subsession if block_given? + subsession end - # Connections are normally established lazily, as soon as they are needed. - # This method forces all servers selected by the current scope to connect, - # if they have not yet been connected. - def connect! - active_sessions - self + # Works as #with, but for specific servers rather than groups. It will + # return a new subsession (Net::SSH::Multi::Subsession) consisting of + # the given servers. (Note that it requires that the servers in question + # have been created via calls to #use on this session object, or things + # will not work quite right.) If a block is given, the new subsession + # will also be yielded to the block. + # + # srv1 = session.use('host1', 'user') + # srv2 = session.use('host2', 'user') + # # ... + # session.on(srv1, srv2).exec('hostname') + def on(*servers) + subsession = Subsession.new(self, servers) + yield subsession if block_given? + subsession end # Closes the multi-session by shutting down all open server sessions, and @@ -314,14 +291,6 @@ module Net; module SSH; module Multi default_gateway.shutdown! if default_gateway end - # Returns +true+ if any server has an open SSH session that is currently - # processing any channels. If +include_invisible+ is +false+ (the default) - # then invisible channels (such as those created by port forwarding) will - # not be counted; otherwise, they will be. - def busy?(include_invisible=false) - servers.any? { |server| server.busy?(include_invisible) } - end - alias :loop_forever :loop # Run the aggregated event loop for all open server sessions, until the given @@ -357,116 +326,6 @@ module Net; module SSH; module Multi end end - # Sends a global request to all active sessions (see #active_sessions). - # This can be used to (e.g.) ping the remote servers to prevent them from - # timing out. - # - # session.send_global_request("keep-alive@openssh.com") - # - # If a block is given, it will be invoked when the server responds, with - # two arguments: the Net::SSH connection that is responding, and a boolean - # indicating whether the request succeeded or not. - def send_global_request(type, *extra, &callback) - active_sessions.each { |ssh| ssh.send_global_request(type, *extra, &callback) } - self - end - - # Asks all active sessions (see #active_sessions) to open a new channel. - # When each server responds, the +on_confirm+ block will be invoked with - # a single argument, the channel object for that server. This means that - # the block will be invoked one time for each active session. - # - # All new channels will be collected and returned, aggregated into a new - # Net::SSH::Multi::Channel instance. - # - # Note that the channels are "enhanced" slightly--they have two properties - # set on them automatically, to make dealing with them in a multi-session - # environment slightly easier: - # - # * :server => the Net::SSH::Multi::Server instance that spawned the channel - # * :host => the host name of the server - # - # Having access to these things lets you more easily report which host - # (e.g.) data was received from: - # - # session.open_channel do |channel| - # channel.exec "command" do |ch, success| - # ch.on_data do |ch, data| - # puts "got data #{data} from #{ch[:host]}" - # end - # end - # end - def open_channel(type="session", *extra, &on_confirm) - channels = active_sessions.map do |ssh| - ssh.open_channel(type, *extra) do |c| - c[:server] = c.connection[:server] - c[:host] = c.connection[:server].host - on_confirm[c] if on_confirm - end - end - Multi::Channel.new(self, channels) - end - - # A convenience method for executing a command on multiple hosts and - # either displaying or capturing the output. It opens a channel on all - # active sessions (see #open_channel and #active_sessions), and then - # executes a command on each channel (Net::SSH::Connection::Channel#exec). - # - # If a block is given, it will be invoked whenever data is received across - # the channel, with three arguments: the channel object, a symbol identifying - # which output stream the data was received on (+:stdout+ or +:stderr+) - # and a string containing the data that was received: - # - # session.exec("command") do |ch, stream, data| - # puts "[#{ch[:host]} : #{stream}] #{data}" - # end - # - # If no block is given, all output will be written to +$stdout+ or - # +$stderr+, as appropriate. - # - # Note that #exec will also capture the exit status of the process in the - # +:exit_status+ property of each channel. Since #exec returns all of the - # channels in a Net::SSH::Multi::Channel object, you can check for the - # exit status like this: - # - # channel = session.exec("command") { ... } - # channel.wait - # - # if channel.any? { |c| c[:exit_status] != 0 } - # puts "executing failed on at least one host!" - # end - def exec(command, &block) - open_channel do |channel| - channel.exec(command) do |ch, success| - raise "could not execute command: #{command.inspect} (#{ch[:host]})" unless success - - channel.on_data do |ch, data| - if block - block.call(ch, :stdout, data) - else - data.chomp.each_line do |line| - $stdout.puts("[#{ch[:host]}] #{line}") - end - end - end - - channel.on_extended_data do |ch, type, data| - if block - block.call(ch, :stderr, data) - else - data.chomp.each_line do |line| - $stderr.puts("[#{ch[:host]}] #{line}") - end - end - end - - channel.on_request("exit-status") do |ch, data| - ch[:exit_status] = data.read_long - end - end - end - end - # Runs the preprocess stage on all servers. Returns false if the block # returns false, and true if there either is no block, or it returns true. # This is called as part of the #process method. diff --git a/lib/net/ssh/multi/session_actions.rb b/lib/net/ssh/multi/session_actions.rb new file mode 100644 index 0000000..35c69ed --- /dev/null +++ b/lib/net/ssh/multi/session_actions.rb @@ -0,0 +1,153 @@ +module Net; module SSH; module Multi + + # This module represents the actions that are available on session + # collections. Any class that includes this module needs only provide a + # +servers+ method that returns a list of Net::SSH::Multi::Server + # instances, and the rest just works. See Net::SSH::Multi::Session and + # Net::SSH::Multi::Subsession for consumers of this module. + module SessionActions + # Returns the session that is the "master". This defaults to +self+, but + # classes that include this module may wish to change this if they are + # subsessions that depend on a master session. + def master + self + end + + # Connections are normally established lazily, as soon as they are needed. + # This method forces all servers in the current container to have their + # connections established immediately, blocking until the connections have + # been made. + def connect! + sessions + self + end + + # Returns +true+ if any server in the current container has an open SSH + # session that is currently processing any channels. If +include_invisible+ + # is +false+ (the default) then invisible channels (such as those created + # by port forwarding) will not be counted; otherwise, they will be. + def busy?(include_invisible=false) + servers.any? { |server| server.busy?(include_invisible) } + end + + # Returns an array of all SSH sessions, blocking until all sessions have + # connected. + def sessions + threads = servers.map { |server| Thread.new { server.session(true) } if server.session.nil? } + threads.each { |thread| thread.join if thread } + servers.map { |server| server.session } + end + + # Sends a global request to the sessions for all contained servers + # (see #sessions). This can be used to (e.g.) ping the remote servers to + # prevent them from timing out. + # + # session.send_global_request("keep-alive@openssh.com") + # + # If a block is given, it will be invoked when the server responds, with + # two arguments: the Net::SSH connection that is responding, and a boolean + # indicating whether the request succeeded or not. + def send_global_request(type, *extra, &callback) + sessions.each { |ssh| ssh.send_global_request(type, *extra, &callback) } + self + end + + # Asks all sessions for all contained servers (see #sessions) to open a + # new channel. When each server responds, the +on_confirm+ block will be + # invoked with a single argument, the channel object for that server. This + # means that the block will be invoked one time for each session. + # + # All new channels will be collected and returned, aggregated into a new + # Net::SSH::Multi::Channel instance. + # + # Note that the channels are "enhanced" slightly--they have two properties + # set on them automatically, to make dealing with them in a multi-session + # environment slightly easier: + # + # * :server => the Net::SSH::Multi::Server instance that spawned the channel + # * :host => the host name of the server + # + # Having access to these things lets you more easily report which host + # (e.g.) data was received from: + # + # session.open_channel do |channel| + # channel.exec "command" do |ch, success| + # ch.on_data do |ch, data| + # puts "got data #{data} from #{ch[:host]}" + # end + # end + # end + def open_channel(type="session", *extra, &on_confirm) + channels = sessions.map do |ssh| + ssh.open_channel(type, *extra) do |c| + c[:server] = c.connection[:server] + c[:host] = c.connection[:server].host + on_confirm[c] if on_confirm + end + end + Multi::Channel.new(master, channels) + end + + # A convenience method for executing a command on multiple hosts and + # either displaying or capturing the output. It opens a channel on all + # active sessions (see #open_channel and #active_sessions), and then + # executes a command on each channel (Net::SSH::Connection::Channel#exec). + # + # If a block is given, it will be invoked whenever data is received across + # the channel, with three arguments: the channel object, a symbol identifying + # which output stream the data was received on (+:stdout+ or +:stderr+) + # and a string containing the data that was received: + # + # session.exec("command") do |ch, stream, data| + # puts "[#{ch[:host]} : #{stream}] #{data}" + # end + # + # If no block is given, all output will be written to +$stdout+ or + # +$stderr+, as appropriate. + # + # Note that #exec will also capture the exit status of the process in the + # +:exit_status+ property of each channel. Since #exec returns all of the + # channels in a Net::SSH::Multi::Channel object, you can check for the + # exit status like this: + # + # channel = session.exec("command") { ... } + # channel.wait + # + # if channel.any? { |c| c[:exit_status] != 0 } + # puts "executing failed on at least one host!" + # end + def exec(command, &block) + open_channel do |channel| + channel.exec(command) do |ch, success| + raise "could not execute command: #{command.inspect} (#{ch[:host]})" unless success + + channel.on_data do |ch, data| + if block + block.call(ch, :stdout, data) + else + data.chomp.each_line do |line| + $stdout.puts("[#{ch[:host]}] #{line}") + end + end + end + + channel.on_extended_data do |ch, type, data| + if block + block.call(ch, :stderr, data) + else + data.chomp.each_line do |line| + $stderr.puts("[#{ch[:host]}] #{line}") + end + end + end + + channel.on_request("exit-status") do |ch, data| + ch[:exit_status] = data.read_long + end + end + end + end + + end + +end; end; end
\ No newline at end of file diff --git a/lib/net/ssh/multi/subsession.rb b/lib/net/ssh/multi/subsession.rb new file mode 100644 index 0000000..29e7866 --- /dev/null +++ b/lib/net/ssh/multi/subsession.rb @@ -0,0 +1,48 @@ +require 'net/ssh/multi/session_actions' + +module Net; module SSH; module Multi + + # A trivial class for representing a subset of servers. It is used + # internally for restricting operations to a subset of all defined + # servers. + # + # subsession = session.with(:app) + # subsession.exec("hostname") + class Subsession + include SessionActions + + # The master session that spawned this subsession. + attr_reader :master + + # The list of servers that this subsession can operate on. + attr_reader :servers + + # Create a new subsession of the given +master+ session, that operates + # on the given +server_list+. + def initialize(master, server_list) + @master = master + @servers = server_list.uniq + end + + # Works as Array#slice, but returns a new subsession consisting of the + # given slice of servers in this subsession. The new subsession will have + # the same #master session as this subsession does. + # + # s1 = subsession.slice(0) + # s2 = subsession.slice(3, -1) + # s3 = subsession.slice(1..4) + def slice(*args) + Subsession.new(master, Array(servers.slice(*args))) + end + + # Returns a new subsession that consists of only the first server in the + # server list of the current subsession. This is just convenience for + # #slice(0): + # + # s1 = subsession.first + def first + slice(0) + end + end + +end; end; end
\ No newline at end of file |