summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJamis Buck <jamis@37signals.com>2008-03-28 21:56:47 -0600
committerJamis Buck <jamis@37signals.com>2008-03-28 21:56:47 -0600
commitd911a09547df9ec3d2d39b4eebdc5f79a2091070 (patch)
tree0aa625e1acad16d61d6028a49ab833f0a42e7869
parentb1439ec8f056365b712724b252cc324cbe745da7 (diff)
downloadnet-ssh-multi-d911a09547df9ec3d2d39b4eebdc5f79a2091070.tar.gz
connection groups
-rw-r--r--lib/net/ssh/multi.rb1
-rw-r--r--lib/net/ssh/multi/session.rb135
2 files changed, 103 insertions, 33 deletions
diff --git a/lib/net/ssh/multi.rb b/lib/net/ssh/multi.rb
index 84a87b0..ca6928e 100644
--- a/lib/net/ssh/multi.rb
+++ b/lib/net/ssh/multi.rb
@@ -9,7 +9,6 @@ module Net; module SSH
begin
yield session
session.loop
- ensure
session.close
end
else
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb
index b596b8f..497d391 100644
--- a/lib/net/ssh/multi/session.rb
+++ b/lib/net/ssh/multi/session.rb
@@ -7,24 +7,34 @@ module Net; module SSH; module Multi
class Session
attr_reader :connections
attr_reader :gateway
-
- class Collector
- attr_reader :specifications
-
- def initialize
- @specifications = []
- end
-
- def to(host, user, options={})
- @specifications << [host, user, options]
- self
- end
- end
+ attr_reader :groups
def initialize
@connections = []
+ @groups = {}
@gateway = nil
- @mutex = Mutex.new
+ @connections_mutex = Mutex.new
+ @groups_mutex = Mutex.new
+ @active_groups = []
+ end
+
+ def group(*args)
+ mapping = args.last.is_a?(Hash) ? args.pop : {}
+
+ begin
+ saved_groups = active_groups.dup
+ active_groups.concat(args.map { |a| a.to_sym }).uniq!
+
+ mapping.each do |key, value|
+ (active_groups + Array(key)).uniq.each do |grp|
+ (groups[grp.to_sym] ||= []).concat(Array(value))
+ end
+ end
+
+ yield self if block_given?
+ ensure
+ active_groups.replace(saved_groups)
+ end
end
def via(*args)
@@ -39,7 +49,8 @@ module Net; module SSH; module Multi
end
def use(*list)
- @connections += list.each { |c| c[:host] = c.host }
+ connections.concat(list.each { |c| c[:host] = c.host })
+ group(active_groups => list)
self
end
@@ -54,9 +65,8 @@ module Net; module SSH; module Multi
collector = Collector.new
yield collector
- mutex = Mutex.new
- threads = collector.specifications.map do |host, user, options|
- Thread.new { establish_connection(host, user, options) }
+ threads = collector.specifications.map do |spec|
+ Thread.new { establish_connection(spec.host, spec.user, spec.options, spec.groups) }
end
threads.each { |t| t.join }
@@ -65,6 +75,22 @@ module Net; module SSH; module Multi
self
end
+ def with(*groups)
+ saved_groups = active_groups.dup
+ active_groups.concat(groups).uniq!
+ yield self
+ ensure
+ active_groups.replace(saved_groups)
+ end
+
+ def active_connections
+ if active_groups.empty?
+ connections
+ else
+ active_groups.map { |group| groups[group] }.flatten.uniq
+ end
+ end
+
def close
connections.each { |connection| connection.channels.each { |id, channel| channel.close } }
loop(0) { busy?(true) }
@@ -73,7 +99,7 @@ module Net; module SSH; module Multi
end
def busy?(include_invisible=false)
- connections.any? { |connection| connection.busy?(include_invisible) }
+ @connections.any? { |connection| connection.busy?(include_invisible) }
end
alias :loop_forever :loop
@@ -84,11 +110,11 @@ module Net; module SSH; module Multi
end
def process(wait=nil, &block)
- connections.each { |c| return false unless c.preprocess(&block) }
+ @connections.each { |c| return false unless c.preprocess(&block) }
writers_by_connection, readers_by_connection = {}, {}
- writers = connections.map do |c|
+ writers = @connections.map do |c|
c.listeners.keys.select do |w|
writers_by_connection[c] ||= []
writers_by_connection[c] << w
@@ -96,11 +122,11 @@ module Net; module SSH; module Multi
end
end.flatten
- readers = connections.map { |c| readers_by_connection[c] = c.listeners.keys }.flatten
+ readers = @connections.map { |c| readers_by_connection[c] = c.listeners.keys }.flatten
readers, writers = IO.select(readers, writers, nil, wait)
- connections.each do |c|
+ @connections.each do |c|
readers_for_this = readers_by_connection[c] & (readers || [])
writers_for_this = writers_by_connection[c] & (writers || [])
return false unless c.postprocess(readers_for_this, writers_for_this)
@@ -110,12 +136,12 @@ module Net; module SSH; module Multi
end
def send_global_request(type, *extra, &callback)
- connections.each { |connection| connection.send_global_request(type, *extra, &callback) }
+ active_connections.each { |connection| connection.send_global_request(type, *extra, &callback) }
self
end
def open_channel(type="session", *extra, &on_confirm)
- channels = connections.map do |connection|
+ channels = active_connections.map do |connection|
channel = connection.open_channel(type, *extra, &on_confirm)
channel[:host] = connection[:host]
channel
@@ -168,26 +194,71 @@ module Net; module SSH; module Multi
return channel[:result]
end
- def send_message(message)
- connections.each { |connection| connection.send_message(message) }
- self
- end
-
private
+ def active_groups
+ @active_groups
+ end
+
def connection_specification?(args)
args.length == 2 || (args.length == 3 && args.last.is_a?(Hash))
end
- def establish_connection(host, user, options={})
+ def establish_connection(host, user, options, groups=[])
connection = gateway ? gateway.ssh(host, user, options) :
Net::SSH.start(host, user, options)
connection[:host] = host
- @mutex.synchronize { @connections.push(connection) }
+ @connections_mutex.synchronize { connections.push(connection) }
+ @groups_mutex.synchronize { group((active_groups + groups).uniq => connection) }
return connection
rescue Net::SSH::AuthenticationFailed => error
error.message << "@#{host}"
raise
end
+
+ class Collector
+ class Specification
+ attr_reader :host, :user, :options
+ attr_reader :groups
+
+ def initialize(host, user, options, groups)
+ @host, @user, @options = host, user, options.dup
+ @groups = groups.dup
+ end
+ end
+
+ attr_reader :specifications
+
+ def initialize
+ @specifications = []
+ @active_groups = []
+ end
+
+ def to(host, user, options={})
+ @specifications << Specification.new(host, user, options, @active_groups)
+ @specifications.length - 1
+ end
+
+ def group(*args)
+ mapping = args.last.is_a?(Hash) ? args.pop : {}
+
+ begin
+ saved_groups = @active_groups.dup
+ @active_groups.concat(args.map { |a| a.to_sym }).uniq!
+
+ mapping.each do |key, value|
+ groups = (Array(key).map { |v| v.to_sym } + @active_groups).uniq
+
+ Array(value).each do |id|
+ @specifications[id].groups.concat(groups).uniq!
+ end
+ end
+
+ yield self if block_given?
+ ensure
+ @active_groups.replace(saved_groups)
+ end
+ end
+ end
end
end; end; end \ No newline at end of file