diff options
Diffstat (limited to 'Final/python/qpid')
-rw-r--r-- | Final/python/qpid/__init__.py | 20 | ||||
-rw-r--r-- | Final/python/qpid/client.py | 114 | ||||
-rw-r--r-- | Final/python/qpid/codec.py | 224 | ||||
-rw-r--r-- | Final/python/qpid/connection.py | 270 | ||||
-rw-r--r-- | Final/python/qpid/content.py | 50 | ||||
-rw-r--r-- | Final/python/qpid/delegate.py | 54 | ||||
-rw-r--r-- | Final/python/qpid/message.py | 84 | ||||
-rw-r--r-- | Final/python/qpid/peer.py | 210 | ||||
-rw-r--r-- | Final/python/qpid/queue.py | 45 | ||||
-rw-r--r-- | Final/python/qpid/spec.py | 358 | ||||
-rw-r--r-- | Final/python/qpid/testlib.py | 237 | ||||
-rw-r--r-- | Final/python/qpid/xmlutil.py | 119 |
12 files changed, 1785 insertions, 0 deletions
diff --git a/Final/python/qpid/__init__.py b/Final/python/qpid/__init__.py new file mode 100644 index 0000000000..4363f175fb --- /dev/null +++ b/Final/python/qpid/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import spec, codec, connection, content, peer, delegate, client diff --git a/Final/python/qpid/client.py b/Final/python/qpid/client.py new file mode 100644 index 0000000000..b4a282f251 --- /dev/null +++ b/Final/python/qpid/client.py @@ -0,0 +1,114 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +An AQMP client implementation that uses a custom delegate for +interacting with the server. +""" + +import threading +from peer import Peer, Closed +from delegate import Delegate +from connection import Connection, Frame +from spec import load +from queue import Queue + + +class Client: + + def __init__(self, host, port, spec, vhost = None): + self.host = host + self.port = port + self.spec = spec + + self.mechanism = None + self.response = None + self.locale = None + + self.vhost = vhost + if self.vhost == None: + self.vhost = self.host + + self.queues = {} + self.lock = threading.Lock() + + self.closed = False + self.started = threading.Event() + + self.conn = Connection(self.host, self.port, self.spec) + self.peer = Peer(self.conn, ClientDelegate(self)) + + def wait(self): + self.started.wait() + if self.closed: + raise EOFError() + + def queue(self, key): + self.lock.acquire() + try: + try: + q = self.queues[key] + except KeyError: + q = Queue(0) + self.queues[key] = q + finally: + self.lock.release() + return q + + def start(self, response, mechanism="AMQPLAIN", locale="en_US"): + self.mechanism = mechanism + self.response = response + self.locale = locale + + self.conn.connect() + self.conn.init() + self.peer.start() + self.wait() + self.channel(0).connection_open(self.vhost) + + def channel(self, id): + return self.peer.channel(id) + +class ClientDelegate(Delegate): + + def __init__(self, client): + Delegate.__init__(self) + self.client = client + + def connection_start(self, ch, msg): + ch.connection_start_ok(mechanism=self.client.mechanism, + response=self.client.response, + locale=self.client.locale) + + def connection_tune(self, ch, msg): + ch.connection_tune_ok(*msg.fields) + self.client.started.set() + + def basic_deliver(self, ch, msg): + self.client.queue(msg.consumer_tag).put(msg) + + def channel_close(self, ch, msg): + ch.close(msg) + + def connection_close(self, ch, msg): + self.client.peer.close(msg) + + def close(self, reason): + self.client.closed = True + self.client.started.set() diff --git a/Final/python/qpid/codec.py b/Final/python/qpid/codec.py new file mode 100644 index 0000000000..69c7ca8afa --- /dev/null +++ b/Final/python/qpid/codec.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Utility code to translate between python objects and AMQP encoded data +fields. +""" + +from cStringIO import StringIO +from struct import * + +class EOF(Exception): + pass + +class Codec: + + def __init__(self, stream): + self.stream = stream + self.nwrote = 0 + self.nread = 0 + self.incoming_bits = [] + self.outgoing_bits = [] + + def read(self, n): + data = self.stream.read(n) + if n > 0 and len(data) == 0: + raise EOF() + self.nread += len(data) + return data + + def write(self, s): + self.flushbits() + self.stream.write(s) + self.nwrote += len(s) + + def flush(self): + self.flushbits() + self.stream.flush() + + def flushbits(self): + if len(self.outgoing_bits) > 0: + bytes = [] + index = 0 + for b in self.outgoing_bits: + if index == 0: bytes.append(0) + if b: bytes[-1] |= 1 << index + index = (index + 1) % 8 + del self.outgoing_bits[:] + for byte in bytes: + self.encode_octet(byte) + + def pack(self, fmt, *args): + self.write(pack(fmt, *args)) + + def unpack(self, fmt): + size = calcsize(fmt) + data = self.read(size) + values = unpack(fmt, data) + if len(values) == 1: + return values[0] + else: + return values + + def encode(self, type, value): + getattr(self, "encode_" + type)(value) + + def decode(self, type): + return getattr(self, "decode_" + type)() + + # bit + def encode_bit(self, o): + if o: + self.outgoing_bits.append(True) + else: + self.outgoing_bits.append(False) + + def decode_bit(self): + if len(self.incoming_bits) == 0: + bits = self.decode_octet() + for i in range(8): + self.incoming_bits.append(bits >> i & 1 != 0) + return self.incoming_bits.pop(0) + + # octet + def encode_octet(self, o): + self.pack("!B", o) + + def decode_octet(self): + return self.unpack("!B") + + # short + def encode_short(self, o): + self.pack("!H", o) + + def decode_short(self): + return self.unpack("!H") + + # long + def encode_long(self, o): + self.pack("!L", o) + + def decode_long(self): + return self.unpack("!L") + + # longlong + def encode_longlong(self, o): + self.pack("!Q", o) + + def decode_longlong(self): + return self.unpack("!Q") + + def enc_str(self, fmt, s): + size = len(s) + self.pack(fmt, size) + self.write(s) + + def dec_str(self, fmt): + size = self.unpack(fmt) + return self.read(size) + + # shortstr + def encode_shortstr(self, s): + self.enc_str("!B", s) + + def decode_shortstr(self): + return self.dec_str("!B") + + # longstr + def encode_longstr(self, s): + if isinstance(s, dict): + self.encode_table(s) + else: + self.enc_str("!L", s) + + def decode_longstr(self): + return self.dec_str("!L") + + # table + def encode_table(self, tbl): + enc = StringIO() + codec = Codec(enc) + for key, value in tbl.items(): + codec.encode_shortstr(key) + if isinstance(value, basestring): + codec.write("S") + codec.encode_longstr(value) + else: + codec.write("I") + codec.encode_long(value) + s = enc.getvalue() + self.encode_long(len(s)) + self.write(s) + + def decode_table(self): + size = self.decode_long() + start = self.nread + result = {} + while self.nread - start < size: + key = self.decode_shortstr() + type = self.read(1) + if type == "S": + value = self.decode_longstr() + elif type == "I": + value = self.decode_long() + else: + raise ValueError(repr(type)) + result[key] = value + return result + +def test(type, value): + if isinstance(value, (list, tuple)): + values = value + else: + values = [value] + stream = StringIO() + codec = Codec(stream) + for v in values: + codec.encode(type, v) + codec.flush() + enc = stream.getvalue() + stream.reset() + dup = [] + for i in xrange(len(values)): + dup.append(codec.decode(type)) + if values != dup: + raise AssertionError("%r --> %r --> %r" % (values, enc, dup)) + +if __name__ == "__main__": + def dotest(type, value): + args = (type, value) + test(*args) + + for value in ("1", "0", "110", "011", "11001", "10101", "10011"): + for i in range(10): + dotest("bit", map(lambda x: x == "1", value*i)) + + for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}): + dotest("table", value) + + for type in ("octet", "short", "long", "longlong"): + for value in range(0, 256): + dotest(type, value) + + for type in ("shortstr", "longstr"): + for value in ("", "a", "asdf"): + dotest(type, value) diff --git a/Final/python/qpid/connection.py b/Final/python/qpid/connection.py new file mode 100644 index 0000000000..0b788e091b --- /dev/null +++ b/Final/python/qpid/connection.py @@ -0,0 +1,270 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A Connection class containing socket code that uses the spec metadata +to read and write Frame objects. This could be used by a client, +server, or even a proxy implementation. +""" + +import socket, codec,logging +from cStringIO import StringIO +from spec import load, pythonize +from codec import EOF + +class SockIO: + + def __init__(self, sock): + self.sock = sock + + def write(self, buf): +# print "OUT: %r" % buf + self.sock.sendall(buf) + + def read(self, n): + data = "" + while len(data) < n: + try: + s = self.sock.recv(n - len(data)) + except socket.error: + break + if len(s) == 0: + break +# print "IN: %r" % s + data += s + return data + + def flush(self): + pass + +class Connection: + + def __init__(self, host, port, spec): + self.host = host + self.port = port + self.spec = spec + self.FRAME_END = self.spec.constants.bypyname["frame_end"].id + + def connect(self): + sock = socket.socket() + sock.connect((self.host, self.port)) + sock.setblocking(1) + self.codec = codec.Codec(SockIO(sock)) + + def flush(self): + self.codec.flush() + + INIT="!4s4B" + + def init(self): + self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major, + self.spec.minor) + + def write(self, frame): + c = self.codec + c.encode_octet(self.spec.constants.bypyname[frame.payload.type].id) + c.encode_short(frame.channel) + frame.payload.encode(c) + c.encode_octet(self.FRAME_END) + + def read(self): + c = self.codec + type = pythonize(self.spec.constants.byid[c.decode_octet()].name) + channel = c.decode_short() + payload = Frame.DECODERS[type].decode(self.spec, c) + end = c.decode_octet() + if end != self.FRAME_END: + raise "frame error: expected %r, got %r" % (self.FRAME_END, end) + frame = Frame(channel, payload) + return frame + +class Frame: + + METHOD = "frame_method" + HEADER = "frame_header" + BODY = "frame_body" + OOB_METHOD = "frame_oob_method" + OOB_HEADER = "frame_oob_header" + OOB_BODY = "frame_oob_body" + TRACE = "frame_trace" + HEARTBEAT = "frame_heartbeat" + + DECODERS = {} + + def __init__(self, channel, payload): + self.channel = channel + self.payload = payload + + def __str__(self): + return "[%d] %s" % (self.channel, self.payload) + +class Payload: + + class __metaclass__(type): + + def __new__(cls, name, bases, dict): + for req in ("encode", "decode", "type"): + if not dict.has_key(req): + raise TypeError("%s must define %s" % (name, req)) + dict["decode"] = staticmethod(dict["decode"]) + t = type.__new__(cls, name, bases, dict) + if t.type != None: + Frame.DECODERS[t.type] = t + return t + + type = None + + def encode(self, enc): abstract + + def decode(spec, dec): abstract + +class Method(Payload): + + type = Frame.METHOD + + def __init__(self, method, *args): + if len(args) != len(method.fields): + argspec = ["%s: %s" % (pythonize(f.name), f.type) + for f in method.fields] + raise TypeError("%s.%s expecting (%s), got %s" % + (pythonize(method.klass.name), + pythonize(method.name), ", ".join(argspec), args)) + self.method = method + self.args = args + + def encode(self, enc): + buf = StringIO() + c = codec.Codec(buf) + c.encode_short(self.method.klass.id) + c.encode_short(self.method.id) + for field, arg in zip(self.method.fields, self.args): + c.encode(field.type, arg) + c.flush() + enc.encode_longstr(buf.getvalue()) + + def decode(spec, dec): + enc = dec.decode_longstr() + c = codec.Codec(StringIO(enc)) + klass = spec.classes.byid[c.decode_short()] + meth = klass.methods.byid[c.decode_short()] + args = tuple([c.decode(f.type) for f in meth.fields]) + return Method(meth, *args) + + def __str__(self): + return "%s %s" % (self.method, ", ".join([str(a) for a in self.args])) + +class Header(Payload): + + type = Frame.HEADER + + def __init__(self, klass, weight, size, **properties): + self.klass = klass + self.weight = weight + self.size = size + self.properties = properties + + def __getitem__(self, name): + return self.properties[name] + + def __setitem__(self, name, value): + self.properties[name] = value + + def __delitem__(self, name): + del self.properties[name] + + def encode(self, enc): + buf = StringIO() + c = codec.Codec(buf) + c.encode_short(self.klass.id) + c.encode_short(self.weight) + c.encode_longlong(self.size) + + # property flags + nprops = len(self.klass.fields) + flags = 0 + for i in range(nprops): + f = self.klass.fields.items[i] + flags <<= 1 + if self.properties.get(f.name) != None: + flags |= 1 + # the last bit indicates more flags + if i > 0 and (i % 15) == 0: + flags <<= 1 + if nprops > (i + 1): + flags |= 1 + c.encode_short(flags) + flags = 0 + flags <<= ((16 - (nprops % 15)) % 16) + c.encode_short(flags) + + # properties + for f in self.klass.fields: + v = self.properties.get(f.name) + if v != None: + c.encode(f.type, v) + c.flush() + enc.encode_longstr(buf.getvalue()) + + def decode(spec, dec): + c = codec.Codec(StringIO(dec.decode_longstr())) + klass = spec.classes.byid[c.decode_short()] + weight = c.decode_short() + size = c.decode_longlong() + + # property flags + bits = [] + while True: + flags = c.decode_short() + for i in range(15, 0, -1): + if flags >> i & 0x1 != 0: + bits.append(True) + else: + bits.append(False) + if flags & 0x1 == 0: + break + + # properties + properties = {} + for b, f in zip(bits, klass.fields): + if b: + # Note: decode returns a unicode u'' string but only + # plain '' strings can be used as keywords so we need to + # stringify the names. + properties[str(f.name)] = c.decode(f.type) + return Header(klass, weight, size, **properties) + + def __str__(self): + return "%s %s %s %s" % (self.klass, self.weight, self.size, + self.properties) + +class Body(Payload): + + type = Frame.BODY + + def __init__(self, content): + self.content = content + + def encode(self, enc): + enc.encode_longstr(self.content) + + def decode(spec, dec): + return Body(dec.decode_longstr()) + + def __str__(self): + return "Body(%r)" % self.content diff --git a/Final/python/qpid/content.py b/Final/python/qpid/content.py new file mode 100644 index 0000000000..bcbea1697c --- /dev/null +++ b/Final/python/qpid/content.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A simple python representation for AMQP content. +""" + +def default(val, defval): + if val == None: + return defval + else: + return val + +class Content: + + def __init__(self, body = "", children = None, properties = None): + self.body = body + self.children = default(children, []) + self.properties = default(properties, {}) + + def size(self): + return len(self.body) + + def weight(self): + return len(self.children) + + def __getitem__(self, name): + return self.properties[name] + + def __setitem__(self, name, value): + self.properties[name] = value + + def __delitem__(self, name): + del self.properties[name] diff --git a/Final/python/qpid/delegate.py b/Final/python/qpid/delegate.py new file mode 100644 index 0000000000..035bb3c476 --- /dev/null +++ b/Final/python/qpid/delegate.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Delegate implementation intended for use with the peer module. +""" + +import threading, inspect +from spec import pythonize + +class Delegate: + + def __init__(self): + self.handlers = {} + self.invokers = {} + # initialize all the mixins + self.invoke_all("init") + + def invoke_all(self, meth, *args, **kwargs): + for cls in inspect.getmro(self.__class__): + if hasattr(cls, meth): + getattr(cls, meth)(self, *args, **kwargs) + + def dispatch(self, channel, message): + method = message.method + + try: + handler = self.handlers[method] + except KeyError: + name = "%s_%s" % (pythonize(method.klass.name), + pythonize(method.name)) + handler = getattr(self, name) + self.handlers[method] = handler + + return handler(channel, message) + + def close(self, reason): + self.invoke_all("close", reason) diff --git a/Final/python/qpid/message.py b/Final/python/qpid/message.py new file mode 100644 index 0000000000..914b878147 --- /dev/null +++ b/Final/python/qpid/message.py @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from sets import Set + +class Message: + + COMMON_FIELDS = Set(("content", "method", "fields")) + + def __init__(self, method, fields, content = None): + self.method = method + self.fields = fields + self.content = content + + def __len__(self): + l = len(self.fields) + if self.method.content: + l += 1 + return len(self.fields) + + def _idx(self, idx): + if idx < 0: idx += len(self) + if idx < 0 or idx > len(self): + raise IndexError(idx) + return idx + + def __getitem__(self, idx): + idx = self._idx(idx) + if idx == len(self.fields): + return self.content + else: + return self.fields[idx] + + def __setitem__(self, idx, value): + idx = self._idx(idx) + if idx == len(self.fields): + self.content = value + else: + self.fields[idx] = value + + def _slot(self, attr): + if attr in Message.COMMON_FIELDS: + env = self.__dict__ + key = attr + else: + env = self.fields + try: + field = self.method.fields.bypyname[attr] + key = self.method.fields.index(field) + except KeyError: + raise AttributeError(attr) + return env, key + + def __getattr__(self, attr): + env, key = self._slot(attr) + return env[key] + + def __setattr__(self, attr, value): + env, key = self._slot(attr) + env[attr] = value + + STR = "%s %s content = %s" + REPR = STR.replace("%s", "%r") + + def __str__(self): + return Message.STR % (self.method, self.fields, self.content) + + def __repr__(self): + return Message.REPR % (self.method, self.fields, self.content) diff --git a/Final/python/qpid/peer.py b/Final/python/qpid/peer.py new file mode 100644 index 0000000000..7c6cf91dea --- /dev/null +++ b/Final/python/qpid/peer.py @@ -0,0 +1,210 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +This module contains a skeletal peer implementation useful for +implementing an AMQP server, client, or proxy. The peer implementation +sorts incoming frames to their intended channels, and dispatches +incoming method frames to a delegate. +""" + +import thread, traceback, socket, sys, logging +from connection import Frame, EOF, Method, Header, Body +from message import Message +from queue import Queue, Closed as QueueClosed +from content import Content +from cStringIO import StringIO + +class Peer: + + def __init__(self, conn, delegate): + self.conn = conn + self.delegate = delegate + self.outgoing = Queue(0) + self.work = Queue(0) + self.channels = {} + self.Channel = type("Channel%s" % conn.spec.klass.__name__, + (Channel, conn.spec.klass), {}) + self.lock = thread.allocate_lock() + + def channel(self, id): + self.lock.acquire() + try: + try: + ch = self.channels[id] + except KeyError: + ch = self.Channel(id, self.outgoing) + self.channels[id] = ch + finally: + self.lock.release() + return ch + + def start(self): + thread.start_new_thread(self.writer, ()) + thread.start_new_thread(self.reader, ()) + thread.start_new_thread(self.worker, ()) + + def fatal(self, message=None): + """Call when an unexpected exception occurs that will kill a thread.""" + if message: print >> sys.stderr, message + self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc())) + + def reader(self): + try: + while True: + try: + frame = self.conn.read() + except EOF, e: + self.work.close() + break + ch = self.channel(frame.channel) + ch.dispatch(frame, self.work) + except: + self.fatal() + + def close(self, reason): + for ch in self.channels.values(): + ch.close(reason) + self.delegate.close(reason) + + def writer(self): + try: + while True: + try: + message = self.outgoing.get() + self.conn.write(message) + except socket.error, e: + self.close(e) + break + self.conn.flush() + except: + self.fatal() + + def worker(self): + try: + while True: + self.dispatch(self.work.get()) + except QueueClosed, e: + self.close(e) + except: + self.fatal() + + def dispatch(self, queue): + frame = queue.get() + channel = self.channel(frame.channel) + payload = frame.payload + if payload.method.content: + content = read_content(queue) + else: + content = None + # Let the caller deal with exceptions thrown here. + message = Message(payload.method, payload.args, content) + self.delegate.dispatch(channel, message) + +class Closed(Exception): pass + +class Channel: + + def __init__(self, id, outgoing): + self.id = id + self.outgoing = outgoing + self.incoming = Queue(0) + self.responses = Queue(0) + self.queue = None + self.closed = False + self.reason = None + + def close(self, reason): + if self.closed: + return + self.closed = True + self.reason = reason + self.incoming.close() + self.responses.close() + + def dispatch(self, frame, work): + payload = frame.payload + if isinstance(payload, Method): + if payload.method.response: + self.queue = self.responses + else: + self.queue = self.incoming + work.put(self.incoming) + self.queue.put(frame) + + def invoke(self, method, args, content = None): + if self.closed: + raise Closed(self.reason) + frame = Frame(self.id, Method(method, *args)) + self.outgoing.put(frame) + + if method.content: + if content == None: + content = Content() + self.write_content(method.klass, content, self.outgoing) + + try: + # here we depend on all nowait fields being named nowait + f = method.fields.byname["nowait"] + nowait = args[method.fields.index(f)] + except KeyError: + nowait = False + + try: + if not nowait and method.responses: + resp = self.responses.get().payload + if resp.method.content: + content = read_content(self.responses) + else: + content = None + if resp.method in method.responses: + return Message(resp.method, resp.args, content) + else: + raise ValueError(resp) + except QueueClosed, e: + if self.closed: + raise Closed(self.reason) + else: + raise e + + def write_content(self, klass, content, queue): + size = content.size() + header = Frame(self.id, Header(klass, content.weight(), size, **content.properties)) + queue.put(header) + for child in content.children: + self.write_content(klass, child, queue) + # should split up if content.body exceeds max frame size + if size > 0: + queue.put(Frame(self.id, Body(content.body))) + +def read_content(queue): + frame = queue.get() + header = frame.payload + children = [] + for i in range(header.weight): + children.append(read_content(queue)) + size = header.size + read = 0 + buf = StringIO() + while read < size: + body = queue.get() + content = body.payload.content + buf.write(content) + read += len(content) + return Content(buf.getvalue(), children, header.properties.copy()) diff --git a/Final/python/qpid/queue.py b/Final/python/qpid/queue.py new file mode 100644 index 0000000000..5438b328ab --- /dev/null +++ b/Final/python/qpid/queue.py @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +This module augments the standard python multithreaded Queue +implementation to add a close() method so that threads blocking on the +content of a queue can be notified if the queue is no longer in use. +""" + +from Queue import Queue as BaseQueue, Empty, Full + +class Closed(Exception): pass + +class Queue(BaseQueue): + + END = object() + + def close(self): + self.put(Queue.END) + + def get(self, block = True, timeout = None): + result = BaseQueue.get(self, block, timeout) + if result == Queue.END: + # this guarantees that any other waiting threads or any future + # calls to get will also result in a Closed exception + self.put(Queue.END) + raise Closed() + else: + return result diff --git a/Final/python/qpid/spec.py b/Final/python/qpid/spec.py new file mode 100644 index 0000000000..0e3a477066 --- /dev/null +++ b/Final/python/qpid/spec.py @@ -0,0 +1,358 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +This module loads protocol metadata into python objects. It provides +access to spec metadata via a python object model, and can also +dynamically creating python methods, classes, and modules based on the +spec metadata. All the generated methods have proper signatures and +doc strings based on the spec metadata so the python help system can +be used to browse the spec documentation. The generated methods all +dispatch to the self.invoke(meth, args) callback of the containing +class so that the generated code can be reused in a variety of +situations. +""" + +import re, textwrap, new, xmlutil + +class SpecContainer: + + def __init__(self): + self.items = [] + self.byname = {} + self.byid = {} + self.indexes = {} + self.bypyname = {} + + def add(self, item): + if self.byname.has_key(item.name): + raise ValueError("duplicate name: %s" % item) + if self.byid.has_key(item.id): + raise ValueError("duplicate id: %s" % item) + pyname = pythonize(item.name) + if self.bypyname.has_key(pyname): + raise ValueError("duplicate pyname: %s" % item) + self.indexes[item] = len(self.items) + self.items.append(item) + self.byname[item.name] = item + self.byid[item.id] = item + self.bypyname[pyname] = item + + def index(self, item): + try: + return self.indexes[item] + except KeyError: + raise ValueError(item) + + def __iter__(self): + return iter(self.items) + + def __len__(self): + return len(self.items) + +class Metadata: + + PRINT = [] + + def __init__(self): + pass + + def __str__(self): + args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT) + return "%s(%s)" % (self.__class__.__name__, ", ".join(args)) + + def __repr__(self): + return str(self) + +class Spec(Metadata): + + PRINT=["major", "minor", "file"] + + def __init__(self, major, minor, file): + Metadata.__init__(self) + self.major = major + self.minor = minor + self.file = file + self.constants = SpecContainer() + self.classes = SpecContainer() + + def post_load(self): + self.module = self.define_module("amqp%s%s" % (self.major, self.minor)) + self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor)) + + def parse_method(self, name): + parts = re.split(r"\s*\.\s*", name) + if len(parts) != 2: + raise ValueError(name) + klass, meth = parts + return self.classes.byname[klass].methods.byname[meth] + + def define_module(self, name, doc = None): + module = new.module(name, doc) + module.__file__ = self.file + for c in self.classes: + classname = pythonize(c.name) + cls = c.define_class(classname) + cls.__module__ = module.__name__ + setattr(module, classname, cls) + return module + + def define_class(self, name): + methods = {} + for c in self.classes: + for m in c.methods: + meth = pythonize(m.klass.name + "_" + m.name) + methods[meth] = m.define_method(meth) + return type(name, (), methods) + +class Constant(Metadata): + + PRINT=["name", "id"] + + def __init__(self, spec, name, id, klass, docs): + Metadata.__init__(self) + self.spec = spec + self.name = name + self.id = id + self.klass = klass + self.docs = docs + +class Class(Metadata): + + PRINT=["name", "id"] + + def __init__(self, spec, name, id, handler, docs): + Metadata.__init__(self) + self.spec = spec + self.name = name + self.id = id + self.handler = handler + self.fields = SpecContainer() + self.methods = SpecContainer() + self.docs = docs + + def define_class(self, name): + methods = {} + for m in self.methods: + meth = pythonize(m.name) + methods[meth] = m.define_method(meth) + return type(name, (), methods) + +class Method(Metadata): + + PRINT=["name", "id"] + + def __init__(self, klass, name, id, content, responses, synchronous, + description, docs): + Metadata.__init__(self) + self.klass = klass + self.name = name + self.id = id + self.content = content + self.responses = responses + self.synchronous = synchronous + self.fields = SpecContainer() + self.description = description + self.docs = docs + self.response = False + + def docstring(self): + s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs]) + for f in self.fields: + if f.docs: + s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, pythonize(f.name))] + + [fill(d, 4) for d in f.docs[1:]]) + return s + + METHOD = "__method__" + DEFAULTS = {"bit": False, + "shortstr": "", + "longstr": "", + "table": {}, + "octet": 0, + "short": 0, + "long": 0, + "longlong": 0, + "timestamp": 0, + "content": None} + + def define_method(self, name): + g = {Method.METHOD: self} + l = {} + args = [(pythonize(f.name), Method.DEFAULTS[f.type]) for f in self.fields] + if self.content: + args += [("content", None)] + code = "def %s(self, %s):\n" % \ + (name, ", ".join(["%s = %r" % a for a in args])) + code += " %r\n" % self.docstring() + if self.content: + methargs = args[:-1] + else: + methargs = args + argnames = ", ".join([a[0] for a in methargs]) + code += " return self.invoke(%s" % Method.METHOD + if argnames: + code += ", (%s,)" % argnames + else: + code += ", ()" + if self.content: + code += ", content" + code += ")" + exec code in g, l + return l[name] + +class Field(Metadata): + + PRINT=["name", "id", "type"] + + def __init__(self, name, id, type, docs): + Metadata.__init__(self) + self.name = name + self.id = id + self.type = type + self.docs = docs + +def get_docs(nd): + return [n.text for n in nd["doc"]] + +def load_fields(nd, l, domains): + for f_nd in nd["field"]: + try: + type = f_nd["@domain"] + except KeyError: + type = f_nd["@type"] + while domains.has_key(type) and domains[type] != type: + type = domains[type] + l.add(Field(f_nd["@name"], f_nd.index(), type, get_docs(f_nd))) + +def load(specfile): + doc = xmlutil.parse(specfile) + root = doc["amqp"][0] + spec = Spec(int(root["@major"]), int(root["@minor"]), specfile) + + # constants + for nd in root["constant"]: + const = Constant(spec, nd["@name"], int(nd["@value"]), nd.get("@class"), + get_docs(nd)) + spec.constants.add(const) + + # domains are typedefs + domains = {} + for nd in root["domain"]: + domains[nd["@name"]] = nd["@type"] + + # classes + for c_nd in root["class"]: + klass = Class(spec, c_nd["@name"], int(c_nd["@index"]), c_nd["@handler"], + get_docs(c_nd)) + load_fields(c_nd, klass.fields, domains) + for m_nd in c_nd["method"]: + meth = Method(klass, m_nd["@name"], + int(m_nd["@index"]), + m_nd.get_bool("@content", False), + [nd["@name"] for nd in m_nd["response"]], + m_nd.get_bool("@synchronous", False), + m_nd.text, + get_docs(m_nd)) + load_fields(m_nd, meth.fields, domains) + klass.methods.add(meth) + # resolve the responses + for m in klass.methods: + m.responses = [klass.methods.byname[r] for r in m.responses] + for resp in m.responses: + resp.response = True + spec.classes.add(klass) + spec.post_load() + return spec + +REPLACE = {" ": "_", "-": "_"} +KEYWORDS = {"global": "global_", + "return": "return_"} + +def pythonize(name): + name = str(name) + for key, val in REPLACE.items(): + name = name.replace(key, val) + try: + name = KEYWORDS[name] + except KeyError: + pass + return name + +def fill(text, indent, heading = None): + sub = indent * " " + if heading: + init = (indent - 2) * " " + heading + " -- " + else: + init = sub + w = textwrap.TextWrapper(initial_indent = init, subsequent_indent = sub) + return w.fill(" ".join(text.split())) + +class Rule(Metadata): + + PRINT = ["text", "implement", "tests"] + + def __init__(self, text, implement, tests, path): + self.text = text + self.implement = implement + self.tests = tests + self.path = path + +def find_rules(node, rules): + if node.name == "rule": + rules.append(Rule(node.text, node.get("@implement"), + [ch.text for ch in node if ch.name == "test"], + node.path())) + if node.name == "doc" and node.get("@name") == "rule": + tests = [] + if node.has("@test"): + tests.append(node["@test"]) + rules.append(Rule(node.text, None, tests, node.path())) + for child in node: + find_rules(child, rules) + +def load_rules(specfile): + rules = [] + find_rules(xmlutil.parse(specfile), rules) + return rules + +def test_summary(): + template = """ + <html><head><title>AMQP Tests</title></head> + <body> + <table width="80%%" align="center"> + %s + </table> + </body> + </html> + """ + rows = [] + for rule in load_rules("amqp.org/specs/amqp7.xml"): + if rule.tests: + tests = ", ".join(rule.tests) + else: + tests = " " + rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>' + '<td><b>Implement:</b> %s</td>' + '<td><b>Tests:</b> %s</td></tr>' % + (rule.path[len("/root/amqp"):], rule.implement, tests)) + rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text) + rows.append('<tr><td colspan="3"> </td></tr>') + + print template % "\n".join(rows) diff --git a/Final/python/qpid/testlib.py b/Final/python/qpid/testlib.py new file mode 100644 index 0000000000..39bad75b86 --- /dev/null +++ b/Final/python/qpid/testlib.py @@ -0,0 +1,237 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Support library for qpid python tests. +# + +import sys, re, unittest, os, random, logging +import qpid.client, qpid.spec +import Queue +from getopt import getopt, GetoptError +from qpid.content import Content + +def findmodules(root): + """Find potential python modules under directory root""" + found = [] + for dirpath, subdirs, files in os.walk(root): + modpath = dirpath.replace(os.sep, '.') + if not re.match(r'\.svn$', dirpath): # Avoid SVN directories + for f in files: + match = re.match(r'(.+)\.py$', f) + if match and f != '__init__.py': + found.append('.'.join([modpath, match.group(1)])) + return found + +def default(value, default): + if (value == None): return default + else: return value + +class TestRunner: + """Runs unit tests. + + Parses command line arguments, provides utility functions for tests, + runs the selected test suite. + """ + + def _die(self, message = None): + if message: print message + print """ +run-tests [options] [test*] +The name of a test is package.module.ClassName.testMethod +Options: + -?/-h/--help : this message + -s/--spec <spec.xml> : file containing amqp XML spec + -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to + -v/--verbose : verbose - lists tests as they are run. + -d/--debug : enable debug logging. + -i/--ignore <test> : ignore the named test. + -I/--ignore-file : file containing patterns to ignore. + """ + sys.exit(1) + + def setBroker(self, broker): + rex = re.compile(r""" + # [ <user> [ / <password> ] @] <host> [ :<port> ] + ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) + match = rex.match(broker) + if not match: self._die("'%s' is not a valid broker" % (broker)) + self.user, self.password, self.host, self.port = match.groups() + self.port = int(default(self.port, 5672)) + self.user = default(self.user, "guest") + self.password = default(self.password, "guest") + + def __init__(self): + # Defaults + self.setBroker("localhost") + self.spec = "../specs/amqp.0-8.xml" + self.verbose = 1 + self.ignore = [] + + def ignoreFile(self, filename): + f = file(filename) + for line in f.readlines(): self.ignore.append(line.strip()) + f.close() + + def _parseargs(self, args): + try: + opts, self.tests = getopt(args, "s:b:h?dvi:I:", ["help", "spec", "server", "verbose", "ignore", "ignore-file"]) + except GetoptError, e: + self._die(str(e)) + for opt, value in opts: + if opt in ("-?", "-h", "--help"): self._die() + if opt in ("-s", "--spec"): self.spec = value + if opt in ("-b", "--broker"): self.setBroker(value) + if opt in ("-v", "--verbose"): self.verbose = 2 + if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) + if opt in ("-i", "--ignore"): self.ignore.append(value) + if opt in ("-I", "--ignore-file"): self.ignoreFile(value) + + if len(self.tests) == 0: self.tests=findmodules("tests") + + def testSuite(self): + class IgnoringTestSuite(unittest.TestSuite): + def addTest(self, test): + if isinstance(test, unittest.TestCase) and test.id() in testrunner.ignore: + return + unittest.TestSuite.addTest(self, test) + + # Use our IgnoringTestSuite in the test loader. + unittest.TestLoader.suiteClass = IgnoringTestSuite + return unittest.defaultTestLoader.loadTestsFromNames(self.tests) + + def run(self, args=sys.argv[1:]): + self._parseargs(args) + runner = unittest.TextTestRunner(descriptions=False, + verbosity=self.verbose) + result = runner.run(self.testSuite()) + if (self.ignore): + print "=======================================" + print "NOTE: the following tests were ignored:" + for t in self.ignore: print t + print "=======================================" + return result.wasSuccessful() + + def connect(self, host=None, port=None, spec=None, user=None, password=None): + """Connect to the broker, returns a qpid.client.Client""" + host = host or self.host + port = port or self.port + spec = spec or self.spec + user = user or self.user + password = password or self.password + client = qpid.client.Client(host, port, qpid.spec.load(spec)) + client.start({"LOGIN": user, "PASSWORD": password}) + return client + + +# Global instance for tests to call connect. +testrunner = TestRunner() + + +class TestBase(unittest.TestCase): + """Base class for Qpid test cases. + + self.client is automatically connected with channel 1 open before + the test methods are run. + + Deletes queues and exchanges after. Tests call + self.queue_declare(channel, ...) and self.exchange_declare(chanel, + ...) which are wrappers for the Channel functions that note + resources to clean up later. + """ + + def setUp(self): + self.queues = [] + self.exchanges = [] + self.client = self.connect() + self.channel = self.client.channel(1) + self.channel.channel_open() + + def tearDown(self): + for ch, q in self.queues: + ch.queue_delete(queue=q) + for ch, ex in self.exchanges: + ch.exchange_delete(exchange=ex) + + def connect(self, *args, **keys): + """Create a new connction, return the Client object""" + return testrunner.connect(*args, **keys) + + def queue_declare(self, channel=None, *args, **keys): + channel = channel or self.channel + reply = channel.queue_declare(*args, **keys) + self.queues.append((channel, reply.queue)) + return reply + + def exchange_declare(self, channel=None, ticket=0, exchange='', + type='', passive=False, durable=False, + auto_delete=False, internal=False, nowait=False, + arguments={}): + channel = channel or self.channel + reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments) + self.exchanges.append((channel,exchange)) + return reply + + def uniqueString(self): + """Generate a unique string, unique for this TestBase instance""" + if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; + return "Test Message " + str(self.uniqueCounter) + + def consume(self, queueName): + """Consume from named queue returns the Queue object.""" + reply = self.channel.basic_consume(queue=queueName, no_ack=True) + return self.client.queue(reply.consumer_tag) + + def assertEmpty(self, queue): + """Assert that the queue is empty""" + try: + queue.get(timeout=1) + self.fail("Queue is not empty.") + except Queue.Empty: None # Ignore + + def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): + """ + Publish to exchange and assert queue.get() returns the same message. + """ + body = self.uniqueString() + self.channel.basic_publish(exchange=exchange, + content=Content(body, properties=properties), + routing_key=routing_key) + msg = queue.get(timeout=1) + self.assertEqual(body, msg.content.body) + if (properties): self.assertEqual(properties, msg.content.properties) + + def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): + """ + Publish a message and consume it, assert it comes back intact. + Return the Queue object used to consume. + """ + self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) + + def assertChannelException(self, expectedCode, message): + self.assertEqual("channel", message.method.klass.name) + self.assertEqual("close", message.method.name) + self.assertEqual(expectedCode, message.reply_code) + + + def assertConnectionException(self, expectedCode, message): + self.assertEqual("connection", message.method.klass.name) + self.assertEqual("close", message.method.name) + self.assertEqual(expectedCode, message.reply_code) + diff --git a/Final/python/qpid/xmlutil.py b/Final/python/qpid/xmlutil.py new file mode 100644 index 0000000000..585516b44f --- /dev/null +++ b/Final/python/qpid/xmlutil.py @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +XML utilities used by spec.py +""" + +import xml.sax +from xml.sax.handler import ContentHandler + +def parse(file): + doc = Node("root") + xml.sax.parse(file, Builder(doc)) + return doc + +class Node: + + def __init__(self, name, attrs = None, text = None, parent = None): + self.name = name + self.attrs = attrs + self.text = text + self.parent = parent + self.children = [] + if parent != None: + parent.children.append(self) + + def get_bool(self, key, default = False): + v = self.get(key) + if v == None: + return default + else: + return bool(int(v)) + + def index(self): + if self.parent: + return self.parent.children.index(self) + else: + return 0 + + def has(self, key): + try: + result = self[key] + return True + except KeyError: + return False + except IndexError: + return False + + def get(self, key, default = None): + if self.has(key): + return self[key] + else: + return default + + def __getitem__(self, key): + if callable(key): + return filter(key, self.children) + else: + t = key.__class__ + meth = "__get%s__" % t.__name__ + if hasattr(self, meth): + return getattr(self, meth)(key) + else: + raise KeyError(key) + + def __getstr__(self, name): + if name[:1] == "@": + return self.attrs[name[1:]] + else: + return self[lambda nd: nd.name == name] + + def __getint__(self, index): + return self.children[index] + + def __iter__(self): + return iter(self.children) + + def path(self): + if self.parent == None: + return "/%s" % self.name + else: + return "%s/%s" % (self.parent.path(), self.name) + +class Builder(ContentHandler): + + def __init__(self, start = None): + self.node = start + + def __setitem__(self, element, type): + self.types[element] = type + + def startElement(self, name, attrs): + self.node = Node(name, attrs, None, self.node) + + def endElement(self, name): + self.node = self.node.parent + + def characters(self, content): + if self.node.text == None: + self.node.text = content + else: + self.node.text += content + |