diff options
author | Jamis Buck <jamis@37signals.com> | 2008-03-28 21:56:47 -0600 |
---|---|---|
committer | Jamis Buck <jamis@37signals.com> | 2008-03-28 21:56:47 -0600 |
commit | d911a09547df9ec3d2d39b4eebdc5f79a2091070 (patch) | |
tree | 0aa625e1acad16d61d6028a49ab833f0a42e7869 | |
parent | b1439ec8f056365b712724b252cc324cbe745da7 (diff) | |
download | net-ssh-multi-d911a09547df9ec3d2d39b4eebdc5f79a2091070.tar.gz |
connection groups
-rw-r--r-- | lib/net/ssh/multi.rb | 1 | ||||
-rw-r--r-- | lib/net/ssh/multi/session.rb | 135 |
2 files changed, 103 insertions, 33 deletions
diff --git a/lib/net/ssh/multi.rb b/lib/net/ssh/multi.rb index 84a87b0..ca6928e 100644 --- a/lib/net/ssh/multi.rb +++ b/lib/net/ssh/multi.rb @@ -9,7 +9,6 @@ module Net; module SSH begin yield session session.loop - ensure session.close end else diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb index b596b8f..497d391 100644 --- a/lib/net/ssh/multi/session.rb +++ b/lib/net/ssh/multi/session.rb @@ -7,24 +7,34 @@ module Net; module SSH; module Multi class Session attr_reader :connections attr_reader :gateway - - class Collector - attr_reader :specifications - - def initialize - @specifications = [] - end - - def to(host, user, options={}) - @specifications << [host, user, options] - self - end - end + attr_reader :groups def initialize @connections = [] + @groups = {} @gateway = nil - @mutex = Mutex.new + @connections_mutex = Mutex.new + @groups_mutex = Mutex.new + @active_groups = [] + end + + def group(*args) + mapping = args.last.is_a?(Hash) ? args.pop : {} + + begin + saved_groups = active_groups.dup + active_groups.concat(args.map { |a| a.to_sym }).uniq! + + mapping.each do |key, value| + (active_groups + Array(key)).uniq.each do |grp| + (groups[grp.to_sym] ||= []).concat(Array(value)) + end + end + + yield self if block_given? + ensure + active_groups.replace(saved_groups) + end end def via(*args) @@ -39,7 +49,8 @@ module Net; module SSH; module Multi end def use(*list) - @connections += list.each { |c| c[:host] = c.host } + connections.concat(list.each { |c| c[:host] = c.host }) + group(active_groups => list) self end @@ -54,9 +65,8 @@ module Net; module SSH; module Multi collector = Collector.new yield collector - mutex = Mutex.new - threads = collector.specifications.map do |host, user, options| - Thread.new { establish_connection(host, user, options) } + threads = collector.specifications.map do |spec| + Thread.new { establish_connection(spec.host, spec.user, spec.options, spec.groups) } end threads.each { |t| t.join } @@ -65,6 +75,22 @@ module Net; module SSH; module Multi self end + def with(*groups) + saved_groups = active_groups.dup + active_groups.concat(groups).uniq! + yield self + ensure + active_groups.replace(saved_groups) + end + + def active_connections + if active_groups.empty? + connections + else + active_groups.map { |group| groups[group] }.flatten.uniq + end + end + def close connections.each { |connection| connection.channels.each { |id, channel| channel.close } } loop(0) { busy?(true) } @@ -73,7 +99,7 @@ module Net; module SSH; module Multi end def busy?(include_invisible=false) - connections.any? { |connection| connection.busy?(include_invisible) } + @connections.any? { |connection| connection.busy?(include_invisible) } end alias :loop_forever :loop @@ -84,11 +110,11 @@ module Net; module SSH; module Multi end def process(wait=nil, &block) - connections.each { |c| return false unless c.preprocess(&block) } + @connections.each { |c| return false unless c.preprocess(&block) } writers_by_connection, readers_by_connection = {}, {} - writers = connections.map do |c| + writers = @connections.map do |c| c.listeners.keys.select do |w| writers_by_connection[c] ||= [] writers_by_connection[c] << w @@ -96,11 +122,11 @@ module Net; module SSH; module Multi end end.flatten - readers = connections.map { |c| readers_by_connection[c] = c.listeners.keys }.flatten + readers = @connections.map { |c| readers_by_connection[c] = c.listeners.keys }.flatten readers, writers = IO.select(readers, writers, nil, wait) - connections.each do |c| + @connections.each do |c| readers_for_this = readers_by_connection[c] & (readers || []) writers_for_this = writers_by_connection[c] & (writers || []) return false unless c.postprocess(readers_for_this, writers_for_this) @@ -110,12 +136,12 @@ module Net; module SSH; module Multi end def send_global_request(type, *extra, &callback) - connections.each { |connection| connection.send_global_request(type, *extra, &callback) } + active_connections.each { |connection| connection.send_global_request(type, *extra, &callback) } self end def open_channel(type="session", *extra, &on_confirm) - channels = connections.map do |connection| + channels = active_connections.map do |connection| channel = connection.open_channel(type, *extra, &on_confirm) channel[:host] = connection[:host] channel @@ -168,26 +194,71 @@ module Net; module SSH; module Multi return channel[:result] end - def send_message(message) - connections.each { |connection| connection.send_message(message) } - self - end - private + def active_groups + @active_groups + end + def connection_specification?(args) args.length == 2 || (args.length == 3 && args.last.is_a?(Hash)) end - def establish_connection(host, user, options={}) + def establish_connection(host, user, options, groups=[]) connection = gateway ? gateway.ssh(host, user, options) : Net::SSH.start(host, user, options) connection[:host] = host - @mutex.synchronize { @connections.push(connection) } + @connections_mutex.synchronize { connections.push(connection) } + @groups_mutex.synchronize { group((active_groups + groups).uniq => connection) } return connection rescue Net::SSH::AuthenticationFailed => error error.message << "@#{host}" raise end + + class Collector + class Specification + attr_reader :host, :user, :options + attr_reader :groups + + def initialize(host, user, options, groups) + @host, @user, @options = host, user, options.dup + @groups = groups.dup + end + end + + attr_reader :specifications + + def initialize + @specifications = [] + @active_groups = [] + end + + def to(host, user, options={}) + @specifications << Specification.new(host, user, options, @active_groups) + @specifications.length - 1 + end + + def group(*args) + mapping = args.last.is_a?(Hash) ? args.pop : {} + + begin + saved_groups = @active_groups.dup + @active_groups.concat(args.map { |a| a.to_sym }).uniq! + + mapping.each do |key, value| + groups = (Array(key).map { |v| v.to_sym } + @active_groups).uniq + + Array(value).each do |id| + @specifications[id].groups.concat(groups).uniq! + end + end + + yield self if block_given? + ensure + @active_groups.replace(saved_groups) + end + end + end end end; end; end
\ No newline at end of file |