From 97e3f9e56a6e8d7ad0027228a497d7f144402ee4 Mon Sep 17 00:00:00 2001 From: Jamis Buck Date: Sun, 6 Apr 2008 08:13:18 -0600 Subject: connection limiting --- lib/net/ssh/multi.rb | 6 +- lib/net/ssh/multi/channel_proxy.rb | 24 +++++++ lib/net/ssh/multi/pending_connection.rb | 35 +++++++++++ lib/net/ssh/multi/server.rb | 83 ++++++++++++++++-------- lib/net/ssh/multi/session.rb | 108 ++++++++++++++++++++++++++++++-- test/server_test.rb | 58 ++++++++--------- test/session_test.rb | 4 +- 7 files changed, 252 insertions(+), 66 deletions(-) diff --git a/lib/net/ssh/multi.rb b/lib/net/ssh/multi.rb index fe60802..ae24f99 100644 --- a/lib/net/ssh/multi.rb +++ b/lib/net/ssh/multi.rb @@ -52,8 +52,10 @@ module Net; module SSH # session = Net::SSH::Multi.start # # ... # session.close - def self.start - session = Session.new + # + # Any options are passed directly to Net::SSH::Multi::Session.new (q.v.). + def self.start(options={}) + session = Session.new(options) if block_given? begin diff --git a/lib/net/ssh/multi/channel_proxy.rb b/lib/net/ssh/multi/channel_proxy.rb index 99f7986..df5c04c 100644 --- a/lib/net/ssh/multi/channel_proxy.rb +++ b/lib/net/ssh/multi/channel_proxy.rb @@ -1,13 +1,32 @@ module Net; module SSH; module Multi + + # The ChannelProxy is a delegate class that represents a channel that has + # not yet been opened. It is only used when Net::SSH::Multi is running with + # with a concurrent connections limit (see Net::SSH::Multi::Session#concurrent_connections). + # + # You'll never need to instantiate one of these directly, and will probably + # (if all goes well!) never even notice when one of these is in use. Essentially, + # it is spawned by a Net::SSH::Multi::PendingConnection when the pending + # connection is asked to open a channel. Any actions performed on the + # channel proxy will then be recorded, until a real channel is set as the + # delegate (see #delegate_to). At that point, all recorded actions will be + # replayed on the channel, and any subsequent actions will be immediately + # delegated to the channel. class ChannelProxy + # This is the "on confirm" callback that gets called when the real channel + # is opened. attr_reader :on_confirm + # Instantiates a new channel proxy with the given +on_confirm+ callback. def initialize(&on_confirm) @on_confirm = on_confirm @recordings = [] @channel = nil end + # Instructs the proxy to delegate all further actions to the given +channel+ + # (which must be an instance of Net::SSH::Connection::Channel). All recorded + # actions are immediately replayed, in order, against the delegate channel. def delegate_to(channel) @channel = channel @recordings.each do |sym, args, block| @@ -15,6 +34,10 @@ module Net; module SSH; module Multi end end + # If a channel delegate has been specified (see #delegate_to), the method + # will be immediately sent to the delegate. Otherwise, the call is added + # to the list of recorded method calls, to be played back when a delegate + # is specified. def method_missing(sym, *args, &block) if @channel @channel.__send__(sym, *args, &block) @@ -23,4 +46,5 @@ module Net; module SSH; module Multi end end end + end; end; end \ No newline at end of file diff --git a/lib/net/ssh/multi/pending_connection.rb b/lib/net/ssh/multi/pending_connection.rb index fcebcba..6af0fac 100644 --- a/lib/net/ssh/multi/pending_connection.rb +++ b/lib/net/ssh/multi/pending_connection.rb @@ -1,7 +1,23 @@ require 'net/ssh/multi/channel_proxy' module Net; module SSH; module Multi + + # A PendingConnection instance mimics a Net::SSH::Connection::Session instance, + # without actually being an open connection to a server. It is used by + # Net::SSH::Multi::Session when a concurrent connection limit is in effect, + # so that a server can hang on to a "connection" that isn't really a connection. + # + # Any requests against this connection (like #open_channel or #send_global_request) + # are not actually sent, but are added to a list of recordings. When the real + # session is opened and replaces this pending connection, all recorded actions + # will be replayed against that session. + # + # You'll never need to initialize one of these directly, and (if all goes well!) + # should never even notice that one of these is in use. Net::SSH::Multi::Session + # will instantiate these as needed, and only when there is a concurrent + # connection limit. class PendingConnection + # Represents a #open_channel action. class ChannelOpenRecording #:nodoc: attr_reader :type, :extras, :channel @@ -15,6 +31,7 @@ module Net; module SSH; module Multi end end + # Represents a #send_global_request action. class SendGlobalRequestRecording #:nodoc: attr_reader :type, :extra, :callback @@ -27,51 +44,69 @@ module Net; module SSH; module Multi end end + # The Net::SSH::Multi::Server object that "owns" this pending connection. attr_reader :server + # Instantiates a new pending connection for the given Net::SSH::Multi::Server + # object. def initialize(server) @server = server @recordings = [] end + # Instructs the pending session to replay all of its recordings against the + # given +session+, and to then replace itself with the given session. def replace_with(session) @recordings.each { |recording| recording.replay_on(session) } @server.replace_session(session) end + # Records that a channel open request has been made, and returns a new + # Net::SSH::Multi::ChannelProxy object to represent the (as yet unopened) + # channel. def open_channel(type="session", *extras, &on_confirm) channel = ChannelProxy.new(&on_confirm) @recordings << ChannelOpenRecording.new(type, extras, channel) return channel end + # Records that a global request has been made. The request is not actually + # sent, and won't be until #replace_with is called. def send_global_request(type, *extra, &callback) @recordings << SendGlobalRequestRecording.new(type, extra, callback) self end + # Always returns +true+, so that the pending connection looks active until + # it can be truly opened and replaced with a real connection. def busy?(include_invisible=false) true end + # Does nothing, except to make a pending connection quack like a real connection. def close self end + # Returns an empty array, since a pending connection cannot have any real channels. def channels [] end + # Returns +true+, and does nothing else. def preprocess true end + # Returns +true+, and does nothing else. def postprocess(readers, writers) true end + # Returns an empty hash, since a pending connection has no real listeners. def listeners {} end end + end; end; end \ No newline at end of file diff --git a/lib/net/ssh/multi/server.rb b/lib/net/ssh/multi/server.rb index 65967ab..999e788 100644 --- a/lib/net/ssh/multi/server.rb +++ b/lib/net/ssh/multi/server.rb @@ -6,6 +6,9 @@ module Net; module SSH; module Multi # need to instantiate one of these directly: instead, you should use # Net::SSH::Multi::Session#use. class Server + # The Net::SSH::Multi::Session instance that manages this server instance. + attr_reader :master + # The host name (or IP address) of the server to connect to. attr_reader :host @@ -21,12 +24,14 @@ module Net; module SSH; module Multi attr_reader :gateway # Creates a new Server instance with the given connection information. The - # +options+ hash must conform to the options described for Net::SSH::start, - # with one addition: + # +master+ argument must be a reference to the Net::SSH::Multi::Session + # instance that will manage this server reference. The +options+ hash must + # conform to the options described for Net::SSH::start, with one addition: # # * :via => a Net::SSH::Gateway instance to use when establishing a # connection to this server. - def initialize(host, user, options={}) + def initialize(master, host, user, options={}) + @master = master @host = host @user = user @options = options.dup @@ -77,9 +82,9 @@ module Net; module SSH; module Multi @inspect ||= "#<%s:0x%x %s>" % [self.class.name, object_id, to_s] end - # Returns the Net::SSH session object for this server. If +ensure_open+ + # Returns the Net::SSH session object for this server. If +require_session+ # is false and the session has not previously been created, this will - # return +nil+. If +ensure_open+ is true, the session will be instantiated + # return +nil+. If +require_session+ is true, the session will be instantiated # if it has not already been instantiated, via the +gateway+ if one is # given, or directly (via Net::SSH::start) otherwise. # @@ -94,9 +99,52 @@ module Net; module SSH; module Multi # (the Server instance that spawned them). # # assert_equal server, server.session[:server] - def session(ensure_open=false) - return @session if @session || !ensure_open - @session ||= begin + def session(require_session=false) + return @session if @session || !require_session + @session ||= master.next_session(self) + end + + # Returns +true+ if the session has been opened, and the session is currently + # busy (as defined by Net::SSH::Connection::Session#busy?). + def busy?(include_invisible=false) + session && session.busy?(include_invisible) + end + + # Closes this server's session. If the session has not yet been opened, + # this does nothing. + def close + session.close if session + ensure + master.server_closed(self) if session + @session = nil + end + + public # but not published, e.g., these are used internally only... + + # Indicate that the session currently in use by this server instance + # should be replaced by the given +session+ argument. This is used when + # a pending session has been "realized". Note that this does not + # actually replace the session--see #update_session! for that. + def replace_session(session) #:nodoc: + @ready_session = session + end + + # If a new session has been made ready (see #replace_session), this + # will replace the current session with the "ready" session. This + # method is called from the event loop to ensure that sessions are + # swapped in at the appropriate point (instead of in the middle of an + # event poll). + def update_session! #:nodoc: + if @ready_session + @session, @ready_session = @ready_session, nil + end + end + + # Returns a new session object based on this server's connection + # criteria. Note that this will not associate the session with the + # server, and should not be called directly; it is called internally + # from Net::SSH::Multi::Session when a new session is required. + def new_session #:nodoc: session = if gateway gateway.ssh(host, user, options) else @@ -105,31 +153,16 @@ module Net; module SSH; module Multi session[:server] = self session + rescue Net::SSH::AuthenticationFailed => error + raise Net::SSH::AuthenticationFailed.new("#{error.message}@#{host}") end - rescue Net::SSH::AuthenticationFailed => error - raise Net::SSH::AuthenticationFailed.new("#{error.message}@#{host}") - end - - # Returns +true+ if the session has been opened, and the session is currently - # busy (as defined by Net::SSH::Connection::Session#busy?). - def busy?(include_invisible=false) - session && session.busy?(include_invisible) - end - public # but not published, e.g., these are used internally only... - # Closes all open channels on this server's session. If the session has # not yet been opened, this does nothing. def close_channels #:nodoc: session.channels.each { |id, channel| channel.close } if session end - # Closes this server's session's transport layer. If the session has not - # yet been opened, this does nothing. - def close #:nodoc: - session.transport.close if session - end - # Runs the session's preprocess action, if the session has been opened. def preprocess #:nodoc: session.preprocess if session diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb index cffe972..99819e1 100644 --- a/lib/net/ssh/multi/session.rb +++ b/lib/net/ssh/multi/session.rb @@ -1,6 +1,8 @@ +require 'thread' require 'net/ssh/gateway' require 'net/ssh/multi/server' require 'net/ssh/multi/channel' +require 'net/ssh/multi/pending_connection' module Net; module SSH; module Multi # Represents a collection of connections to various servers. It provides an @@ -50,6 +52,13 @@ module Net; module SSH; module Multi # corresponding Net::SSH::Multi::Server definitions. attr_reader :groups + # The number of allowed concurrent connections. No more than this number + # of sessions will be open at any given time. + attr_accessor :concurrent_connections + + # The number of connections that are currently open. + attr_reader :open_connections #:nodoc: + # The list of "open" groups, which will receive subsequent server definitions. # See #use and #group. attr_reader :open_groups #:nodoc: @@ -61,12 +70,30 @@ module Net; module SSH; module Multi # Creates a new Net::SSH::Multi::Session instance. Initially, it contains # no server definitions, no group definitions, and no default gateway. - def initialize + # + # You can set the #concurrent_connections property in the options. Setting + # it to +nil+ (the default) will cause Net::SSH::Multi to ignore any + # concurrent connection limit and allow all defined sessions to be open + # simultaneously. Setting it to an integer will cause Net::SSH::Multi to + # allow no more than that number of concurrently open sessions, opening + # subsequent sessions only when other sessions finish and close. + # + # Net::SSH::Multi.start(:concurrent_connections => 10) do |session| + # session.use ... + # end + def initialize(options={}) @servers = [] @groups = {} @gateway = nil @active_groups = {} @open_groups = [] + @connect_threads = [] + + @open_connections = 0 + @pending_sessions = [] + @session_mutex = Mutex.new + + options.each { |opt, value| send("#{opt}=", value) } end # At its simplest, this associates a named group with a server definition. @@ -144,7 +171,7 @@ module Net; module SSH; module Multi # session.use 'host2', 'user2', :via => nil # session.use 'host3', 'user3', :via => Net::SSH::Gateway.new('gateway.host', 'user') def use(host, user, options={}) - server = Server.new(host, user, {:via => default_gateway}.merge(options)) + server = Server.new(self, host, user, {:via => default_gateway}.merge(options)) exists = servers.index(server) if exists server = servers[exists] @@ -313,6 +340,9 @@ module Net; module SSH; module Multi # whether #process returns +true+ (the block did not return +false+), or # +false+ (the block returned +false+). def process(wait=nil, &block) + realize_pending_connections! + wait = @connect_threads.any? ? 0 : wait + return false unless preprocess(&block) readers = servers.map { |s| s.readers }.flatten @@ -320,7 +350,11 @@ module Net; module SSH; module Multi readers, writers, = IO.select(readers, writers, nil, wait) - return postprocess(readers, writers) + if readers + return postprocess(readers, writers) + else + return true + end end # Sends a global request to all active sessions (see #active_sessions). @@ -365,8 +399,8 @@ module Net; module SSH; module Multi def open_channel(type="session", *extra, &on_confirm) channels = active_sessions.map do |ssh| ssh.open_channel(type, *extra) do |c| - c[:server] = ssh[:server] - c[:host] = ssh[:server].host + c[:server] = c.connection[:server] + c[:host] = c.connection[:server].host on_confirm[c] if on_confirm end end @@ -448,5 +482,69 @@ module Net; module SSH; module Multi servers.each { |server| server.postprocess(readers, writers) } true end + + # Takes the #concurrent_connections property into account, and tries to + # return a new session for the given server. If the concurrent connections + # limit has been reached, then a Net::SSH::Multi::PendingConnection instance + # will be returned instead, which will be realized into an actual session + # as soon as a slot opens up. + # + # If +force+ is true, the concurrent_connections check is skipped and a real + # connection is always returned. + def next_session(server, force=false) #:nodoc: + @session_mutex.synchronize do + if !force && concurrent_connections && concurrent_connections <= open_connections + connection = PendingConnection.new(server) + @pending_sessions << connection + return connection + end + + @open_connections += 1 + end + + begin + server.new_session + rescue Exception => e + @session_mutex.synchronize { @open_connections -= 1 } + raise + end + end + + # Tells the session that the given server has closed its connection. The + # session indicates that a new connection slot is available, which may be + # filled by the next pending connection on the next event loop iteration. + def server_closed(server) #:nodoc: + @session_mutex.synchronize do + unless @pending_sessions.delete(server.session) + @open_connections -= 1 + end + end + end + + # Invoked by the event loop. If there is a concurrent_connections limit in + # effect, this will close any non-busy sessions and try to open as many + # new sessions as it can. It does this in threads, so that existing processing + # can continue. + # + # If there is no concurrent_connections limit in effect, then this method + # does nothing. + def realize_pending_connections! #:nodoc: + return unless concurrent_connections + + servers.each do |s| + s.close if !s.busy?(true) + s.update_session! + end + + @connect_threads.delete_if { |t| !t.alive? } + + count = concurrent_connections ? (concurrent_connections - open_connections) : @pending_sessions.length + count.times do + session = @pending_sessions.pop or break + @connect_threads << Thread.new do + session.replace_with(next_session(session.server, true)) + end + end + end end end; end; end \ No newline at end of file diff --git a/test/server_test.rb b/test/server_test.rb index 4c9d03c..caca334 100644 --- a/test/server_test.rb +++ b/test/server_test.rb @@ -2,6 +2,10 @@ require 'common' require 'net/ssh/multi/server' class ServerTest < Test::Unit::TestCase + def setup + @master = mock('multi-session') + end + def test_accessor_without_properties_should_access_empty_hash assert_nil server('host', 'user')[:foo] end @@ -72,19 +76,8 @@ class ServerTest < Test::Unit::TestCase end def test_session_with_true_argument_should_instantiate_and_cache_session - session = {} srv = server('host', 'user', :port => 1234) - Net::SSH.expects(:start).with('host', 'user', {:port => 1234}).once.returns(session) - assert_equal session, srv.session(true) - assert_equal session, srv.session(true) - assert_equal session, srv.session - end - - def test_session_via_gateway_with_true_argument_should_instantiate_and_cache_session - session = {} - gateway = mock('gateway') - srv = server('host', 'user', :port => 1234, :via => gateway) - gateway.expects(:ssh).with('host', 'user', {:port => 1234}).once.returns(session) + session = expect_connection_to(srv) assert_equal session, srv.session(true) assert_equal session, srv.session(true) assert_equal session, srv.session @@ -92,10 +85,10 @@ class ServerTest < Test::Unit::TestCase def test_session_that_cannot_authenticate_adds_host_to_exception_message srv = server('host', 'user') - Net::SSH.expects(:start).raises(Net::SSH::AuthenticationFailed.new('user')) + Net::SSH.expects(:start).with('host', 'user', {}).raises(Net::SSH::AuthenticationFailed.new('user')) begin - srv.session(true) + srv.new_session flunk rescue Net::SSH::AuthenticationFailed => e assert_equal "user@host", e.message @@ -108,8 +101,7 @@ class ServerTest < Test::Unit::TestCase def test_close_channels_when_session_is_open_should_iterate_over_open_channels_and_close_them srv = server('host', 'user') - session = {} - Net::SSH.expects(:start).returns(session) + session = expect_connection_to(srv) c1 = mock('channel', :close => nil) c2 = mock('channel', :close => nil) c3 = mock('channel', :close => nil) @@ -122,11 +114,11 @@ class ServerTest < Test::Unit::TestCase assert_nothing_raised { server('host', 'user').close } end - def test_close_when_session_is_open_should_close_transport_layer + def test_close_when_session_is_open_should_close_session srv = server('host', 'user') - session = {} - Net::SSH.expects(:start).returns(session) - session.expects(:transport).returns(mock('transport', :close => nil)) + session = expect_connection_to(srv) + session.expects(:close) + @master.expects(:server_closed).with(srv) assert_equal session, srv.session(true) srv.close end @@ -137,8 +129,7 @@ class ServerTest < Test::Unit::TestCase def test_busy_should_be_false_when_session_is_not_busy srv = server('host', 'user') - session = {} - Net::SSH.expects(:start).returns(session) + session = expect_connection_to(srv) session.expects(:busy?).returns(false) srv.session(true) assert !srv.busy? @@ -146,8 +137,7 @@ class ServerTest < Test::Unit::TestCase def test_busy_should_be_true_when_session_is_busy srv = server('host', 'user') - session = {} - Net::SSH.expects(:start).returns(session) + session = expect_connection_to(srv) session.expects(:busy?).returns(true) srv.session(true) assert srv.busy? @@ -159,8 +149,7 @@ class ServerTest < Test::Unit::TestCase def test_preprocess_should_return_result_of_session_preprocess srv = server('host', 'user') - session = {} - Net::SSH.expects(:start).returns(session) + session = expect_connection_to(srv) session.expects(:preprocess).returns(:result) srv.session(true) assert_equal :result, srv.preprocess @@ -172,8 +161,7 @@ class ServerTest < Test::Unit::TestCase def test_readers_should_return_all_listeners_when_session_is_open srv = server('host', 'user') - session = {} - Net::SSH.expects(:start).returns(session) + session = expect_connection_to(srv) session.expects(:listeners).returns(1 => 2, 3 => 4, 5 => 6, 7 => 8) srv.session(true) assert_equal [1, 3, 5, 7], srv.readers.sort @@ -185,8 +173,7 @@ class ServerTest < Test::Unit::TestCase def test_writers_should_return_all_listeners_that_are_pending_writes_when_session_is_open srv = server('host', 'user') - session = {} - Net::SSH.expects(:start).returns(session) + session = expect_connection_to(srv) listeners = { writer(:ready) => 1, writer(:reader) => 2, writer(:reader) => 3, writer(:idle) => 4, writer(:ready) => 5 } session.expects(:listeners).returns(listeners) @@ -200,8 +187,7 @@ class ServerTest < Test::Unit::TestCase def test_postprocess_should_call_session_postprocess_with_ios_belonging_to_session srv = server('host', 'user') - session = {} - Net::SSH.expects(:start).returns(session) + session = expect_connection_to(srv) session.expects(:listeners).returns(1 => 2, 3 => 4, 5 => 6, 7 => 8) session.expects(:postprocess).with([1,3], [7]).returns(:result) srv.session(true) @@ -211,7 +197,13 @@ class ServerTest < Test::Unit::TestCase private def server(host, user, options={}) - Net::SSH::Multi::Server.new(host, user, options) + Net::SSH::Multi::Server.new(@master, host, user, options) + end + + def expect_connection_to(server) + session = {} + @master.expects(:next_session).with(server).returns(session) + return session end def writer(mode) diff --git a/test/session_test.rb b/test/session_test.rb index 18afea9..8aebccc 100644 --- a/test/session_test.rb +++ b/test/session_test.rb @@ -251,8 +251,10 @@ class SessionTest < Test::Unit::TestCase srv2 = @session.use('h2', 'u2') s1 = { :server => srv1 } s2 = { :server => srv2 } - c1 = {} + c1 = { :stub => :value } c2 = {} + c1.stubs(:connection).returns(s1) + c2.stubs(:connection).returns(s2) @session.expects(:active_sessions).returns([s1, s2]) s1.expects(:open_channel).with("session").yields(c1).returns(c1) s2.expects(:open_channel).with("session").yields(c2).returns(c2) -- cgit v1.2.1