From 7a9dd0a0aa61fb0d7cdab472d2fabab2b8c91df1 Mon Sep 17 00:00:00 2001 From: Jamis Buck Date: Thu, 27 Mar 2008 22:06:41 -0600 Subject: initial commit of multi-session implementation --- lib/net/ssh/multi.rb | 20 +++++ lib/net/ssh/multi/channel.rb | 96 ++++++++++++++++++++++ lib/net/ssh/multi/session.rb | 190 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 306 insertions(+) create mode 100644 lib/net/ssh/multi.rb create mode 100644 lib/net/ssh/multi/channel.rb create mode 100644 lib/net/ssh/multi/session.rb diff --git a/lib/net/ssh/multi.rb b/lib/net/ssh/multi.rb new file mode 100644 index 0000000..84a87b0 --- /dev/null +++ b/lib/net/ssh/multi.rb @@ -0,0 +1,20 @@ +require 'net/ssh/multi/session' + +module Net; module SSH + module Multi + def self.start + session = Session.new + + if block_given? + begin + yield session + session.loop + ensure + session.close + end + else + return session + end + end + end +end; end \ No newline at end of file diff --git a/lib/net/ssh/multi/channel.rb b/lib/net/ssh/multi/channel.rb new file mode 100644 index 0000000..c82ded1 --- /dev/null +++ b/lib/net/ssh/multi/channel.rb @@ -0,0 +1,96 @@ +module Net; module SSH; module Multi + class Channel + include Enumerable + + attr_reader :connection + attr_reader :channels + attr_reader :properties + + def initialize(connection, channels) + @connection = connection + @channels = channels + @properties = {} + end + + def each + @channels.each { |channel| yield channel } + end + + def [](key) + @properties[key.to_sym] + end + + def []=(key, value) + @properties[key.to_sym] = value + end + + def exec(command, &block) + channels.each { |channel| channel.exec(command, &block) } + self + end + + def subsystem(subsystem, &block) + channels.each { |channel| channel.subsystem(subsystem, &block) } + self + end + + def request_pty(opts={}, &block) + channels.each { |channel| channel.request_pty(opts, &block) } + self + end + + def send_data(data) + channels.each { |channel| channel.send_data(data) } + self + end + + def active? + channels.any? { |channel| channel.active? } + end + + def wait + connection.loop { active? } + self + end + + def close + channels.each { |channel| channel.close } + self + end + + def eof! + channels.each { |channel| channel.eof! } + self + end + + def on_data(&block) + channels.each { |channel| channel.on_data(&block) } + self + end + + def on_extended_data(&block) + channels.each { |channel| channel.on_extended_data(&block) } + self + end + + def on_process(&block) + channels.each { |channel| channel.on_process(&block) } + self + end + + def on_close(&block) + channels.each { |channel| channel.on_close(&block) } + self + end + + def on_eof(&block) + channels.each { |channel| channel.on_eof(&block) } + self + end + + def on_request(type, &block) + channels.each { |channel| channel.on_request(type, &block) } + self + end + end +end; end; end \ No newline at end of file diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb new file mode 100644 index 0000000..0a5c73e --- /dev/null +++ b/lib/net/ssh/multi/session.rb @@ -0,0 +1,190 @@ +require 'thread' +require 'net/ssh' +require 'net/ssh/gateway' +require 'net/ssh/multi/channel' + +module Net; module SSH; module Multi + class Session + attr_reader :connections + attr_reader :gateway + + class Collector + attr_reader :specifications + + def initialize + @specifications = [] + end + + def to(host, user, options={}) + @specifications << [host, user, options] + self + end + end + + def initialize + @connections = [] + @gateway = nil + @mutex = Mutex.new + end + + def via(*args) + if connection_specification?(args) + @gateway = Net::SSH::Gateway.new(*args) + elsif args.length == 1 + @gateway = args.first + else + raise ArgumentError, "expected either a connection specification or a Net::SSH::Gateway instance" + end + self + end + + def use(*list) + @connections += list.each { |c| c[:host] = c.host } + self + end + + def connect(*args) + if connection_specification?(args) + establish_connection(*args) + elsif args.any? + raise ArgumentError, "expected either a connection specification or a block" + end + + if block_given? + collector = Collector.new + yield collector + + mutex = Mutex.new + threads = collector.specifications.map do |host, user, options| + Thread.new { establish_connection(host, user, options) } + end + + threads.each { |t| t.join } + end + + self + end + + def close + connections.each { |connection| connection.channels.each { |id, channel| channel.close } } + loop(0) { busy?(true) } + connections.each { |connection| connection.transport.close } + gateway.shutdown! if gateway + end + + def busy?(include_invisible=false) + connections.any? { |connection| connection.busy?(include_invisible) } + end + + alias :loop_forever :loop + + def loop(wait=nil, &block) + running = block || Proc.new { |c| busy? } + loop_forever { break unless process(wait, &running) } + end + + def process(wait=nil, &block) + connections.each { |c| return false unless c.preprocess(&block) } + + writers_by_connection, readers_by_connection = {}, {} + + writers = connections.map do |c| + c.listeners.keys.select do |w| + writers_by_connection[c] ||= [] + writers_by_connection[c] << w + w.respond_to?(:pending_write?) && w.pending_write? + end + end.flatten + + readers = connections.map { |c| readers_by_connection[c] = c.listeners.keys }.flatten + + readers, writers = IO.select(readers, writers, nil, wait) + + connections.each do |c| + readers_for_this = readers_by_connection[c] & (readers || []) + writers_for_this = writers_by_connection[c] & (writers || []) + return false unless c.postprocess(readers_for_this, writers_for_this) + end + + return true + end + + def send_global_request(type, *extra, &callback) + connections.each { |connection| connection.send_global_request(type, *extra, &callback) } + self + end + + def open_channel(type="session", *extra, &on_confirm) + channels = connections.map do |connection| + channel = connection.open_channel(type, *extra, &on_confirm) + channel[:host] = connection[:host] + channel + end + Multi::Channel.new(self, channels) + end + + def exec(command, &block) + open_channel do |channel| + channel.exec(command) do |ch, success| + raise "could not execute command: #{command.inspect} (#{ch[:host]})" unless success + + channel.on_data do |ch, data| + if block + block.call(ch, :stdout, data) + else + data.chomp.each_line do |line| + $stdout.puts("[#{ch[:host]}] #{line}") + end + end + end + + channel.on_extended_data do |ch, type, data| + if block + block.call(ch, :stderr, data) + else + data.chomp.each_line do |line| + $stderr.puts("[#{ch[:host]}] #{line}") + end + end + end + + channel.on_request("exit-status") do |ch, data| + ch[:exit_status] = data.read_long + end + end + end + end + + def exec!(command, &block) + block ||= Proc.new do |ch, type, data| + ch[:result] ||= {} + ch[:result][ch.connection[:host]] ||= "" + ch[:result][ch.connection[:host]] << data + end + + channel = exec(command, &block) + channel.wait + + return channel[:result] + end + + def send_message(message) + connections.each { |connection| connection.send_message(message) } + self + end + + private + + def connection_specification?(args) + args.length == 2 || (args.length == 3 && args.last.is_a?(Hash)) + end + + def establish_connection(host, user, options={}) + connection = gateway ? gateway.ssh(host, user, options) : + Net::SSH.start(host, user, options) + connection[:host] = host + @mutex.synchronize { @connections.push(connection) } + return connection + end + end +end; end; end \ No newline at end of file -- cgit v1.2.1