summaryrefslogtreecommitdiff
path: root/lib/net/ssh/multi/session.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/net/ssh/multi/session.rb')
-rw-r--r--lib/net/ssh/multi/session.rb108
1 files changed, 103 insertions, 5 deletions
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb
index cffe972..99819e1 100644
--- a/lib/net/ssh/multi/session.rb
+++ b/lib/net/ssh/multi/session.rb
@@ -1,6 +1,8 @@
+require 'thread'
require 'net/ssh/gateway'
require 'net/ssh/multi/server'
require 'net/ssh/multi/channel'
+require 'net/ssh/multi/pending_connection'
module Net; module SSH; module Multi
# Represents a collection of connections to various servers. It provides an
@@ -50,6 +52,13 @@ module Net; module SSH; module Multi
# corresponding Net::SSH::Multi::Server definitions.
attr_reader :groups
+ # The number of allowed concurrent connections. No more than this number
+ # of sessions will be open at any given time.
+ attr_accessor :concurrent_connections
+
+ # The number of connections that are currently open.
+ attr_reader :open_connections #:nodoc:
+
# The list of "open" groups, which will receive subsequent server definitions.
# See #use and #group.
attr_reader :open_groups #:nodoc:
@@ -61,12 +70,30 @@ module Net; module SSH; module Multi
# Creates a new Net::SSH::Multi::Session instance. Initially, it contains
# no server definitions, no group definitions, and no default gateway.
- def initialize
+ #
+ # You can set the #concurrent_connections property in the options. Setting
+ # it to +nil+ (the default) will cause Net::SSH::Multi to ignore any
+ # concurrent connection limit and allow all defined sessions to be open
+ # simultaneously. Setting it to an integer will cause Net::SSH::Multi to
+ # allow no more than that number of concurrently open sessions, opening
+ # subsequent sessions only when other sessions finish and close.
+ #
+ # Net::SSH::Multi.start(:concurrent_connections => 10) do |session|
+ # session.use ...
+ # end
+ def initialize(options={})
@servers = []
@groups = {}
@gateway = nil
@active_groups = {}
@open_groups = []
+ @connect_threads = []
+
+ @open_connections = 0
+ @pending_sessions = []
+ @session_mutex = Mutex.new
+
+ options.each { |opt, value| send("#{opt}=", value) }
end
# At its simplest, this associates a named group with a server definition.
@@ -144,7 +171,7 @@ module Net; module SSH; module Multi
# session.use 'host2', 'user2', :via => nil
# session.use 'host3', 'user3', :via => Net::SSH::Gateway.new('gateway.host', 'user')
def use(host, user, options={})
- server = Server.new(host, user, {:via => default_gateway}.merge(options))
+ server = Server.new(self, host, user, {:via => default_gateway}.merge(options))
exists = servers.index(server)
if exists
server = servers[exists]
@@ -313,6 +340,9 @@ module Net; module SSH; module Multi
# whether #process returns +true+ (the block did not return +false+), or
# +false+ (the block returned +false+).
def process(wait=nil, &block)
+ realize_pending_connections!
+ wait = @connect_threads.any? ? 0 : wait
+
return false unless preprocess(&block)
readers = servers.map { |s| s.readers }.flatten
@@ -320,7 +350,11 @@ module Net; module SSH; module Multi
readers, writers, = IO.select(readers, writers, nil, wait)
- return postprocess(readers, writers)
+ if readers
+ return postprocess(readers, writers)
+ else
+ return true
+ end
end
# Sends a global request to all active sessions (see #active_sessions).
@@ -365,8 +399,8 @@ module Net; module SSH; module Multi
def open_channel(type="session", *extra, &on_confirm)
channels = active_sessions.map do |ssh|
ssh.open_channel(type, *extra) do |c|
- c[:server] = ssh[:server]
- c[:host] = ssh[:server].host
+ c[:server] = c.connection[:server]
+ c[:host] = c.connection[:server].host
on_confirm[c] if on_confirm
end
end
@@ -448,5 +482,69 @@ module Net; module SSH; module Multi
servers.each { |server| server.postprocess(readers, writers) }
true
end
+
+ # Takes the #concurrent_connections property into account, and tries to
+ # return a new session for the given server. If the concurrent connections
+ # limit has been reached, then a Net::SSH::Multi::PendingConnection instance
+ # will be returned instead, which will be realized into an actual session
+ # as soon as a slot opens up.
+ #
+ # If +force+ is true, the concurrent_connections check is skipped and a real
+ # connection is always returned.
+ def next_session(server, force=false) #:nodoc:
+ @session_mutex.synchronize do
+ if !force && concurrent_connections && concurrent_connections <= open_connections
+ connection = PendingConnection.new(server)
+ @pending_sessions << connection
+ return connection
+ end
+
+ @open_connections += 1
+ end
+
+ begin
+ server.new_session
+ rescue Exception => e
+ @session_mutex.synchronize { @open_connections -= 1 }
+ raise
+ end
+ end
+
+ # Tells the session that the given server has closed its connection. The
+ # session indicates that a new connection slot is available, which may be
+ # filled by the next pending connection on the next event loop iteration.
+ def server_closed(server) #:nodoc:
+ @session_mutex.synchronize do
+ unless @pending_sessions.delete(server.session)
+ @open_connections -= 1
+ end
+ end
+ end
+
+ # Invoked by the event loop. If there is a concurrent_connections limit in
+ # effect, this will close any non-busy sessions and try to open as many
+ # new sessions as it can. It does this in threads, so that existing processing
+ # can continue.
+ #
+ # If there is no concurrent_connections limit in effect, then this method
+ # does nothing.
+ def realize_pending_connections! #:nodoc:
+ return unless concurrent_connections
+
+ servers.each do |s|
+ s.close if !s.busy?(true)
+ s.update_session!
+ end
+
+ @connect_threads.delete_if { |t| !t.alive? }
+
+ count = concurrent_connections ? (concurrent_connections - open_connections) : @pending_sessions.length
+ count.times do
+ session = @pending_sessions.pop or break
+ @connect_threads << Thread.new do
+ session.replace_with(next_session(session.server, true))
+ end
+ end
+ end
end
end; end; end \ No newline at end of file