summaryrefslogtreecommitdiff
path: root/lib/net/ssh/multi/session.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/net/ssh/multi/session.rb')
-rw-r--r--lib/net/ssh/multi/session.rb285
1 files changed, 72 insertions, 213 deletions
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb
index 99819e1..561f9d1 100644
--- a/lib/net/ssh/multi/session.rb
+++ b/lib/net/ssh/multi/session.rb
@@ -3,6 +3,8 @@ require 'net/ssh/gateway'
require 'net/ssh/multi/server'
require 'net/ssh/multi/channel'
require 'net/ssh/multi/pending_connection'
+require 'net/ssh/multi/session_actions'
+require 'net/ssh/multi/subsession'
module Net; module SSH; module Multi
# Represents a collection of connections to various servers. It provides an
@@ -27,11 +29,11 @@ module Net; module SSH; module Multi
# session.use 'app2', 'user'
# end
#
- # # execute commands
+ # # execute commands on all servers
# session.exec "uptime"
#
# # execute commands on a subset of servers
- # session.with(:app) { session.exec "hostname" }
+ # session.with(:app).exec "hostname"
#
# # run the aggregated event loop
# session.loop
@@ -41,6 +43,8 @@ module Net; module SSH; module Multi
# You can force the connections to be opened immediately, though, using the
# #connect! method.
class Session
+ include SessionActions
+
# The list of Net::SSH::Multi::Server definitions managed by this session.
attr_reader :servers
@@ -63,11 +67,6 @@ module Net; module SSH; module Multi
# See #use and #group.
attr_reader :open_groups #:nodoc:
- # The list of "active" groups, which will be used to restrict subsequent
- # commands. This is actually a Hash, mapping group names to their corresponding
- # constraints (see #with).
- attr_reader :active_groups #:nodoc:
-
# Creates a new Net::SSH::Multi::Session instance. Initially, it contains
# no server definitions, no group definitions, and no default gateway.
#
@@ -85,7 +84,6 @@ module Net; module SSH; module Multi
@servers = []
@groups = {}
@gateway = nil
- @active_groups = {}
@open_groups = []
@connect_threads = []
@@ -182,24 +180,22 @@ module Net; module SSH; module Multi
server
end
- # Restricts the set of servers that will be targeted by commands within
- # the associated block. It can be used in either of two ways (or both ways
- # used together).
+ # Returns the set of servers that match the given criteria. It can be used
+ # in any (or all) of three ways.
#
- # First, you can simply specify a list of group names. All servers in all
- # named groups will be the target of the commands. (Nested calls to #with
- # are cumulative.)
+ # First, you can omit any arguments. In this case, the full list of servers
+ # will be returned.
#
- # # execute 'hostname' on all servers in the :app group, and 'uptime'
- # # on all servers in either :app or :db.
- # session.with(:app) do
- # session.exec('hostname')
- # session.with(:db) do
- # session.exec('uptime')
- # end
- # end
+ # all = session.servers_for
+ #
+ # Second, you can simply specify a list of group names. All servers in all
+ # named groups will be returned. If a server belongs to multiple matching
+ # groups, then it will appear only once in the list (the resulting list
+ # will contain only unique servers).
#
- # Secondly, you can specify a hash with group names as keys, and property
+ # servers = session.servers_for(:app, :db)
+ #
+ # Last, you can specify a hash with group names as keys, and property
# constraints as the values. These property constraints are either "only"
# constraints (which restrict the set of servers to "only" those that match
# the given properties) or "except" constraints (which restrict the set of
@@ -212,95 +208,76 @@ module Net; module SSH; module Multi
# session.use 'dbslve2', 'user2'
# end
#
- # # execute the given rake task ONLY on the servers in the :db group
- # # which have the :primary property set to true.
- # session.with :db => { :only => { :primary => true } } do
- # session.exec "rake db:migrate"
- # end
+ # # return ONLY on the servers in the :db group which have the :primary
+ # # property set to true.
+ # primary = session.servers_for(:db => { :only => { :primary => true } })
#
# You can, naturally, combine these methods:
#
# # all servers in :app and :web, and all servers in :db with the
# # :primary property set to true
- # session.with :app, :web, :db => { :only => { :primary => true } } do
- # # ...
- # end
- def with(*groups)
- saved_groups = active_groups.dup
-
- new_map = groups.inject({}) do |map, group|
- if group.is_a?(Hash)
- group.each do |gr, value|
- raise ArgumentError, "the value for any group must be a Hash" unless value.is_a?(Hash)
- bad_keys = value.keys - [:only, :except]
- raise ArgumentError, "unknown constraint(s): #{bad_keys.inspect}" unless bad_keys.empty?
- map[gr] = (active_groups[gr] || {}).merge(value)
+ # servers = session.servers_for(:app, :web, :db => { :only => { :primary => true } })
+ def servers_for(*criteria)
+ if criteria.empty?
+ servers
+ else
+ # normalize the criteria list, so that every entry is a key to a
+ # criteria hash (possibly empty).
+ criteria = criteria.inject({}) do |hash, entry|
+ case entry
+ when Hash then hash.merge(entry)
+ else hash.merge(entry => {})
end
- else
- map[group] = active_groups[group] || {}
end
- map
- end
-
- active_groups.update(new_map)
- yield self
- ensure
- active_groups.replace(saved_groups)
- end
- # Works as #with, but for specific servers rather than groups. In other
- # words, you can use this to restrict actions within the block to only
- # a specific list of servers. It works by creating an ad-hoc group, adding
- # the servers to that group, and then making that group the only active
- # group. (Note that because of this, you cannot nest #on within #with,
- # though you could nest #with inside of #on.)
- #
- # srv = session.use('host', 'user')
- # # ...
- # session.on(srv) do
- # session.exec('hostname')
- # end
- def on(*servers)
- adhoc_group = "adhoc_group_#{servers.hash}_#{rand(0xffffffff)}".to_sym
- group(adhoc_group => servers)
- saved_groups = active_groups.dup
- active_groups.replace(adhoc_group => {})
- yield self
- ensure
- active_groups.replace(saved_groups) if saved_groups
- groups.delete(adhoc_group)
- end
+ list = criteria.inject([]) do |server_list, (group, properties)|
+ raise ArgumentError, "the value for any group must be a Hash, but got a #{properties.class} for #{group.inspect}" unless properties.is_a?(Hash)
+ bad_keys = properties.keys - [:only, :except]
+ raise ArgumentError, "unknown constraint(s) #{bad_keys.inspect} for #{group.inspect}" unless bad_keys.empty?
- # Returns the list of Net::SSH sessions for all servers that match the
- # current scope (e.g., the groups or servers named in the outer #with or
- # #on calls). If any servers have not yet been connected to, this will
- # block until the connections have been made.
- def active_sessions
- list = if active_groups.empty?
- servers
- else
- active_groups.inject([]) do |list, (group, properties)|
- servers = groups[group].select do |server|
+ servers = (groups[group] || []).select do |server|
(properties[:only] || {}).all? { |prop, value| server[prop] == value } &&
!(properties[:except] || {}).any? { |prop, value| server[prop] == value }
end
- list.concat(servers)
+ server_list.concat(servers)
end
- end
- list.uniq!
- threads = list.map { |server| Thread.new { server.session(true) } if server.session.nil? }
- threads.each { |thread| thread.join if thread }
+ list.uniq
+ end
+ end
- list.map { |server| server.session }
+ # Returns a new Net::SSH::Multi::Subsession instance consisting of the
+ # servers that meet the given criteria. If a block is given, the
+ # subsession will be yielded to it. See #servers_for for a discussion of
+ # how these criteria are interpreted.
+ #
+ # session.with(:app).exec('hostname')
+ #
+ # session.with(:app, :db => { :primary => true }) do |s|
+ # s.exec 'date'
+ # s.exec 'uptime'
+ # end
+ def with(*groups)
+ subsession = Subsession.new(self, servers_for(*groups))
+ yield subsession if block_given?
+ subsession
end
- # Connections are normally established lazily, as soon as they are needed.
- # This method forces all servers selected by the current scope to connect,
- # if they have not yet been connected.
- def connect!
- active_sessions
- self
+ # Works as #with, but for specific servers rather than groups. It will
+ # return a new subsession (Net::SSH::Multi::Subsession) consisting of
+ # the given servers. (Note that it requires that the servers in question
+ # have been created via calls to #use on this session object, or things
+ # will not work quite right.) If a block is given, the new subsession
+ # will also be yielded to the block.
+ #
+ # srv1 = session.use('host1', 'user')
+ # srv2 = session.use('host2', 'user')
+ # # ...
+ # session.on(srv1, srv2).exec('hostname')
+ def on(*servers)
+ subsession = Subsession.new(self, servers)
+ yield subsession if block_given?
+ subsession
end
# Closes the multi-session by shutting down all open server sessions, and
@@ -314,14 +291,6 @@ module Net; module SSH; module Multi
default_gateway.shutdown! if default_gateway
end
- # Returns +true+ if any server has an open SSH session that is currently
- # processing any channels. If +include_invisible+ is +false+ (the default)
- # then invisible channels (such as those created by port forwarding) will
- # not be counted; otherwise, they will be.
- def busy?(include_invisible=false)
- servers.any? { |server| server.busy?(include_invisible) }
- end
-
alias :loop_forever :loop
# Run the aggregated event loop for all open server sessions, until the given
@@ -357,116 +326,6 @@ module Net; module SSH; module Multi
end
end
- # Sends a global request to all active sessions (see #active_sessions).
- # This can be used to (e.g.) ping the remote servers to prevent them from
- # timing out.
- #
- # session.send_global_request("keep-alive@openssh.com")
- #
- # If a block is given, it will be invoked when the server responds, with
- # two arguments: the Net::SSH connection that is responding, and a boolean
- # indicating whether the request succeeded or not.
- def send_global_request(type, *extra, &callback)
- active_sessions.each { |ssh| ssh.send_global_request(type, *extra, &callback) }
- self
- end
-
- # Asks all active sessions (see #active_sessions) to open a new channel.
- # When each server responds, the +on_confirm+ block will be invoked with
- # a single argument, the channel object for that server. This means that
- # the block will be invoked one time for each active session.
- #
- # All new channels will be collected and returned, aggregated into a new
- # Net::SSH::Multi::Channel instance.
- #
- # Note that the channels are "enhanced" slightly--they have two properties
- # set on them automatically, to make dealing with them in a multi-session
- # environment slightly easier:
- #
- # * :server => the Net::SSH::Multi::Server instance that spawned the channel
- # * :host => the host name of the server
- #
- # Having access to these things lets you more easily report which host
- # (e.g.) data was received from:
- #
- # session.open_channel do |channel|
- # channel.exec "command" do |ch, success|
- # ch.on_data do |ch, data|
- # puts "got data #{data} from #{ch[:host]}"
- # end
- # end
- # end
- def open_channel(type="session", *extra, &on_confirm)
- channels = active_sessions.map do |ssh|
- ssh.open_channel(type, *extra) do |c|
- c[:server] = c.connection[:server]
- c[:host] = c.connection[:server].host
- on_confirm[c] if on_confirm
- end
- end
- Multi::Channel.new(self, channels)
- end
-
- # A convenience method for executing a command on multiple hosts and
- # either displaying or capturing the output. It opens a channel on all
- # active sessions (see #open_channel and #active_sessions), and then
- # executes a command on each channel (Net::SSH::Connection::Channel#exec).
- #
- # If a block is given, it will be invoked whenever data is received across
- # the channel, with three arguments: the channel object, a symbol identifying
- # which output stream the data was received on (+:stdout+ or +:stderr+)
- # and a string containing the data that was received:
- #
- # session.exec("command") do |ch, stream, data|
- # puts "[#{ch[:host]} : #{stream}] #{data}"
- # end
- #
- # If no block is given, all output will be written to +$stdout+ or
- # +$stderr+, as appropriate.
- #
- # Note that #exec will also capture the exit status of the process in the
- # +:exit_status+ property of each channel. Since #exec returns all of the
- # channels in a Net::SSH::Multi::Channel object, you can check for the
- # exit status like this:
- #
- # channel = session.exec("command") { ... }
- # channel.wait
- #
- # if channel.any? { |c| c[:exit_status] != 0 }
- # puts "executing failed on at least one host!"
- # end
- def exec(command, &block)
- open_channel do |channel|
- channel.exec(command) do |ch, success|
- raise "could not execute command: #{command.inspect} (#{ch[:host]})" unless success
-
- channel.on_data do |ch, data|
- if block
- block.call(ch, :stdout, data)
- else
- data.chomp.each_line do |line|
- $stdout.puts("[#{ch[:host]}] #{line}")
- end
- end
- end
-
- channel.on_extended_data do |ch, type, data|
- if block
- block.call(ch, :stderr, data)
- else
- data.chomp.each_line do |line|
- $stderr.puts("[#{ch[:host]}] #{line}")
- end
- end
- end
-
- channel.on_request("exit-status") do |ch, data|
- ch[:exit_status] = data.read_long
- end
- end
- end
- end
-
# Runs the preprocess stage on all servers. Returns false if the block
# returns false, and true if there either is no block, or it returns true.
# This is called as part of the #process method.