diff options
Diffstat (limited to 'qpid/ruby/qpid/peer.rb')
-rw-r--r-- | qpid/ruby/qpid/peer.rb | 287 |
1 files changed, 287 insertions, 0 deletions
diff --git a/qpid/ruby/qpid/peer.rb b/qpid/ruby/qpid/peer.rb new file mode 100644 index 0000000000..320808fdc6 --- /dev/null +++ b/qpid/ruby/qpid/peer.rb @@ -0,0 +1,287 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "thread" +require "qpid/queue" +require "qpid/connection" +require "qpid/fields" + +module Qpid + + class Peer + + def initialize(conn, delegate) + @conn = conn + @delegate = delegate + @outgoing = Queue.new() + @work = Queue.new() + @channels = {} + @mutex = Mutex.new() + end + + def channel(id) + @mutex.synchronize do + ch = @channels[id] + if ch.nil? + ch = Channel.new(id, self, @outgoing, @conn.spec) + @channels[id] = ch + end + return ch + end + end + + def channel_delete(id) + @channels.delete(id) + end + + def start() + spawn(:writer) + spawn(:reader) + spawn(:worker) + end + + def close() + @mutex.synchronize do + @channels.each_value do |ch| + ch.close() + @outgoing.close() + @work.close() + end + end + end + + private + + def spawn(method, *args) + Thread.new do + begin + send(method, *args) + # is this the standard way to catch any exception? + rescue Closed => e + puts "#{method} #{e}" + rescue Object => e + print e + e.backtrace.each do |line| + print "\n ", line + end + print "\n" + end + end + end + + def reader() + while true + frame = @conn.read() + ch = channel(frame.channel) + ch.dispatch(frame, @work) + end + end + + def writer() + while true + @conn.write(@outgoing.pop()) + end + end + + def worker() + while true + dispatch(@work.pop()) + end + end + + def dispatch(queue) + frame = queue.pop() + ch = channel(frame.channel) + payload = frame.payload + if payload.method.content? + content = Qpid::read_content(queue) + else + content = nil + end + + message = Message.new(payload.method, payload.args, content) + @delegate.dispatch(ch, message) + end + + end + + class Channel + def initialize(id, peer, outgoing, spec) + @id = id + @peer = peer + @outgoing = outgoing + @spec = spec + @incoming = Queue.new() + @responses = Queue.new() + @queue = nil + @closed = false + end + + attr_reader :id + + def closed?; @closed end + + def close() + return if closed? + @peer.channel_delete(@id) + @closed = true + @incoming.close() + @responses.close() + end + + def dispatch(frame, work) + payload = frame.payload + case payload + when Method + if payload.method.response? + @queue = @responses + else + @queue = @incoming + work << @incoming + end + end + @queue << frame + end + + def method_missing(name, *args) + method = @spec.find_method(name) + if method.nil? + raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}") + end + + if args.size == 1 and args[0].instance_of? Hash + kwargs = args[0] + invoke_args = method.fields.map do |f| + kwargs[f.name] + end + content = kwargs[:content] + else + invoke_args = [] + method.fields.each do |f| + if args.any? + invoke_args << args.shift() + else + invoke_args << f.default + end + end + if method.content? and args.any? + content = args.shift() + else + content = nil + end + if args.any? then raise ArgumentError.new("#{args.size} extr arguments") end + end + return invoke(method, invoke_args, content) + end + + def invoke(method, args, content = nil) + raise Closed() if closed? + frame = Frame.new(@id, Method.new(method, args)) + @outgoing << frame + + if method.content? + content = Content.new() if content.nil? + write_content(method.parent, content, @outgoing) + end + + nowait = false + f = method.fields[:"nowait"] + nowait = args[method.fields.index(f)] unless f.nil? + + unless nowait or method.responses.empty? + resp = @responses.pop().payload + if resp.method.content? + content = read_content(@responses) + else + content = nil + end + if method.responses.include? resp.method + return Message.new(resp.method, resp.args, content) + else + # XXX: ValueError doesn't actually exist + raise ValueError.new(resp) + end + end + end + + def write_content(klass, content, queue) + size = content.size + header = Frame.new(@id, Header.new(klass, content.weight, size, content.headers)) + queue << header + content.children.each {|child| write_content(klass, child, queue)} + queue << Frame.new(@id, Body.new(content.body)) if size > 0 + end + + end + + def Qpid.read_content(queue) + frame = queue.pop() + header = frame.payload + children = [] + 1.upto(header.weight) { children << read_content(queue) } + size = header.size + read = 0 + buf = "" + while read < size + body = queue.pop() + content = body.payload.content + buf << content + read += content.size + end + buf.freeze() + return Content.new(header.properties.clone(), buf, children) + end + + class Content + def initialize(headers = {}, body = "", children = []) + @headers = headers + @body = body + @children = children + end + + attr_reader :headers, :body, :children + + def size; body.size end + def weight; children.size end + + def [](key); @headers[key] end + def []=(key, value); @headers[key] = value end + end + + class Message + fields(:method, :args, :content) + + alias fields args + + def method_missing(name) + return args[@method.fields[name].id] + end + + def inspect() + "#{method.qname}(#{args.join(", ")})" + end + end + + module Delegate + def dispatch(ch, msg) + send(msg.method.qname, ch, msg) + end + end + +end |