summaryrefslogtreecommitdiff
path: root/trunk/qpid/ruby/lib/qpid/peer.rb
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/ruby/lib/qpid/peer.rb')
-rw-r--r--trunk/qpid/ruby/lib/qpid/peer.rb289
1 files changed, 0 insertions, 289 deletions
diff --git a/trunk/qpid/ruby/lib/qpid/peer.rb b/trunk/qpid/ruby/lib/qpid/peer.rb
deleted file mode 100644
index cdb962169b..0000000000
--- a/trunk/qpid/ruby/lib/qpid/peer.rb
+++ /dev/null
@@ -1,289 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-require "thread"
-require "qpid/queue"
-require "qpid/connection08"
-require "qpid/fields"
-
-module Qpid08
-
- Queue = Qpid::Queue
-
- class Peer
-
- def initialize(conn, delegate)
- @conn = conn
- @delegate = delegate
- @outgoing = Queue.new()
- @work = Queue.new()
- @channels = {}
- @mutex = Mutex.new()
- end
-
- def channel(id)
- @mutex.synchronize do
- ch = @channels[id]
- if ch.nil?
- ch = Channel.new(id, self, @outgoing, @conn.spec)
- @channels[id] = ch
- end
- return ch
- end
- end
-
- def channel_delete(id)
- @channels.delete(id)
- end
-
- def start()
- spawn(:writer)
- spawn(:reader)
- spawn(:worker)
- end
-
- def close()
- @mutex.synchronize do
- @channels.each_value do |ch|
- ch.close()
- end
- @outgoing.close()
- @work.close()
- end
- end
-
- private
-
- def spawn(method, *args)
- Thread.new do
- begin
- send(method, *args)
- # is this the standard way to catch any exception?
- rescue Closed => e
- puts "#{method} #{e}"
- rescue Object => e
- print e
- e.backtrace.each do |line|
- print "\n ", line
- end
- print "\n"
- end
- end
- end
-
- def reader()
- while true
- frame = @conn.read()
- ch = channel(frame.channel)
- ch.dispatch(frame, @work)
- end
- end
-
- def writer()
- while true
- @conn.write(@outgoing.get())
- end
- end
-
- def worker()
- while true
- dispatch(@work.get())
- end
- end
-
- def dispatch(queue)
- frame = queue.get()
- ch = channel(frame.channel)
- payload = frame.payload
- if payload.method.content?
- content = Qpid08::read_content(queue)
- else
- content = nil
- end
-
- message = Message.new(payload.method, payload.args, content)
- @delegate.dispatch(ch, message)
- end
-
- end
-
- class Channel
- def initialize(id, peer, outgoing, spec)
- @id = id
- @peer = peer
- @outgoing = outgoing
- @spec = spec
- @incoming = Queue.new()
- @responses = Queue.new()
- @queue = nil
- @closed = false
- end
-
- attr_reader :id
-
- def closed?; @closed end
-
- def close()
- return if closed?
- @peer.channel_delete(@id)
- @closed = true
- @incoming.close()
- @responses.close()
- end
-
- def dispatch(frame, work)
- payload = frame.payload
- case payload
- when Method
- if payload.method.response?
- @queue = @responses
- else
- @queue = @incoming
- work << @incoming
- end
- end
- @queue << frame
- end
-
- def method_missing(name, *args)
- method = @spec.find_method(name)
- if method.nil?
- raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}")
- end
-
- if args.size == 1 and args[0].instance_of? Hash
- kwargs = args[0]
- invoke_args = method.fields.map do |f|
- kwargs[f.name]
- end
- content = kwargs[:content]
- else
- invoke_args = []
- method.fields.each do |f|
- if args.any?
- invoke_args << args.shift()
- else
- invoke_args << f.default
- end
- end
- if method.content? and args.any?
- content = args.shift()
- else
- content = nil
- end
- if args.any? then raise ArgumentError.new("#{args.size} extr arguments") end
- end
- return invoke(method, invoke_args, content)
- end
-
- def invoke(method, args, content = nil)
- raise Closed() if closed?
- frame = Frame.new(@id, Method.new(method, args))
- @outgoing << frame
-
- if method.content?
- content = Content.new() if content.nil?
- write_content(method.parent, content, @outgoing)
- end
-
- nowait = false
- f = method.fields[:"nowait"]
- nowait = args[method.fields.index(f)] unless f.nil?
-
- unless nowait or method.responses.empty?
- resp = @responses.get().payload
- if resp.method.content?
- content = read_content(@responses)
- else
- content = nil
- end
- if method.responses.include? resp.method
- return Message.new(resp.method, resp.args, content)
- else
- # XXX: ValueError doesn't actually exist
- raise ValueError.new(resp)
- end
- end
- end
-
- def write_content(klass, content, queue)
- size = content.size
- header = Frame.new(@id, Header.new(klass, content.weight, size, content.headers))
- queue << header
- content.children.each {|child| write_content(klass, child, queue)}
- queue << Frame.new(@id, Body.new(content.body)) if size > 0
- end
-
- end
-
- def Qpid08.read_content(queue)
- frame = queue.get()
- header = frame.payload
- children = []
- 1.upto(header.weight) { children << read_content(queue) }
- size = header.size
- read = 0
- buf = ""
- while read < size
- body = queue.get()
- content = body.payload.content
- buf << content
- read += content.size
- end
- buf.freeze()
- return Content.new(header.properties.clone(), buf, children)
- end
-
- class Content
- def initialize(headers = {}, body = "", children = [])
- @headers = headers
- @body = body
- @children = children
- end
-
- attr_reader :headers, :body, :children
-
- def size; body.size end
- def weight; children.size end
-
- def [](key); @headers[key] end
- def []=(key, value); @headers[key] = value end
- end
-
- class Message
- fields(:method, :args, :content)
-
- alias fields args
-
- def method_missing(name)
- return args[@method.fields[name].id]
- end
-
- def inspect()
- "#{method.qname}(#{args.join(", ")})"
- end
- end
-
- module Delegate
- def dispatch(ch, msg)
- send(msg.method.qname, ch, msg)
- end
- end
-
-end