diff options
Diffstat (limited to 'lib/net/ssh/multi/session.rb')
-rw-r--r-- | lib/net/ssh/multi/session.rb | 95 |
1 files changed, 57 insertions, 38 deletions
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb index 9d44e71..f94028b 100644 --- a/lib/net/ssh/multi/session.rb +++ b/lib/net/ssh/multi/session.rb @@ -1,4 +1,3 @@ -require 'thread' require 'net/ssh/gateway' require 'net/ssh/multi/server' require 'net/ssh/multi/channel' @@ -9,12 +8,13 @@ module Net; module SSH; module Multi attr_reader :default_gateway attr_reader :groups + attr_reader :open_groups + attr_reader :active_groups + def initialize @servers = [] @groups = {} @gateway = nil - @connections_mutex = Mutex.new - @groups_mutex = Mutex.new @active_groups = {} @open_groups = [] end @@ -26,16 +26,16 @@ module Net; module SSH; module Multi raise ArgumentError, "must provide group mapping OR block, not both" elsif block_given? begin - saved_groups = @open_groups.dup - @open_groups.concat(args.map { |a| a.to_sym }).uniq! + saved_groups = open_groups.dup + open_groups.concat(args.map { |a| a.to_sym }).uniq! yield self if block_given? ensure - @open_groups.replace(saved_groups) + open_groups.replace(saved_groups) end else mapping.each do |key, value| - (@open_groups + Array(key)).uniq.each do |grp| - (groups[grp.to_sym] ||= []).concat(Array(value)) + (open_groups + Array(key)).uniq.each do |grp| + (groups[grp.to_sym] ||= []).concat(Array(value)).uniq! end end end @@ -48,7 +48,10 @@ module Net; module SSH; module Multi def use(host, user, options={}) server = Server.new(host, user, {:via => default_gateway}.merge(options)) - unless servers.include?(server) + exists = servers.index(server) + if exists + server = servers[exists] + else servers << server group [] => server end @@ -56,32 +59,49 @@ module Net; module SSH; module Multi end def with(*groups) - saved_groups = @active_groups.dup + saved_groups = active_groups.dup new_map = groups.inject({}) do |map, group| if group.is_a?(Hash) group.each do |gr, value| raise ArgumentError, "the value for any group must be a Hash" unless value.is_a?(Hash) - map[gr] = (@active_groups[gr] || {}).merge(value) + bad_keys = value.keys - [:only, :except] + raise ArgumentError, "unknown constraint(s): #{bad_keys.inspect}" unless bad_keys.empty? + map[gr] = (active_groups[gr] || {}).merge(value) end else - map[group] = @active_groups[group] || {} + map[group] = active_groups[group] || {} end map end - @active_groups.update(new_map) + active_groups.update(new_map) yield self ensure - @active_groups.replace(saved_groups) + active_groups.replace(saved_groups) + end + + def on(*servers) + adhoc_group = "adhoc_group_#{servers.hash}_#{rand(0xffffffff)}".to_sym + group(adhoc_group => servers) + saved_groups = active_groups.dup + active_groups.replace(adhoc_group => {}) + yield self + ensure + active_groups.replace(saved_groups) if saved_groups + groups.delete(adhoc_group) end def active_sessions - list = if @active_groups.empty? + list = if active_groups.empty? servers else - @active_groups.inject([]) do |list, (group, properties)| - list.concat(groups[group].select { |server| properties.all? { |prop, value| server[prop] == value } }) + active_groups.inject([]) do |list, (group, properties)| + servers = groups[group].select do |server| + (properties[:only] || {}).all? { |prop, value| server[prop] == value } && + !(properties[:except] || {}).any? { |prop, value| server[prop] == value } + end + list.concat(servers) end end @@ -111,15 +131,26 @@ module Net; module SSH; module Multi loop_forever { break unless process(wait, &running) } end + def preprocess(&block) + return false if block && !block[self] + servers.each { |server| server.preprocess } + block.nil? || block[self] + end + + def postprocess(readers, writers) + servers.each { |server| server.postprocess(readers, writers) } + true + end + def process(wait=nil, &block) - return false if servers.any? { |server| !server.preprocess(&block) } + return false unless preprocess(&block) readers = servers.map { |s| s.readers }.flatten writers = servers.map { |s| s.writers }.flatten readers, writers, = IO.select(readers, writers, nil, wait) - return servers.all? { |server| server.postprocess(readers, writers) } + return postprocess(readers, writers) end def send_global_request(type, *extra, &callback) @@ -129,10 +160,11 @@ module Net; module SSH; module Multi def open_channel(type="session", *extra, &on_confirm) channels = active_sessions.map do |ssh| - channel = ssh.open_channel(type, *extra, &on_confirm) - channel[:server] = ssh[:server] - channel[:host] = ssh[:server].host - channel + ssh.open_channel(type, *extra) do |c| + c[:server] = ssh[:server] + c[:host] = ssh[:server].host + on_confirm[c] if on_confirm + end end Multi::Channel.new(self, channels) end @@ -169,24 +201,11 @@ module Net; module SSH; module Multi end end - def exec!(command, &block) - block ||= Proc.new do |ch, type, data| - ch[:result] ||= {} - ch[:result][ch[:server]] ||= "" - ch[:result][ch[:server]] << data - end - - channel = exec(command, &block) - channel.wait - - return channel[:result] - end - private def sessions_for(servers) - threads = servers.map { |server| Thread.new { server.session(true) } } - threads.each { |thread| thread.join } + threads = servers.map { |server| Thread.new { server.session(true) } if server.session.nil? } + threads.each { |thread| thread.join if thread } servers.map { |server| server.session } end end |