diff options
Diffstat (limited to 'qpid/ruby/lib/qpid/connection.rb')
-rw-r--r-- | qpid/ruby/lib/qpid/connection.rb | 222 |
1 files changed, 222 insertions, 0 deletions
diff --git a/qpid/ruby/lib/qpid/connection.rb b/qpid/ruby/lib/qpid/connection.rb new file mode 100644 index 0000000000..d2efbfb263 --- /dev/null +++ b/qpid/ruby/lib/qpid/connection.rb @@ -0,0 +1,222 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'monitor' + +module Qpid + + class ChannelBusy< Exception ; end + + class ChannelsBusy < Exception ; end + + class SessionBusy < Exception ; end + + class ConnectionFailed < Exception ; end + + class Timeout < Exception ; end + + class Connection < Assembler + + include MonitorMixin + + attr_reader :spec, :attached, :sessions, :thread + attr_accessor :opened, :failed, :close_code, :user_id + + def initialize(sock, args={}) + super(sock) + + delegate = args[:delegate] || Qpid::Delegate::Client.method(:new) + spec = args[:spec] || nil + + @spec = Qpid::Spec010::load(spec) + @track = @spec["track"] + + @attached = {} + @sessions = {} + + @condition = new_cond + @opened = false + @failed = false + @close_code = [nil, "connection aborted"] + + @thread = nil + + @channel_max = 65535 + @user_id = nil + + @delegate = delegate.call(self, args) + end + + def attach(name, ch, delegate, force=false) + synchronize do + ssn = @attached[ch.id] + if ssn + raise ChannelBusy.new(ch, ssn) unless ssn.name == name + else + ssn = @sessions[name] + if ssn.nil? + ssn = Session.new(name, @spec, :delegate => delegate) + @sessions[name] = ssn + elsif ssn.channel + if force + @attached.delete(ssn.channel.id) + ssn.channel = nil + else + raise SessionBusy.new(ssn) + end + end + @attached[ch.id] = ssn + ssn.channel = ch + end + ch.session = ssn + return ssn + end + end + + def detach(name, ch) + synchronize do + @attached.delete(ch.id) + ssn = @sessions.delete(name) + if ssn + ssn.channel = nil + ssn.closed + return ssn + end + end + end + + def session(name, kwargs = {}) + timeout = kwargs[:timeout] + delegate = kwargs[:delegate] || Qpid::Session::Client.method(:new) + + # FIXME: Python has cryptic comment about 'ch 0 ?' + channel = (0..@channel_max).detect { |i| ! @attached.key?(i) } + raise ChannelsBusy unless channel + + synchronize do + ch = Channel.new(self, channel) + ssn = attach(name, ch, delegate) + ssn.channel.session_attach(name) + if ssn.wait_for(timeout) { ssn.channel } + return ssn + else + detach(name, ch) + raise Timeout + end + end + end + + def detach_all + synchronize do + attached.values.each do |ssn| + ssn.exceptions << @close_code unless @close_code[0] == 200 + detach(ssn.name, ssn.channel) + end + end + end + + def start(timeout=nil) + @delegate.start + @thread = Thread.new { run } + @thread[:name] = 'conn' + synchronize do + unless @condition.wait_for(timeout) { @opened || @failed } + raise Timeout + end + end + if @failed + raise ConnectionFailed.new(@close_code) + end + end + + def run + # XXX: we don't really have a good way to exit this loop without + # getting the other end to kill the socket + loop do + begin + seg = read_segment + rescue Qpid::Closed => e + detach_all + break + end + @delegate.received(seg) + end + end + + def close(timeout=nil) + return unless @opened + Channel.new(self, 0).connection_close(200) + synchronize do + unless @condition.wait_for(timeout) { ! @opened } + raise Timeout + end + end + @thread.join(timeout) + @thread = nil + end + + def signal + synchronize { @condition.signal } + end + + def to_s + # FIXME: We'd like to report something like HOST:PORT + return @sock.to_s + end + + class Channel < Invoker + + attr_reader :id, :connection + attr_accessor :session + + def initialize(connection, id) + @connection = connection + @id = id + @session = nil + end + + def resolve_method(name) + inst = @connection.spec[name] + if inst.is_a?(Qpid::Spec010::Control) + return invocation(:method, inst) + else + return invocation(:error, nil) + end + end + + def invoke(type, args) + ctl = type.create(*args) + sc = StringCodec.new(@connection.spec) + sc.write_control(ctl) + @connection.write_segment(Segment.new(true, true, type.segment_type, + type.track, self.id, sc.encoded)) + + log = Qpid::logger["qpid.io.ctl"] + log.debug("SENT %s", ctl) if log + end + + def to_s + return "#{@connection}[#{@id}]" + end + + end + + end + +end |