diff options
author | Jamis Buck <jamis@37signals.com> | 2008-03-29 13:13:16 -0600 |
---|---|---|
committer | Jamis Buck <jamis@37signals.com> | 2008-03-29 13:13:16 -0600 |
commit | 606a0a2092c2b605d03722efb14cb9039bb8dcb8 (patch) | |
tree | 281882567edb45c82d2b6b463f3c08b6a3941c62 | |
parent | d911a09547df9ec3d2d39b4eebdc5f79a2091070 (diff) | |
download | net-ssh-multi-606a0a2092c2b605d03722efb14cb9039bb8dcb8.tar.gz |
push server management logic into its own class
-rw-r--r-- | lib/net/ssh/multi/server.rb | 92 | ||||
-rw-r--r-- | lib/net/ssh/multi/session.rb | 189 |
2 files changed, 145 insertions, 136 deletions
diff --git a/lib/net/ssh/multi/server.rb b/lib/net/ssh/multi/server.rb new file mode 100644 index 0000000..e6f337b --- /dev/null +++ b/lib/net/ssh/multi/server.rb @@ -0,0 +1,92 @@ +require 'net/ssh' + +module Net; module SSH; module Multi + class Server + attr_reader :host + attr_reader :user + attr_reader :options + attr_reader :gateway + + def initialize(host, user, options={}) + @host = host + @user = user + @options = options.dup + @gateway = @options.delete(:via) + end + + def port + options[:port] || 22 + end + + def eql?(server) + host == server.host && + user == server.user && + port == server.port + end + + alias :== :eql? + + def hash + @hash ||= [host, user, port].hash + end + + def to_s + @to_s ||= begin + s = "#{user}@#{host}" + s << ":#{options[:port]}" if options[:port] + s + end + end + + def session(ensure_open=false) + return @session if @session || !ensure_open + @session ||= begin + session = if gateway + gateway.ssh(host, user, options) + else + Net::SSH.start(host, user, options) + end + + session[:server] = self + session + end + rescue Net::SSH::AuthenticationFailed => error + raise Net::SSH::AuthenticationFailed.new("#{error.message}@#{host}") + end + + def close_channels + session.channels.each { |id, channel| channel.close } if session + end + + def close + session.transport.close if session + end + + def busy?(include_invisible=false) + session && session.busy?(include_invisible) + end + + def preprocess(&block) + return true unless session + session.preprocess(&block) + end + + def readers + return [] unless session + session.listeners.keys + end + + def writers + return [] unless session + session.listeners.keys.select do |io| + io.respond_to?(:pending_write?) && io.pending_write? + end + end + + def postprocess(readers, writers) + return true unless session + listeners = session.listeners.keys + session.postprocess(listeners & readers, listeners & writers) + end + end +end; end; end
\ No newline at end of file diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb index 497d391..efcd7fe 100644 --- a/lib/net/ssh/multi/session.rb +++ b/lib/net/ssh/multi/session.rb @@ -1,16 +1,16 @@ require 'thread' -require 'net/ssh' require 'net/ssh/gateway' +require 'net/ssh/multi/server' require 'net/ssh/multi/channel' module Net; module SSH; module Multi class Session - attr_reader :connections - attr_reader :gateway + attr_reader :servers + attr_reader :default_gateway attr_reader :groups def initialize - @connections = [] + @servers = [] @groups = {} @gateway = nil @connections_mutex = Mutex.new @@ -21,58 +21,37 @@ module Net; module SSH; module Multi def group(*args) mapping = args.last.is_a?(Hash) ? args.pop : {} - begin - saved_groups = active_groups.dup - active_groups.concat(args.map { |a| a.to_sym }).uniq! - + if mapping.any? && block_given? + raise ArgumentError, "must provide group mapping OR block, not both" + elsif block_given? + begin + saved_groups = active_groups.dup + active_groups.concat(args.map { |a| a.to_sym }).uniq! + yield self if block_given? + ensure + active_groups.replace(saved_groups) + end + else mapping.each do |key, value| (active_groups + Array(key)).uniq.each do |grp| (groups[grp.to_sym] ||= []).concat(Array(value)) end end - - yield self if block_given? - ensure - active_groups.replace(saved_groups) - end - end - - def via(*args) - if connection_specification?(args) - @gateway = Net::SSH::Gateway.new(*args) - elsif args.length == 1 - @gateway = args.first - else - raise ArgumentError, "expected either a connection specification or a Net::SSH::Gateway instance" end - self end - def use(*list) - connections.concat(list.each { |c| c[:host] = c.host }) - group(active_groups => list) + def via(host, user, options={}) + @default_gateway = Net::SSH::Gateway.new(host, user, options) self end - def connect(*args) - if connection_specification?(args) - establish_connection(*args) - elsif args.any? - raise ArgumentError, "expected either a connection specification or a block" + def use(host, user, options={}) + server = Server.new(host, user, {:via => default_gateway}.merge(options)) + unless servers.include?(server) + servers << server + group [] => server end - - if block_given? - collector = Collector.new - yield collector - - threads = collector.specifications.map do |spec| - Thread.new { establish_connection(spec.host, spec.user, spec.options, spec.groups) } - end - - threads.each { |t| t.join } - end - - self + server end def with(*groups) @@ -83,23 +62,30 @@ module Net; module SSH; module Multi active_groups.replace(saved_groups) end - def active_connections - if active_groups.empty? - connections + def active_sessions + list = if active_groups.empty? + servers else active_groups.map { |group| groups[group] }.flatten.uniq end + + sessions_for(list) + end + + def connect! + active_sessions + self end def close - connections.each { |connection| connection.channels.each { |id, channel| channel.close } } + servers.each { |server| server.close_channels } loop(0) { busy?(true) } - connections.each { |connection| connection.transport.close } - gateway.shutdown! if gateway + servers.each { |server| server.close } + default_gateway.shutdown! if default_gateway end def busy?(include_invisible=false) - @connections.any? { |connection| connection.busy?(include_invisible) } + servers.any? { |server| server.busy?(include_invisible) } end alias :loop_forever :loop @@ -110,40 +96,26 @@ module Net; module SSH; module Multi end def process(wait=nil, &block) - @connections.each { |c| return false unless c.preprocess(&block) } - - writers_by_connection, readers_by_connection = {}, {} - - writers = @connections.map do |c| - c.listeners.keys.select do |w| - writers_by_connection[c] ||= [] - writers_by_connection[c] << w - w.respond_to?(:pending_write?) && w.pending_write? - end - end.flatten + return false if servers.any? { |server| !server.preprocess(&block) } - readers = @connections.map { |c| readers_by_connection[c] = c.listeners.keys }.flatten + readers = servers.map { |s| s.readers }.flatten + writers = servers.map { |s| s.writers }.flatten - readers, writers = IO.select(readers, writers, nil, wait) + readers, writers, = IO.select(readers, writers, nil, wait) - @connections.each do |c| - readers_for_this = readers_by_connection[c] & (readers || []) - writers_for_this = writers_by_connection[c] & (writers || []) - return false unless c.postprocess(readers_for_this, writers_for_this) - end - - return true + return servers.all? { |server| server.postprocess(readers, writers) } end def send_global_request(type, *extra, &callback) - active_connections.each { |connection| connection.send_global_request(type, *extra, &callback) } + active_sessions.each { |ssh| ssh.send_global_request(type, *extra, &callback) } self end def open_channel(type="session", *extra, &on_confirm) - channels = active_connections.map do |connection| - channel = connection.open_channel(type, *extra, &on_confirm) - channel[:host] = connection[:host] + channels = active_sessions.map do |ssh| + channel = ssh.open_channel(type, *extra, &on_confirm) + channel[:server] = ssh[:server] + channel[:host] = ssh[:server].host channel end Multi::Channel.new(self, channels) @@ -184,8 +156,8 @@ module Net; module SSH; module Multi def exec!(command, &block) block ||= Proc.new do |ch, type, data| ch[:result] ||= {} - ch[:result][ch.connection[:host]] ||= "" - ch[:result][ch.connection[:host]] << data + ch[:result][ch[:server]] ||= "" + ch[:result][ch[:server]] << data end channel = exec(command, &block) @@ -200,65 +172,10 @@ module Net; module SSH; module Multi @active_groups end - def connection_specification?(args) - args.length == 2 || (args.length == 3 && args.last.is_a?(Hash)) - end - - def establish_connection(host, user, options, groups=[]) - connection = gateway ? gateway.ssh(host, user, options) : - Net::SSH.start(host, user, options) - connection[:host] = host - @connections_mutex.synchronize { connections.push(connection) } - @groups_mutex.synchronize { group((active_groups + groups).uniq => connection) } - return connection - rescue Net::SSH::AuthenticationFailed => error - error.message << "@#{host}" - raise - end - - class Collector - class Specification - attr_reader :host, :user, :options - attr_reader :groups - - def initialize(host, user, options, groups) - @host, @user, @options = host, user, options.dup - @groups = groups.dup - end - end - - attr_reader :specifications - - def initialize - @specifications = [] - @active_groups = [] - end - - def to(host, user, options={}) - @specifications << Specification.new(host, user, options, @active_groups) - @specifications.length - 1 - end - - def group(*args) - mapping = args.last.is_a?(Hash) ? args.pop : {} - - begin - saved_groups = @active_groups.dup - @active_groups.concat(args.map { |a| a.to_sym }).uniq! - - mapping.each do |key, value| - groups = (Array(key).map { |v| v.to_sym } + @active_groups).uniq - - Array(value).each do |id| - @specifications[id].groups.concat(groups).uniq! - end - end - - yield self if block_given? - ensure - @active_groups.replace(saved_groups) - end - end + def sessions_for(servers) + threads = servers.map { |server| Thread.new { server.session(true) } } + threads.each { |thread| thread.join } + servers.map { |server| server.session } end end end; end; end
\ No newline at end of file |