summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJamis Buck <jamis@37signals.com>2008-04-06 08:13:18 -0600
committerJamis Buck <jamis@37signals.com>2008-04-06 08:13:18 -0600
commit97e3f9e56a6e8d7ad0027228a497d7f144402ee4 (patch)
tree117071f8c4cca485aa1ed06672ca0fc0e5de289b
parent6214d1ec6796ffab41b9413840e733a1d5cf70dd (diff)
downloadnet-ssh-multi-97e3f9e56a6e8d7ad0027228a497d7f144402ee4.tar.gz
connection limiting
-rw-r--r--lib/net/ssh/multi.rb6
-rw-r--r--lib/net/ssh/multi/channel_proxy.rb24
-rw-r--r--lib/net/ssh/multi/pending_connection.rb35
-rw-r--r--lib/net/ssh/multi/server.rb83
-rw-r--r--lib/net/ssh/multi/session.rb108
-rw-r--r--test/server_test.rb58
-rw-r--r--test/session_test.rb4
7 files changed, 252 insertions, 66 deletions
diff --git a/lib/net/ssh/multi.rb b/lib/net/ssh/multi.rb
index fe60802..ae24f99 100644
--- a/lib/net/ssh/multi.rb
+++ b/lib/net/ssh/multi.rb
@@ -52,8 +52,10 @@ module Net; module SSH
# session = Net::SSH::Multi.start
# # ...
# session.close
- def self.start
- session = Session.new
+ #
+ # Any options are passed directly to Net::SSH::Multi::Session.new (q.v.).
+ def self.start(options={})
+ session = Session.new(options)
if block_given?
begin
diff --git a/lib/net/ssh/multi/channel_proxy.rb b/lib/net/ssh/multi/channel_proxy.rb
index 99f7986..df5c04c 100644
--- a/lib/net/ssh/multi/channel_proxy.rb
+++ b/lib/net/ssh/multi/channel_proxy.rb
@@ -1,13 +1,32 @@
module Net; module SSH; module Multi
+
+ # The ChannelProxy is a delegate class that represents a channel that has
+ # not yet been opened. It is only used when Net::SSH::Multi is running with
+ # with a concurrent connections limit (see Net::SSH::Multi::Session#concurrent_connections).
+ #
+ # You'll never need to instantiate one of these directly, and will probably
+ # (if all goes well!) never even notice when one of these is in use. Essentially,
+ # it is spawned by a Net::SSH::Multi::PendingConnection when the pending
+ # connection is asked to open a channel. Any actions performed on the
+ # channel proxy will then be recorded, until a real channel is set as the
+ # delegate (see #delegate_to). At that point, all recorded actions will be
+ # replayed on the channel, and any subsequent actions will be immediately
+ # delegated to the channel.
class ChannelProxy
+ # This is the "on confirm" callback that gets called when the real channel
+ # is opened.
attr_reader :on_confirm
+ # Instantiates a new channel proxy with the given +on_confirm+ callback.
def initialize(&on_confirm)
@on_confirm = on_confirm
@recordings = []
@channel = nil
end
+ # Instructs the proxy to delegate all further actions to the given +channel+
+ # (which must be an instance of Net::SSH::Connection::Channel). All recorded
+ # actions are immediately replayed, in order, against the delegate channel.
def delegate_to(channel)
@channel = channel
@recordings.each do |sym, args, block|
@@ -15,6 +34,10 @@ module Net; module SSH; module Multi
end
end
+ # If a channel delegate has been specified (see #delegate_to), the method
+ # will be immediately sent to the delegate. Otherwise, the call is added
+ # to the list of recorded method calls, to be played back when a delegate
+ # is specified.
def method_missing(sym, *args, &block)
if @channel
@channel.__send__(sym, *args, &block)
@@ -23,4 +46,5 @@ module Net; module SSH; module Multi
end
end
end
+
end; end; end \ No newline at end of file
diff --git a/lib/net/ssh/multi/pending_connection.rb b/lib/net/ssh/multi/pending_connection.rb
index fcebcba..6af0fac 100644
--- a/lib/net/ssh/multi/pending_connection.rb
+++ b/lib/net/ssh/multi/pending_connection.rb
@@ -1,7 +1,23 @@
require 'net/ssh/multi/channel_proxy'
module Net; module SSH; module Multi
+
+ # A PendingConnection instance mimics a Net::SSH::Connection::Session instance,
+ # without actually being an open connection to a server. It is used by
+ # Net::SSH::Multi::Session when a concurrent connection limit is in effect,
+ # so that a server can hang on to a "connection" that isn't really a connection.
+ #
+ # Any requests against this connection (like #open_channel or #send_global_request)
+ # are not actually sent, but are added to a list of recordings. When the real
+ # session is opened and replaces this pending connection, all recorded actions
+ # will be replayed against that session.
+ #
+ # You'll never need to initialize one of these directly, and (if all goes well!)
+ # should never even notice that one of these is in use. Net::SSH::Multi::Session
+ # will instantiate these as needed, and only when there is a concurrent
+ # connection limit.
class PendingConnection
+ # Represents a #open_channel action.
class ChannelOpenRecording #:nodoc:
attr_reader :type, :extras, :channel
@@ -15,6 +31,7 @@ module Net; module SSH; module Multi
end
end
+ # Represents a #send_global_request action.
class SendGlobalRequestRecording #:nodoc:
attr_reader :type, :extra, :callback
@@ -27,51 +44,69 @@ module Net; module SSH; module Multi
end
end
+ # The Net::SSH::Multi::Server object that "owns" this pending connection.
attr_reader :server
+ # Instantiates a new pending connection for the given Net::SSH::Multi::Server
+ # object.
def initialize(server)
@server = server
@recordings = []
end
+ # Instructs the pending session to replay all of its recordings against the
+ # given +session+, and to then replace itself with the given session.
def replace_with(session)
@recordings.each { |recording| recording.replay_on(session) }
@server.replace_session(session)
end
+ # Records that a channel open request has been made, and returns a new
+ # Net::SSH::Multi::ChannelProxy object to represent the (as yet unopened)
+ # channel.
def open_channel(type="session", *extras, &on_confirm)
channel = ChannelProxy.new(&on_confirm)
@recordings << ChannelOpenRecording.new(type, extras, channel)
return channel
end
+ # Records that a global request has been made. The request is not actually
+ # sent, and won't be until #replace_with is called.
def send_global_request(type, *extra, &callback)
@recordings << SendGlobalRequestRecording.new(type, extra, callback)
self
end
+ # Always returns +true+, so that the pending connection looks active until
+ # it can be truly opened and replaced with a real connection.
def busy?(include_invisible=false)
true
end
+ # Does nothing, except to make a pending connection quack like a real connection.
def close
self
end
+ # Returns an empty array, since a pending connection cannot have any real channels.
def channels
[]
end
+ # Returns +true+, and does nothing else.
def preprocess
true
end
+ # Returns +true+, and does nothing else.
def postprocess(readers, writers)
true
end
+ # Returns an empty hash, since a pending connection has no real listeners.
def listeners
{}
end
end
+
end; end; end \ No newline at end of file
diff --git a/lib/net/ssh/multi/server.rb b/lib/net/ssh/multi/server.rb
index 65967ab..999e788 100644
--- a/lib/net/ssh/multi/server.rb
+++ b/lib/net/ssh/multi/server.rb
@@ -6,6 +6,9 @@ module Net; module SSH; module Multi
# need to instantiate one of these directly: instead, you should use
# Net::SSH::Multi::Session#use.
class Server
+ # The Net::SSH::Multi::Session instance that manages this server instance.
+ attr_reader :master
+
# The host name (or IP address) of the server to connect to.
attr_reader :host
@@ -21,12 +24,14 @@ module Net; module SSH; module Multi
attr_reader :gateway
# Creates a new Server instance with the given connection information. The
- # +options+ hash must conform to the options described for Net::SSH::start,
- # with one addition:
+ # +master+ argument must be a reference to the Net::SSH::Multi::Session
+ # instance that will manage this server reference. The +options+ hash must
+ # conform to the options described for Net::SSH::start, with one addition:
#
# * :via => a Net::SSH::Gateway instance to use when establishing a
# connection to this server.
- def initialize(host, user, options={})
+ def initialize(master, host, user, options={})
+ @master = master
@host = host
@user = user
@options = options.dup
@@ -77,9 +82,9 @@ module Net; module SSH; module Multi
@inspect ||= "#<%s:0x%x %s>" % [self.class.name, object_id, to_s]
end
- # Returns the Net::SSH session object for this server. If +ensure_open+
+ # Returns the Net::SSH session object for this server. If +require_session+
# is false and the session has not previously been created, this will
- # return +nil+. If +ensure_open+ is true, the session will be instantiated
+ # return +nil+. If +require_session+ is true, the session will be instantiated
# if it has not already been instantiated, via the +gateway+ if one is
# given, or directly (via Net::SSH::start) otherwise.
#
@@ -94,9 +99,52 @@ module Net; module SSH; module Multi
# (the Server instance that spawned them).
#
# assert_equal server, server.session[:server]
- def session(ensure_open=false)
- return @session if @session || !ensure_open
- @session ||= begin
+ def session(require_session=false)
+ return @session if @session || !require_session
+ @session ||= master.next_session(self)
+ end
+
+ # Returns +true+ if the session has been opened, and the session is currently
+ # busy (as defined by Net::SSH::Connection::Session#busy?).
+ def busy?(include_invisible=false)
+ session && session.busy?(include_invisible)
+ end
+
+ # Closes this server's session. If the session has not yet been opened,
+ # this does nothing.
+ def close
+ session.close if session
+ ensure
+ master.server_closed(self) if session
+ @session = nil
+ end
+
+ public # but not published, e.g., these are used internally only...
+
+ # Indicate that the session currently in use by this server instance
+ # should be replaced by the given +session+ argument. This is used when
+ # a pending session has been "realized". Note that this does not
+ # actually replace the session--see #update_session! for that.
+ def replace_session(session) #:nodoc:
+ @ready_session = session
+ end
+
+ # If a new session has been made ready (see #replace_session), this
+ # will replace the current session with the "ready" session. This
+ # method is called from the event loop to ensure that sessions are
+ # swapped in at the appropriate point (instead of in the middle of an
+ # event poll).
+ def update_session! #:nodoc:
+ if @ready_session
+ @session, @ready_session = @ready_session, nil
+ end
+ end
+
+ # Returns a new session object based on this server's connection
+ # criteria. Note that this will not associate the session with the
+ # server, and should not be called directly; it is called internally
+ # from Net::SSH::Multi::Session when a new session is required.
+ def new_session #:nodoc:
session = if gateway
gateway.ssh(host, user, options)
else
@@ -105,31 +153,16 @@ module Net; module SSH; module Multi
session[:server] = self
session
+ rescue Net::SSH::AuthenticationFailed => error
+ raise Net::SSH::AuthenticationFailed.new("#{error.message}@#{host}")
end
- rescue Net::SSH::AuthenticationFailed => error
- raise Net::SSH::AuthenticationFailed.new("#{error.message}@#{host}")
- end
-
- # Returns +true+ if the session has been opened, and the session is currently
- # busy (as defined by Net::SSH::Connection::Session#busy?).
- def busy?(include_invisible=false)
- session && session.busy?(include_invisible)
- end
- public # but not published, e.g., these are used internally only...
-
# Closes all open channels on this server's session. If the session has
# not yet been opened, this does nothing.
def close_channels #:nodoc:
session.channels.each { |id, channel| channel.close } if session
end
- # Closes this server's session's transport layer. If the session has not
- # yet been opened, this does nothing.
- def close #:nodoc:
- session.transport.close if session
- end
-
# Runs the session's preprocess action, if the session has been opened.
def preprocess #:nodoc:
session.preprocess if session
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb
index cffe972..99819e1 100644
--- a/lib/net/ssh/multi/session.rb
+++ b/lib/net/ssh/multi/session.rb
@@ -1,6 +1,8 @@
+require 'thread'
require 'net/ssh/gateway'
require 'net/ssh/multi/server'
require 'net/ssh/multi/channel'
+require 'net/ssh/multi/pending_connection'
module Net; module SSH; module Multi
# Represents a collection of connections to various servers. It provides an
@@ -50,6 +52,13 @@ module Net; module SSH; module Multi
# corresponding Net::SSH::Multi::Server definitions.
attr_reader :groups
+ # The number of allowed concurrent connections. No more than this number
+ # of sessions will be open at any given time.
+ attr_accessor :concurrent_connections
+
+ # The number of connections that are currently open.
+ attr_reader :open_connections #:nodoc:
+
# The list of "open" groups, which will receive subsequent server definitions.
# See #use and #group.
attr_reader :open_groups #:nodoc:
@@ -61,12 +70,30 @@ module Net; module SSH; module Multi
# Creates a new Net::SSH::Multi::Session instance. Initially, it contains
# no server definitions, no group definitions, and no default gateway.
- def initialize
+ #
+ # You can set the #concurrent_connections property in the options. Setting
+ # it to +nil+ (the default) will cause Net::SSH::Multi to ignore any
+ # concurrent connection limit and allow all defined sessions to be open
+ # simultaneously. Setting it to an integer will cause Net::SSH::Multi to
+ # allow no more than that number of concurrently open sessions, opening
+ # subsequent sessions only when other sessions finish and close.
+ #
+ # Net::SSH::Multi.start(:concurrent_connections => 10) do |session|
+ # session.use ...
+ # end
+ def initialize(options={})
@servers = []
@groups = {}
@gateway = nil
@active_groups = {}
@open_groups = []
+ @connect_threads = []
+
+ @open_connections = 0
+ @pending_sessions = []
+ @session_mutex = Mutex.new
+
+ options.each { |opt, value| send("#{opt}=", value) }
end
# At its simplest, this associates a named group with a server definition.
@@ -144,7 +171,7 @@ module Net; module SSH; module Multi
# session.use 'host2', 'user2', :via => nil
# session.use 'host3', 'user3', :via => Net::SSH::Gateway.new('gateway.host', 'user')
def use(host, user, options={})
- server = Server.new(host, user, {:via => default_gateway}.merge(options))
+ server = Server.new(self, host, user, {:via => default_gateway}.merge(options))
exists = servers.index(server)
if exists
server = servers[exists]
@@ -313,6 +340,9 @@ module Net; module SSH; module Multi
# whether #process returns +true+ (the block did not return +false+), or
# +false+ (the block returned +false+).
def process(wait=nil, &block)
+ realize_pending_connections!
+ wait = @connect_threads.any? ? 0 : wait
+
return false unless preprocess(&block)
readers = servers.map { |s| s.readers }.flatten
@@ -320,7 +350,11 @@ module Net; module SSH; module Multi
readers, writers, = IO.select(readers, writers, nil, wait)
- return postprocess(readers, writers)
+ if readers
+ return postprocess(readers, writers)
+ else
+ return true
+ end
end
# Sends a global request to all active sessions (see #active_sessions).
@@ -365,8 +399,8 @@ module Net; module SSH; module Multi
def open_channel(type="session", *extra, &on_confirm)
channels = active_sessions.map do |ssh|
ssh.open_channel(type, *extra) do |c|
- c[:server] = ssh[:server]
- c[:host] = ssh[:server].host
+ c[:server] = c.connection[:server]
+ c[:host] = c.connection[:server].host
on_confirm[c] if on_confirm
end
end
@@ -448,5 +482,69 @@ module Net; module SSH; module Multi
servers.each { |server| server.postprocess(readers, writers) }
true
end
+
+ # Takes the #concurrent_connections property into account, and tries to
+ # return a new session for the given server. If the concurrent connections
+ # limit has been reached, then a Net::SSH::Multi::PendingConnection instance
+ # will be returned instead, which will be realized into an actual session
+ # as soon as a slot opens up.
+ #
+ # If +force+ is true, the concurrent_connections check is skipped and a real
+ # connection is always returned.
+ def next_session(server, force=false) #:nodoc:
+ @session_mutex.synchronize do
+ if !force && concurrent_connections && concurrent_connections <= open_connections
+ connection = PendingConnection.new(server)
+ @pending_sessions << connection
+ return connection
+ end
+
+ @open_connections += 1
+ end
+
+ begin
+ server.new_session
+ rescue Exception => e
+ @session_mutex.synchronize { @open_connections -= 1 }
+ raise
+ end
+ end
+
+ # Tells the session that the given server has closed its connection. The
+ # session indicates that a new connection slot is available, which may be
+ # filled by the next pending connection on the next event loop iteration.
+ def server_closed(server) #:nodoc:
+ @session_mutex.synchronize do
+ unless @pending_sessions.delete(server.session)
+ @open_connections -= 1
+ end
+ end
+ end
+
+ # Invoked by the event loop. If there is a concurrent_connections limit in
+ # effect, this will close any non-busy sessions and try to open as many
+ # new sessions as it can. It does this in threads, so that existing processing
+ # can continue.
+ #
+ # If there is no concurrent_connections limit in effect, then this method
+ # does nothing.
+ def realize_pending_connections! #:nodoc:
+ return unless concurrent_connections
+
+ servers.each do |s|
+ s.close if !s.busy?(true)
+ s.update_session!
+ end
+
+ @connect_threads.delete_if { |t| !t.alive? }
+
+ count = concurrent_connections ? (concurrent_connections - open_connections) : @pending_sessions.length
+ count.times do
+ session = @pending_sessions.pop or break
+ @connect_threads << Thread.new do
+ session.replace_with(next_session(session.server, true))
+ end
+ end
+ end
end
end; end; end \ No newline at end of file
diff --git a/test/server_test.rb b/test/server_test.rb
index 4c9d03c..caca334 100644
--- a/test/server_test.rb
+++ b/test/server_test.rb
@@ -2,6 +2,10 @@ require 'common'
require 'net/ssh/multi/server'
class ServerTest < Test::Unit::TestCase
+ def setup
+ @master = mock('multi-session')
+ end
+
def test_accessor_without_properties_should_access_empty_hash
assert_nil server('host', 'user')[:foo]
end
@@ -72,19 +76,8 @@ class ServerTest < Test::Unit::TestCase
end
def test_session_with_true_argument_should_instantiate_and_cache_session
- session = {}
srv = server('host', 'user', :port => 1234)
- Net::SSH.expects(:start).with('host', 'user', {:port => 1234}).once.returns(session)
- assert_equal session, srv.session(true)
- assert_equal session, srv.session(true)
- assert_equal session, srv.session
- end
-
- def test_session_via_gateway_with_true_argument_should_instantiate_and_cache_session
- session = {}
- gateway = mock('gateway')
- srv = server('host', 'user', :port => 1234, :via => gateway)
- gateway.expects(:ssh).with('host', 'user', {:port => 1234}).once.returns(session)
+ session = expect_connection_to(srv)
assert_equal session, srv.session(true)
assert_equal session, srv.session(true)
assert_equal session, srv.session
@@ -92,10 +85,10 @@ class ServerTest < Test::Unit::TestCase
def test_session_that_cannot_authenticate_adds_host_to_exception_message
srv = server('host', 'user')
- Net::SSH.expects(:start).raises(Net::SSH::AuthenticationFailed.new('user'))
+ Net::SSH.expects(:start).with('host', 'user', {}).raises(Net::SSH::AuthenticationFailed.new('user'))
begin
- srv.session(true)
+ srv.new_session
flunk
rescue Net::SSH::AuthenticationFailed => e
assert_equal "user@host", e.message
@@ -108,8 +101,7 @@ class ServerTest < Test::Unit::TestCase
def test_close_channels_when_session_is_open_should_iterate_over_open_channels_and_close_them
srv = server('host', 'user')
- session = {}
- Net::SSH.expects(:start).returns(session)
+ session = expect_connection_to(srv)
c1 = mock('channel', :close => nil)
c2 = mock('channel', :close => nil)
c3 = mock('channel', :close => nil)
@@ -122,11 +114,11 @@ class ServerTest < Test::Unit::TestCase
assert_nothing_raised { server('host', 'user').close }
end
- def test_close_when_session_is_open_should_close_transport_layer
+ def test_close_when_session_is_open_should_close_session
srv = server('host', 'user')
- session = {}
- Net::SSH.expects(:start).returns(session)
- session.expects(:transport).returns(mock('transport', :close => nil))
+ session = expect_connection_to(srv)
+ session.expects(:close)
+ @master.expects(:server_closed).with(srv)
assert_equal session, srv.session(true)
srv.close
end
@@ -137,8 +129,7 @@ class ServerTest < Test::Unit::TestCase
def test_busy_should_be_false_when_session_is_not_busy
srv = server('host', 'user')
- session = {}
- Net::SSH.expects(:start).returns(session)
+ session = expect_connection_to(srv)
session.expects(:busy?).returns(false)
srv.session(true)
assert !srv.busy?
@@ -146,8 +137,7 @@ class ServerTest < Test::Unit::TestCase
def test_busy_should_be_true_when_session_is_busy
srv = server('host', 'user')
- session = {}
- Net::SSH.expects(:start).returns(session)
+ session = expect_connection_to(srv)
session.expects(:busy?).returns(true)
srv.session(true)
assert srv.busy?
@@ -159,8 +149,7 @@ class ServerTest < Test::Unit::TestCase
def test_preprocess_should_return_result_of_session_preprocess
srv = server('host', 'user')
- session = {}
- Net::SSH.expects(:start).returns(session)
+ session = expect_connection_to(srv)
session.expects(:preprocess).returns(:result)
srv.session(true)
assert_equal :result, srv.preprocess
@@ -172,8 +161,7 @@ class ServerTest < Test::Unit::TestCase
def test_readers_should_return_all_listeners_when_session_is_open
srv = server('host', 'user')
- session = {}
- Net::SSH.expects(:start).returns(session)
+ session = expect_connection_to(srv)
session.expects(:listeners).returns(1 => 2, 3 => 4, 5 => 6, 7 => 8)
srv.session(true)
assert_equal [1, 3, 5, 7], srv.readers.sort
@@ -185,8 +173,7 @@ class ServerTest < Test::Unit::TestCase
def test_writers_should_return_all_listeners_that_are_pending_writes_when_session_is_open
srv = server('host', 'user')
- session = {}
- Net::SSH.expects(:start).returns(session)
+ session = expect_connection_to(srv)
listeners = { writer(:ready) => 1, writer(:reader) => 2,
writer(:reader) => 3, writer(:idle) => 4, writer(:ready) => 5 }
session.expects(:listeners).returns(listeners)
@@ -200,8 +187,7 @@ class ServerTest < Test::Unit::TestCase
def test_postprocess_should_call_session_postprocess_with_ios_belonging_to_session
srv = server('host', 'user')
- session = {}
- Net::SSH.expects(:start).returns(session)
+ session = expect_connection_to(srv)
session.expects(:listeners).returns(1 => 2, 3 => 4, 5 => 6, 7 => 8)
session.expects(:postprocess).with([1,3], [7]).returns(:result)
srv.session(true)
@@ -211,7 +197,13 @@ class ServerTest < Test::Unit::TestCase
private
def server(host, user, options={})
- Net::SSH::Multi::Server.new(host, user, options)
+ Net::SSH::Multi::Server.new(@master, host, user, options)
+ end
+
+ def expect_connection_to(server)
+ session = {}
+ @master.expects(:next_session).with(server).returns(session)
+ return session
end
def writer(mode)
diff --git a/test/session_test.rb b/test/session_test.rb
index 18afea9..8aebccc 100644
--- a/test/session_test.rb
+++ b/test/session_test.rb
@@ -251,8 +251,10 @@ class SessionTest < Test::Unit::TestCase
srv2 = @session.use('h2', 'u2')
s1 = { :server => srv1 }
s2 = { :server => srv2 }
- c1 = {}
+ c1 = { :stub => :value }
c2 = {}
+ c1.stubs(:connection).returns(s1)
+ c2.stubs(:connection).returns(s2)
@session.expects(:active_sessions).returns([s1, s2])
s1.expects(:open_channel).with("session").yields(c1).returns(c1)
s2.expects(:open_channel).with("session").yields(c2).returns(c2)