diff options
Diffstat (limited to 'qpid/ruby/lib/qpid/session.rb')
-rw-r--r-- | qpid/ruby/lib/qpid/session.rb | 458 |
1 files changed, 458 insertions, 0 deletions
diff --git a/qpid/ruby/lib/qpid/session.rb b/qpid/ruby/lib/qpid/session.rb new file mode 100644 index 0000000000..d693b722c2 --- /dev/null +++ b/qpid/ruby/lib/qpid/session.rb @@ -0,0 +1,458 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'monitor' + +module Qpid + + class Session < Invoker + + def log; Qpid::logger["qpid.io.cmd"]; end + def msg; Qpid::logger["qpid.io.msg"]; end + + + class Exception < RuntimeError; end + class Closed < Qpid::Session::Exception; end + class Detached < Qpid::Session::Exception; end + + + INCOMPLETE = Object.new + + def self.client(*args) + return Qpid::Client(*args) + end + + def self.server(*args) + return Server(*args) + end + + attr_reader :name, :spec, :auto_sync, :timeout, :channel + attr_reader :results, :exceptions + attr_accessor :channel, :auto_sync, :send_id, :receiver, :sender + + # FIXME: Pass delegate through a block ? + def initialize(name, spec, kwargs = {}) + auto_sync = true + auto_sync = kwargs[:auto_sync] if kwargs.key?(:auto_sync) + timeout = kwargs[:timeout] || 10 + delegate = kwargs[:delegate] + + @name = name + @spec = spec + @auto_sync = auto_sync + @timeout = timeout + @invoke_lock = Monitor.new + @closing = false + @closed = false + + @cond_lock = Monitor.new + @condition = @cond_lock.new_cond + + @send_id = true + @receiver = Receiver.new(self) + @sender = Sender.new(self) + + @lock = Monitor.new + @incoming = {} + @results = {} + @exceptions = [] + + @assembly = nil + + @delegate = delegate.call(self) if delegate + + @ctl_seg = spec[:segment_type].enum[:control].value + @cmd_seg = spec[:segment_type].enum[:command].value + @hdr_seg = spec[:segment_type].enum[:header].value + @body_seg = spec[:segment_type].enum[:body].value + end + + def incoming(destination) + @lock.synchronize do + queue = @incoming[destination] + unless queue + queue = Incoming.new(self, destination) + @incoming[destination] = queue + end + return queue + end + end + + def error? + @exceptions.size > 0 + end + + def sync(timeout=nil) + if channel && Thread.current == channel.connection.thread + raise Qpid::Session::Exception, "deadlock detected" + end + unless @auto_sync + execution_sync(:sync => true) + end + last = @sender.next_id - 1 + @cond_lock.synchronize do + unless @condition.wait_for(timeout) { + @sender.completed.include?(last) || error? + } + raise Qpid::Timeout + end + end + if error? + raise Qpid::Session::Exception, @exceptions + end + end + + def close(timeout=nil) + @invoke_lock.synchronize do + @closing = true + channel.session_detach(name) + end + @cond_lock.synchronize do + unless @condition.wait_for(timeout) { @closed } + raise Qpid::Timeout + end + end + end + + def closed + @lock.synchronize do + return if @closed + + @results.each { |id, f| f.error(exceptions) } + @results.clear + + @incoming.values.each { |q| q.close(exceptions) } + @closed = true + @cond_lock.synchronize { @condition.signal } + end + end + + def resolve_method(name) + o = @spec.children[name] + case o + when Qpid::Spec010::Command + return invocation(:method, o) + when Qpid::Spec010::Struct + return invocation(:method, o) + when Qpid::Spec010::Domain + return invocation(:value, o.enum) unless o.enum.nil? + end + + matches = @spec.children.select { |x| + x.name.to_s.include?(name.to_s) + }.collect { |x| x.name.to_s }.sort + if matches.size == 0 + msg = nil + elsif matches.size == 1 + msg = "Did you mean #{matches[0]} ? " + else + msg = "Did you mean one of #{matches.join(",")} ? " + end + return invocation(:error, msg) + end + + def invoke(type, args) + # XXX + unless type.respond_to?(:track) + return type.create(*args) + end + @invoke_lock.synchronize do + return do_invoke(type, args) + end + end + + def do_invoke(type, args) + raise Qpid::Session::Closed if @closing + raise Qpid::Session::Detached unless channel + + # Clumsy simulation of Python's keyword args + kwargs = {} + if args.size > 0 && args[-1].is_a?(Hash) + if args.size > type.fields.size + kwargs = args.pop + elsif type.fields[args.size - 1].type != @spec[:map] + kwargs = args.pop + end + end + + if type.payload + if args.size == type.fields.size + 1 + message = args.pop + else + message = kwargs.delete(:message) # XXX Really ? + end + else + message = nil + end + + hdr = Qpid::struct(@spec[:header]) + hdr.sync = @auto_sync || kwargs.delete(:sync) + + cmd = type.create(*args.push(kwargs)) + sc = Qpid::StringCodec.new(@spec) + sc.write_command(hdr, cmd) + + seg = Segment.new(true, (message.nil? || + (message.headers.nil? && message.body.nil?)), + type.segment_type, type.track, @channel.id, sc.encoded) + + unless type.result.nil? + result = Future.new(exception=Exception) + @results[@sender.next_id] = result + end + emit(seg) + + log.debug("SENT %s %s %s" % [seg.id, hdr, cmd]) if log + + unless message.nil? + unless message.headers.nil? + sc = Qpid::StringCodec.new(@spec) + message.headers.each { |st| sc.write_struct32(st) } + + seg = Segment.new(false, message.body.nil?, @hdr_seg, + type.track, @channel.id, sc.encoded) + emit(seg) + end + unless message.body.nil? + seg = Segment.new(false, true, @body_seg, type.track, + @channel.id, message.body) + emit(seg) + end + msg.debug("SENT %s" % message) if msg + end + + if !type.result.nil? + return @auto_sync ? result.get(@timeout) : result + elsif @auto_sync + sync(@timeout) + end + end + + def received(seg) + @receiver.received(seg) + if seg.first_segment? + raise Qpid::Session::Exception unless @assembly.nil? + @assembly = [] + end + @assembly << seg + if seg.last_segment? + dispatch(@assembly) + @assembly = nil + end + end + + def dispatch(assembly) + hdr = nil + cmd = nil + header = nil + body = nil + assembly.each do |seg| + d = seg.decode(@spec) + case seg.type + when @cmd_seg + hdr, cmd = d + when @hdr_seg + header = d + when @body_seg + body = d + else + raise Qpid::Session::Exception + end + end + log.debug("RECV %s %s %s" % [cmd.id, hdr, cmd]) if log + + if cmd.st_type.payload + result = @delegate.send(cmd.st_type.name, cmd, header, body) + else + result = @delegate.send(cmd.st_type.name, cmd) + end + + unless cmd.st_type.result.nil? + execution_result(cmd.id, result) + end + + if result != INCOMPLETE + assembly.each do |seg| + @receiver.has_completed(seg) + # XXX: don't forget to obey sync for manual completion as well + if hdr.sync + @channel.session_completed(@receiver.completed) + end + end + end + end + + # Python calls this 'send', but that has a special meaning + # in Ruby, so we call it 'emit' + def emit(seg) + @sender.emit(seg) + end + + def signal + @cond_lock.synchronize { @condition.signal } + end + + def wait_for(timeout = nil, &block) + @cond_lock.synchronize { @condition.wait_for(timeout, &block) } + end + + def to_s + "<Session: #{name}, #{channel}>" + end + + class Receiver + + attr_reader :completed + attr_accessor :next_id, :next_offset + + def initialize(session) + @session = session + @next_id = nil + @next_offset = nil + @completed = Qpid::RangedSet.new() + end + + def received(seg) + if @next_id.nil? || @next_offset.nil? + raise Exception, "todo" + end + seg.id = @next_id + seg.offset = @next_offset + if seg.last_segment? + @next_id += 1 + @next_offset = 0 + else + @next_offset += seg.payload.size + end + end + + def has_completed(seg) + if seg.id.nil? + raise ArgumentError, "cannot complete unidentified segment" + end + if seg.last_segment? + @completed.add(seg.id) + end + end + + def known_completed(commands) + completed = Qpid::RangedSet.new() + @completed.ranges.each do |c| + unless commands.ranges.find { |kc| + kc.contains(c.lower) && kc.contains(c.upper) + } + completed.add_range(c) + end + end + @completed = completed + end + end + + class Sender + + def initialize(session) + @session = session + @next_id = 0.to_serial + @next_offset = 0 + @segments = [] + @completed = RangedSet.new() + end + + attr_reader :next_id, :completed + + def emit(seg) + seg.id = @next_id + seg.offset = @next_offset + if seg.last_segment? + @next_id += 1 + @next_offset = 0 + else + @next_offset += seg.payload.size + end + @segments << seg + if @session.send_id + @session.send_id = false + @session.channel.session_command_point(seg.id, seg.offset) + end + @session.channel.connection.write_segment(seg) + end + + def has_completed(commands) + @segments = @segments.reject { |seg| commands.include?(seg.id) } + commands.ranges.each do |range| + @completed.add(range.lower, range.upper) + end + end + end + + class Incoming < Qpid::Queue + + def initialize(session, destination) + super() + @session = session + @destination = destination + end + + def start + @session.message_credit_unit.choices.each do |unit| + @session.message_flow(@destination, unit.value, 0xFFFFFFFF) + end + end + + def stop + @session.message_cancel(@destination) + listen # Kill the listener + end + end + + class Delegate + + def initialize(session) + @session = session + end + + #XXX: do something with incoming accepts + def message_accept(ma) nil; end + + def execution_result(er) + future = @session.results.delete(er.command_id) + future.set(er.value) + end + + def execution_exception(ex) + @session.exceptions << ex + end + end + + class Client < Delegate + + def log ; Qpid::logger["qpid.io.msg"]; end + + def message_transfer(cmd, headers, body) + m = Qpid::Message.new(body) + m.headers = headers + m.id = cmd.id + messages = @session.incoming(cmd.destination) + messages.put(m) + log.debug("RECV %s" % m) if log + return INCOMPLETE + end + end + end +end |