summaryrefslogtreecommitdiff
path: root/trunk/qpid/ruby/lib/qpid/connection.rb
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/ruby/lib/qpid/connection.rb')
-rw-r--r--trunk/qpid/ruby/lib/qpid/connection.rb222
1 files changed, 0 insertions, 222 deletions
diff --git a/trunk/qpid/ruby/lib/qpid/connection.rb b/trunk/qpid/ruby/lib/qpid/connection.rb
deleted file mode 100644
index d2efbfb263..0000000000
--- a/trunk/qpid/ruby/lib/qpid/connection.rb
+++ /dev/null
@@ -1,222 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-require 'monitor'
-
-module Qpid
-
- class ChannelBusy< Exception ; end
-
- class ChannelsBusy < Exception ; end
-
- class SessionBusy < Exception ; end
-
- class ConnectionFailed < Exception ; end
-
- class Timeout < Exception ; end
-
- class Connection < Assembler
-
- include MonitorMixin
-
- attr_reader :spec, :attached, :sessions, :thread
- attr_accessor :opened, :failed, :close_code, :user_id
-
- def initialize(sock, args={})
- super(sock)
-
- delegate = args[:delegate] || Qpid::Delegate::Client.method(:new)
- spec = args[:spec] || nil
-
- @spec = Qpid::Spec010::load(spec)
- @track = @spec["track"]
-
- @attached = {}
- @sessions = {}
-
- @condition = new_cond
- @opened = false
- @failed = false
- @close_code = [nil, "connection aborted"]
-
- @thread = nil
-
- @channel_max = 65535
- @user_id = nil
-
- @delegate = delegate.call(self, args)
- end
-
- def attach(name, ch, delegate, force=false)
- synchronize do
- ssn = @attached[ch.id]
- if ssn
- raise ChannelBusy.new(ch, ssn) unless ssn.name == name
- else
- ssn = @sessions[name]
- if ssn.nil?
- ssn = Session.new(name, @spec, :delegate => delegate)
- @sessions[name] = ssn
- elsif ssn.channel
- if force
- @attached.delete(ssn.channel.id)
- ssn.channel = nil
- else
- raise SessionBusy.new(ssn)
- end
- end
- @attached[ch.id] = ssn
- ssn.channel = ch
- end
- ch.session = ssn
- return ssn
- end
- end
-
- def detach(name, ch)
- synchronize do
- @attached.delete(ch.id)
- ssn = @sessions.delete(name)
- if ssn
- ssn.channel = nil
- ssn.closed
- return ssn
- end
- end
- end
-
- def session(name, kwargs = {})
- timeout = kwargs[:timeout]
- delegate = kwargs[:delegate] || Qpid::Session::Client.method(:new)
-
- # FIXME: Python has cryptic comment about 'ch 0 ?'
- channel = (0..@channel_max).detect { |i| ! @attached.key?(i) }
- raise ChannelsBusy unless channel
-
- synchronize do
- ch = Channel.new(self, channel)
- ssn = attach(name, ch, delegate)
- ssn.channel.session_attach(name)
- if ssn.wait_for(timeout) { ssn.channel }
- return ssn
- else
- detach(name, ch)
- raise Timeout
- end
- end
- end
-
- def detach_all
- synchronize do
- attached.values.each do |ssn|
- ssn.exceptions << @close_code unless @close_code[0] == 200
- detach(ssn.name, ssn.channel)
- end
- end
- end
-
- def start(timeout=nil)
- @delegate.start
- @thread = Thread.new { run }
- @thread[:name] = 'conn'
- synchronize do
- unless @condition.wait_for(timeout) { @opened || @failed }
- raise Timeout
- end
- end
- if @failed
- raise ConnectionFailed.new(@close_code)
- end
- end
-
- def run
- # XXX: we don't really have a good way to exit this loop without
- # getting the other end to kill the socket
- loop do
- begin
- seg = read_segment
- rescue Qpid::Closed => e
- detach_all
- break
- end
- @delegate.received(seg)
- end
- end
-
- def close(timeout=nil)
- return unless @opened
- Channel.new(self, 0).connection_close(200)
- synchronize do
- unless @condition.wait_for(timeout) { ! @opened }
- raise Timeout
- end
- end
- @thread.join(timeout)
- @thread = nil
- end
-
- def signal
- synchronize { @condition.signal }
- end
-
- def to_s
- # FIXME: We'd like to report something like HOST:PORT
- return @sock.to_s
- end
-
- class Channel < Invoker
-
- attr_reader :id, :connection
- attr_accessor :session
-
- def initialize(connection, id)
- @connection = connection
- @id = id
- @session = nil
- end
-
- def resolve_method(name)
- inst = @connection.spec[name]
- if inst.is_a?(Qpid::Spec010::Control)
- return invocation(:method, inst)
- else
- return invocation(:error, nil)
- end
- end
-
- def invoke(type, args)
- ctl = type.create(*args)
- sc = StringCodec.new(@connection.spec)
- sc.write_control(ctl)
- @connection.write_segment(Segment.new(true, true, type.segment_type,
- type.track, self.id, sc.encoded))
-
- log = Qpid::logger["qpid.io.ctl"]
- log.debug("SENT %s", ctl) if log
- end
-
- def to_s
- return "#{@connection}[#{@id}]"
- end
-
- end
-
- end
-
-end