diff options
Diffstat (limited to 'trunk/qpid/ruby/lib/qpid/connection.rb')
-rw-r--r-- | trunk/qpid/ruby/lib/qpid/connection.rb | 222 |
1 files changed, 0 insertions, 222 deletions
diff --git a/trunk/qpid/ruby/lib/qpid/connection.rb b/trunk/qpid/ruby/lib/qpid/connection.rb deleted file mode 100644 index d2efbfb263..0000000000 --- a/trunk/qpid/ruby/lib/qpid/connection.rb +++ /dev/null @@ -1,222 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -require 'monitor' - -module Qpid - - class ChannelBusy< Exception ; end - - class ChannelsBusy < Exception ; end - - class SessionBusy < Exception ; end - - class ConnectionFailed < Exception ; end - - class Timeout < Exception ; end - - class Connection < Assembler - - include MonitorMixin - - attr_reader :spec, :attached, :sessions, :thread - attr_accessor :opened, :failed, :close_code, :user_id - - def initialize(sock, args={}) - super(sock) - - delegate = args[:delegate] || Qpid::Delegate::Client.method(:new) - spec = args[:spec] || nil - - @spec = Qpid::Spec010::load(spec) - @track = @spec["track"] - - @attached = {} - @sessions = {} - - @condition = new_cond - @opened = false - @failed = false - @close_code = [nil, "connection aborted"] - - @thread = nil - - @channel_max = 65535 - @user_id = nil - - @delegate = delegate.call(self, args) - end - - def attach(name, ch, delegate, force=false) - synchronize do - ssn = @attached[ch.id] - if ssn - raise ChannelBusy.new(ch, ssn) unless ssn.name == name - else - ssn = @sessions[name] - if ssn.nil? - ssn = Session.new(name, @spec, :delegate => delegate) - @sessions[name] = ssn - elsif ssn.channel - if force - @attached.delete(ssn.channel.id) - ssn.channel = nil - else - raise SessionBusy.new(ssn) - end - end - @attached[ch.id] = ssn - ssn.channel = ch - end - ch.session = ssn - return ssn - end - end - - def detach(name, ch) - synchronize do - @attached.delete(ch.id) - ssn = @sessions.delete(name) - if ssn - ssn.channel = nil - ssn.closed - return ssn - end - end - end - - def session(name, kwargs = {}) - timeout = kwargs[:timeout] - delegate = kwargs[:delegate] || Qpid::Session::Client.method(:new) - - # FIXME: Python has cryptic comment about 'ch 0 ?' - channel = (0..@channel_max).detect { |i| ! @attached.key?(i) } - raise ChannelsBusy unless channel - - synchronize do - ch = Channel.new(self, channel) - ssn = attach(name, ch, delegate) - ssn.channel.session_attach(name) - if ssn.wait_for(timeout) { ssn.channel } - return ssn - else - detach(name, ch) - raise Timeout - end - end - end - - def detach_all - synchronize do - attached.values.each do |ssn| - ssn.exceptions << @close_code unless @close_code[0] == 200 - detach(ssn.name, ssn.channel) - end - end - end - - def start(timeout=nil) - @delegate.start - @thread = Thread.new { run } - @thread[:name] = 'conn' - synchronize do - unless @condition.wait_for(timeout) { @opened || @failed } - raise Timeout - end - end - if @failed - raise ConnectionFailed.new(@close_code) - end - end - - def run - # XXX: we don't really have a good way to exit this loop without - # getting the other end to kill the socket - loop do - begin - seg = read_segment - rescue Qpid::Closed => e - detach_all - break - end - @delegate.received(seg) - end - end - - def close(timeout=nil) - return unless @opened - Channel.new(self, 0).connection_close(200) - synchronize do - unless @condition.wait_for(timeout) { ! @opened } - raise Timeout - end - end - @thread.join(timeout) - @thread = nil - end - - def signal - synchronize { @condition.signal } - end - - def to_s - # FIXME: We'd like to report something like HOST:PORT - return @sock.to_s - end - - class Channel < Invoker - - attr_reader :id, :connection - attr_accessor :session - - def initialize(connection, id) - @connection = connection - @id = id - @session = nil - end - - def resolve_method(name) - inst = @connection.spec[name] - if inst.is_a?(Qpid::Spec010::Control) - return invocation(:method, inst) - else - return invocation(:error, nil) - end - end - - def invoke(type, args) - ctl = type.create(*args) - sc = StringCodec.new(@connection.spec) - sc.write_control(ctl) - @connection.write_segment(Segment.new(true, true, type.segment_type, - type.track, self.id, sc.encoded)) - - log = Qpid::logger["qpid.io.ctl"] - log.debug("SENT %s", ctl) if log - end - - def to_s - return "#{@connection}[#{@id}]" - end - - end - - end - -end |