summaryrefslogtreecommitdiff
path: root/qpid/ruby/lib/qpid/connection.rb
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/ruby/lib/qpid/connection.rb')
-rw-r--r--qpid/ruby/lib/qpid/connection.rb222
1 files changed, 222 insertions, 0 deletions
diff --git a/qpid/ruby/lib/qpid/connection.rb b/qpid/ruby/lib/qpid/connection.rb
new file mode 100644
index 0000000000..d2efbfb263
--- /dev/null
+++ b/qpid/ruby/lib/qpid/connection.rb
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'monitor'
+
+module Qpid
+
+ class ChannelBusy< Exception ; end
+
+ class ChannelsBusy < Exception ; end
+
+ class SessionBusy < Exception ; end
+
+ class ConnectionFailed < Exception ; end
+
+ class Timeout < Exception ; end
+
+ class Connection < Assembler
+
+ include MonitorMixin
+
+ attr_reader :spec, :attached, :sessions, :thread
+ attr_accessor :opened, :failed, :close_code, :user_id
+
+ def initialize(sock, args={})
+ super(sock)
+
+ delegate = args[:delegate] || Qpid::Delegate::Client.method(:new)
+ spec = args[:spec] || nil
+
+ @spec = Qpid::Spec010::load(spec)
+ @track = @spec["track"]
+
+ @attached = {}
+ @sessions = {}
+
+ @condition = new_cond
+ @opened = false
+ @failed = false
+ @close_code = [nil, "connection aborted"]
+
+ @thread = nil
+
+ @channel_max = 65535
+ @user_id = nil
+
+ @delegate = delegate.call(self, args)
+ end
+
+ def attach(name, ch, delegate, force=false)
+ synchronize do
+ ssn = @attached[ch.id]
+ if ssn
+ raise ChannelBusy.new(ch, ssn) unless ssn.name == name
+ else
+ ssn = @sessions[name]
+ if ssn.nil?
+ ssn = Session.new(name, @spec, :delegate => delegate)
+ @sessions[name] = ssn
+ elsif ssn.channel
+ if force
+ @attached.delete(ssn.channel.id)
+ ssn.channel = nil
+ else
+ raise SessionBusy.new(ssn)
+ end
+ end
+ @attached[ch.id] = ssn
+ ssn.channel = ch
+ end
+ ch.session = ssn
+ return ssn
+ end
+ end
+
+ def detach(name, ch)
+ synchronize do
+ @attached.delete(ch.id)
+ ssn = @sessions.delete(name)
+ if ssn
+ ssn.channel = nil
+ ssn.closed
+ return ssn
+ end
+ end
+ end
+
+ def session(name, kwargs = {})
+ timeout = kwargs[:timeout]
+ delegate = kwargs[:delegate] || Qpid::Session::Client.method(:new)
+
+ # FIXME: Python has cryptic comment about 'ch 0 ?'
+ channel = (0..@channel_max).detect { |i| ! @attached.key?(i) }
+ raise ChannelsBusy unless channel
+
+ synchronize do
+ ch = Channel.new(self, channel)
+ ssn = attach(name, ch, delegate)
+ ssn.channel.session_attach(name)
+ if ssn.wait_for(timeout) { ssn.channel }
+ return ssn
+ else
+ detach(name, ch)
+ raise Timeout
+ end
+ end
+ end
+
+ def detach_all
+ synchronize do
+ attached.values.each do |ssn|
+ ssn.exceptions << @close_code unless @close_code[0] == 200
+ detach(ssn.name, ssn.channel)
+ end
+ end
+ end
+
+ def start(timeout=nil)
+ @delegate.start
+ @thread = Thread.new { run }
+ @thread[:name] = 'conn'
+ synchronize do
+ unless @condition.wait_for(timeout) { @opened || @failed }
+ raise Timeout
+ end
+ end
+ if @failed
+ raise ConnectionFailed.new(@close_code)
+ end
+ end
+
+ def run
+ # XXX: we don't really have a good way to exit this loop without
+ # getting the other end to kill the socket
+ loop do
+ begin
+ seg = read_segment
+ rescue Qpid::Closed => e
+ detach_all
+ break
+ end
+ @delegate.received(seg)
+ end
+ end
+
+ def close(timeout=nil)
+ return unless @opened
+ Channel.new(self, 0).connection_close(200)
+ synchronize do
+ unless @condition.wait_for(timeout) { ! @opened }
+ raise Timeout
+ end
+ end
+ @thread.join(timeout)
+ @thread = nil
+ end
+
+ def signal
+ synchronize { @condition.signal }
+ end
+
+ def to_s
+ # FIXME: We'd like to report something like HOST:PORT
+ return @sock.to_s
+ end
+
+ class Channel < Invoker
+
+ attr_reader :id, :connection
+ attr_accessor :session
+
+ def initialize(connection, id)
+ @connection = connection
+ @id = id
+ @session = nil
+ end
+
+ def resolve_method(name)
+ inst = @connection.spec[name]
+ if inst.is_a?(Qpid::Spec010::Control)
+ return invocation(:method, inst)
+ else
+ return invocation(:error, nil)
+ end
+ end
+
+ def invoke(type, args)
+ ctl = type.create(*args)
+ sc = StringCodec.new(@connection.spec)
+ sc.write_control(ctl)
+ @connection.write_segment(Segment.new(true, true, type.segment_type,
+ type.track, self.id, sc.encoded))
+
+ log = Qpid::logger["qpid.io.ctl"]
+ log.debug("SENT %s", ctl) if log
+ end
+
+ def to_s
+ return "#{@connection}[#{@id}]"
+ end
+
+ end
+
+ end
+
+end