summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJamis Buck <jamis@37signals.com>2008-03-27 22:06:41 -0600
committerJamis Buck <jamis@37signals.com>2008-03-27 22:06:41 -0600
commit7a9dd0a0aa61fb0d7cdab472d2fabab2b8c91df1 (patch)
tree0b6828422ceb9ae2829138878d61dfa364f1bc48
downloadnet-ssh-multi-7a9dd0a0aa61fb0d7cdab472d2fabab2b8c91df1.tar.gz
initial commit of multi-session implementation
-rw-r--r--lib/net/ssh/multi.rb20
-rw-r--r--lib/net/ssh/multi/channel.rb96
-rw-r--r--lib/net/ssh/multi/session.rb190
3 files changed, 306 insertions, 0 deletions
diff --git a/lib/net/ssh/multi.rb b/lib/net/ssh/multi.rb
new file mode 100644
index 0000000..84a87b0
--- /dev/null
+++ b/lib/net/ssh/multi.rb
@@ -0,0 +1,20 @@
+require 'net/ssh/multi/session'
+
+module Net; module SSH
+ module Multi
+ def self.start
+ session = Session.new
+
+ if block_given?
+ begin
+ yield session
+ session.loop
+ ensure
+ session.close
+ end
+ else
+ return session
+ end
+ end
+ end
+end; end \ No newline at end of file
diff --git a/lib/net/ssh/multi/channel.rb b/lib/net/ssh/multi/channel.rb
new file mode 100644
index 0000000..c82ded1
--- /dev/null
+++ b/lib/net/ssh/multi/channel.rb
@@ -0,0 +1,96 @@
+module Net; module SSH; module Multi
+ class Channel
+ include Enumerable
+
+ attr_reader :connection
+ attr_reader :channels
+ attr_reader :properties
+
+ def initialize(connection, channels)
+ @connection = connection
+ @channels = channels
+ @properties = {}
+ end
+
+ def each
+ @channels.each { |channel| yield channel }
+ end
+
+ def [](key)
+ @properties[key.to_sym]
+ end
+
+ def []=(key, value)
+ @properties[key.to_sym] = value
+ end
+
+ def exec(command, &block)
+ channels.each { |channel| channel.exec(command, &block) }
+ self
+ end
+
+ def subsystem(subsystem, &block)
+ channels.each { |channel| channel.subsystem(subsystem, &block) }
+ self
+ end
+
+ def request_pty(opts={}, &block)
+ channels.each { |channel| channel.request_pty(opts, &block) }
+ self
+ end
+
+ def send_data(data)
+ channels.each { |channel| channel.send_data(data) }
+ self
+ end
+
+ def active?
+ channels.any? { |channel| channel.active? }
+ end
+
+ def wait
+ connection.loop { active? }
+ self
+ end
+
+ def close
+ channels.each { |channel| channel.close }
+ self
+ end
+
+ def eof!
+ channels.each { |channel| channel.eof! }
+ self
+ end
+
+ def on_data(&block)
+ channels.each { |channel| channel.on_data(&block) }
+ self
+ end
+
+ def on_extended_data(&block)
+ channels.each { |channel| channel.on_extended_data(&block) }
+ self
+ end
+
+ def on_process(&block)
+ channels.each { |channel| channel.on_process(&block) }
+ self
+ end
+
+ def on_close(&block)
+ channels.each { |channel| channel.on_close(&block) }
+ self
+ end
+
+ def on_eof(&block)
+ channels.each { |channel| channel.on_eof(&block) }
+ self
+ end
+
+ def on_request(type, &block)
+ channels.each { |channel| channel.on_request(type, &block) }
+ self
+ end
+ end
+end; end; end \ No newline at end of file
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb
new file mode 100644
index 0000000..0a5c73e
--- /dev/null
+++ b/lib/net/ssh/multi/session.rb
@@ -0,0 +1,190 @@
+require 'thread'
+require 'net/ssh'
+require 'net/ssh/gateway'
+require 'net/ssh/multi/channel'
+
+module Net; module SSH; module Multi
+ class Session
+ attr_reader :connections
+ attr_reader :gateway
+
+ class Collector
+ attr_reader :specifications
+
+ def initialize
+ @specifications = []
+ end
+
+ def to(host, user, options={})
+ @specifications << [host, user, options]
+ self
+ end
+ end
+
+ def initialize
+ @connections = []
+ @gateway = nil
+ @mutex = Mutex.new
+ end
+
+ def via(*args)
+ if connection_specification?(args)
+ @gateway = Net::SSH::Gateway.new(*args)
+ elsif args.length == 1
+ @gateway = args.first
+ else
+ raise ArgumentError, "expected either a connection specification or a Net::SSH::Gateway instance"
+ end
+ self
+ end
+
+ def use(*list)
+ @connections += list.each { |c| c[:host] = c.host }
+ self
+ end
+
+ def connect(*args)
+ if connection_specification?(args)
+ establish_connection(*args)
+ elsif args.any?
+ raise ArgumentError, "expected either a connection specification or a block"
+ end
+
+ if block_given?
+ collector = Collector.new
+ yield collector
+
+ mutex = Mutex.new
+ threads = collector.specifications.map do |host, user, options|
+ Thread.new { establish_connection(host, user, options) }
+ end
+
+ threads.each { |t| t.join }
+ end
+
+ self
+ end
+
+ def close
+ connections.each { |connection| connection.channels.each { |id, channel| channel.close } }
+ loop(0) { busy?(true) }
+ connections.each { |connection| connection.transport.close }
+ gateway.shutdown! if gateway
+ end
+
+ def busy?(include_invisible=false)
+ connections.any? { |connection| connection.busy?(include_invisible) }
+ end
+
+ alias :loop_forever :loop
+
+ def loop(wait=nil, &block)
+ running = block || Proc.new { |c| busy? }
+ loop_forever { break unless process(wait, &running) }
+ end
+
+ def process(wait=nil, &block)
+ connections.each { |c| return false unless c.preprocess(&block) }
+
+ writers_by_connection, readers_by_connection = {}, {}
+
+ writers = connections.map do |c|
+ c.listeners.keys.select do |w|
+ writers_by_connection[c] ||= []
+ writers_by_connection[c] << w
+ w.respond_to?(:pending_write?) && w.pending_write?
+ end
+ end.flatten
+
+ readers = connections.map { |c| readers_by_connection[c] = c.listeners.keys }.flatten
+
+ readers, writers = IO.select(readers, writers, nil, wait)
+
+ connections.each do |c|
+ readers_for_this = readers_by_connection[c] & (readers || [])
+ writers_for_this = writers_by_connection[c] & (writers || [])
+ return false unless c.postprocess(readers_for_this, writers_for_this)
+ end
+
+ return true
+ end
+
+ def send_global_request(type, *extra, &callback)
+ connections.each { |connection| connection.send_global_request(type, *extra, &callback) }
+ self
+ end
+
+ def open_channel(type="session", *extra, &on_confirm)
+ channels = connections.map do |connection|
+ channel = connection.open_channel(type, *extra, &on_confirm)
+ channel[:host] = connection[:host]
+ channel
+ end
+ Multi::Channel.new(self, channels)
+ end
+
+ def exec(command, &block)
+ open_channel do |channel|
+ channel.exec(command) do |ch, success|
+ raise "could not execute command: #{command.inspect} (#{ch[:host]})" unless success
+
+ channel.on_data do |ch, data|
+ if block
+ block.call(ch, :stdout, data)
+ else
+ data.chomp.each_line do |line|
+ $stdout.puts("[#{ch[:host]}] #{line}")
+ end
+ end
+ end
+
+ channel.on_extended_data do |ch, type, data|
+ if block
+ block.call(ch, :stderr, data)
+ else
+ data.chomp.each_line do |line|
+ $stderr.puts("[#{ch[:host]}] #{line}")
+ end
+ end
+ end
+
+ channel.on_request("exit-status") do |ch, data|
+ ch[:exit_status] = data.read_long
+ end
+ end
+ end
+ end
+
+ def exec!(command, &block)
+ block ||= Proc.new do |ch, type, data|
+ ch[:result] ||= {}
+ ch[:result][ch.connection[:host]] ||= ""
+ ch[:result][ch.connection[:host]] << data
+ end
+
+ channel = exec(command, &block)
+ channel.wait
+
+ return channel[:result]
+ end
+
+ def send_message(message)
+ connections.each { |connection| connection.send_message(message) }
+ self
+ end
+
+ private
+
+ def connection_specification?(args)
+ args.length == 2 || (args.length == 3 && args.last.is_a?(Hash))
+ end
+
+ def establish_connection(host, user, options={})
+ connection = gateway ? gateway.ssh(host, user, options) :
+ Net::SSH.start(host, user, options)
+ connection[:host] = host
+ @mutex.synchronize { @connections.push(connection) }
+ return connection
+ end
+ end
+end; end; end \ No newline at end of file