summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMiklós Fazekas <mfazekas@szemafor.com>2016-05-02 20:45:13 +0200
committerMiklós Fazekas <mfazekas@szemafor.com>2016-05-02 20:45:13 +0200
commit9ca28da6ee80354e86a606fe391a655b985b1676 (patch)
tree88f0b334157634f5f7a21f5437e3c4a86886660f
parentd539101da49dcfb535c70a2f36ecfe49516b6a4a (diff)
parent1e47189698a7624a4f5986b69bc622f900ee664f (diff)
downloadnet-ssh-9ca28da6ee80354e86a606fe391a655b985b1676.tar.gz
Merge pull request #309 from mfazekas/event-loop
EventLoop abstraction
-rw-r--r--lib/net/ssh/connection/event_loop.rb110
-rw-r--r--lib/net/ssh/connection/keepalive.rb4
-rw-r--r--lib/net/ssh/connection/session.rb56
-rw-r--r--test/connection/test_session.rb5
-rw-r--r--test/integration/test_proxy.rb2
5 files changed, 155 insertions, 22 deletions
diff --git a/lib/net/ssh/connection/event_loop.rb b/lib/net/ssh/connection/event_loop.rb
new file mode 100644
index 0000000..5892bce
--- /dev/null
+++ b/lib/net/ssh/connection/event_loop.rb
@@ -0,0 +1,110 @@
+require 'net/ssh/loggable'
+require 'net/ssh/ruby_compat'
+
+module Net; module SSH; module Connection
+ # EventLoop can be shared across multiple sessions
+ #
+ # one issue is with blocks passed to loop, etc.
+ # they should get current session as parameter, but in
+ # case you're using multiple sessions in an event loop it doesnt makes sense
+ # and we don't pass session.
+ class EventLoop
+ include Loggable
+
+ def initialize(logger=nil)
+ self.logger = logger
+ @sessions = []
+ end
+
+ def register(session)
+ @sessions << session
+ end
+
+ # process until timeout
+ # if a block is given a session will be removed from loop
+ # if block returns false for that session
+ def process(wait = nil, &block)
+ return false unless ev_preprocess(&block)
+
+ ev_select_and_postprocess(wait)
+ end
+
+ # process the event loop but only for the sepcified session
+ def process_only(session, wait = nil)
+ orig_sessions = @sessions
+ begin
+ @sessions = [session]
+ return false unless ev_preprocess
+ ev_select_and_postprocess(wait)
+ ensure
+ @sessions = orig_sessions
+ end
+ end
+
+ # Call preprocess on each session. If block given and that
+ # block retuns false then we exit the processing
+ def ev_preprocess(&block)
+ return false if block_given? && !yield(self)
+ @sessions.each(&:ev_preprocess)
+ return false if block_given? && !yield(self)
+ return true
+ end
+
+ def ev_select_and_postprocess(wait)
+ owners = {}
+ r = []
+ w = []
+ minwait = nil
+ @sessions.each do |session|
+ sr,sw,actwait = session.ev_do_calculate_rw_wait(wait)
+ minwait = actwait if actwait && (minwait.nil? || actwait < minwait)
+ r.push(*sr)
+ w.push(*sw)
+ sr.each { |ri| owners[ri] = session }
+ sw.each { |wi| owners[wi] = session }
+ end
+
+ readers, writers, = Net::SSH::Compat.io_select(r, w, nil, minwait)
+
+ fired_sessions = {}
+
+ readers.each do |reader|
+ session = owners[reader]
+ (fired_sessions[session] ||= {r: [],w: []})[:r] << reader
+ end if readers
+ writers.each do |writer|
+ session = owners[writer]
+ (fired_sessions[session] ||= {r: [],w: []})[:w] << writer
+ end if writers
+
+ fired_sessions.each do |s,rw|
+ s.ev_do_handle_events(rw[:r],rw[:w])
+ end
+
+ @sessions.each { |s| s.ev_do_postprocess(fired_sessions.key?(s)) }
+ true
+ end
+ end
+
+ # optimized version for a single session
+ class SingleSessionEventLoop < EventLoop
+ # Compatibility for original single session event loops:
+ # we call block with session as argument
+ def ev_preprocess(&block)
+ return false if block_given? && !yield(@sessions.first)
+ @sessions.each(&:ev_preprocess)
+ return false if block_given? && !yield(@sessions.first)
+ return true
+ end
+
+ def ev_select_and_postprocess(wait)
+ raise "Only one session expected" unless @sessions.count == 1
+ session = @sessions.first
+ sr,sw,actwait = session.ev_do_calculate_rw_wait(wait)
+ readers, writers, = Net::SSH::Compat.io_select(sr, sw, nil, actwait)
+
+ session.ev_do_handle_events(readers,writers)
+ session.ev_do_postprocess(!((readers.nil? || readers.empty?) && (writers.nil? || writers.empty?)))
+ end
+ end
+end; end; end
diff --git a/lib/net/ssh/connection/keepalive.rb b/lib/net/ssh/connection/keepalive.rb
index 242071d..641ae57 100644
--- a/lib/net/ssh/connection/keepalive.rb
+++ b/lib/net/ssh/connection/keepalive.rb
@@ -33,8 +33,8 @@ class Keepalive
(options[:keepalive_maxcount] || 3).to_i
end
- def send_as_needed(readers, writers)
- return unless readers.nil? && writers.nil?
+ def send_as_needed(was_events)
+ return if was_events
return unless should_send?
info { "sending keepalive #{@unresponded_keepalive_count}" }
diff --git a/lib/net/ssh/connection/session.rb b/lib/net/ssh/connection/session.rb
index d717070..4f1bf7c 100644
--- a/lib/net/ssh/connection/session.rb
+++ b/lib/net/ssh/connection/session.rb
@@ -4,6 +4,7 @@ require 'net/ssh/connection/channel'
require 'net/ssh/connection/constants'
require 'net/ssh/service/forward'
require 'net/ssh/connection/keepalive'
+require 'net/ssh/connection/event_loop'
module Net; module SSH; module Connection
@@ -81,6 +82,9 @@ module Net; module SSH; module Connection
@max_win_size = (options.has_key?(:max_win_size) ? options[:max_win_size] : 0x20000)
@keepalive = Keepalive.new(self)
+
+ @event_loop = options[:event_loop] || SingleSessionEventLoop.new
+ @event_loop.register(self)
end
# Retrieves a custom property from this instance. This can be used to
@@ -186,6 +190,8 @@ module Net; module SSH; module Connection
# This will also cause all active channels to be processed once each (see
# Net::SSH::Connection::Channel#on_process).
#
+ # TODO revise example
+ #
# # process multiple Net::SSH connections in parallel
# connections = [
# Net::SSH.start("host1", ...),
@@ -203,13 +209,7 @@ module Net; module SSH; module Connection
# break if connections.empty?
# end
def process(wait=nil, &block)
- return false unless preprocess(&block)
-
- r = listeners.keys
- w = r.select { |w2| w2.respond_to?(:pending_write?) && w2.pending_write? }
- readers, writers, = Net::SSH::Compat.io_select(r, w, nil, io_select_wait(wait))
-
- postprocess(readers, writers)
+ @event_loop.process(wait, &block)
rescue
force_channel_cleanup_on_close if closed?
raise
@@ -220,19 +220,38 @@ module Net; module SSH; module Connection
# for any active channels. If a block is given, it is invoked at the
# start of the method and again at the end, and if the block ever returns
# false, this method returns false. Otherwise, it returns true.
- def preprocess
+ def preprocess(&block)
return false if block_given? && !yield(self)
- dispatch_incoming_packets
- each_channel { |id, channel| channel.process unless channel.local_closed? }
+ ev_preprocess(&block)
return false if block_given? && !yield(self)
return true
end
- # This is called internally as part of #process. It loops over the given
- # arrays of reader IO's and writer IO's, processing them as needed, and
+ # Called by event loop to process available data before going to
+ # event multiplexing
+ def ev_preprocess(&block)
+ dispatch_incoming_packets
+ each_channel { |id, channel| channel.process unless channel.local_closed? }
+ end
+
+ # Returns the file descriptors the event loop should wait for read/write events,
+ # we also return the max wait
+ def ev_do_calculate_rw_wait(wait)
+ r = listeners.keys
+ w = r.select { |w2| w2.respond_to?(:pending_write?) && w2.pending_write? }
+ [r,w,io_select_wait(wait)]
+ end
+
+ # This is called internally as part of #process.
+ def postprocess(readers, writers)
+ ev_do_handle_events(readers, writers)
+ end
+
+ # It loops over the given arrays of reader IO's and writer IO's,
+ # processing them as needed, and
# then calls Net::SSH::Transport::Session#rekey_as_needed to allow the
# transport layer to rekey. Then returns true.
- def postprocess(readers, writers)
+ def ev_do_handle_events(readers, writers)
Array(readers).each do |reader|
if listeners[reader]
listeners[reader].call(reader)
@@ -247,11 +266,14 @@ module Net; module SSH; module Connection
Array(writers).each do |writer|
writer.send_pending
end
+ end
- @keepalive.send_as_needed(readers, writers)
+ # calls Net::SSH::Transport::Session#rekey_as_needed to allow the
+ # transport layer to rekey
+ def ev_do_postprocess(was_events)
+ @keepalive.send_as_needed(was_events)
transport.rekey_as_needed
-
- return true
+ true
end
# Send a global request of the given type. The +extra+ parameters must
@@ -333,7 +355,7 @@ module Net; module SSH; module Connection
open_channel do |channel|
channel.exec(command) do |ch, success|
raise "could not execute command: #{command.inspect}" unless success
-
+
channel.on_data do |ch2, data|
if block
block.call(ch2, :stdout, data)
diff --git a/test/connection/test_session.rb b/test/connection/test_session.rb
index 2a8a772..32b795e 100644
--- a/test/connection/test_session.rb
+++ b/test/connection/test_session.rb
@@ -1,4 +1,4 @@
-require 'common'
+require_relative '../common'
require 'net/ssh/connection/session'
module Connection
@@ -413,7 +413,8 @@ module Connection
def test_process_should_not_call_enqueue_message_unless_io_select_timed_out
timeout = Net::SSH::Connection::Session::DEFAULT_IO_SELECT_TIMEOUT
options = { :keepalive => true }
- IO.stubs(:select).with([socket],[],nil,timeout).returns([[],[],[]])
+ IO.stubs(:select).with([socket],[],nil,timeout).returns([[socket],[],[]])
+ socket.stubs(:recv).returns("x")
transport.expects(:enqueue_message).never
session(options).process
end
diff --git a/test/integration/test_proxy.rb b/test/integration/test_proxy.rb
index 3286f34..8c54388 100644
--- a/test/integration/test_proxy.rb
+++ b/test/integration/test_proxy.rb
@@ -90,4 +90,4 @@ class TestProxy < NetSSHTest
end
end
-end \ No newline at end of file
+end