summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJamis Buck <jamis@37signals.com>2008-04-06 08:13:18 -0600
committerJamis Buck <jamis@37signals.com>2008-04-06 08:13:18 -0600
commit97e3f9e56a6e8d7ad0027228a497d7f144402ee4 (patch)
tree117071f8c4cca485aa1ed06672ca0fc0e5de289b /lib
parent6214d1ec6796ffab41b9413840e733a1d5cf70dd (diff)
downloadnet-ssh-multi-97e3f9e56a6e8d7ad0027228a497d7f144402ee4.tar.gz
connection limiting
Diffstat (limited to 'lib')
-rw-r--r--lib/net/ssh/multi.rb6
-rw-r--r--lib/net/ssh/multi/channel_proxy.rb24
-rw-r--r--lib/net/ssh/multi/pending_connection.rb35
-rw-r--r--lib/net/ssh/multi/server.rb83
-rw-r--r--lib/net/ssh/multi/session.rb108
5 files changed, 224 insertions, 32 deletions
diff --git a/lib/net/ssh/multi.rb b/lib/net/ssh/multi.rb
index fe60802..ae24f99 100644
--- a/lib/net/ssh/multi.rb
+++ b/lib/net/ssh/multi.rb
@@ -52,8 +52,10 @@ module Net; module SSH
# session = Net::SSH::Multi.start
# # ...
# session.close
- def self.start
- session = Session.new
+ #
+ # Any options are passed directly to Net::SSH::Multi::Session.new (q.v.).
+ def self.start(options={})
+ session = Session.new(options)
if block_given?
begin
diff --git a/lib/net/ssh/multi/channel_proxy.rb b/lib/net/ssh/multi/channel_proxy.rb
index 99f7986..df5c04c 100644
--- a/lib/net/ssh/multi/channel_proxy.rb
+++ b/lib/net/ssh/multi/channel_proxy.rb
@@ -1,13 +1,32 @@
module Net; module SSH; module Multi
+
+ # The ChannelProxy is a delegate class that represents a channel that has
+ # not yet been opened. It is only used when Net::SSH::Multi is running with
+ # with a concurrent connections limit (see Net::SSH::Multi::Session#concurrent_connections).
+ #
+ # You'll never need to instantiate one of these directly, and will probably
+ # (if all goes well!) never even notice when one of these is in use. Essentially,
+ # it is spawned by a Net::SSH::Multi::PendingConnection when the pending
+ # connection is asked to open a channel. Any actions performed on the
+ # channel proxy will then be recorded, until a real channel is set as the
+ # delegate (see #delegate_to). At that point, all recorded actions will be
+ # replayed on the channel, and any subsequent actions will be immediately
+ # delegated to the channel.
class ChannelProxy
+ # This is the "on confirm" callback that gets called when the real channel
+ # is opened.
attr_reader :on_confirm
+ # Instantiates a new channel proxy with the given +on_confirm+ callback.
def initialize(&on_confirm)
@on_confirm = on_confirm
@recordings = []
@channel = nil
end
+ # Instructs the proxy to delegate all further actions to the given +channel+
+ # (which must be an instance of Net::SSH::Connection::Channel). All recorded
+ # actions are immediately replayed, in order, against the delegate channel.
def delegate_to(channel)
@channel = channel
@recordings.each do |sym, args, block|
@@ -15,6 +34,10 @@ module Net; module SSH; module Multi
end
end
+ # If a channel delegate has been specified (see #delegate_to), the method
+ # will be immediately sent to the delegate. Otherwise, the call is added
+ # to the list of recorded method calls, to be played back when a delegate
+ # is specified.
def method_missing(sym, *args, &block)
if @channel
@channel.__send__(sym, *args, &block)
@@ -23,4 +46,5 @@ module Net; module SSH; module Multi
end
end
end
+
end; end; end \ No newline at end of file
diff --git a/lib/net/ssh/multi/pending_connection.rb b/lib/net/ssh/multi/pending_connection.rb
index fcebcba..6af0fac 100644
--- a/lib/net/ssh/multi/pending_connection.rb
+++ b/lib/net/ssh/multi/pending_connection.rb
@@ -1,7 +1,23 @@
require 'net/ssh/multi/channel_proxy'
module Net; module SSH; module Multi
+
+ # A PendingConnection instance mimics a Net::SSH::Connection::Session instance,
+ # without actually being an open connection to a server. It is used by
+ # Net::SSH::Multi::Session when a concurrent connection limit is in effect,
+ # so that a server can hang on to a "connection" that isn't really a connection.
+ #
+ # Any requests against this connection (like #open_channel or #send_global_request)
+ # are not actually sent, but are added to a list of recordings. When the real
+ # session is opened and replaces this pending connection, all recorded actions
+ # will be replayed against that session.
+ #
+ # You'll never need to initialize one of these directly, and (if all goes well!)
+ # should never even notice that one of these is in use. Net::SSH::Multi::Session
+ # will instantiate these as needed, and only when there is a concurrent
+ # connection limit.
class PendingConnection
+ # Represents a #open_channel action.
class ChannelOpenRecording #:nodoc:
attr_reader :type, :extras, :channel
@@ -15,6 +31,7 @@ module Net; module SSH; module Multi
end
end
+ # Represents a #send_global_request action.
class SendGlobalRequestRecording #:nodoc:
attr_reader :type, :extra, :callback
@@ -27,51 +44,69 @@ module Net; module SSH; module Multi
end
end
+ # The Net::SSH::Multi::Server object that "owns" this pending connection.
attr_reader :server
+ # Instantiates a new pending connection for the given Net::SSH::Multi::Server
+ # object.
def initialize(server)
@server = server
@recordings = []
end
+ # Instructs the pending session to replay all of its recordings against the
+ # given +session+, and to then replace itself with the given session.
def replace_with(session)
@recordings.each { |recording| recording.replay_on(session) }
@server.replace_session(session)
end
+ # Records that a channel open request has been made, and returns a new
+ # Net::SSH::Multi::ChannelProxy object to represent the (as yet unopened)
+ # channel.
def open_channel(type="session", *extras, &on_confirm)
channel = ChannelProxy.new(&on_confirm)
@recordings << ChannelOpenRecording.new(type, extras, channel)
return channel
end
+ # Records that a global request has been made. The request is not actually
+ # sent, and won't be until #replace_with is called.
def send_global_request(type, *extra, &callback)
@recordings << SendGlobalRequestRecording.new(type, extra, callback)
self
end
+ # Always returns +true+, so that the pending connection looks active until
+ # it can be truly opened and replaced with a real connection.
def busy?(include_invisible=false)
true
end
+ # Does nothing, except to make a pending connection quack like a real connection.
def close
self
end
+ # Returns an empty array, since a pending connection cannot have any real channels.
def channels
[]
end
+ # Returns +true+, and does nothing else.
def preprocess
true
end
+ # Returns +true+, and does nothing else.
def postprocess(readers, writers)
true
end
+ # Returns an empty hash, since a pending connection has no real listeners.
def listeners
{}
end
end
+
end; end; end \ No newline at end of file
diff --git a/lib/net/ssh/multi/server.rb b/lib/net/ssh/multi/server.rb
index 65967ab..999e788 100644
--- a/lib/net/ssh/multi/server.rb
+++ b/lib/net/ssh/multi/server.rb
@@ -6,6 +6,9 @@ module Net; module SSH; module Multi
# need to instantiate one of these directly: instead, you should use
# Net::SSH::Multi::Session#use.
class Server
+ # The Net::SSH::Multi::Session instance that manages this server instance.
+ attr_reader :master
+
# The host name (or IP address) of the server to connect to.
attr_reader :host
@@ -21,12 +24,14 @@ module Net; module SSH; module Multi
attr_reader :gateway
# Creates a new Server instance with the given connection information. The
- # +options+ hash must conform to the options described for Net::SSH::start,
- # with one addition:
+ # +master+ argument must be a reference to the Net::SSH::Multi::Session
+ # instance that will manage this server reference. The +options+ hash must
+ # conform to the options described for Net::SSH::start, with one addition:
#
# * :via => a Net::SSH::Gateway instance to use when establishing a
# connection to this server.
- def initialize(host, user, options={})
+ def initialize(master, host, user, options={})
+ @master = master
@host = host
@user = user
@options = options.dup
@@ -77,9 +82,9 @@ module Net; module SSH; module Multi
@inspect ||= "#<%s:0x%x %s>" % [self.class.name, object_id, to_s]
end
- # Returns the Net::SSH session object for this server. If +ensure_open+
+ # Returns the Net::SSH session object for this server. If +require_session+
# is false and the session has not previously been created, this will
- # return +nil+. If +ensure_open+ is true, the session will be instantiated
+ # return +nil+. If +require_session+ is true, the session will be instantiated
# if it has not already been instantiated, via the +gateway+ if one is
# given, or directly (via Net::SSH::start) otherwise.
#
@@ -94,9 +99,52 @@ module Net; module SSH; module Multi
# (the Server instance that spawned them).
#
# assert_equal server, server.session[:server]
- def session(ensure_open=false)
- return @session if @session || !ensure_open
- @session ||= begin
+ def session(require_session=false)
+ return @session if @session || !require_session
+ @session ||= master.next_session(self)
+ end
+
+ # Returns +true+ if the session has been opened, and the session is currently
+ # busy (as defined by Net::SSH::Connection::Session#busy?).
+ def busy?(include_invisible=false)
+ session && session.busy?(include_invisible)
+ end
+
+ # Closes this server's session. If the session has not yet been opened,
+ # this does nothing.
+ def close
+ session.close if session
+ ensure
+ master.server_closed(self) if session
+ @session = nil
+ end
+
+ public # but not published, e.g., these are used internally only...
+
+ # Indicate that the session currently in use by this server instance
+ # should be replaced by the given +session+ argument. This is used when
+ # a pending session has been "realized". Note that this does not
+ # actually replace the session--see #update_session! for that.
+ def replace_session(session) #:nodoc:
+ @ready_session = session
+ end
+
+ # If a new session has been made ready (see #replace_session), this
+ # will replace the current session with the "ready" session. This
+ # method is called from the event loop to ensure that sessions are
+ # swapped in at the appropriate point (instead of in the middle of an
+ # event poll).
+ def update_session! #:nodoc:
+ if @ready_session
+ @session, @ready_session = @ready_session, nil
+ end
+ end
+
+ # Returns a new session object based on this server's connection
+ # criteria. Note that this will not associate the session with the
+ # server, and should not be called directly; it is called internally
+ # from Net::SSH::Multi::Session when a new session is required.
+ def new_session #:nodoc:
session = if gateway
gateway.ssh(host, user, options)
else
@@ -105,31 +153,16 @@ module Net; module SSH; module Multi
session[:server] = self
session
+ rescue Net::SSH::AuthenticationFailed => error
+ raise Net::SSH::AuthenticationFailed.new("#{error.message}@#{host}")
end
- rescue Net::SSH::AuthenticationFailed => error
- raise Net::SSH::AuthenticationFailed.new("#{error.message}@#{host}")
- end
-
- # Returns +true+ if the session has been opened, and the session is currently
- # busy (as defined by Net::SSH::Connection::Session#busy?).
- def busy?(include_invisible=false)
- session && session.busy?(include_invisible)
- end
- public # but not published, e.g., these are used internally only...
-
# Closes all open channels on this server's session. If the session has
# not yet been opened, this does nothing.
def close_channels #:nodoc:
session.channels.each { |id, channel| channel.close } if session
end
- # Closes this server's session's transport layer. If the session has not
- # yet been opened, this does nothing.
- def close #:nodoc:
- session.transport.close if session
- end
-
# Runs the session's preprocess action, if the session has been opened.
def preprocess #:nodoc:
session.preprocess if session
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb
index cffe972..99819e1 100644
--- a/lib/net/ssh/multi/session.rb
+++ b/lib/net/ssh/multi/session.rb
@@ -1,6 +1,8 @@
+require 'thread'
require 'net/ssh/gateway'
require 'net/ssh/multi/server'
require 'net/ssh/multi/channel'
+require 'net/ssh/multi/pending_connection'
module Net; module SSH; module Multi
# Represents a collection of connections to various servers. It provides an
@@ -50,6 +52,13 @@ module Net; module SSH; module Multi
# corresponding Net::SSH::Multi::Server definitions.
attr_reader :groups
+ # The number of allowed concurrent connections. No more than this number
+ # of sessions will be open at any given time.
+ attr_accessor :concurrent_connections
+
+ # The number of connections that are currently open.
+ attr_reader :open_connections #:nodoc:
+
# The list of "open" groups, which will receive subsequent server definitions.
# See #use and #group.
attr_reader :open_groups #:nodoc:
@@ -61,12 +70,30 @@ module Net; module SSH; module Multi
# Creates a new Net::SSH::Multi::Session instance. Initially, it contains
# no server definitions, no group definitions, and no default gateway.
- def initialize
+ #
+ # You can set the #concurrent_connections property in the options. Setting
+ # it to +nil+ (the default) will cause Net::SSH::Multi to ignore any
+ # concurrent connection limit and allow all defined sessions to be open
+ # simultaneously. Setting it to an integer will cause Net::SSH::Multi to
+ # allow no more than that number of concurrently open sessions, opening
+ # subsequent sessions only when other sessions finish and close.
+ #
+ # Net::SSH::Multi.start(:concurrent_connections => 10) do |session|
+ # session.use ...
+ # end
+ def initialize(options={})
@servers = []
@groups = {}
@gateway = nil
@active_groups = {}
@open_groups = []
+ @connect_threads = []
+
+ @open_connections = 0
+ @pending_sessions = []
+ @session_mutex = Mutex.new
+
+ options.each { |opt, value| send("#{opt}=", value) }
end
# At its simplest, this associates a named group with a server definition.
@@ -144,7 +171,7 @@ module Net; module SSH; module Multi
# session.use 'host2', 'user2', :via => nil
# session.use 'host3', 'user3', :via => Net::SSH::Gateway.new('gateway.host', 'user')
def use(host, user, options={})
- server = Server.new(host, user, {:via => default_gateway}.merge(options))
+ server = Server.new(self, host, user, {:via => default_gateway}.merge(options))
exists = servers.index(server)
if exists
server = servers[exists]
@@ -313,6 +340,9 @@ module Net; module SSH; module Multi
# whether #process returns +true+ (the block did not return +false+), or
# +false+ (the block returned +false+).
def process(wait=nil, &block)
+ realize_pending_connections!
+ wait = @connect_threads.any? ? 0 : wait
+
return false unless preprocess(&block)
readers = servers.map { |s| s.readers }.flatten
@@ -320,7 +350,11 @@ module Net; module SSH; module Multi
readers, writers, = IO.select(readers, writers, nil, wait)
- return postprocess(readers, writers)
+ if readers
+ return postprocess(readers, writers)
+ else
+ return true
+ end
end
# Sends a global request to all active sessions (see #active_sessions).
@@ -365,8 +399,8 @@ module Net; module SSH; module Multi
def open_channel(type="session", *extra, &on_confirm)
channels = active_sessions.map do |ssh|
ssh.open_channel(type, *extra) do |c|
- c[:server] = ssh[:server]
- c[:host] = ssh[:server].host
+ c[:server] = c.connection[:server]
+ c[:host] = c.connection[:server].host
on_confirm[c] if on_confirm
end
end
@@ -448,5 +482,69 @@ module Net; module SSH; module Multi
servers.each { |server| server.postprocess(readers, writers) }
true
end
+
+ # Takes the #concurrent_connections property into account, and tries to
+ # return a new session for the given server. If the concurrent connections
+ # limit has been reached, then a Net::SSH::Multi::PendingConnection instance
+ # will be returned instead, which will be realized into an actual session
+ # as soon as a slot opens up.
+ #
+ # If +force+ is true, the concurrent_connections check is skipped and a real
+ # connection is always returned.
+ def next_session(server, force=false) #:nodoc:
+ @session_mutex.synchronize do
+ if !force && concurrent_connections && concurrent_connections <= open_connections
+ connection = PendingConnection.new(server)
+ @pending_sessions << connection
+ return connection
+ end
+
+ @open_connections += 1
+ end
+
+ begin
+ server.new_session
+ rescue Exception => e
+ @session_mutex.synchronize { @open_connections -= 1 }
+ raise
+ end
+ end
+
+ # Tells the session that the given server has closed its connection. The
+ # session indicates that a new connection slot is available, which may be
+ # filled by the next pending connection on the next event loop iteration.
+ def server_closed(server) #:nodoc:
+ @session_mutex.synchronize do
+ unless @pending_sessions.delete(server.session)
+ @open_connections -= 1
+ end
+ end
+ end
+
+ # Invoked by the event loop. If there is a concurrent_connections limit in
+ # effect, this will close any non-busy sessions and try to open as many
+ # new sessions as it can. It does this in threads, so that existing processing
+ # can continue.
+ #
+ # If there is no concurrent_connections limit in effect, then this method
+ # does nothing.
+ def realize_pending_connections! #:nodoc:
+ return unless concurrent_connections
+
+ servers.each do |s|
+ s.close if !s.busy?(true)
+ s.update_session!
+ end
+
+ @connect_threads.delete_if { |t| !t.alive? }
+
+ count = concurrent_connections ? (concurrent_connections - open_connections) : @pending_sessions.length
+ count.times do
+ session = @pending_sessions.pop or break
+ @connect_threads << Thread.new do
+ session.replace_with(next_session(session.server, true))
+ end
+ end
+ end
end
end; end; end \ No newline at end of file