summaryrefslogtreecommitdiff
path: root/lib/net/ssh/multi/session.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/net/ssh/multi/session.rb')
-rw-r--r--lib/net/ssh/multi/session.rb95
1 files changed, 57 insertions, 38 deletions
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb
index 9d44e71..f94028b 100644
--- a/lib/net/ssh/multi/session.rb
+++ b/lib/net/ssh/multi/session.rb
@@ -1,4 +1,3 @@
-require 'thread'
require 'net/ssh/gateway'
require 'net/ssh/multi/server'
require 'net/ssh/multi/channel'
@@ -9,12 +8,13 @@ module Net; module SSH; module Multi
attr_reader :default_gateway
attr_reader :groups
+ attr_reader :open_groups
+ attr_reader :active_groups
+
def initialize
@servers = []
@groups = {}
@gateway = nil
- @connections_mutex = Mutex.new
- @groups_mutex = Mutex.new
@active_groups = {}
@open_groups = []
end
@@ -26,16 +26,16 @@ module Net; module SSH; module Multi
raise ArgumentError, "must provide group mapping OR block, not both"
elsif block_given?
begin
- saved_groups = @open_groups.dup
- @open_groups.concat(args.map { |a| a.to_sym }).uniq!
+ saved_groups = open_groups.dup
+ open_groups.concat(args.map { |a| a.to_sym }).uniq!
yield self if block_given?
ensure
- @open_groups.replace(saved_groups)
+ open_groups.replace(saved_groups)
end
else
mapping.each do |key, value|
- (@open_groups + Array(key)).uniq.each do |grp|
- (groups[grp.to_sym] ||= []).concat(Array(value))
+ (open_groups + Array(key)).uniq.each do |grp|
+ (groups[grp.to_sym] ||= []).concat(Array(value)).uniq!
end
end
end
@@ -48,7 +48,10 @@ module Net; module SSH; module Multi
def use(host, user, options={})
server = Server.new(host, user, {:via => default_gateway}.merge(options))
- unless servers.include?(server)
+ exists = servers.index(server)
+ if exists
+ server = servers[exists]
+ else
servers << server
group [] => server
end
@@ -56,32 +59,49 @@ module Net; module SSH; module Multi
end
def with(*groups)
- saved_groups = @active_groups.dup
+ saved_groups = active_groups.dup
new_map = groups.inject({}) do |map, group|
if group.is_a?(Hash)
group.each do |gr, value|
raise ArgumentError, "the value for any group must be a Hash" unless value.is_a?(Hash)
- map[gr] = (@active_groups[gr] || {}).merge(value)
+ bad_keys = value.keys - [:only, :except]
+ raise ArgumentError, "unknown constraint(s): #{bad_keys.inspect}" unless bad_keys.empty?
+ map[gr] = (active_groups[gr] || {}).merge(value)
end
else
- map[group] = @active_groups[group] || {}
+ map[group] = active_groups[group] || {}
end
map
end
- @active_groups.update(new_map)
+ active_groups.update(new_map)
yield self
ensure
- @active_groups.replace(saved_groups)
+ active_groups.replace(saved_groups)
+ end
+
+ def on(*servers)
+ adhoc_group = "adhoc_group_#{servers.hash}_#{rand(0xffffffff)}".to_sym
+ group(adhoc_group => servers)
+ saved_groups = active_groups.dup
+ active_groups.replace(adhoc_group => {})
+ yield self
+ ensure
+ active_groups.replace(saved_groups) if saved_groups
+ groups.delete(adhoc_group)
end
def active_sessions
- list = if @active_groups.empty?
+ list = if active_groups.empty?
servers
else
- @active_groups.inject([]) do |list, (group, properties)|
- list.concat(groups[group].select { |server| properties.all? { |prop, value| server[prop] == value } })
+ active_groups.inject([]) do |list, (group, properties)|
+ servers = groups[group].select do |server|
+ (properties[:only] || {}).all? { |prop, value| server[prop] == value } &&
+ !(properties[:except] || {}).any? { |prop, value| server[prop] == value }
+ end
+ list.concat(servers)
end
end
@@ -111,15 +131,26 @@ module Net; module SSH; module Multi
loop_forever { break unless process(wait, &running) }
end
+ def preprocess(&block)
+ return false if block && !block[self]
+ servers.each { |server| server.preprocess }
+ block.nil? || block[self]
+ end
+
+ def postprocess(readers, writers)
+ servers.each { |server| server.postprocess(readers, writers) }
+ true
+ end
+
def process(wait=nil, &block)
- return false if servers.any? { |server| !server.preprocess(&block) }
+ return false unless preprocess(&block)
readers = servers.map { |s| s.readers }.flatten
writers = servers.map { |s| s.writers }.flatten
readers, writers, = IO.select(readers, writers, nil, wait)
- return servers.all? { |server| server.postprocess(readers, writers) }
+ return postprocess(readers, writers)
end
def send_global_request(type, *extra, &callback)
@@ -129,10 +160,11 @@ module Net; module SSH; module Multi
def open_channel(type="session", *extra, &on_confirm)
channels = active_sessions.map do |ssh|
- channel = ssh.open_channel(type, *extra, &on_confirm)
- channel[:server] = ssh[:server]
- channel[:host] = ssh[:server].host
- channel
+ ssh.open_channel(type, *extra) do |c|
+ c[:server] = ssh[:server]
+ c[:host] = ssh[:server].host
+ on_confirm[c] if on_confirm
+ end
end
Multi::Channel.new(self, channels)
end
@@ -169,24 +201,11 @@ module Net; module SSH; module Multi
end
end
- def exec!(command, &block)
- block ||= Proc.new do |ch, type, data|
- ch[:result] ||= {}
- ch[:result][ch[:server]] ||= ""
- ch[:result][ch[:server]] << data
- end
-
- channel = exec(command, &block)
- channel.wait
-
- return channel[:result]
- end
-
private
def sessions_for(servers)
- threads = servers.map { |server| Thread.new { server.session(true) } }
- threads.each { |thread| thread.join }
+ threads = servers.map { |server| Thread.new { server.session(true) } if server.session.nil? }
+ threads.each { |thread| thread.join if thread }
servers.map { |server| server.session }
end
end