diff options
Diffstat (limited to 'lib/net/ssh/multi/session.rb')
-rw-r--r-- | lib/net/ssh/multi/session.rb | 280 |
1 files changed, 260 insertions, 20 deletions
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb index f94028b..cffe972 100644 --- a/lib/net/ssh/multi/session.rb +++ b/lib/net/ssh/multi/session.rb @@ -3,14 +3,64 @@ require 'net/ssh/multi/server' require 'net/ssh/multi/channel' module Net; module SSH; module Multi + # Represents a collection of connections to various servers. It provides an + # interface for organizing the connections (#group), as well as a way to + # scope commands to a subset of all connections (#with). You can also provide + # a default gateway connection that servers should use when connecting + # (#via). It exposes an interface similar to Net::SSH::Connection::Session + # for opening SSH channels and executing commands, allowing for these + # operations to be done in parallel across multiple connections. + # + # Net::SSH::Multi.start do |session| + # # access servers via a gateway + # session.via 'gateway', 'gateway-user' + # + # # define the servers we want to use + # session.use 'host1', 'user1' + # session.use 'host2', 'user2' + # + # # define servers in groups for more granular access + # session.group :app do + # session.use 'app1', 'user' + # session.use 'app2', 'user' + # end + # + # # execute commands + # session.exec "uptime" + # + # # execute commands on a subset of servers + # session.with(:app) { session.exec "hostname" } + # + # # run the aggregated event loop + # session.loop + # end + # + # Note that connections are established lazily, as soon as they are needed. + # You can force the connections to be opened immediately, though, using the + # #connect! method. class Session + # The list of Net::SSH::Multi::Server definitions managed by this session. attr_reader :servers + + # The default Net::SSH::Gateway instance to use to connect to the servers. + # If +nil+, no default gateway will be used. attr_reader :default_gateway + + # The hash of group definitions, mapping each group name to the list of + # corresponding Net::SSH::Multi::Server definitions. attr_reader :groups - attr_reader :open_groups - attr_reader :active_groups + # The list of "open" groups, which will receive subsequent server definitions. + # See #use and #group. + attr_reader :open_groups #:nodoc: + + # The list of "active" groups, which will be used to restrict subsequent + # commands. This is actually a Hash, mapping group names to their corresponding + # constraints (see #with). + attr_reader :active_groups #:nodoc: + # Creates a new Net::SSH::Multi::Session instance. Initially, it contains + # no server definitions, no group definitions, and no default gateway. def initialize @servers = [] @groups = {} @@ -19,6 +69,32 @@ module Net; module SSH; module Multi @open_groups = [] end + # At its simplest, this associates a named group with a server definition. + # It can be used in either of two ways: + # + # First, you can use it to associate a group (or array of groups) with a + # server definition (or array of server definitions). The server definitions + # must already exist in the #servers array (typically by calling #use): + # + # server1 = session.use('host1', 'user1') + # server2 = session.use('host2', 'user2') + # session.group :app => server1, :web => server2 + # session.group :staging => [server1, server2] + # session.group %w(xen linux) => server2 + # session.group %w(rackspace backup) => [server1, server2] + # + # Secondly, instead of a mapping of groups to servers, you can just + # provide a list of group names, and then a block. Inside the block, any + # calls to #use will automatically associate the new server definition with + # those groups. You can nest #group calls, too, which will aggregate the + # group definitions. + # + # session.group :rackspace, :backup do + # session.use 'host1', 'user1' + # session.group :xen do + # session.use 'host2', 'user2' + # end + # end def group(*args) mapping = args.last.is_a?(Hash) ? args.pop : {} @@ -41,11 +117,32 @@ module Net; module SSH; module Multi end end + # Sets up a default gateway to use when establishing connections to servers. + # Note that any servers defined prior to this invocation will not use the + # default gateway; it only affects servers defined subsequently. + # + # session.via 'gateway.host', 'user' + # + # You may override the default gateway on a per-server basis by passing the + # :via key to the #use method; see #use for details. def via(host, user, options={}) @default_gateway = Net::SSH::Gateway.new(host, user, options) self end + # Defines a new server definition, to be managed by this session. The + # server is at the given +host+, and will be connected to as the given + # +user+. The other options are passed as-is to the Net::SSH session + # constructor. + # + # If a default gateway has been specified previously (with #via) it will + # be passed to the new server definition. You can override this by passing + # a different Net::SSH::Gateway instance (or +nil+) with the :via key in + # the +options+. + # + # session.use 'host', 'user' + # session.use 'host2', 'user2', :via => nil + # session.use 'host3', 'user3', :via => Net::SSH::Gateway.new('gateway.host', 'user') def use(host, user, options={}) server = Server.new(host, user, {:via => default_gateway}.merge(options)) exists = servers.index(server) @@ -58,6 +155,49 @@ module Net; module SSH; module Multi server end + # Restricts the set of servers that will be targeted by commands within + # the associated block. It can be used in either of two ways (or both ways + # used together). + # + # First, you can simply specify a list of group names. All servers in all + # named groups will be the target of the commands. (Nested calls to #with + # are cumulative.) + # + # # execute 'hostname' on all servers in the :app group, and 'uptime' + # # on all servers in either :app or :db. + # session.with(:app) do + # session.exec('hostname') + # session.with(:db) do + # session.exec('uptime') + # end + # end + # + # Secondly, you can specify a hash with group names as keys, and property + # constraints as the values. These property constraints are either "only" + # constraints (which restrict the set of servers to "only" those that match + # the given properties) or "except" constraints (which restrict the set of + # servers to those whose properties do _not_ match). Properties are described + # when the server is defined (via the :properties key): + # + # session.group :db do + # session.use 'dbmain', 'user', :properties => { :primary => true } + # session.use 'dbslave', 'user2' + # session.use 'dbslve2', 'user2' + # end + # + # # execute the given rake task ONLY on the servers in the :db group + # # which have the :primary property set to true. + # session.with :db => { :only => { :primary => true } } do + # session.exec "rake db:migrate" + # end + # + # You can, naturally, combine these methods: + # + # # all servers in :app and :web, and all servers in :db with the + # # :primary property set to true + # session.with :app, :web, :db => { :only => { :primary => true } } do + # # ... + # end def with(*groups) saved_groups = active_groups.dup @@ -81,6 +221,18 @@ module Net; module SSH; module Multi active_groups.replace(saved_groups) end + # Works as #with, but for specific servers rather than groups. In other + # words, you can use this to restrict actions within the block to only + # a specific list of servers. It works by creating an ad-hoc group, adding + # the servers to that group, and then making that group the only active + # group. (Note that because of this, you cannot nest #on within #with, + # though you could nest #with inside of #on.) + # + # srv = session.use('host', 'user') + # # ... + # session.on(srv) do + # session.exec('hostname') + # end def on(*servers) adhoc_group = "adhoc_group_#{servers.hash}_#{rand(0xffffffff)}".to_sym group(adhoc_group => servers) @@ -92,6 +244,10 @@ module Net; module SSH; module Multi groups.delete(adhoc_group) end + # Returns the list of Net::SSH sessions for all servers that match the + # current scope (e.g., the groups or servers named in the outer #with or + # #on calls). If any servers have not yet been connected to, this will + # block until the connections have been made. def active_sessions list = if active_groups.empty? servers @@ -105,14 +261,25 @@ module Net; module SSH; module Multi end end - sessions_for(list.uniq) + list.uniq! + threads = list.map { |server| Thread.new { server.session(true) } if server.session.nil? } + threads.each { |thread| thread.join if thread } + + list.map { |server| server.session } end + # Connections are normally established lazily, as soon as they are needed. + # This method forces all servers selected by the current scope to connect, + # if they have not yet been connected. def connect! active_sessions self end + # Closes the multi-session by shutting down all open server sessions, and + # the default gateway (if one was specified using #via). Note that other + # gateway connections (e.g., those passed to #use directly) will _not_ be + # closed by this method, and must be managed externally. def close servers.each { |server| server.close_channels } loop(0) { busy?(true) } @@ -120,28 +287,31 @@ module Net; module SSH; module Multi default_gateway.shutdown! if default_gateway end + # Returns +true+ if any server has an open SSH session that is currently + # processing any channels. If +include_invisible+ is +false+ (the default) + # then invisible channels (such as those created by port forwarding) will + # not be counted; otherwise, they will be. def busy?(include_invisible=false) servers.any? { |server| server.busy?(include_invisible) } end alias :loop_forever :loop + # Run the aggregated event loop for all open server sessions, until the given + # block returns +false+. If no block is given, the loop will run for as + # long as #busy? returns +true+ (in other words, for as long as there are + # any (non-invisible) channels open). def loop(wait=nil, &block) running = block || Proc.new { |c| busy? } loop_forever { break unless process(wait, &running) } end - def preprocess(&block) - return false if block && !block[self] - servers.each { |server| server.preprocess } - block.nil? || block[self] - end - - def postprocess(readers, writers) - servers.each { |server| server.postprocess(readers, writers) } - true - end - + # Run a single iteration of the aggregated event loop for all open server + # sessions. The +wait+ parameter indicates how long to wait for an event + # to appear on any of the different sessions; +nil+ (the default) means + # "wait forever". If the block is given, then it will be used to determine + # whether #process returns +true+ (the block did not return +false+), or + # +false+ (the block returned +false+). def process(wait=nil, &block) return false unless preprocess(&block) @@ -153,11 +323,45 @@ module Net; module SSH; module Multi return postprocess(readers, writers) end + # Sends a global request to all active sessions (see #active_sessions). + # This can be used to (e.g.) ping the remote servers to prevent them from + # timing out. + # + # session.send_global_request("keep-alive@openssh.com") + # + # If a block is given, it will be invoked when the server responds, with + # two arguments: the Net::SSH connection that is responding, and a boolean + # indicating whether the request succeeded or not. def send_global_request(type, *extra, &callback) active_sessions.each { |ssh| ssh.send_global_request(type, *extra, &callback) } self end + # Asks all active sessions (see #active_sessions) to open a new channel. + # When each server responds, the +on_confirm+ block will be invoked with + # a single argument, the channel object for that server. This means that + # the block will be invoked one time for each active session. + # + # All new channels will be collected and returned, aggregated into a new + # Net::SSH::Multi::Channel instance. + # + # Note that the channels are "enhanced" slightly--they have two properties + # set on them automatically, to make dealing with them in a multi-session + # environment slightly easier: + # + # * :server => the Net::SSH::Multi::Server instance that spawned the channel + # * :host => the host name of the server + # + # Having access to these things lets you more easily report which host + # (e.g.) data was received from: + # + # session.open_channel do |channel| + # channel.exec "command" do |ch, success| + # ch.on_data do |ch, data| + # puts "got data #{data} from #{ch[:host]}" + # end + # end + # end def open_channel(type="session", *extra, &on_confirm) channels = active_sessions.map do |ssh| ssh.open_channel(type, *extra) do |c| @@ -169,6 +373,34 @@ module Net; module SSH; module Multi Multi::Channel.new(self, channels) end + # A convenience method for executing a command on multiple hosts and + # either displaying or capturing the output. It opens a channel on all + # active sessions (see #open_channel and #active_sessions), and then + # executes a command on each channel (Net::SSH::Connection::Channel#exec). + # + # If a block is given, it will be invoked whenever data is received across + # the channel, with three arguments: the channel object, a symbol identifying + # which output stream the data was received on (+:stdout+ or +:stderr+) + # and a string containing the data that was received: + # + # session.exec("command") do |ch, stream, data| + # puts "[#{ch[:host]} : #{stream}] #{data}" + # end + # + # If no block is given, all output will be written to +$stdout+ or + # +$stderr+, as appropriate. + # + # Note that #exec will also capture the exit status of the process in the + # +:exit_status+ property of each channel. Since #exec returns all of the + # channels in a Net::SSH::Multi::Channel object, you can check for the + # exit status like this: + # + # channel = session.exec("command") { ... } + # channel.wait + # + # if channel.any? { |c| c[:exit_status] != 0 } + # puts "executing failed on at least one host!" + # end def exec(command, &block) open_channel do |channel| channel.exec(command) do |ch, success| @@ -201,12 +433,20 @@ module Net; module SSH; module Multi end end - private + # Runs the preprocess stage on all servers. Returns false if the block + # returns false, and true if there either is no block, or it returns true. + # This is called as part of the #process method. + def preprocess(&block) #:nodoc: + return false if block && !block[self] + servers.each { |server| server.preprocess } + block.nil? || block[self] + end - def sessions_for(servers) - threads = servers.map { |server| Thread.new { server.session(true) } if server.session.nil? } - threads.each { |thread| thread.join if thread } - servers.map { |server| server.session } - end + # Runs the postprocess stage on all servers. Always returns true. This is + # called as part of the #process method. + def postprocess(readers, writers) #:nodoc: + servers.each { |server| server.postprocess(readers, writers) } + true + end end end; end; end
\ No newline at end of file |