diff options
Diffstat (limited to 'ruby/qpid')
-rw-r--r-- | ruby/qpid/client.rb | 106 | ||||
-rw-r--r-- | ruby/qpid/codec.rb | 253 | ||||
-rw-r--r-- | ruby/qpid/connection.rb | 142 | ||||
-rw-r--r-- | ruby/qpid/fields.rb | 46 | ||||
-rw-r--r-- | ruby/qpid/peer.rb | 246 | ||||
-rw-r--r-- | ruby/qpid/queue.rb | 49 | ||||
-rw-r--r-- | ruby/qpid/spec.rb | 290 | ||||
-rw-r--r-- | ruby/qpid/traverse.rb | 61 |
8 files changed, 1193 insertions, 0 deletions
diff --git a/ruby/qpid/client.rb b/ruby/qpid/client.rb new file mode 100644 index 0000000000..6aa91855ce --- /dev/null +++ b/ruby/qpid/client.rb @@ -0,0 +1,106 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require "thread" +require "qpid/peer" +require "qpid/queue" + +module Qpid + + class Client + def initialize(host, port, spec, vhost = nil) + @host = host + @port = port + @spec = spec + @vhost = if vhost.nil?; host else vhost end + + @mechanism = nil + @response = nil + @locale = nil + + @queues = {} + @mutex = Mutex.new() + + @closed = false + @started = ConditionVariable.new() + + @conn = Connection.new(@host, @port, @spec) + @peer = Peer.new(@conn, ClientDelegate.new(self)) + end + + attr_reader :mechanism, :response, :locale + + def closed?; @closed end + + def wait() + @mutex.synchronize do + @started.wait(@mutex) + end + raise EOFError.new() if closed? + end + + def signal_start() + @started.broadcast() + end + + def queue(key) + @mutex.synchronize do + q = @queues[key] + if q.nil? + q = Queue.new() + @queues[key] = q + end + return q + end + end + + def start(response, mechanism="AMQPLAIN", locale="en_US") + @response = response + @mechanism = mechanism + @locale = locale + + @conn.connect() + @conn.init() + @peer.start() + wait() + channel(0).connection_open(@vhost) + end + + def channel(id) + return @peer.channel(id) + end + end + + class ClientDelegate + include Delegate + + def initialize(client) + @client = client + end + + def connection_start(ch, msg) + ch.connection_start_ok(:mechanism => @client.mechanism, + :response => @client.response, + :locale => @client.locale) + end + + def connection_tune(ch, msg) + ch.connection_tune_ok(*msg.fields) + @client.signal_start() + end + end + +end diff --git a/ruby/qpid/codec.rb b/ruby/qpid/codec.rb new file mode 100644 index 0000000000..e5f3c846fc --- /dev/null +++ b/ruby/qpid/codec.rb @@ -0,0 +1,253 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Codec + # is there a better way to do this? + class StringWriter + + def initialize(str = "") + @str = str + end + + def write(value) + @str << value + end + + def to_s() + return @str + end + + end + + class EOF < Exception; end + + class Encoder + + def initialize(out) + @out = out + @bits = [] + end + + attr_reader(:out) + + def encode(type, value) + send(type, value) + end + + def bit(b) + @bits << b + end + + def octet(o) + pack("C", o) + end + + def short(s) + pack("n", s) + end + + def long(l) + pack("N", l) + end + + def longlong(l) + # is this the right byte order? + lower = l & 0xffffffff + upper = (l & ~0xffffffff) >> 32 + long(lower) + long(upper) + end + + def shortstr(s) + # shortstr is actually octetstr + octet(s.length) + write(s) + end + + def longstr(s) + case s + when Hash + table(s) + else + long(s.length) + write(s) + end + end + + def table(t) + t = {} if t.nil? + enc = Encoder.new(StringWriter.new()) + t.each {|key, value| + enc.shortstr(key) + # I offer this chicken to the gods of polymorphism. May they + # choke on it. + case value + when String + type = :longstr + desc = "S" + when Numeric + type = :long + desc = "I" + else + raise Exception.new("unknown table value: #{value.class}") + end + enc.write(desc) + enc.encode(type, value) + } + longstr(enc.out.to_s()) + end + + def write(str) + flushbits() + @out.write(str) + end + + def pack(fmt, *args) + write(args.pack(fmt)) + end + + def flush() + flushbits() + end + + private + + def flushbits() + if @bits.empty? then return end + + bytes = [] + index = 0 + @bits.each {|b| + bytes << 0 if index == 0 + if b then bytes[-1] |= 1 << index end + index = (index + 1) % 8 + } + @bits.clear() + bytes.each {|b| + octet(b) + } + end + + end + + class StringReader + + def initialize(str) + @str = str + @index = 0 + end + + def read(n) + result = @str[@index, n] + @index += result.length + return result + end + + end + + class Decoder + + def initialize(_in) + @in = _in + @bits = [] + end + + def decode(type) + return send(type) + end + + def bit() + if @bits.empty? + byte = octet() + 7.downto(0) {|i| + @bits << (byte[i] == 1) + } + end + return @bits.pop() + end + + def octet() + return unpack("C", 1) + end + + def short() + return unpack("n", 2) + end + + def long() + return unpack("N", 4) + end + + def longlong() + upper = long() + lower = long() + return upper << 32 | lower + end + + def shortstr() + # shortstr is actually octetstr + return read(octet()) + end + + def longstr() + return read(long()) + end + + def table() + dec = Decoder.new(StringReader.new(longstr())) + result = {} + while true + begin + key = dec.shortstr() + rescue EOF + break + end + desc = dec.read(1) + case desc + when "S" + value = dec.longstr() + when "I" + value = dec.long() + else + raise Exception.new("unrecognized descriminator: #{desc.inspect()}") + end + result[key] = value + end + return result + end + + def read(n) + return "" if n == 0 + result = @in.read(n) + if result.nil? or result.empty? + raise EOF.new() + else + return result + end + end + + def unpack(fmt, size) + result = read(size).unpack(fmt) + if result.length == 1 + return result[0] + else + return result + end + end + + end + +end diff --git a/ruby/qpid/connection.rb b/ruby/qpid/connection.rb new file mode 100644 index 0000000000..da2b0e1de7 --- /dev/null +++ b/ruby/qpid/connection.rb @@ -0,0 +1,142 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require "socket" +require "qpid/codec" + +include Codec + +module Qpid + + class Connection + + def initialize(host, port, spec) + @host = host + @port = port + @spec = spec + end + + attr_reader(:host, :port, :spec) + + def connect() + @sock = TCPSocket.open(@host, @port) + @out = Encoder.new(@sock) + @in = Decoder.new(@sock) + end + + def init() + @out.write("AMQP") + [1, 1, @spec.major, @spec.minor].each {|o| + @out.octet(o) + } + end + + def write(frame) + @out.octet(@spec.constants[frame.payload.type].id) + @out.short(frame.channel) + frame.payload.encode(@out) + @out.octet(frame_end) + end + + def read() + type = @spec.constants[@in.octet()].name + channel = @in.short() + payload = Payload.decode(type, @spec, @in) + oct = @in.octet() + if oct != frame_end + raise Exception.new("framing error: expected #{frame_end}, got #{oct}") + end + Frame.new(channel, payload) + end + + private + + def frame_end + @spec.constants[:"frame end"].id + end + + end + + class Frame + + def initialize(channel, payload) + @channel = channel + @payload = payload + end + + attr_reader(:channel, :payload) + + end + + class Payload + + TYPES = {} + + def Payload.singleton_method_added(name) + if name == :type + TYPES[type] = self + end + end + + def Payload.decode(type, spec, dec) + klass = TYPES[type] + klass.decode(spec, dec) + end + + end + + class Method < Payload + + def initialize(method, args) + if args.size != method.fields.size + raise ArgumentError.new("argument mismatch #{method} #{args}") + end + @method = method + @args = args + end + + attr_reader(:method, :args) + + def Method.type + :"frame method" + end + + def type; Method.type end + + def encode(encoder) + buf = StringWriter.new() + enc = Encoder.new(buf) + enc.short(@method.parent.id) + enc.short(@method.id) + @method.fields.zip(self.args).each {|f, a| + enc.encode(f.type, a) + } + enc.flush() + encoder.longstr(buf.to_s) + end + + def Method.decode(spec, decoder) + buf = decoder.longstr() + dec = Decoder.new(StringReader.new(buf)) + klass = spec.classes[dec.short()] + meth = klass.methods[dec.short()] + args = meth.fields.map {|f| dec.decode(f.type)} + return Method.new(meth, args) + end + + end + +end diff --git a/ruby/qpid/fields.rb b/ruby/qpid/fields.rb new file mode 100644 index 0000000000..c46ba645ee --- /dev/null +++ b/ruby/qpid/fields.rb @@ -0,0 +1,46 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class Class + def fields(*fields) + module_eval { + def initialize(*args, &block) + args = init_fields(*args) + + if respond_to? :init + init(*args) {|*a| yield(*a)} + elsif args.any? + raise ArgumentException.new("extra arguments: #{args}") + end + end + } + + vars = fields.map {|f| :"@#{f.to_s().chomp("?")}"} + + define_method(:init_fields) {|*args| + vars.each {|v| + instance_variable_set(v, args.shift()) + } + args + } + + vars.each_index {|i| + define_method(fields[i]) { + instance_variable_get(vars[i]) + } + } + end +end diff --git a/ruby/qpid/peer.rb b/ruby/qpid/peer.rb new file mode 100644 index 0000000000..3dc19dd8f9 --- /dev/null +++ b/ruby/qpid/peer.rb @@ -0,0 +1,246 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require "thread" +require "qpid/queue" +require "qpid/connection" +require "qpid/fields" + +module Qpid + + class Peer + + def initialize(conn, delegate) + @conn = conn + @delegate = delegate + @outgoing = Queue.new() + @work = Queue.new() + @channels = {} + @mutex = Mutex.new() + end + + def channel(id) + @mutex.synchronize do + ch = @channels[id] + if ch.nil? + ch = Channel.new(id, @outgoing, @conn.spec) + @channels[id] = ch + end + return ch + end + end + + def start() + spawn(:writer) + spawn(:reader) + spawn(:worker) + end + + private + + def spawn(method, *args) + Thread.new do + begin + send(method, *args) + # is this the standard way to catch any exception? + rescue Object => e + print e + e.backtrace.each do |line| + print "\n ", line + end + print "\n" + end + end + end + + def reader() + while true + frame = @conn.read() + ch = channel(frame.channel) + ch.dispatch(frame, @work) + end + end + + def writer() + while true + @conn.write(@outgoing.pop()) + end + end + + def worker() + while true + dispatch(@work.pop()) + end + end + + def dispatch(queue) + frame = queue.pop() + ch = channel(frame.channel) + payload = frame.payload + if payload.method.content? + content = read_content(queue) + else + content = nil + end + + message = Message.new(payload.method, payload.args, content) + @delegate.dispatch(ch, message) + end + + end + + class Channel + def initialize(id, outgoing, spec) + @id = id + @outgoing = outgoing + @spec = spec + @incoming = Queue.new() + @responses = Queue.new() + @queue = nil + @closed = false + @reason = nil + end + + def closed?; @closed end + + def close(reason) + return if closed? + @closed = true + @reason = reason + @incoming.close() + @responses .close() + end + + def dispatch(frame, work) + payload = frame.payload + case payload + when Method + if payload.method.response? + @queue = @responses + else + @queue = @incoming + work << @incoming + end + end + @queue << frame + end + + def method_missing(name, *args) + method = @spec.ruby_method(name) + if method.nil? + raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}") + end + + if args.size == 1 and args[0].instance_of? Hash + kwargs = args[0] + invoke_args = method.fields.map do |f| + kwargs[f.ruby_name] + end + content = kwargs[:content] + else + invoke_args = [] + method.fields.each do |f| + if args.any? + invoke_args << args.shift() + else + invoke_args << f.default + end + end + if method.content? and args.any? + content = args.shift() + else + content = nil + end + if args.any? then raise ArgumentError.new("#{args.size} extr arguments") end + end + return invoke(method, invoke_args, content) + end + + def invoke(method, args, content = nil) + raise Closed(@reason) if closed? + frame = Frame.new(@id, Method.new(method, args)) + @outgoing << frame + + if method.content? + content = Content.new() if content.nil? + write_content(method.klass, content, @outgoing) + end + + nowait = false + f = method.fields[:"nowait"] + nowait = args[method.fields.index(f)] unless f.nil? + + unless nowait or method.responses.empty? + resp = @responses.pop().payload + if resp.method.content? + content = read_content(@responses) + else + content = nil + end + if method.responses.include? resp.method + return Message.new(resp.method, resp.args, content) + else + # XXX: ValueError doesn't actually exist + raise ValueError.new(resp) + end + end + end + + def write_content(klass, content, queue) + size = content.size + header = Frame.new(@id, Header.new(klass, content.weight, size)) + queue << header + content.children.each {|child| write_content(klass, child, queue)} + queue << Frame.new(@id, Body.new(content.body)) if size > 0 + end + + end + + def read_content(queue) + frame = queue.pop() + header = frame.payload + children = [] + 1.upto(header.weight) { children << read_content(queue) } + size = header.size + read = 0 + buf = "" + while read << size + body = queue.get() + content = body.payload.content + buf << content + read += content.size + end + buf.freeze() + return Content.new(buf, children, header.properties.clone()) + end + + class Message + fields(:method, :args, :content) + + alias fields args + + def inspect() + "#{method.ruby_name}(#{args.join(", ")})" + end + end + + module Delegate + def dispatch(ch, msg) + send(msg.method.ruby_name, ch, msg) + end + end + +end diff --git a/ruby/qpid/queue.rb b/ruby/qpid/queue.rb new file mode 100644 index 0000000000..037ecd25b7 --- /dev/null +++ b/ruby/qpid/queue.rb @@ -0,0 +1,49 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require "thread" + +module Qpid + + class Closed < Exception; end + + class Queue < Queue + + @@END = Object.new() + + def close() + # sentinal to indicate the end of the queue + self << @@END + end + + def pop(*args) + result = super(*args) + if @@END.equal? result + # we put another sentinal on the end in case there are + # subsequent calls to pop by this or other threads + self << @@END + raise Closed.new() + else + return result + end + end + + alias shift pop + alias deq pop + + end + +end diff --git a/ruby/qpid/spec.rb b/ruby/qpid/spec.rb new file mode 100644 index 0000000000..fda2fa6600 --- /dev/null +++ b/ruby/qpid/spec.rb @@ -0,0 +1,290 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require "set" +require "rexml/document" +require "qpid/fields" +require "qpid/traverse" + +module Spec + + include REXML + + class Container < Array + + def initialize() + @cache = {} + end + + def [](key) + return @cache[key] if @cache.include?(key) + + case key + when String + value = find {|x| x.name == key.intern()} + when Symbol + value = find {|x| x.name == key} + when Integer + value = find {|x| x.id == key} + else + raise Exception.new("invalid key: #{key}") + end + + @cache[key] = value + return value + end + + end + + class Root + fields(:major, :minor, :classes, :constants, :domains) + + def ruby_method(name) + classes.each do |c| + c.methods.each do |m| + if name == m.ruby_name + return m + end + end + end + end + end + + class Constant + fields(:name, :id, :type, :docs) + end + + class Domain + fields(:name, :type) + end + + class Class + fields(:name, :id, :handler, :fields, :methods, :docs) + end + + class Method + fields(:name, :id, :content?, :responses, :synchronous?, :fields, + :docs) + + def init() + @response = false + end + + attr :parent, true + + def response?; @response end + def response=(b); @response = b end + + def ruby_name + Spec.rubyize(:"#{parent.name}_#{name}") + end + end + + class Field + fields(:name, :id, :type, :docs) + + def ruby_name + Spec.rubyize(name) + end + + def default + case type + when :bit then false + when :octet, :short, :long, :longlong then 0 + when :shortstr, :longstr then "" + when :table then {} + end + end + + end + + class Doc + fields(:type, :text) + end + + class Reference + + fields(:name) + + def init(&block) + @resolver = block + end + + def resolve(spec, klass) + @resolver.call(spec, klass) + end + + end + + class Loader + + def initialize() + @stack = [] + end + + def load(obj) + case obj + when String + elem = @stack[-1] + result = Container.new() + elem.elements.each(obj) {|e| + @index = result.size + result << load(e) + } + @index = nil + return result + else + elem = obj + @stack << elem + begin + result = send(:"load_#{elem.name}") + ensure + @stack.pop() + end + return result + end + end + + def element + @stack[-1] + end + + def text + element.text + end + + def attr(name, type = :string, default = nil) + value = element.attributes[name] + value = value.strip() unless value.nil? + value = nil unless value.nil? or value.any? + if value.nil? and not default.nil? then + default + else + send(:"parse_#{type}", value) + end + end + + def parse_int(value) + value.to_i + end + + TRUE = ["yes", "true", "1"].to_set + FALSE = ["no", "false", "0", nil].to_set + + def parse_bool(value) + if TRUE.include?(value) + true + elsif FALSE.include?(value) + false + else + raise Exception.new("parse error, expecting boolean: #{value}") + end + end + + def parse_string(value) + value.to_s + end + + def parse_symbol(value) + value.intern() unless value.nil? + end + + def load_amqp() + Root.new(attr("major", :int), attr("minor", :int), load("class"), + load("constant"), load("domain")) + end + + def load_class() + Class.new(attr("name", :symbol), attr("index", :int), attr("handler", :symbol), + load("field"), load("method"), load("doc")) + end + + def load_method() + Method.new(attr("name", :symbol), attr("index", :int), + attr("content", :bool), load("response"), + attr("synchronous", :bool), load("field"), load("docs")) + end + + def load_response() + name = attr("name", :symbol) + Reference.new {|spec, klass| + response = klass.methods[name] + if response.nil? + raise Exception.new("no such method: #{name}") + end + response + } + end + + def load_field() + type = attr("type", :symbol) + if type.nil? + domain = attr("domain", :symbol) + type = Reference.new {|spec, klass| + spec.domains[domain].type + } + end + Field.new(attr("name", :symbol), @index, type, load("docs")) + end + + def load_constant() + Constant.new(attr("name", :symbol), attr("value", :int), attr("class", :symbol), + load("doc")) + end + + def load_domain() + Domain.new(attr("name", :symbol), attr("type", :symbol)) + end + + def load_doc() + Doc.new(attr("type", :symbol), text) + end + + end + + def Spec.load(spec) + case spec + when String + spec = File.new(spec) + end + doc = Document.new(spec) + spec = Loader.new().load(doc.root) + spec.classes.each do |klass| + klass.traverse! do |o| + case o + when Reference + o.resolve(spec, klass) + else + o + end + end + klass.methods.each do |m| + m.parent = klass + m.responses.each do |r| + r.response = true + end + end + end + spec + end + + private + + def Spec.rubyize(name) + name.to_s.gsub(/[\s-]/, '_').intern() + end + +end diff --git a/ruby/qpid/traverse.rb b/ruby/qpid/traverse.rb new file mode 100644 index 0000000000..85551a55d5 --- /dev/null +++ b/ruby/qpid/traverse.rb @@ -0,0 +1,61 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class Object + + public + + def traverse() + traverse! {|o| yield(o); o} + end + + def traverse_children!() + instance_variables.each {|v| + value = instance_variable_get(v) + replacement = yield(value) + instance_variable_set(v, replacement) unless replacement.equal? value + } + end + + def traverse!(replacements = {}) + return replacements[__id__] if replacements.has_key? __id__ + replacement = yield(self) + replacements[__id__] = replacement + traverse_children! {|o| o.traverse!(replacements) {|c| yield(c)}} + return replacement + end + +end + +class Array + def traverse_children!() + map! {|o| yield(o)} + end +end + +class Hash + def traverse_children!() + mods = {} + each_pair {|k, v| + key = yield(k) + value = yield(v) + mods[key] = value unless key.equal? k and value.equal? v + delete(k) unless key.equal? k + } + + merge!(mods) + end +end |