summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJamis Buck <jamis@37signals.com>2008-03-30 16:42:42 -0600
committerJamis Buck <jamis@37signals.com>2008-03-30 16:42:42 -0600
commit45eee50ba0d47b5badf970ffbb10ca20bec1dc9e (patch)
tree0375645740d5e4509d3accb778563c6ad1c472d3
parent2ea62eb4cd9dbc82f8f24536d44318f55962998d (diff)
downloadnet-ssh-multi-45eee50ba0d47b5badf970ffbb10ca20bec1dc9e.tar.gz
session tests
-rw-r--r--lib/net/ssh/multi/server.rb5
-rw-r--r--lib/net/ssh/multi/session.rb95
-rw-r--r--test/server_test.rb10
-rw-r--r--test/session_test.rb334
4 files changed, 397 insertions, 47 deletions
diff --git a/lib/net/ssh/multi/server.rb b/lib/net/ssh/multi/server.rb
index 252304b..338a870 100644
--- a/lib/net/ssh/multi/server.rb
+++ b/lib/net/ssh/multi/server.rb
@@ -74,9 +74,8 @@ module Net; module SSH; module Multi
session && session.busy?(include_invisible)
end
- def preprocess(&block)
- return true unless session
- session.preprocess(&block)
+ def preprocess
+ session.preprocess if session
end
def readers
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb
index 9d44e71..f94028b 100644
--- a/lib/net/ssh/multi/session.rb
+++ b/lib/net/ssh/multi/session.rb
@@ -1,4 +1,3 @@
-require 'thread'
require 'net/ssh/gateway'
require 'net/ssh/multi/server'
require 'net/ssh/multi/channel'
@@ -9,12 +8,13 @@ module Net; module SSH; module Multi
attr_reader :default_gateway
attr_reader :groups
+ attr_reader :open_groups
+ attr_reader :active_groups
+
def initialize
@servers = []
@groups = {}
@gateway = nil
- @connections_mutex = Mutex.new
- @groups_mutex = Mutex.new
@active_groups = {}
@open_groups = []
end
@@ -26,16 +26,16 @@ module Net; module SSH; module Multi
raise ArgumentError, "must provide group mapping OR block, not both"
elsif block_given?
begin
- saved_groups = @open_groups.dup
- @open_groups.concat(args.map { |a| a.to_sym }).uniq!
+ saved_groups = open_groups.dup
+ open_groups.concat(args.map { |a| a.to_sym }).uniq!
yield self if block_given?
ensure
- @open_groups.replace(saved_groups)
+ open_groups.replace(saved_groups)
end
else
mapping.each do |key, value|
- (@open_groups + Array(key)).uniq.each do |grp|
- (groups[grp.to_sym] ||= []).concat(Array(value))
+ (open_groups + Array(key)).uniq.each do |grp|
+ (groups[grp.to_sym] ||= []).concat(Array(value)).uniq!
end
end
end
@@ -48,7 +48,10 @@ module Net; module SSH; module Multi
def use(host, user, options={})
server = Server.new(host, user, {:via => default_gateway}.merge(options))
- unless servers.include?(server)
+ exists = servers.index(server)
+ if exists
+ server = servers[exists]
+ else
servers << server
group [] => server
end
@@ -56,32 +59,49 @@ module Net; module SSH; module Multi
end
def with(*groups)
- saved_groups = @active_groups.dup
+ saved_groups = active_groups.dup
new_map = groups.inject({}) do |map, group|
if group.is_a?(Hash)
group.each do |gr, value|
raise ArgumentError, "the value for any group must be a Hash" unless value.is_a?(Hash)
- map[gr] = (@active_groups[gr] || {}).merge(value)
+ bad_keys = value.keys - [:only, :except]
+ raise ArgumentError, "unknown constraint(s): #{bad_keys.inspect}" unless bad_keys.empty?
+ map[gr] = (active_groups[gr] || {}).merge(value)
end
else
- map[group] = @active_groups[group] || {}
+ map[group] = active_groups[group] || {}
end
map
end
- @active_groups.update(new_map)
+ active_groups.update(new_map)
yield self
ensure
- @active_groups.replace(saved_groups)
+ active_groups.replace(saved_groups)
+ end
+
+ def on(*servers)
+ adhoc_group = "adhoc_group_#{servers.hash}_#{rand(0xffffffff)}".to_sym
+ group(adhoc_group => servers)
+ saved_groups = active_groups.dup
+ active_groups.replace(adhoc_group => {})
+ yield self
+ ensure
+ active_groups.replace(saved_groups) if saved_groups
+ groups.delete(adhoc_group)
end
def active_sessions
- list = if @active_groups.empty?
+ list = if active_groups.empty?
servers
else
- @active_groups.inject([]) do |list, (group, properties)|
- list.concat(groups[group].select { |server| properties.all? { |prop, value| server[prop] == value } })
+ active_groups.inject([]) do |list, (group, properties)|
+ servers = groups[group].select do |server|
+ (properties[:only] || {}).all? { |prop, value| server[prop] == value } &&
+ !(properties[:except] || {}).any? { |prop, value| server[prop] == value }
+ end
+ list.concat(servers)
end
end
@@ -111,15 +131,26 @@ module Net; module SSH; module Multi
loop_forever { break unless process(wait, &running) }
end
+ def preprocess(&block)
+ return false if block && !block[self]
+ servers.each { |server| server.preprocess }
+ block.nil? || block[self]
+ end
+
+ def postprocess(readers, writers)
+ servers.each { |server| server.postprocess(readers, writers) }
+ true
+ end
+
def process(wait=nil, &block)
- return false if servers.any? { |server| !server.preprocess(&block) }
+ return false unless preprocess(&block)
readers = servers.map { |s| s.readers }.flatten
writers = servers.map { |s| s.writers }.flatten
readers, writers, = IO.select(readers, writers, nil, wait)
- return servers.all? { |server| server.postprocess(readers, writers) }
+ return postprocess(readers, writers)
end
def send_global_request(type, *extra, &callback)
@@ -129,10 +160,11 @@ module Net; module SSH; module Multi
def open_channel(type="session", *extra, &on_confirm)
channels = active_sessions.map do |ssh|
- channel = ssh.open_channel(type, *extra, &on_confirm)
- channel[:server] = ssh[:server]
- channel[:host] = ssh[:server].host
- channel
+ ssh.open_channel(type, *extra) do |c|
+ c[:server] = ssh[:server]
+ c[:host] = ssh[:server].host
+ on_confirm[c] if on_confirm
+ end
end
Multi::Channel.new(self, channels)
end
@@ -169,24 +201,11 @@ module Net; module SSH; module Multi
end
end
- def exec!(command, &block)
- block ||= Proc.new do |ch, type, data|
- ch[:result] ||= {}
- ch[:result][ch[:server]] ||= ""
- ch[:result][ch[:server]] << data
- end
-
- channel = exec(command, &block)
- channel.wait
-
- return channel[:result]
- end
-
private
def sessions_for(servers)
- threads = servers.map { |server| Thread.new { server.session(true) } }
- threads.each { |thread| thread.join }
+ threads = servers.map { |server| Thread.new { server.session(true) } if server.session.nil? }
+ threads.each { |thread| thread.join if thread }
servers.map { |server| server.session }
end
end
diff --git a/test/server_test.rb b/test/server_test.rb
index b89066f..4c9d03c 100644
--- a/test/server_test.rb
+++ b/test/server_test.rb
@@ -153,19 +153,17 @@ class ServerTest < Test::Unit::TestCase
assert srv.busy?
end
- def test_preprocess_should_be_true_when_session_is_not_open
- assert_equal true, server('host', 'user').preprocess
+ def test_preprocess_should_be_nil_when_session_is_not_open
+ assert_nil server('host', 'user').preprocess
end
def test_preprocess_should_return_result_of_session_preprocess
srv = server('host', 'user')
session = {}
Net::SSH.expects(:start).returns(session)
- session.expects(:preprocess).returns(:result).yields(session)
- called = nil
+ session.expects(:preprocess).returns(:result)
srv.session(true)
- assert_equal :result, srv.preprocess { |s| called = s }
- assert_equal session, called
+ assert_equal :result, srv.preprocess
end
def test_readers_should_return_empty_array_when_session_is_not_open
diff --git a/test/session_test.rb b/test/session_test.rb
new file mode 100644
index 0000000..18afea9
--- /dev/null
+++ b/test/session_test.rb
@@ -0,0 +1,334 @@
+require 'common'
+require 'net/ssh/multi/session'
+
+class SessionTest < Test::Unit::TestCase
+ def setup
+ @session = Net::SSH::Multi::Session.new
+ end
+
+ def test_group_should_fail_when_given_both_mapping_and_block
+ assert_raises(ArgumentError) do
+ @session.group(:app => mock('server')) { |s| }
+ end
+ end
+
+ def test_group_with_block_should_use_groups_within_block_and_restore_on_exit
+ @session.open_groups.concat([:first, :second])
+ assert_equal [:first, :second], @session.open_groups
+ yielded = nil
+ @session.group(:third, :fourth) do |s|
+ yielded = s
+ assert_equal [:first, :second, :third, :fourth], @session.open_groups
+ end
+ assert_equal [:first, :second], @session.open_groups
+ assert_equal @session, yielded
+ end
+
+ def test_group_with_mapping_should_append_new_servers_to_specified_and_open_groups
+ @session.open_groups.concat([:first, :second])
+ @session.groups[:second] = [1]
+ @session.group %w(third fourth) => [2, 3], :fifth => 1, :sixth => [4]
+ assert_equal [1, 2, 3, 4], @session.groups[:first].sort
+ assert_equal [1, 2, 3, 4], @session.groups[:second].sort
+ assert_equal [2, 3], @session.groups[:third]
+ assert_equal [2, 3], @session.groups[:fourth]
+ assert_equal [1], @session.groups[:fifth]
+ assert_equal [4], @session.groups[:sixth]
+ end
+
+ def test_via_should_instantiate_and_set_default_gateway
+ Net::SSH::Gateway.expects(:new).with('host', 'user', :a => :b).returns(:gateway)
+ assert_equal @session, @session.via('host', 'user', :a => :b)
+ assert_equal :gateway, @session.default_gateway
+ end
+
+ def test_use_should_add_new_server_to_server_list
+ @session.open_groups.concat([:first, :second])
+ server = @session.use('host', 'user', :a => :b)
+ assert_equal [server], @session.servers
+ assert_equal 'host', server.host
+ assert_equal 'user', server.user
+ assert_equal({:a => :b}, server.options)
+ assert_nil server.gateway
+ end
+
+ def test_use_with_open_groups_should_add_new_server_to_server_list_and_groups
+ @session.open_groups.concat([:first, :second])
+ server = @session.use('host', 'user')
+ assert_equal [server], @session.groups[:first]
+ assert_equal [server], @session.groups[:second]
+ end
+
+ def test_use_with_default_gateway_should_set_gateway_on_server
+ Net::SSH::Gateway.expects(:new).with('host', 'user', {}).returns(:gateway)
+ @session.via('host', 'user')
+ server = @session.use('host2', 'user2')
+ assert_equal :gateway, server.gateway
+ end
+
+ def test_use_with_duplicate_server_will_not_add_server_twice
+ s1 = @session.use('host', 'user')
+ s2 = @session.use('host', 'user')
+ assert_equal 1, @session.servers.length
+ assert_equal s1.object_id, s2.object_id
+ end
+
+ def test_with_should_set_active_groups_and_yield_and_restore_active_groups
+ yielded = nil
+ @session.with(:app, :web) do |s|
+ yielded = s
+ assert_equal({:app => {}, :web => {}}, @session.active_groups)
+ end
+ assert_equal @session, yielded
+ assert_equal({}, @session.active_groups)
+ end
+
+ def test_with_with_unknown_constraint_should_raise_error
+ assert_raises(ArgumentError) do
+ @session.with(:app => { :all => :foo }) {}
+ end
+ end
+
+ def test_with_with_constraints_should_add_constraints_to_active_groups
+ @session.with(:app => { :only => { :primary => true }, :except => { :backup => true } }) do |s|
+ assert_equal({:app => {:only => {:primary => true}, :except => {:backup => true}}}, @session.active_groups)
+ end
+ end
+
+ def test_on_should_create_ad_hoc_group_and_make_that_group_the_only_active_group
+ s1 = @session.use('h1', 'u1')
+ s2 = @session.use('h2', 'u2')
+ yielded = nil
+ @session.active_groups[:g1] = []
+ @session.on(s1, s2) do |s|
+ yielded = s
+ assert_equal 1, @session.active_groups.size
+ assert_not_equal :g1, @session.active_groups.keys.first
+ assert_equal [s1, s2], @session.groups[@session.active_groups.keys.first]
+ end
+ assert_equal [:g1], @session.active_groups.keys
+ assert_equal @session, yielded
+ end
+
+ def test_active_sessions_should_return_sessions_for_all_servers_if_active_groups_is_empty
+ s1, s2, s3 = MockSession.new, MockSession.new, MockSession.new
+ srv1, srv2, srv3 = @session.use('h1', 'u1'), @session.use('h2', 'u2'), @session.use('h3', 'u3')
+ Net::SSH.expects(:start).with('h1', 'u1', {}).returns(s1)
+ Net::SSH.expects(:start).with('h2', 'u2', {}).returns(s2)
+ Net::SSH.expects(:start).with('h3', 'u3', {}).returns(s3)
+ assert_equal [s1, s2, s3], @session.active_sessions.sort
+ end
+
+ def test_active_sessions_should_return_sessions_only_for_active_groups_if_active_groups_exist
+ s1, s2, s3 = MockSession.new, MockSession.new, MockSession.new
+ srv1, srv2, srv3 = @session.use('h1', 'u1'), @session.use('h2', 'u2'), @session.use('h3', 'u3')
+ @session.group :app => [srv1, srv2], :db => [srv3]
+ Net::SSH.expects(:start).with('h1', 'u1', {}).returns(s1)
+ Net::SSH.expects(:start).with('h2', 'u2', {}).returns(s2)
+ @session.active_groups.replace(:app => {})
+ assert_equal [s1, s2], @session.active_sessions.sort
+ end
+
+ def test_active_sessions_should_not_return_duplicate_sessions
+ s1, s2, s3 = MockSession.new, MockSession.new, MockSession.new
+ srv1, srv2, srv3 = @session.use('h1', 'u1'), @session.use('h2', 'u2'), @session.use('h3', 'u3')
+ @session.group :app => [srv1, srv2], :db => [srv2, srv3]
+ Net::SSH.expects(:start).with('h1', 'u1', {}).returns(s1)
+ Net::SSH.expects(:start).with('h2', 'u2', {}).returns(s2)
+ Net::SSH.expects(:start).with('h3', 'u3', {}).returns(s3)
+ @session.active_groups.replace(:app => {}, :db => {})
+ assert_equal [s1, s2, s3], @session.active_sessions.sort
+ end
+
+ def test_active_sessions_should_correctly_apply_only_and_except_constraints
+ s1, s2, s3 = MockSession.new, MockSession.new, MockSession.new
+ srv1, srv2, srv3 = @session.use('h1', 'u1', :properties => {:a => 1}), @session.use('h2', 'u2', :properties => {:a => 1, :b => 2}), @session.use('h3', 'u3')
+ @session.group :app => [srv1, srv2, srv3]
+ Net::SSH.expects(:start).with('h1', 'u1', :properties => {:a => 1}).returns(s1)
+ @session.active_groups.replace(:app => {:only => {:a => 1}, :except => {:b => 2}})
+ assert_equal [s1], @session.active_sessions.sort
+ end
+
+ def test_connect_bang_should_call_active_sessions_and_return_self
+ @session.expects(:active_sessions)
+ assert_equal @session, @session.connect!
+ end
+
+ def test_close_should_close_server_sessions
+ srv1, srv2 = @session.use('h1', 'u1'), @session.use('h2', 'u2')
+ srv1.expects(:close_channels)
+ srv2.expects(:close_channels)
+ srv1.expects(:close)
+ srv2.expects(:close)
+ @session.close
+ end
+
+ def test_close_should_shutdown_default_gateway
+ gateway = mock('gateway')
+ gateway.expects(:shutdown!)
+ Net::SSH::Gateway.expects(:new).returns(gateway)
+ @session.via('host', 'user')
+ @session.close
+ end
+
+ def test_busy_should_be_true_if_any_server_is_busy
+ srv1, srv2, srv3 = @session.use('h1', 'u1', :properties => {:a => 1}), @session.use('h2', 'u2', :properties => {:a => 1, :b => 2}), @session.use('h3', 'u3')
+ srv1.stubs(:busy?).returns(false)
+ srv2.stubs(:busy?).returns(false)
+ srv3.stubs(:busy?).returns(true)
+ assert @session.busy?
+ end
+
+ def test_busy_should_be_false_if_all_servers_are_not_busy
+ srv1, srv2, srv3 = @session.use('h1', 'u1', :properties => {:a => 1}), @session.use('h2', 'u2', :properties => {:a => 1, :b => 2}), @session.use('h3', 'u3')
+ srv1.stubs(:busy?).returns(false)
+ srv2.stubs(:busy?).returns(false)
+ srv3.stubs(:busy?).returns(false)
+ assert !@session.busy?
+ end
+
+ def test_loop_should_loop_until_process_is_false
+ @session.expects(:process).with(5).times(4).returns(true,true,true,false).yields
+ yielded = false
+ @session.loop(5) { yielded = true }
+ assert yielded
+ end
+
+ def test_preprocess_should_immediately_return_false_if_block_returns_false
+ srv = @session.use('h1', 'u1')
+ srv.expects(:preprocess).never
+ assert_equal false, @session.preprocess { false }
+ end
+
+ def test_preprocess_should_call_preprocess_on_component_servers
+ srv = @session.use('h1', 'u1')
+ srv.expects(:preprocess)
+ assert_equal :hello, @session.preprocess { :hello }
+ end
+
+ def test_preprocess_should_succeed_even_without_block
+ srv = @session.use('h1', 'u1')
+ srv.expects(:preprocess)
+ assert_equal true, @session.preprocess
+ end
+
+ def test_postprocess_should_call_postprocess_on_component_servers
+ srv = @session.use('h1', 'u1')
+ srv.expects(:postprocess).with([:a], [:b])
+ assert_equal true, @session.postprocess([:a], [:b])
+ end
+
+ def test_process_should_return_false_if_preprocess_returns_false
+ assert_equal false, @session.process { false }
+ end
+
+ def test_process_should_call_select_on_combined_readers_and_writers_from_all_servers
+ @session.expects(:postprocess).with([:b, :c], [:a, :c])
+ srv1, srv2, srv3 = @session.use('h1', 'u1'), @session.use('h2', 'u2'), @session.use('h3', 'u3')
+ srv1.expects(:readers).returns([:a])
+ srv1.expects(:writers).returns([:a])
+ srv2.expects(:readers).returns([])
+ srv2.expects(:writers).returns([])
+ srv3.expects(:readers).returns([:b, :c])
+ srv3.expects(:writers).returns([:c])
+ IO.expects(:select).with([:a, :b, :c], [:a, :c], nil, 5).returns([[:b, :c], [:a, :c]])
+ @session.process(5)
+ end
+
+ def test_send_global_request_should_delegate_to_active_sessions
+ s1 = mock('ssh')
+ s2 = mock('ssh')
+ s1.expects(:send_global_request).with("a", "b", "c").yields
+ s2.expects(:send_global_request).with("a", "b", "c").yields
+ @session.expects(:active_sessions).returns([s1, s2])
+ calls = 0
+ @session.send_global_request("a", "b", "c") { calls += 1 }
+ assert_equal 2, calls
+ end
+
+ def test_open_channel_should_delegate_to_active_sessions_and_set_accessors_on_each_channel_and_return_multi_channel
+ srv1 = @session.use('h1', 'u1')
+ srv2 = @session.use('h2', 'u2')
+ s1 = { :server => srv1 }
+ s2 = { :server => srv2 }
+ c1 = {}
+ c2 = {}
+ @session.expects(:active_sessions).returns([s1, s2])
+ s1.expects(:open_channel).with("session").yields(c1).returns(c1)
+ s2.expects(:open_channel).with("session").yields(c2).returns(c2)
+ results = []
+ channel = @session.open_channel do |c|
+ results << c
+ end
+ assert_equal [c1, c2], results
+ assert_equal "h1", c1[:host]
+ assert_equal "h2", c2[:host]
+ assert_equal srv1, c1[:server]
+ assert_equal srv2, c2[:server]
+ assert_instance_of Net::SSH::Multi::Channel, channel
+ assert_equal [c1, c2], channel.channels
+ end
+
+ def test_exec_should_raise_exception_if_channel_cannot_exec_command
+ c = { :host => "host" }
+ @session.expects(:open_channel).yields(c).returns(c)
+ c.expects(:exec).with('something').yields(c, false)
+ assert_raises(RuntimeError) { @session.exec("something") }
+ end
+
+ def test_exec_with_block_should_pass_data_and_extended_data_to_block
+ c = { :host => "host" }
+ @session.expects(:open_channel).yields(c).returns(c)
+ c.expects(:exec).with('something').yields(c, true)
+ c.expects(:on_data).yields(c, "stdout")
+ c.expects(:on_extended_data).yields(c, 1, "stderr")
+ c.expects(:on_request)
+ results = {}
+ @session.exec("something") do |c, stream, data|
+ results[stream] = data
+ end
+ assert_equal({:stdout => "stdout", :stderr => "stderr"}, results)
+ end
+
+ def test_exec_without_block_should_write_data_and_extended_data_lines_to_stdout_and_stderr
+ c = { :host => "host" }
+ @session.expects(:open_channel).yields(c).returns(c)
+ c.expects(:exec).with('something').yields(c, true)
+ c.expects(:on_data).yields(c, "stdout 1\nstdout 2\n")
+ c.expects(:on_extended_data).yields(c, 1, "stderr 1\nstderr 2\n")
+ c.expects(:on_request)
+ $stdout.expects(:puts).with("[host] stdout 1\n")
+ $stdout.expects(:puts).with("[host] stdout 2")
+ $stderr.expects(:puts).with("[host] stderr 1\n")
+ $stderr.expects(:puts).with("[host] stderr 2")
+ @session.exec("something")
+ end
+
+ def test_exec_should_capture_exit_status_of_process
+ c = { :host => "host" }
+ @session.expects(:open_channel).yields(c).returns(c)
+ c.expects(:exec).with('something').yields(c, true)
+ c.expects(:on_data)
+ c.expects(:on_extended_data)
+ c.expects(:on_request).with("exit-status").yields(c, Net::SSH::Buffer.from(:long, 127))
+ @session.exec("something")
+ assert_equal 127, c[:exit_status]
+ end
+
+ private
+
+ class MockSession < Hash
+ include Comparable
+
+ @@next_id = 0
+ attr_reader :id
+
+ def initialize
+ @id = (@@next_id += 1)
+ end
+
+ def <=>(s)
+ id <=> s.id
+ end
+ end
+end \ No newline at end of file