diff options
author | Miklós Fazekas <mfazekas@szemafor.com> | 2016-05-02 20:45:13 +0200 |
---|---|---|
committer | Miklós Fazekas <mfazekas@szemafor.com> | 2016-05-02 20:45:13 +0200 |
commit | 9ca28da6ee80354e86a606fe391a655b985b1676 (patch) | |
tree | 88f0b334157634f5f7a21f5437e3c4a86886660f | |
parent | d539101da49dcfb535c70a2f36ecfe49516b6a4a (diff) | |
parent | 1e47189698a7624a4f5986b69bc622f900ee664f (diff) | |
download | net-ssh-9ca28da6ee80354e86a606fe391a655b985b1676.tar.gz |
Merge pull request #309 from mfazekas/event-loop
EventLoop abstraction
-rw-r--r-- | lib/net/ssh/connection/event_loop.rb | 110 | ||||
-rw-r--r-- | lib/net/ssh/connection/keepalive.rb | 4 | ||||
-rw-r--r-- | lib/net/ssh/connection/session.rb | 56 | ||||
-rw-r--r-- | test/connection/test_session.rb | 5 | ||||
-rw-r--r-- | test/integration/test_proxy.rb | 2 |
5 files changed, 155 insertions, 22 deletions
diff --git a/lib/net/ssh/connection/event_loop.rb b/lib/net/ssh/connection/event_loop.rb new file mode 100644 index 0000000..5892bce --- /dev/null +++ b/lib/net/ssh/connection/event_loop.rb @@ -0,0 +1,110 @@ +require 'net/ssh/loggable' +require 'net/ssh/ruby_compat' + +module Net; module SSH; module Connection + # EventLoop can be shared across multiple sessions + # + # one issue is with blocks passed to loop, etc. + # they should get current session as parameter, but in + # case you're using multiple sessions in an event loop it doesnt makes sense + # and we don't pass session. + class EventLoop + include Loggable + + def initialize(logger=nil) + self.logger = logger + @sessions = [] + end + + def register(session) + @sessions << session + end + + # process until timeout + # if a block is given a session will be removed from loop + # if block returns false for that session + def process(wait = nil, &block) + return false unless ev_preprocess(&block) + + ev_select_and_postprocess(wait) + end + + # process the event loop but only for the sepcified session + def process_only(session, wait = nil) + orig_sessions = @sessions + begin + @sessions = [session] + return false unless ev_preprocess + ev_select_and_postprocess(wait) + ensure + @sessions = orig_sessions + end + end + + # Call preprocess on each session. If block given and that + # block retuns false then we exit the processing + def ev_preprocess(&block) + return false if block_given? && !yield(self) + @sessions.each(&:ev_preprocess) + return false if block_given? && !yield(self) + return true + end + + def ev_select_and_postprocess(wait) + owners = {} + r = [] + w = [] + minwait = nil + @sessions.each do |session| + sr,sw,actwait = session.ev_do_calculate_rw_wait(wait) + minwait = actwait if actwait && (minwait.nil? || actwait < minwait) + r.push(*sr) + w.push(*sw) + sr.each { |ri| owners[ri] = session } + sw.each { |wi| owners[wi] = session } + end + + readers, writers, = Net::SSH::Compat.io_select(r, w, nil, minwait) + + fired_sessions = {} + + readers.each do |reader| + session = owners[reader] + (fired_sessions[session] ||= {r: [],w: []})[:r] << reader + end if readers + writers.each do |writer| + session = owners[writer] + (fired_sessions[session] ||= {r: [],w: []})[:w] << writer + end if writers + + fired_sessions.each do |s,rw| + s.ev_do_handle_events(rw[:r],rw[:w]) + end + + @sessions.each { |s| s.ev_do_postprocess(fired_sessions.key?(s)) } + true + end + end + + # optimized version for a single session + class SingleSessionEventLoop < EventLoop + # Compatibility for original single session event loops: + # we call block with session as argument + def ev_preprocess(&block) + return false if block_given? && !yield(@sessions.first) + @sessions.each(&:ev_preprocess) + return false if block_given? && !yield(@sessions.first) + return true + end + + def ev_select_and_postprocess(wait) + raise "Only one session expected" unless @sessions.count == 1 + session = @sessions.first + sr,sw,actwait = session.ev_do_calculate_rw_wait(wait) + readers, writers, = Net::SSH::Compat.io_select(sr, sw, nil, actwait) + + session.ev_do_handle_events(readers,writers) + session.ev_do_postprocess(!((readers.nil? || readers.empty?) && (writers.nil? || writers.empty?))) + end + end +end; end; end diff --git a/lib/net/ssh/connection/keepalive.rb b/lib/net/ssh/connection/keepalive.rb index 242071d..641ae57 100644 --- a/lib/net/ssh/connection/keepalive.rb +++ b/lib/net/ssh/connection/keepalive.rb @@ -33,8 +33,8 @@ class Keepalive (options[:keepalive_maxcount] || 3).to_i end - def send_as_needed(readers, writers) - return unless readers.nil? && writers.nil? + def send_as_needed(was_events) + return if was_events return unless should_send? info { "sending keepalive #{@unresponded_keepalive_count}" } diff --git a/lib/net/ssh/connection/session.rb b/lib/net/ssh/connection/session.rb index d717070..4f1bf7c 100644 --- a/lib/net/ssh/connection/session.rb +++ b/lib/net/ssh/connection/session.rb @@ -4,6 +4,7 @@ require 'net/ssh/connection/channel' require 'net/ssh/connection/constants' require 'net/ssh/service/forward' require 'net/ssh/connection/keepalive' +require 'net/ssh/connection/event_loop' module Net; module SSH; module Connection @@ -81,6 +82,9 @@ module Net; module SSH; module Connection @max_win_size = (options.has_key?(:max_win_size) ? options[:max_win_size] : 0x20000) @keepalive = Keepalive.new(self) + + @event_loop = options[:event_loop] || SingleSessionEventLoop.new + @event_loop.register(self) end # Retrieves a custom property from this instance. This can be used to @@ -186,6 +190,8 @@ module Net; module SSH; module Connection # This will also cause all active channels to be processed once each (see # Net::SSH::Connection::Channel#on_process). # + # TODO revise example + # # # process multiple Net::SSH connections in parallel # connections = [ # Net::SSH.start("host1", ...), @@ -203,13 +209,7 @@ module Net; module SSH; module Connection # break if connections.empty? # end def process(wait=nil, &block) - return false unless preprocess(&block) - - r = listeners.keys - w = r.select { |w2| w2.respond_to?(:pending_write?) && w2.pending_write? } - readers, writers, = Net::SSH::Compat.io_select(r, w, nil, io_select_wait(wait)) - - postprocess(readers, writers) + @event_loop.process(wait, &block) rescue force_channel_cleanup_on_close if closed? raise @@ -220,19 +220,38 @@ module Net; module SSH; module Connection # for any active channels. If a block is given, it is invoked at the # start of the method and again at the end, and if the block ever returns # false, this method returns false. Otherwise, it returns true. - def preprocess + def preprocess(&block) return false if block_given? && !yield(self) - dispatch_incoming_packets - each_channel { |id, channel| channel.process unless channel.local_closed? } + ev_preprocess(&block) return false if block_given? && !yield(self) return true end - # This is called internally as part of #process. It loops over the given - # arrays of reader IO's and writer IO's, processing them as needed, and + # Called by event loop to process available data before going to + # event multiplexing + def ev_preprocess(&block) + dispatch_incoming_packets + each_channel { |id, channel| channel.process unless channel.local_closed? } + end + + # Returns the file descriptors the event loop should wait for read/write events, + # we also return the max wait + def ev_do_calculate_rw_wait(wait) + r = listeners.keys + w = r.select { |w2| w2.respond_to?(:pending_write?) && w2.pending_write? } + [r,w,io_select_wait(wait)] + end + + # This is called internally as part of #process. + def postprocess(readers, writers) + ev_do_handle_events(readers, writers) + end + + # It loops over the given arrays of reader IO's and writer IO's, + # processing them as needed, and # then calls Net::SSH::Transport::Session#rekey_as_needed to allow the # transport layer to rekey. Then returns true. - def postprocess(readers, writers) + def ev_do_handle_events(readers, writers) Array(readers).each do |reader| if listeners[reader] listeners[reader].call(reader) @@ -247,11 +266,14 @@ module Net; module SSH; module Connection Array(writers).each do |writer| writer.send_pending end + end - @keepalive.send_as_needed(readers, writers) + # calls Net::SSH::Transport::Session#rekey_as_needed to allow the + # transport layer to rekey + def ev_do_postprocess(was_events) + @keepalive.send_as_needed(was_events) transport.rekey_as_needed - - return true + true end # Send a global request of the given type. The +extra+ parameters must @@ -333,7 +355,7 @@ module Net; module SSH; module Connection open_channel do |channel| channel.exec(command) do |ch, success| raise "could not execute command: #{command.inspect}" unless success - + channel.on_data do |ch2, data| if block block.call(ch2, :stdout, data) diff --git a/test/connection/test_session.rb b/test/connection/test_session.rb index 2a8a772..32b795e 100644 --- a/test/connection/test_session.rb +++ b/test/connection/test_session.rb @@ -1,4 +1,4 @@ -require 'common' +require_relative '../common' require 'net/ssh/connection/session' module Connection @@ -413,7 +413,8 @@ module Connection def test_process_should_not_call_enqueue_message_unless_io_select_timed_out timeout = Net::SSH::Connection::Session::DEFAULT_IO_SELECT_TIMEOUT options = { :keepalive => true } - IO.stubs(:select).with([socket],[],nil,timeout).returns([[],[],[]]) + IO.stubs(:select).with([socket],[],nil,timeout).returns([[socket],[],[]]) + socket.stubs(:recv).returns("x") transport.expects(:enqueue_message).never session(options).process end diff --git a/test/integration/test_proxy.rb b/test/integration/test_proxy.rb index 3286f34..8c54388 100644 --- a/test/integration/test_proxy.rb +++ b/test/integration/test_proxy.rb @@ -90,4 +90,4 @@ class TestProxy < NetSSHTest end end -end
\ No newline at end of file +end |