diff options
author | Jamis Buck <jamis@37signals.com> | 2008-04-09 14:23:06 -0600 |
---|---|---|
committer | Jamis Buck <jamis@37signals.com> | 2008-04-09 14:23:06 -0600 |
commit | 81d215f0764e473842dccd9b98b98aa08383cc7d (patch) | |
tree | d0caedf48ed711119857d91a6d7ac81c323716dc | |
parent | b97e7c775049cd5eac3656af04eab5dc7f42ee92 (diff) | |
download | net-ssh-multi-81d215f0764e473842dccd9b98b98aa08383cc7d.tar.gz |
deferred server evaluation via session.use(&block)
-rw-r--r-- | lib/net/ssh/multi/dynamic_server.rb | 37 | ||||
-rw-r--r-- | lib/net/ssh/multi/server_list.rb | 61 | ||||
-rw-r--r-- | lib/net/ssh/multi/session.rb | 68 | ||||
-rw-r--r-- | test/session_test.rb | 21 |
4 files changed, 147 insertions, 40 deletions
diff --git a/lib/net/ssh/multi/dynamic_server.rb b/lib/net/ssh/multi/dynamic_server.rb new file mode 100644 index 0000000..92af7a5 --- /dev/null +++ b/lib/net/ssh/multi/dynamic_server.rb @@ -0,0 +1,37 @@ +require 'net/ssh/multi/server' + +module Net; module SSH; module Multi + + class DynamicServer + attr_reader :master + attr_reader :callback + attr_reader :options + + def initialize(master, options, callback) + @master, @options, @callback = master, options, callback + @servers = nil + end + + def [](key) + (options[:properties] || {})[key] + end + + def each + (@servers || []).each { |server| yield server } + end + + def evaluate! + @servers ||= Array(callback[options]).map do |server| + case server + when String then Net::SSH::Multi::Server.new(master, server, options) + else server + end + end + end + + def to_ary + evaluate! + end + end + +end; end; end
\ No newline at end of file diff --git a/lib/net/ssh/multi/server_list.rb b/lib/net/ssh/multi/server_list.rb new file mode 100644 index 0000000..1f83be3 --- /dev/null +++ b/lib/net/ssh/multi/server_list.rb @@ -0,0 +1,61 @@ +require 'net/ssh/multi/server' +require 'net/ssh/multi/dynamic_server' + +module Net; module SSH; module Multi + + class ServerList + include Enumerable + + def initialize(list=[]) + @list = list.uniq + end + + def add(server) + index = @list.index(server) + if index + server = @list[index] + else + @list.push(server) + end + server + end + + def concat(servers) + servers.each { |server| add(server) } + self + end + + def each + @list.each do |server| + case server + when Server then yield server + when DynamicServer then server.each { |item| yield item } + else raise ArgumentError, "server list contains non-server: #{server.class}" + end + end + self + end + + def select + subset = @list.select { |i| yield i } + ServerList.new(subset) + end + + def flatten + result = @list.inject([]) do |aggregator, server| + case server + when Server then aggregator.push(server) + when DynamicServer then aggregator.concat(server) + else raise ArgumentError, "server list contains non-server: #{server.class}" + end + end + + result.uniq + end + + def to_ary + flatten + end + end + +end; end; end
\ No newline at end of file diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb index 0f9df0b..103e1c4 100644 --- a/lib/net/ssh/multi/session.rb +++ b/lib/net/ssh/multi/session.rb @@ -1,6 +1,8 @@ require 'thread' require 'net/ssh/gateway' require 'net/ssh/multi/server' +require 'net/ssh/multi/dynamic_server' +require 'net/ssh/multi/server_list' require 'net/ssh/multi/channel' require 'net/ssh/multi/pending_connection' require 'net/ssh/multi/session_actions' @@ -45,15 +47,15 @@ module Net; module SSH; module Multi class Session include SessionActions - # The list of Net::SSH::Multi::Server definitions managed by this session. - attr_reader :servers + # The Net::SSH::Multi::ServerList managed by this session. + attr_reader :server_list # The default Net::SSH::Gateway instance to use to connect to the servers. # If +nil+, no default gateway will be used. attr_reader :default_gateway - # The hash of group definitions, mapping each group name to the list of - # corresponding Net::SSH::Multi::Server definitions. + # The hash of group definitions, mapping each group name to a corresponding + # Net::SSH::Multi::ServerList. attr_reader :groups # The number of allowed concurrent connections. No more than this number @@ -92,8 +94,8 @@ module Net; module SSH; module Multi # session.use ... # end def initialize(options={}) - @servers = [] - @groups = {} + @server_list = ServerList.new + @groups = Hash.new { |h,k| h[k] = ServerList.new } @gateway = nil @open_groups = [] @connect_threads = [] @@ -112,7 +114,7 @@ module Net; module SSH; module Multi # # First, you can use it to associate a group (or array of groups) with a # server definition (or array of server definitions). The server definitions - # must already exist in the #servers array (typically by calling #use): + # must already exist in the #server_list array (typically by calling #use): # # server1 = session.use('host1', 'user1') # server2 = session.use('host2', 'user2') @@ -149,7 +151,7 @@ module Net; module SSH; module Multi else mapping.each do |key, value| (open_groups + Array(key)).uniq.each do |grp| - (groups[grp.to_sym] ||= []).concat(Array(value)).uniq! + groups[grp.to_sym].concat(Array(value)) end end end @@ -187,24 +189,29 @@ module Net; module SSH; module Multi # server instances will be returned. # # server1, server2 = session.use "host1", "host2" - def use(*hosts) + def use(*hosts, &block) options = hosts.last.is_a?(Hash) ? hosts.pop : {} + options = { :via => default_gateway }.merge(options) results = hosts.map do |host| - server = Server.new(self, host, {:via => default_gateway}.merge(options)) - exists = servers.index(server) - if exists - server = servers[exists] - else - servers << server - end - server + server_list.add(Server.new(self, host, options)) + end + + if block + results << server_list.add(DynamicServer.new(self, options, block)) end group [] => results results.length > 1 ? results : results.first end + # Essentially an alias for #servers_for without any arguments. This is used + # primarily to satistfy the expectations of the Net::SSH::Multi::SessionActions + # module. + def servers + servers_for + end + # Returns the set of servers that match the given criteria. It can be used # in any (or all) of three ways. # @@ -244,7 +251,7 @@ module Net; module SSH; module Multi # servers = session.servers_for(:app, :web, :db => { :only => { :primary => true } }) def servers_for(*criteria) if criteria.empty? - servers + server_list.flatten else # normalize the criteria list, so that every entry is a key to a # criteria hash (possibly empty). @@ -255,16 +262,17 @@ module Net; module SSH; module Multi end end - list = criteria.inject([]) do |server_list, (group, properties)| + list = criteria.inject([]) do |aggregator, (group, properties)| raise ArgumentError, "the value for any group must be a Hash, but got a #{properties.class} for #{group.inspect}" unless properties.is_a?(Hash) bad_keys = properties.keys - [:only, :except] raise ArgumentError, "unknown constraint(s) #{bad_keys.inspect} for #{group.inspect}" unless bad_keys.empty? - servers = (groups[group] || []).select do |server| + servers = groups[group].select do |server| (properties[:only] || {}).all? { |prop, value| server[prop] == value } && !(properties[:except] || {}).any? { |prop, value| server[prop] == value } end - server_list.concat(servers) + + aggregator.concat(servers) end list.uniq @@ -310,9 +318,9 @@ module Net; module SSH; module Multi # gateway connections (e.g., those passed to #use directly) will _not_ be # closed by this method, and must be managed externally. def close - servers.each { |server| server.close_channels } + server_list.each { |server| server.close_channels } loop(0) { busy?(true) } - servers.each { |server| server.close } + server_list.each { |server| server.close } default_gateway.shutdown! if default_gateway end @@ -339,8 +347,8 @@ module Net; module SSH; module Multi return false unless preprocess(&block) - readers = servers.map { |s| s.readers }.flatten - writers = servers.map { |s| s.writers }.flatten + readers = server_list.map { |s| s.readers }.flatten + writers = server_list.map { |s| s.writers }.flatten readers, writers, = IO.select(readers, writers, nil, wait) @@ -356,14 +364,14 @@ module Net; module SSH; module Multi # This is called as part of the #process method. def preprocess(&block) #:nodoc: return false if block && !block[self] - servers.each { |server| server.preprocess } + server_list.each { |server| server.preprocess } block.nil? || block[self] end # Runs the postprocess stage on all servers. Always returns true. This is # called as part of the #process method. def postprocess(readers, writers) #:nodoc: - servers.each { |server| server.postprocess(readers, writers) } + server_list.each { |server| server.postprocess(readers, writers) } true end @@ -437,9 +445,9 @@ module Net; module SSH; module Multi def realize_pending_connections! #:nodoc: return unless concurrent_connections - servers.each do |s| - s.close if !s.busy?(true) - s.update_session! + server_list.each do |server| + server.close if !server.busy?(true) + server.update_session! end @connect_threads.delete_if { |t| !t.alive? } diff --git a/test/session_test.rb b/test/session_test.rb index 5a948d8..c453a97 100644 --- a/test/session_test.rb +++ b/test/session_test.rb @@ -25,15 +25,16 @@ class SessionTest < Test::Unit::TestCase end def test_group_with_mapping_should_append_new_servers_to_specified_and_open_groups + s1, s2, s3, s4 = @session.use('h1', 'h2', 'h3', 'h4') + @session.group :second => s1 @session.open_groups.concat([:first, :second]) - @session.groups[:second] = [1] - @session.group %w(third fourth) => [2, 3], :fifth => 1, :sixth => [4] - assert_equal [1, 2, 3, 4], @session.groups[:first].sort - assert_equal [1, 2, 3, 4], @session.groups[:second].sort - assert_equal [2, 3], @session.groups[:third] - assert_equal [2, 3], @session.groups[:fourth] - assert_equal [1], @session.groups[:fifth] - assert_equal [4], @session.groups[:sixth] + @session.group %w(third fourth) => [s2, s3], :fifth => s1, :sixth => [s4] + assert_equal [s1, s2, s3, s4], @session.groups[:first].sort + assert_equal [s1, s2, s3, s4], @session.groups[:second].sort + assert_equal [s2, s3], @session.groups[:third].sort + assert_equal [s2, s3], @session.groups[:fourth].sort + assert_equal [s1], @session.groups[:fifth].sort + assert_equal [s4], @session.groups[:sixth].sort end def test_via_should_instantiate_and_set_default_gateway @@ -55,8 +56,8 @@ class SessionTest < Test::Unit::TestCase def test_use_with_open_groups_should_add_new_server_to_server_list_and_groups @session.open_groups.concat([:first, :second]) server = @session.use('host') - assert_equal [server], @session.groups[:first] - assert_equal [server], @session.groups[:second] + assert_equal [server], @session.groups[:first].sort + assert_equal [server], @session.groups[:second].sort end def test_use_with_default_gateway_should_set_gateway_on_server |